Skip to main content

lellm_graph/
stream_emitter.rs

1//! StreamSink — 数据面发射抽象。
2//!
3//! Graph 层只知道 `StreamSink` trait,不知道 channel、WebSocket、Logger。
4//! 所有消费端实现都在 Agent/Provider 层。
5//!
6//! 设计原则:
7//! - 同步 `emit` — Node 永远不阻塞(O(1))
8//! - Producer Push 模型 — 生产者推送,不感知消费者
9//! - 取消 = 消费者离开(不是背压)
10
11use std::sync::Arc;
12
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16use crate::stream_chunk::StreamChunk;
17
18// ─── StreamSink Trait ─────────────────────────────────────────
19
20/// 数据面发射抽象。
21///
22/// Graph 层唯一的流式依赖。Node 通过 `ctx.emit(chunk)` 推送数据。
23/// 实现者决定如何处理(channel、WebSocket、文件、丢弃)。
24pub trait StreamSink: Send + Sync {
25    /// 发射一个数据面事件。
26    ///
27    /// 同步调用,永远不阻塞。
28    fn emit(&self, chunk: StreamChunk);
29}
30
31// ─── BufferedSink ─────────────────────────────────────────────
32
33/// 基于大缓冲队列的 StreamSink 实现。
34///
35/// 用于 Agent 层:将 StreamChunk 推入队列,
36/// 由 Forward Task 异步消费并转发到 mpsc channel。
37///
38/// ```text
39/// LLMNode
40///    ↓ emit() — O(1), 固定成本
41/// BufferedSink (large buffer mpsc)
42///    ↓
43/// Forward Task (spawn)
44///    ↓
45/// mpsc::Sender<StreamChunk> (bounded, backpressure)
46///    ↓
47/// Consumer
48/// ```
49pub struct BufferedSink {
50    tx: mpsc::UnboundedSender<StreamChunk>,
51}
52
53impl BufferedSink {
54    /// 创建 BufferedSink(无界队列)。
55    ///
56    /// 取消机制负责清理:消费者离开 → cancel → Node 停止。
57    pub fn new() -> (Self, mpsc::UnboundedReceiver<StreamChunk>) {
58        let (tx, rx) = mpsc::unbounded_channel();
59        (Self { tx }, rx)
60    }
61}
62
63impl StreamSink for BufferedSink {
64    fn emit(&self, chunk: StreamChunk) {
65        // unbounded send only fails if receiver is dropped
66        let _ = self.tx.send(chunk);
67    }
68}
69
70impl Clone for BufferedSink {
71    fn clone(&self) -> Self {
72        Self {
73            tx: self.tx.clone(),
74        }
75    }
76}
77
78// ─── ChannelSink ──────────────────────────────────────────────
79
80/// 直接写入 mpsc channel 的 StreamSink。
81///
82/// 用于测试或简单场景——不需要 Forward Task。
83/// channel full 时静默丢弃(消费者会触发 cancel)。
84pub struct ChannelSink {
85    tx: mpsc::Sender<StreamChunk>,
86}
87
88impl ChannelSink {
89    pub fn new(tx: mpsc::Sender<StreamChunk>) -> Self {
90        Self { tx }
91    }
92}
93
94impl StreamSink for ChannelSink {
95    fn emit(&self, chunk: StreamChunk) {
96        // try_send 失败 = channel full 或消费者已断开
97        // 消费者断开时,cancel 会触发,Node 会停止
98        let _ = self.tx.try_send(chunk);
99    }
100}
101
102// ─── NoopSink ─────────────────────────────────────────────────
103
104/// 丢弃所有事件的 StreamSink。
105///
106/// 用于阻塞执行模式(sink=None 的等价实现)和测试。
107#[derive(Clone, Copy, Debug, Default)]
108pub struct NoopSink;
109
110impl StreamSink for NoopSink {
111    fn emit(&self, _chunk: StreamChunk) {
112        // no-op
113    }
114}
115
116// ─── StreamHub (Fan-out Bus) ─────────────────────────────────
117
118/// 扇出总线 — 将每个 emit 广播到所有注册的 sinks。
119///
120/// ParallelNode 的子分支 clone 父级的 `Arc<StreamHub>`,
121/// 子分支的 emit 自动汇聚到同一个消费者集合。
122///
123/// ```text
124/// ExecutionEngine
125///     │
126///     ├── StreamHub (fan-out bus)
127///     │     ├── CLI sink
128///     │     ├── Trace sink
129///     │     └── AgentEvent sink
130///     │
131///     └── Parallel 子分支 clone Arc<StreamHub>
132///           ├── Branch A → emit → → StreamHub → all sinks
133///           └── Branch B → emit → → StreamHub → all sinks
134/// ```
135/// StreamSink 不要求 Debug,故 StreamHub 也不 derive Debug。
136pub struct StreamHub {
137    sinks: Vec<Arc<dyn StreamSink>>,
138}
139
140impl StreamHub {
141    pub fn new() -> Self {
142        Self { sinks: Vec::new() }
143    }
144
145    /// 注册一个 sink。
146    pub fn add_sink(&mut self, sink: Arc<dyn StreamSink>) {
147        self.sinks.push(sink);
148    }
149
150    /// 从单个 sink 创建 StreamHub(便捷构造)。
151    pub fn from_sink(sink: Arc<dyn StreamSink>) -> Self {
152        Self { sinks: vec![sink] }
153    }
154
155    /// 是否为空(无 sink)。
156    pub fn is_empty(&self) -> bool {
157        self.sinks.is_empty()
158    }
159}
160
161impl StreamSink for StreamHub {
162    fn emit(&self, chunk: StreamChunk) {
163        for sink in &self.sinks {
164            sink.emit(chunk.clone());
165        }
166    }
167}
168
169impl Clone for StreamHub {
170    /// Clone 只克隆 sink 列表的 Arc 引用(浅拷贝)。
171    fn clone(&self) -> Self {
172        Self {
173            sinks: self.sinks.clone(),
174        }
175    }
176}
177
178// ─── Arc<dyn StreamSink> helpers ──────────────────────────────
179
180/// 创建 `Arc<dyn StreamSink>` 的便捷函数。
181pub fn sink_arc<S: StreamSink + 'static>(sink: S) -> Arc<dyn StreamSink> {
182    Arc::new(sink)
183}
184
185/// 创建 NoopSink 的 Arc。
186pub fn noop_sink() -> Arc<dyn StreamSink> {
187    Arc::new(NoopSink)
188}
189
190// ─── Forward Task ─────────────────────────────────────────────
191
192/// 启动 Forward Task:从 BufferedSink 读取,转发到 mpsc channel。
193///
194/// 消费者断开(Receiver dropped)时,task 退出并触发 CancellationToken。
195pub fn spawn_forward_task(
196    mut buffer_rx: mpsc::UnboundedReceiver<StreamChunk>,
197    tx: mpsc::Sender<StreamChunk>,
198    cancel: CancellationToken,
199) -> tokio::task::JoinHandle<()> {
200    tokio::spawn(async move {
201        loop {
202            tokio::select! {
203                _ = cancel.cancelled() => break,
204                chunk = buffer_rx.recv() => {
205                    let chunk = match chunk {
206                        Some(c) => c,
207                        None => break, // sender dropped
208                    };
209
210                    // 发送失败 = 消费者断开
211                    if tx.send(chunk).await.is_err() {
212                        break;
213                    }
214                }
215            }
216        }
217        // Forward task 退出 → 触发取消
218        cancel.cancel();
219    })
220}