lellm_graph/stream_emitter.rs
1//! StreamSink — 数据面发射抽象。
2//!
3//! Graph 层只知道 `StreamSink` trait,不知道 channel、WebSocket、Logger。
4//! 所有消费端实现都在 Agent/Provider 层。
5//!
6//! 设计原则:
7//! - 同步 `emit` — Node 永远不阻塞(O(1))
8//! - Producer Push 模型 — 生产者推送,不感知消费者
9//! - 取消 = 消费者离开(不是背压)
10
11use std::sync::Arc;
12
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16use crate::stream_chunk::StreamChunk;
17
18// ─── StreamSink Trait ─────────────────────────────────────────
19
20/// 数据面发射抽象。
21///
22/// Graph 层唯一的流式依赖。Node 通过 `ctx.emit(chunk)` 推送数据。
23/// 实现者决定如何处理(channel、WebSocket、文件、丢弃)。
24pub trait StreamSink: Send + Sync {
25 /// 发射一个数据面事件。
26 ///
27 /// 同步调用,永远不阻塞。
28 fn emit(&self, chunk: StreamChunk);
29}
30
31// ─── BufferedSink ─────────────────────────────────────────────
32
33/// 基于大缓冲队列的 StreamSink 实现。
34///
35/// 用于 Agent 层:将 StreamChunk 推入队列,
36/// 由 Forward Task 异步消费并转发到 mpsc channel。
37///
38/// ```text
39/// LLMNode
40/// ↓ emit() — O(1), 固定成本
41/// BufferedSink (large buffer mpsc)
42/// ↓
43/// Forward Task (spawn)
44/// ↓
45/// mpsc::Sender<StreamChunk> (bounded, backpressure)
46/// ↓
47/// Consumer
48/// ```
49pub struct BufferedSink {
50 tx: mpsc::UnboundedSender<StreamChunk>,
51}
52
53impl BufferedSink {
54 /// 创建 BufferedSink(无界队列)。
55 ///
56 /// 取消机制负责清理:消费者离开 → cancel → Node 停止。
57 pub fn new() -> (Self, mpsc::UnboundedReceiver<StreamChunk>) {
58 let (tx, rx) = mpsc::unbounded_channel();
59 (Self { tx }, rx)
60 }
61}
62
63impl StreamSink for BufferedSink {
64 fn emit(&self, chunk: StreamChunk) {
65 // unbounded send only fails if receiver is dropped
66 let _ = self.tx.send(chunk);
67 }
68}
69
70impl Clone for BufferedSink {
71 fn clone(&self) -> Self {
72 Self {
73 tx: self.tx.clone(),
74 }
75 }
76}
77
78// ─── ChannelSink ──────────────────────────────────────────────
79
80/// 直接写入 mpsc channel 的 StreamSink。
81///
82/// 用于测试或简单场景——不需要 Forward Task。
83/// channel full 时静默丢弃(消费者会触发 cancel)。
84pub struct ChannelSink {
85 tx: mpsc::Sender<StreamChunk>,
86}
87
88impl ChannelSink {
89 pub fn new(tx: mpsc::Sender<StreamChunk>) -> Self {
90 Self { tx }
91 }
92}
93
94impl StreamSink for ChannelSink {
95 fn emit(&self, chunk: StreamChunk) {
96 // try_send 失败 = channel full 或消费者已断开
97 // 消费者断开时,cancel 会触发,Node 会停止
98 let _ = self.tx.try_send(chunk);
99 }
100}
101
102// ─── NoopSink ─────────────────────────────────────────────────
103
104/// 丢弃所有事件的 StreamSink。
105///
106/// 用于阻塞执行模式(sink=None 的等价实现)和测试。
107#[derive(Clone, Copy, Debug, Default)]
108pub struct NoopSink;
109
110impl StreamSink for NoopSink {
111 fn emit(&self, _chunk: StreamChunk) {
112 // no-op
113 }
114}
115
116// ─── StreamHub (Fan-out Bus) ─────────────────────────────────
117
118/// 扇出总线 — 将每个 emit 广播到所有注册的 sinks。
119///
120/// ParallelNode 的子分支 clone 父级的 `Arc<StreamHub>`,
121/// 子分支的 emit 自动汇聚到同一个消费者集合。
122///
123/// ```text
124/// ExecutionEngine
125/// │
126/// ├── StreamHub (fan-out bus)
127/// │ ├── CLI sink
128/// │ ├── Trace sink
129/// │ └── AgentEvent sink
130/// │
131/// └── Parallel 子分支 clone Arc<StreamHub>
132/// ├── Branch A → emit → → StreamHub → all sinks
133/// └── Branch B → emit → → StreamHub → all sinks
134/// ```
135/// StreamSink 不要求 Debug,故 StreamHub 也不 derive Debug。
136pub struct StreamHub {
137 sinks: Vec<Arc<dyn StreamSink>>,
138}
139
140impl StreamHub {
141 pub fn new() -> Self {
142 Self { sinks: Vec::new() }
143 }
144
145 /// 注册一个 sink。
146 pub fn add_sink(&mut self, sink: Arc<dyn StreamSink>) {
147 self.sinks.push(sink);
148 }
149
150 /// 从单个 sink 创建 StreamHub(便捷构造)。
151 pub fn from_sink(sink: Arc<dyn StreamSink>) -> Self {
152 Self { sinks: vec![sink] }
153 }
154
155 /// 是否为空(无 sink)。
156 pub fn is_empty(&self) -> bool {
157 self.sinks.is_empty()
158 }
159}
160
161impl StreamSink for StreamHub {
162 fn emit(&self, chunk: StreamChunk) {
163 for sink in &self.sinks {
164 sink.emit(chunk.clone());
165 }
166 }
167}
168
169impl Clone for StreamHub {
170 /// Clone 只克隆 sink 列表的 Arc 引用(浅拷贝)。
171 fn clone(&self) -> Self {
172 Self {
173 sinks: self.sinks.clone(),
174 }
175 }
176}
177
178// ─── Arc<dyn StreamSink> helpers ──────────────────────────────
179
180/// 创建 `Arc<dyn StreamSink>` 的便捷函数。
181pub fn sink_arc<S: StreamSink + 'static>(sink: S) -> Arc<dyn StreamSink> {
182 Arc::new(sink)
183}
184
185/// 创建 NoopSink 的 Arc。
186pub fn noop_sink() -> Arc<dyn StreamSink> {
187 Arc::new(NoopSink)
188}
189
190// ─── Forward Task ─────────────────────────────────────────────
191
192/// 启动 Forward Task:从 BufferedSink 读取,转发到 mpsc channel。
193///
194/// 消费者断开(Receiver dropped)时,task 退出并触发 CancellationToken。
195pub fn spawn_forward_task(
196 mut buffer_rx: mpsc::UnboundedReceiver<StreamChunk>,
197 tx: mpsc::Sender<StreamChunk>,
198 cancel: CancellationToken,
199) -> tokio::task::JoinHandle<()> {
200 tokio::spawn(async move {
201 loop {
202 tokio::select! {
203 _ = cancel.cancelled() => break,
204 chunk = buffer_rx.recv() => {
205 let chunk = match chunk {
206 Some(c) => c,
207 None => break, // sender dropped
208 };
209
210 // 发送失败 = 消费者断开
211 if tx.send(chunk).await.is_err() {
212 break;
213 }
214 }
215 }
216 }
217 // Forward task 退出 → 触发取消
218 cancel.cancel();
219 })
220}