Skip to main content

lellm_graph/
node_context.rs

1//! NodeContext + LeafContext — 节点能力视图。
2//!
3//! 职责分离:
4//! - `LeafContext<'a, S>` — 只读视图,Leaf 节点使用(`&S`,不能修改 State)
5//! - `NodeContext<'a, S>` — 可变视图,向后兼容(`&mut S`,可通过 `replace_state()` 修改)
6//!
7//! ExecutionEngine 和相关 trait 定义在 [`execution_engine`] 模块中。
8
9use tokio_util::sync::CancellationToken;
10
11use crate::event::FlowEvent;
12use crate::execution_engine::{ExecutionControl, NodeMetadata};
13use crate::state::State;
14use crate::stream_chunk::StreamChunk;
15use crate::stream_emitter::StreamSink;
16use crate::workflow_state::WorkflowState;
17
18// ─── Backward Compat Re-exports ──────────────────────────────
19
20/// 向后兼容 — `ExecutionContext` 已迁移到 [`execution_engine`] 模块。
21pub use crate::execution_engine::ExecutionContext;
22
23// ─── LeafContext (Borrowed View) ──────────────────────────────
24
25/// Leaf 节点能力视图 — 纯借用,不拥有任何状态。
26///
27/// 设计原则:
28/// - **只能读 State**(`&S`,不可变引用)
29/// - **只能 emit Mutation**(借用在 Engine 的 mutations buffer)
30/// - **只能 emit Stream / FlowEvent**
31/// - **不能 replace_state / clone_state / fork / merge**
32///
33/// 与 NodeContext 的区别:
34/// - NodeContext 持有 `&mut S`(可变引用),Composite 节点可用 replace_state()
35/// - LeafContext 持有 `&S`(只读引用),编译期保证不能修改 State
36pub struct LeafContext<'a, S: WorkflowState = State> {
37    /// 类型化状态 — 只读引用
38    pub(crate) state: &'a S,
39    /// 数据面发射器 — 可选(阻塞模式 = None)
40    pub(crate) stream: Option<&'a dyn StreamSink>,
41    /// 取消令牌
42    pub(crate) cancel: &'a CancellationToken,
43    /// 控制信号 — 节点写入,Executor 读取
44    pub(crate) control: &'a mut ExecutionControl,
45    /// 节点元数据 — 节点写入
46    pub(crate) metadata: &'a mut NodeMetadata,
47    /// Mutation 缓冲 — 借用在 ExecutionEngine 的 buffer
48    pub(crate) mutations: &'a mut Vec<S::Mutation>,
49    /// FlowEvent 缓冲 — 借用在 ExecutionEngine 的 buffer
50    pub(crate) flow_events: &'a mut Vec<FlowEvent>,
51}
52
53impl<S: WorkflowState> LeafContext<'_, S> {
54    // ─── 读 State ─────────────────────────────────────────────
55
56    /// 获取类型化状态(只读)。
57    pub fn state(&self) -> &S {
58        self.state
59    }
60
61    // ─── 记录 Mutation ────────────────────────────────────────
62
63    /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
64    ///
65    /// 这是 Leaf 节点变更状态的**唯一入口**。
66    pub fn record(&mut self, mutation: S::Mutation) {
67        self.mutations.push(mutation);
68    }
69
70    // ─── 数据面发射 ───────────────────────────────────────────
71
72    /// 发射数据面事件(无 stream 则静默丢弃)。
73    pub fn emit(&self, chunk: StreamChunk) {
74        if let Some(stream) = self.stream {
75            stream.emit(chunk);
76        }
77    }
78
79    /// 发射控制面 FlowEvent(缓冲到 ExecutionEngine,供 Executor 收集转发)。
80    pub fn emit_flow_event(&mut self, event: FlowEvent) {
81        self.flow_events.push(event);
82    }
83
84    // ─── 取消检查 ─────────────────────────────────────────────
85
86    /// 检查是否已取消。
87    pub fn is_cancelled(&self) -> bool {
88        self.cancel.is_cancelled()
89    }
90
91    /// 获取取消令牌引用。
92    pub fn cancel_token(&self) -> &CancellationToken {
93        self.cancel
94    }
95
96    // ─── 控制信号 ─────────────────────────────────────────────
97
98    /// 跳转到指定节点。
99    pub fn goto(&mut self, target: impl Into<String>) {
100        self.control.goto(target);
101    }
102
103    /// 结束执行。
104    pub fn end(&mut self) {
105        self.control.end();
106    }
107
108    /// Barrier 挂起。
109    pub fn pause(
110        &mut self,
111        barrier_id: crate::event::BarrierId,
112        timeout: Option<std::time::Duration>,
113    ) {
114        self.control.pause(barrier_id, timeout);
115    }
116
117    // ─── 元数据 ───────────────────────────────────────────────
118
119    /// 设置 token 成本。
120    pub fn set_token_cost(&mut self, cost: f64) {
121        self.metadata.token_cost = cost;
122    }
123
124    /// 标记有副作用。
125    pub fn set_has_side_effects(&mut self) {
126        self.metadata.has_side_effects = true;
127    }
128}
129
130// ─── NodeContext ──────────────────────────────────────────────
131
132/// 节点能力视图 — 向后兼容,节点能做的三件事:读 State、记录 Mutation、发射 Stream。
133///
134/// # 设计原则
135///
136/// NodeContext 是 Runtime 的能力视图,不是 Runtime 的拥有者。
137/// - 节点只借用,不拥有。零复制透传给子组件
138/// - 禁止放入:RuntimeEventEmitter、TraceId、SpanId、GraphHandle、ExecutorConfig
139/// - **不提供 `state_mut()`** — 节点只能通过 `record()` 声明变更意图
140/// - 组合节点(如 ParallelNode)使用 `replace_state()` 整体替换状态
141///
142/// # 泛型参数
143///
144/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
145pub struct NodeContext<'a, S: WorkflowState = State> {
146    /// 类型化状态 — 可变引用(仅组合节点如 ParallelNode 需要写权限)
147    pub(crate) state: &'a mut S,
148    /// 数据面发射器 — 可选(阻塞模式 = None)
149    pub(crate) stream: Option<&'a dyn StreamSink>,
150    /// 取消令牌
151    pub(crate) cancel: &'a CancellationToken,
152    /// 控制信号 — 节点写入,Executor 读取
153    pub(crate) control: &'a mut ExecutionControl,
154    /// 节点元数据 — 节点写入
155    pub(crate) metadata: &'a mut NodeMetadata,
156    /// Mutation 缓冲 — 节点产生的强类型领域事件
157    pub(crate) mutations: &'a mut Vec<S::Mutation>,
158    /// FlowEvent 缓冲 — 节点产生的控制面事件
159    pub(crate) flow_events: &'a mut Vec<FlowEvent>,
160}
161
162impl<S: WorkflowState> NodeContext<'_, S> {
163    // ─── 读 State ─────────────────────────────────────────────
164
165    /// 获取类型化状态(只读)。
166    pub fn state(&self) -> &S {
167        &self.state
168    }
169
170    /// 替换整个状态(仅组合节点使用,如 ParallelNode)。
171    ///
172    /// 这是组合节点合并子分支结果后的 sanctioned API。
173    /// 普通节点应使用 `record()` 声明变更意图。
174    ///
175    /// 替换后,Engine 持有的状态直接变为 `new_state`。
176    /// 不会触发 Mutation 记录(因为这是整体替换,不是增量变更)。
177    pub fn replace_state(&mut self, new_state: S) {
178        *self.state = new_state;
179    }
180
181    // ─── 记录 Mutation ────────────────────────────────────────
182
183    /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
184    ///
185    /// 这是节点变更状态的**唯一入口**。
186    /// 零序列化开销 — 直接存储 `S::Mutation`。
187    ///
188    /// Executor 在节点执行后统一消费并 apply 到 State。
189    pub fn record(&mut self, mutation: S::Mutation) {
190        self.mutations.push(mutation);
191    }
192
193    // ─── 数据面发射 ───────────────────────────────────────────
194
195    /// 发射数据面事件(无 stream 则静默丢弃)。
196    pub fn emit(&self, chunk: StreamChunk) {
197        if let Some(stream) = self.stream {
198            stream.emit(chunk);
199        }
200    }
201
202    /// 发射控制面 FlowEvent(缓冲到 ExecutionContext,供 Executor 收集转发)。
203    pub fn emit_flow_event(&mut self, event: FlowEvent) {
204        self.flow_events.push(event);
205    }
206
207    // ─── 取消检查 ─────────────────────────────────────────────
208
209    /// 检查是否已取消。
210    pub fn is_cancelled(&self) -> bool {
211        self.cancel.is_cancelled()
212    }
213
214    /// 获取取消令牌引用。
215    pub fn cancel_token(&self) -> &CancellationToken {
216        self.cancel
217    }
218
219    // ─── 控制信号 ─────────────────────────────────────────────
220
221    /// 跳转到指定节点。
222    pub fn goto(&mut self, target: impl Into<String>) {
223        self.control.goto(target);
224    }
225
226    /// 结束执行。
227    pub fn end(&mut self) {
228        self.control.end();
229    }
230
231    /// Barrier 挂起。
232    pub fn pause(
233        &mut self,
234        barrier_id: crate::event::BarrierId,
235        timeout: Option<std::time::Duration>,
236    ) {
237        self.control.pause(barrier_id, timeout);
238    }
239
240    // ─── 元数据 ───────────────────────────────────────────────
241
242    /// 设置 token 成本。
243    pub fn set_token_cost(&mut self, cost: f64) {
244        self.metadata.token_cost = cost;
245    }
246
247    /// 标记有副作用。
248    pub fn set_has_side_effects(&mut self) {
249        self.metadata.has_side_effects = true;
250    }
251}