1use bytes::Bytes;
4use tokio::sync::mpsc;
5
6use crate::error::{SdkError, SdkResult};
7
8#[derive(Debug)]
13pub struct StreamSender {
14 tx: mpsc::Sender<Bytes>,
15 ended: std::sync::atomic::AtomicBool,
16}
17
18impl StreamSender {
19 pub fn new(tx: mpsc::Sender<Bytes>) -> Self {
24 Self {
25 tx,
26 ended: std::sync::atomic::AtomicBool::new(false),
27 }
28 }
29
30 pub fn is_ended(&self) -> bool {
32 self.ended.load(std::sync::atomic::Ordering::Relaxed)
33 }
34
35 pub async fn emit(&self, data: Bytes) -> SdkResult<()> {
37 if self.is_ended() {
38 return Err(SdkError::StreamEnded);
39 }
40 self.tx
41 .send(data)
42 .await
43 .map_err(|e| SdkError::ChannelSend(e.to_string()))
44 }
45
46 pub async fn emit_str(&self, data: &str) -> SdkResult<()> {
48 self.emit(Bytes::from(data.to_owned())).await
49 }
50
51 pub async fn emit_json<T: serde::Serialize>(&self, data: &T) -> SdkResult<()> {
53 let json = serde_json::to_vec(data)
54 .map_err(|e| SdkError::Other(format!("json serialization error: {e}")))?;
55 self.emit(Bytes::from(json)).await
56 }
57
58 pub fn end(&self) {
60 self.ended
61 .store(true, std::sync::atomic::Ordering::Relaxed);
62 }
63}