Skip to main content

lellm_graph/
stream_emitter.rs

1//! StreamSink — 数据面发射抽象。
2//!
3//! Graph 层只知道 `StreamSink` trait,不知道 channel、WebSocket、Logger。
4//! 所有消费端实现都在 Agent/Provider 层。
5//!
6//! 设计原则:
7//! - 同步 `emit` — Node 永远不阻塞(O(1))
8//! - Producer Push 模型 — 生产者推送,不感知消费者
9//! - 取消 = 消费者离开(不是背压)
10
11use std::sync::Arc;
12
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16use crate::stream_chunk::StreamChunk;
17
18// ─── StreamSink Trait ─────────────────────────────────────────
19
20/// 数据面发射抽象。
21///
22/// Graph 层唯一的流式依赖。Node 通过 `ctx.emit(chunk)` 推送数据。
23/// 实现者决定如何处理(channel、WebSocket、文件、丢弃)。
24pub trait StreamSink: Send + Sync {
25    /// 发射一个数据面事件。
26    ///
27    /// 同步调用,永远不阻塞。
28    fn emit(&self, chunk: StreamChunk);
29}
30
31// ─── BufferedSink ─────────────────────────────────────────────
32
33/// 基于大缓冲队列的 StreamSink 实现。
34///
35/// 用于 Agent 层:将 StreamChunk 推入队列,
36/// 由 Forward Task 异步消费并转发到 mpsc channel。
37///
38/// ```text
39/// LLMNode
40///    ↓ emit() — O(1), 固定成本
41/// BufferedSink (large buffer mpsc)
42///    ↓
43/// Forward Task (spawn)
44///    ↓
45/// mpsc::Sender<StreamChunk> (bounded, backpressure)
46///    ↓
47/// Consumer
48/// ```
49pub struct BufferedSink {
50    tx: mpsc::UnboundedSender<StreamChunk>,
51}
52
53impl BufferedSink {
54    /// 创建 BufferedSink(无界队列)。
55    ///
56    /// 取消机制负责清理:消费者离开 → cancel → Node 停止。
57    pub fn new() -> (Self, mpsc::UnboundedReceiver<StreamChunk>) {
58        let (tx, rx) = mpsc::unbounded_channel();
59        (Self { tx }, rx)
60    }
61}
62
63impl StreamSink for BufferedSink {
64    fn emit(&self, chunk: StreamChunk) {
65        // unbounded send only fails if receiver is dropped
66        let _ = self.tx.send(chunk);
67    }
68}
69
70impl Clone for BufferedSink {
71    fn clone(&self) -> Self {
72        Self {
73            tx: self.tx.clone(),
74        }
75    }
76}
77
78// ─── ChannelSink ──────────────────────────────────────────────
79
80/// 直接写入 mpsc channel 的 StreamSink。
81///
82/// 用于测试或简单场景——不需要 Forward Task。
83/// channel full 时静默丢弃(消费者会触发 cancel)。
84pub struct ChannelSink {
85    tx: mpsc::Sender<StreamChunk>,
86}
87
88impl ChannelSink {
89    pub fn new(tx: mpsc::Sender<StreamChunk>) -> Self {
90        Self { tx }
91    }
92}
93
94impl StreamSink for ChannelSink {
95    fn emit(&self, chunk: StreamChunk) {
96        // try_send 失败 = channel full 或消费者已断开
97        // 消费者断开时,cancel 会触发,Node 会停止
98        let _ = self.tx.try_send(chunk);
99    }
100}
101
102// ─── NoopSink ─────────────────────────────────────────────────
103
104/// 丢弃所有事件的 StreamSink。
105///
106/// 用于阻塞执行模式(sink=None 的等价实现)和测试。
107#[derive(Clone, Copy, Debug, Default)]
108pub struct NoopSink;
109
110impl StreamSink for NoopSink {
111    fn emit(&self, _chunk: StreamChunk) {
112        // no-op
113    }
114}
115
116// ─── Arc<dyn StreamSink> helpers ──────────────────────────────
117
118/// 创建 `Arc<dyn StreamSink>` 的便捷函数。
119pub fn sink_arc<S: StreamSink + 'static>(sink: S) -> Arc<dyn StreamSink> {
120    Arc::new(sink)
121}
122
123/// 创建 NoopSink 的 Arc。
124pub fn noop_sink() -> Arc<dyn StreamSink> {
125    Arc::new(NoopSink)
126}
127
128// ─── Forward Task ─────────────────────────────────────────────
129
130/// 启动 Forward Task:从 BufferedSink 读取,转发到 mpsc channel。
131///
132/// 消费者断开(Receiver dropped)时,task 退出并触发 CancellationToken。
133pub fn spawn_forward_task(
134    mut buffer_rx: mpsc::UnboundedReceiver<StreamChunk>,
135    tx: mpsc::Sender<StreamChunk>,
136    cancel: CancellationToken,
137) -> tokio::task::JoinHandle<()> {
138    tokio::spawn(async move {
139        loop {
140            tokio::select! {
141                _ = cancel.cancelled() => break,
142                chunk = buffer_rx.recv() => {
143                    let chunk = match chunk {
144                        Some(c) => c,
145                        None => break, // sender dropped
146                    };
147
148                    // 发送失败 = 消费者断开
149                    if tx.send(chunk).await.is_err() {
150                        break;
151                    }
152                }
153            }
154        }
155        // Forward task 退出 → 触发取消
156        cancel.cancel();
157    })
158}