lellm_graph/node/node_context.rs
1//! NodeContext + LeafContext — 节点能力视图。
2//!
3//! 职责分离:
4//! - `LeafContext<'a, S>` — 只读视图,Leaf 节点使用(`&S`,不能修改 State)
5//! - `NodeContext<'a, S>` — 可变视图,向后兼容(`&mut S`,可通过 `replace_state()` 修改)
6//!
7//! ExecutionEngine 和相关 trait 定义在 [`execution_engine`] 模块中。
8
9use tokio_util::sync::CancellationToken;
10
11use crate::exec::execution_engine::{ExecutionControl, NodeMetadata};
12use crate::state::State;
13use crate::state::workflow_state::WorkflowState;
14use crate::stream_chunk::StreamChunk;
15use crate::stream_emitter::StreamSink;
16
17// ─── Backward Compat Re-exports ──────────────────────────────
18
19/// 向后兼容 — `ExecutionContext` 已迁移到 [`execution_engine`] 模块。
20pub use crate::exec::execution_engine::ExecutionContext;
21
22// ─── LeafContext (Borrowed View) ──────────────────────────────
23
24/// Leaf 节点能力视图 — 纯借用,不拥有任何状态。
25///
26/// 设计原则:
27/// - **只能读 State**(`&S`,不可变引用)
28/// - **只能 emit Mutation**(借用在 Engine 的 mutations buffer)
29/// - **只能 emit Stream**
30/// - **不能 replace_state / clone_state / fork / merge**
31///
32/// 与 NodeContext 的区别:
33/// - NodeContext 持有 `&mut S`(可变引用),Composite 节点可用 replace_state()
34/// - LeafContext 持有 `&S`(只读引用),编译期保证不能修改 State
35pub struct LeafContext<'a, S: WorkflowState = State> {
36 /// 类型化状态 — 只读引用
37 pub(crate) state: &'a S,
38 /// 数据面发射器 — 可选(阻塞模式 = None)
39 pub(crate) stream: Option<&'a dyn StreamSink>,
40 /// 取消令牌
41 pub(crate) cancel: &'a CancellationToken,
42 /// 控制信号 — 节点写入,Executor 读取
43 pub(crate) control: &'a mut ExecutionControl,
44 /// 节点元数据 — 节点写入
45 pub(crate) metadata: &'a mut NodeMetadata,
46 /// Mutation 缓冲 — 借用在 ExecutionEngine 的 buffer
47 pub(crate) mutations: &'a mut Vec<S::Mutation>,
48}
49
50impl<S: WorkflowState> LeafContext<'_, S> {
51 // ─── 读 State ─────────────────────────────────────────────
52
53 /// 获取类型化状态(只读)。
54 pub fn state(&self) -> &S {
55 self.state
56 }
57
58 // ─── 记录 Mutation ────────────────────────────────────────
59
60 /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
61 ///
62 /// 这是 Leaf 节点变更状态的**唯一入口**。
63 pub fn record(&mut self, mutation: S::Mutation) {
64 self.mutations.push(mutation);
65 }
66
67 // ─── 数据面发射 ───────────────────────────────────────────
68
69 /// 发射数据面事件(无 stream 则静默丢弃)。
70 pub fn emit(&self, chunk: StreamChunk) {
71 if let Some(stream) = self.stream {
72 stream.emit(chunk);
73 }
74 }
75
76 // ─── 取消检查 ─────────────────────────────────────────────
77
78 /// 检查是否已取消。
79 pub fn is_cancelled(&self) -> bool {
80 self.cancel.is_cancelled()
81 }
82
83 /// 获取取消令牌引用。
84 pub fn cancel_token(&self) -> &CancellationToken {
85 self.cancel
86 }
87
88 // ─── 控制信号 ─────────────────────────────────────────────
89
90 /// 跳转到指定节点。
91 pub fn goto(&mut self, target: impl Into<String>) {
92 self.control.goto(target);
93 }
94
95 /// 结束执行。
96 pub fn end(&mut self) {
97 self.control.end();
98 }
99
100 /// Barrier 挂起。
101 pub fn pause(
102 &mut self,
103 barrier_id: crate::event::BarrierId,
104 timeout: Option<std::time::Duration>,
105 ) {
106 self.control.pause(barrier_id, timeout);
107 }
108
109 // ─── 元数据 ───────────────────────────────────────────────
110
111 /// 设置 token 成本。
112 pub fn set_token_cost(&mut self, cost: f64) {
113 self.metadata.token_cost = cost;
114 }
115
116 /// 标记有副作用。
117 pub fn set_has_side_effects(&mut self) {
118 self.metadata.has_side_effects = true;
119 }
120}
121
122// ─── NodeContext ──────────────────────────────────────────────
123
124/// 节点能力视图 — 向后兼容,节点能做的三件事:读 State、记录 Mutation、发射 Stream。
125///
126/// # 设计原则
127///
128/// NodeContext 是 Runtime 的能力视图,不是 Runtime 的拥有者。
129/// - 节点只借用,不拥有。零复制透传给子组件
130/// - 禁止放入:RuntimeEventEmitter、TraceId、SpanId、GraphHandle、ExecutorConfig
131/// - **不提供 `state_mut()`** — 节点只能通过 `record()` 声明变更意图
132/// - 组合节点(如 ParallelNode)使用 `replace_state()` 整体替换状态
133///
134/// # 泛型参数
135///
136/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
137pub struct NodeContext<'a, S: WorkflowState = State> {
138 /// 类型化状态 — 可变引用(仅组合节点如 ParallelNode 需要写权限)
139 pub(crate) state: &'a mut S,
140 /// 数据面发射器 — 可选(阻塞模式 = None)
141 pub(crate) stream: Option<&'a dyn StreamSink>,
142 /// 取消令牌
143 pub(crate) cancel: &'a CancellationToken,
144 /// 控制信号 — 节点写入,Executor 读取
145 pub(crate) control: &'a mut ExecutionControl,
146 /// 节点元数据 — 节点写入
147 pub(crate) metadata: &'a mut NodeMetadata,
148 /// Mutation 缓冲 — 节点产生的强类型领域事件
149 pub(crate) mutations: &'a mut Vec<S::Mutation>,
150}
151
152impl<S: WorkflowState> NodeContext<'_, S> {
153 // ─── 读 State ─────────────────────────────────────────────
154
155 /// 获取类型化状态(只读)。
156 pub fn state(&self) -> &S {
157 &self.state
158 }
159
160 /// 替换整个状态(仅组合节点使用,如 ParallelNode)。
161 ///
162 /// 这是组合节点合并子分支结果后的 sanctioned API。
163 /// 普通节点应使用 `record()` 声明变更意图。
164 ///
165 /// 替换后,Engine 持有的状态直接变为 `new_state`。
166 /// 不会触发 Mutation 记录(因为这是整体替换,不是增量变更)。
167 pub fn replace_state(&mut self, new_state: S) {
168 *self.state = new_state;
169 }
170
171 // ─── 记录 Mutation ────────────────────────────────────────
172
173 /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
174 ///
175 /// 这是节点变更状态的**唯一入口**。
176 /// 零序列化开销 — 直接存储 `S::Mutation`。
177 ///
178 /// Executor 在节点执行后统一消费并 apply 到 State。
179 pub fn record(&mut self, mutation: S::Mutation) {
180 self.mutations.push(mutation);
181 }
182
183 // ─── 数据面发射 ───────────────────────────────────────────
184
185 /// 发射数据面事件(无 stream 则静默丢弃)。
186 pub fn emit(&self, chunk: StreamChunk) {
187 if let Some(stream) = self.stream {
188 stream.emit(chunk);
189 }
190 }
191
192 // ─── 取消检查 ─────────────────────────────────────────────
193
194 /// 检查是否已取消。
195 pub fn is_cancelled(&self) -> bool {
196 self.cancel.is_cancelled()
197 }
198
199 /// 获取取消令牌引用。
200 pub fn cancel_token(&self) -> &CancellationToken {
201 self.cancel
202 }
203
204 // ─── 控制信号 ─────────────────────────────────────────────
205
206 /// 跳转到指定节点。
207 pub fn goto(&mut self, target: impl Into<String>) {
208 self.control.goto(target);
209 }
210
211 /// 结束执行。
212 pub fn end(&mut self) {
213 self.control.end();
214 }
215
216 /// Barrier 挂起。
217 pub fn pause(
218 &mut self,
219 barrier_id: crate::event::BarrierId,
220 timeout: Option<std::time::Duration>,
221 ) {
222 self.control.pause(barrier_id, timeout);
223 }
224
225 // ─── 元数据 ───────────────────────────────────────────────
226
227 /// 设置 token 成本。
228 pub fn set_token_cost(&mut self, cost: f64) {
229 self.metadata.token_cost = cost;
230 }
231
232 /// 标记有副作用。
233 pub fn set_has_side_effects(&mut self) {
234 self.metadata.has_side_effects = true;
235 }
236}