use bytes::Bytes;
use tokio::sync::mpsc;
use crate::error::{SdkError, SdkResult};
#[derive(Debug)]
pub struct StreamSender {
tx: mpsc::Sender<Bytes>,
ended: std::sync::atomic::AtomicBool,
}
impl StreamSender {
pub fn new(tx: mpsc::Sender<Bytes>) -> Self {
Self {
tx,
ended: std::sync::atomic::AtomicBool::new(false),
}
}
pub fn is_ended(&self) -> bool {
self.ended.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn emit(&self, data: Bytes) -> SdkResult<()> {
if self.is_ended() {
return Err(SdkError::StreamEnded);
}
self.tx
.send(data)
.await
.map_err(|e| SdkError::ChannelSend(e.to_string()))
}
pub async fn emit_str(&self, data: &str) -> SdkResult<()> {
self.emit(Bytes::from(data.to_owned())).await
}
pub async fn emit_json<T: serde::Serialize>(&self, data: &T) -> SdkResult<()> {
let json = serde_json::to_vec(data)
.map_err(|e| SdkError::Other(format!("json serialization error: {e}")))?;
self.emit(Bytes::from(json)).await
}
pub fn end(&self) {
self.ended.store(true, std::sync::atomic::Ordering::Relaxed);
}
}