use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::stream_chunk::StreamChunk;
pub trait StreamSink: Send + Sync {
fn emit(&self, chunk: StreamChunk);
}
pub struct BufferedSink {
tx: mpsc::UnboundedSender<StreamChunk>,
}
impl BufferedSink {
pub fn new() -> (Self, mpsc::UnboundedReceiver<StreamChunk>) {
let (tx, rx) = mpsc::unbounded_channel();
(Self { tx }, rx)
}
}
impl StreamSink for BufferedSink {
fn emit(&self, chunk: StreamChunk) {
let _ = self.tx.send(chunk);
}
}
impl Clone for BufferedSink {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
pub struct ChannelSink {
tx: mpsc::Sender<StreamChunk>,
}
impl ChannelSink {
pub fn new(tx: mpsc::Sender<StreamChunk>) -> Self {
Self { tx }
}
}
impl StreamSink for ChannelSink {
fn emit(&self, chunk: StreamChunk) {
let _ = self.tx.try_send(chunk);
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct NoopSink;
impl StreamSink for NoopSink {
fn emit(&self, _chunk: StreamChunk) {
}
}
pub struct StreamHub {
sinks: Vec<Arc<dyn StreamSink>>,
}
impl StreamHub {
pub fn new() -> Self {
Self { sinks: Vec::new() }
}
pub fn add_sink(&mut self, sink: Arc<dyn StreamSink>) {
self.sinks.push(sink);
}
pub fn from_sink(sink: Arc<dyn StreamSink>) -> Self {
Self { sinks: vec![sink] }
}
pub fn is_empty(&self) -> bool {
self.sinks.is_empty()
}
}
impl StreamSink for StreamHub {
fn emit(&self, chunk: StreamChunk) {
for sink in &self.sinks {
sink.emit(chunk.clone());
}
}
}
impl Clone for StreamHub {
fn clone(&self) -> Self {
Self {
sinks: self.sinks.clone(),
}
}
}
pub fn sink_arc<S: StreamSink + 'static>(sink: S) -> Arc<dyn StreamSink> {
Arc::new(sink)
}
pub fn noop_sink() -> Arc<dyn StreamSink> {
Arc::new(NoopSink)
}
pub fn spawn_forward_task(
mut buffer_rx: mpsc::UnboundedReceiver<StreamChunk>,
tx: mpsc::Sender<StreamChunk>,
cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => break,
chunk = buffer_rx.recv() => {
let chunk = match chunk {
Some(c) => c,
None => break, };
if tx.send(chunk).await.is_err() {
break;
}
}
}
}
cancel.cancel();
})
}