lellm_graph/node_context.rs
1//! NodeContext + LeafContext — 节点能力视图。
2//!
3//! 职责分离:
4//! - `LeafContext<'a, S>` — 只读视图,Leaf 节点使用(`&S`,不能修改 State)
5//! - `NodeContext<'a, S>` — 可变视图,向后兼容(`&mut S`,可通过 `replace_state()` 修改)
6//!
7//! ExecutionEngine 和相关 trait 定义在 [`execution_engine`] 模块中。
8
9use tokio_util::sync::CancellationToken;
10
11use crate::event::FlowEvent;
12use crate::execution_engine::{ExecutionControl, NodeMetadata};
13use crate::state::State;
14use crate::stream_chunk::StreamChunk;
15use crate::stream_emitter::StreamSink;
16use crate::workflow_state::WorkflowState;
17
18// ─── Backward Compat Re-exports ──────────────────────────────
19
20/// 向后兼容 — `ExecutionContext` 已迁移到 [`execution_engine`] 模块。
21pub use crate::execution_engine::ExecutionContext;
22
23// ─── LeafContext (Borrowed View) ──────────────────────────────
24
25/// Leaf 节点能力视图 — 纯借用,不拥有任何状态。
26///
27/// 设计原则:
28/// - **只能读 State**(`&S`,不可变引用)
29/// - **只能 emit Mutation**(借用在 Engine 的 mutations buffer)
30/// - **只能 emit Stream / FlowEvent**
31/// - **不能 replace_state / clone_state / fork / merge**
32///
33/// 与 NodeContext 的区别:
34/// - NodeContext 持有 `&mut S`(可变引用),Composite 节点可用 replace_state()
35/// - LeafContext 持有 `&S`(只读引用),编译期保证不能修改 State
36pub struct LeafContext<'a, S: WorkflowState = State> {
37 /// 类型化状态 — 只读引用
38 pub(crate) state: &'a S,
39 /// 数据面发射器 — 可选(阻塞模式 = None)
40 pub(crate) stream: Option<&'a dyn StreamSink>,
41 /// 取消令牌
42 pub(crate) cancel: &'a CancellationToken,
43 /// 控制信号 — 节点写入,Executor 读取
44 pub(crate) control: &'a mut ExecutionControl,
45 /// 节点元数据 — 节点写入
46 pub(crate) metadata: &'a mut NodeMetadata,
47 /// Mutation 缓冲 — 借用在 ExecutionEngine 的 buffer
48 pub(crate) mutations: &'a mut Vec<S::Mutation>,
49 /// FlowEvent 缓冲 — 借用在 ExecutionEngine 的 buffer
50 pub(crate) flow_events: &'a mut Vec<FlowEvent>,
51}
52
53impl<S: WorkflowState> LeafContext<'_, S> {
54 // ─── 读 State ─────────────────────────────────────────────
55
56 /// 获取类型化状态(只读)。
57 pub fn state(&self) -> &S {
58 self.state
59 }
60
61 // ─── 记录 Mutation ────────────────────────────────────────
62
63 /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
64 ///
65 /// 这是 Leaf 节点变更状态的**唯一入口**。
66 pub fn record(&mut self, mutation: S::Mutation) {
67 self.mutations.push(mutation);
68 }
69
70 // ─── 数据面发射 ───────────────────────────────────────────
71
72 /// 发射数据面事件(无 stream 则静默丢弃)。
73 pub fn emit(&self, chunk: StreamChunk) {
74 if let Some(stream) = self.stream {
75 stream.emit(chunk);
76 }
77 }
78
79 /// 发射控制面 FlowEvent(缓冲到 ExecutionEngine,供 Executor 收集转发)。
80 pub fn emit_flow_event(&mut self, event: FlowEvent) {
81 self.flow_events.push(event);
82 }
83
84 // ─── 取消检查 ─────────────────────────────────────────────
85
86 /// 检查是否已取消。
87 pub fn is_cancelled(&self) -> bool {
88 self.cancel.is_cancelled()
89 }
90
91 /// 获取取消令牌引用。
92 pub fn cancel_token(&self) -> &CancellationToken {
93 self.cancel
94 }
95
96 // ─── 控制信号 ─────────────────────────────────────────────
97
98 /// 跳转到指定节点。
99 pub fn goto(&mut self, target: impl Into<String>) {
100 self.control.goto(target);
101 }
102
103 /// 结束执行。
104 pub fn end(&mut self) {
105 self.control.end();
106 }
107
108 /// Barrier 挂起。
109 pub fn pause(
110 &mut self,
111 barrier_id: crate::event::BarrierId,
112 timeout: Option<std::time::Duration>,
113 ) {
114 self.control.pause(barrier_id, timeout);
115 }
116
117 // ─── 元数据 ───────────────────────────────────────────────
118
119 /// 设置 token 成本。
120 pub fn set_token_cost(&mut self, cost: f64) {
121 self.metadata.token_cost = cost;
122 }
123
124 /// 标记有副作用。
125 pub fn set_has_side_effects(&mut self) {
126 self.metadata.has_side_effects = true;
127 }
128}
129
130// ─── NodeContext ──────────────────────────────────────────────
131
132/// 节点能力视图 — 向后兼容,节点能做的三件事:读 State、记录 Mutation、发射 Stream。
133///
134/// # 设计原则
135///
136/// NodeContext 是 Runtime 的能力视图,不是 Runtime 的拥有者。
137/// - 节点只借用,不拥有。零复制透传给子组件
138/// - 禁止放入:RuntimeEventEmitter、TraceId、SpanId、GraphHandle、ExecutorConfig
139/// - **不提供 `state_mut()`** — 节点只能通过 `record()` 声明变更意图
140/// - 组合节点(如 ParallelNode)使用 `replace_state()` 整体替换状态
141///
142/// # 泛型参数
143///
144/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
145pub struct NodeContext<'a, S: WorkflowState = State> {
146 /// 类型化状态 — 可变引用(仅组合节点如 ParallelNode 需要写权限)
147 pub(crate) state: &'a mut S,
148 /// 数据面发射器 — 可选(阻塞模式 = None)
149 pub(crate) stream: Option<&'a dyn StreamSink>,
150 /// 取消令牌
151 pub(crate) cancel: &'a CancellationToken,
152 /// 控制信号 — 节点写入,Executor 读取
153 pub(crate) control: &'a mut ExecutionControl,
154 /// 节点元数据 — 节点写入
155 pub(crate) metadata: &'a mut NodeMetadata,
156 /// Mutation 缓冲 — 节点产生的强类型领域事件
157 pub(crate) mutations: &'a mut Vec<S::Mutation>,
158 /// FlowEvent 缓冲 — 节点产生的控制面事件
159 pub(crate) flow_events: &'a mut Vec<FlowEvent>,
160}
161
162impl<S: WorkflowState> NodeContext<'_, S> {
163 // ─── 读 State ─────────────────────────────────────────────
164
165 /// 获取类型化状态(只读)。
166 pub fn state(&self) -> &S {
167 &self.state
168 }
169
170 /// 替换整个状态(仅组合节点使用,如 ParallelNode)。
171 ///
172 /// 这是组合节点合并子分支结果后的 sanctioned API。
173 /// 普通节点应使用 `record()` 声明变更意图。
174 ///
175 /// 替换后,Engine 持有的状态直接变为 `new_state`。
176 /// 不会触发 Mutation 记录(因为这是整体替换,不是增量变更)。
177 pub fn replace_state(&mut self, new_state: S) {
178 *self.state = new_state;
179 }
180
181 // ─── 记录 Mutation ────────────────────────────────────────
182
183 /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
184 ///
185 /// 这是节点变更状态的**唯一入口**。
186 /// 零序列化开销 — 直接存储 `S::Mutation`。
187 ///
188 /// Executor 在节点执行后统一消费并 apply 到 State。
189 pub fn record(&mut self, mutation: S::Mutation) {
190 self.mutations.push(mutation);
191 }
192
193 // ─── 数据面发射 ───────────────────────────────────────────
194
195 /// 发射数据面事件(无 stream 则静默丢弃)。
196 pub fn emit(&self, chunk: StreamChunk) {
197 if let Some(stream) = self.stream {
198 stream.emit(chunk);
199 }
200 }
201
202 /// 发射控制面 FlowEvent(缓冲到 ExecutionContext,供 Executor 收集转发)。
203 pub fn emit_flow_event(&mut self, event: FlowEvent) {
204 self.flow_events.push(event);
205 }
206
207 // ─── 取消检查 ─────────────────────────────────────────────
208
209 /// 检查是否已取消。
210 pub fn is_cancelled(&self) -> bool {
211 self.cancel.is_cancelled()
212 }
213
214 /// 获取取消令牌引用。
215 pub fn cancel_token(&self) -> &CancellationToken {
216 self.cancel
217 }
218
219 // ─── 控制信号 ─────────────────────────────────────────────
220
221 /// 跳转到指定节点。
222 pub fn goto(&mut self, target: impl Into<String>) {
223 self.control.goto(target);
224 }
225
226 /// 结束执行。
227 pub fn end(&mut self) {
228 self.control.end();
229 }
230
231 /// Barrier 挂起。
232 pub fn pause(
233 &mut self,
234 barrier_id: crate::event::BarrierId,
235 timeout: Option<std::time::Duration>,
236 ) {
237 self.control.pause(barrier_id, timeout);
238 }
239
240 // ─── 元数据 ───────────────────────────────────────────────
241
242 /// 设置 token 成本。
243 pub fn set_token_cost(&mut self, cost: f64) {
244 self.metadata.token_cost = cost;
245 }
246
247 /// 标记有副作用。
248 pub fn set_has_side_effects(&mut self) {
249 self.metadata.has_side_effects = true;
250 }
251}