Skip to main content

lellm_graph/node/
node_context.rs

1//! NodeContext + LeafContext — 节点能力视图。
2//!
3//! 职责分离:
4//! - `LeafContext<'a, S>` — 只读视图,Leaf 节点使用(`&S`,不能修改 State)
5//! - `NodeContext<'a, S>` — 可变视图,向后兼容(`&mut S`,可通过 `replace_state()` 修改)
6//!
7//! ExecutionEngine 和相关 trait 定义在 [`execution_engine`] 模块中。
8
9use tokio_util::sync::CancellationToken;
10
11use crate::exec::execution_engine::{ExecutionControl, NodeMetadata};
12use crate::state::State;
13use crate::state::workflow_state::WorkflowState;
14use crate::stream_chunk::StreamChunk;
15use crate::stream_emitter::StreamSink;
16
17// ─── Backward Compat Re-exports ──────────────────────────────
18
19/// 向后兼容 — `ExecutionContext` 已迁移到 [`execution_engine`] 模块。
20pub use crate::exec::execution_engine::ExecutionContext;
21
22// ─── LeafContext (Borrowed View) ──────────────────────────────
23
24/// Leaf 节点能力视图 — 纯借用,不拥有任何状态。
25///
26/// 设计原则:
27/// - **只能读 State**(`&S`,不可变引用)
28/// - **只能 emit Mutation**(借用在 Engine 的 mutations buffer)
29/// - **只能 emit Stream**
30/// - **不能 replace_state / clone_state / fork / merge**
31///
32/// 与 NodeContext 的区别:
33/// - NodeContext 持有 `&mut S`(可变引用),Composite 节点可用 replace_state()
34/// - LeafContext 持有 `&S`(只读引用),编译期保证不能修改 State
35pub struct LeafContext<'a, S: WorkflowState = State> {
36    /// 类型化状态 — 只读引用
37    pub(crate) state: &'a S,
38    /// 数据面发射器 — 可选(阻塞模式 = None)
39    pub(crate) stream: Option<&'a dyn StreamSink>,
40    /// 取消令牌
41    pub(crate) cancel: &'a CancellationToken,
42    /// 控制信号 — 节点写入,Executor 读取
43    pub(crate) control: &'a mut ExecutionControl,
44    /// 节点元数据 — 节点写入
45    pub(crate) metadata: &'a mut NodeMetadata,
46    /// Mutation 缓冲 — 借用在 ExecutionEngine 的 buffer
47    pub(crate) mutations: &'a mut Vec<S::Mutation>,
48}
49
50impl<S: WorkflowState> LeafContext<'_, S> {
51    // ─── 读 State ─────────────────────────────────────────────
52
53    /// 获取类型化状态(只读)。
54    pub fn state(&self) -> &S {
55        self.state
56    }
57
58    // ─── 记录 Mutation ────────────────────────────────────────
59
60    /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
61    ///
62    /// 这是 Leaf 节点变更状态的**唯一入口**。
63    pub fn record(&mut self, mutation: S::Mutation) {
64        self.mutations.push(mutation);
65    }
66
67    // ─── 数据面发射 ───────────────────────────────────────────
68
69    /// 发射数据面事件(无 stream 则静默丢弃)。
70    pub fn emit(&self, chunk: StreamChunk) {
71        if let Some(stream) = self.stream {
72            stream.emit(chunk);
73        }
74    }
75
76    // ─── 取消检查 ─────────────────────────────────────────────
77
78    /// 检查是否已取消。
79    pub fn is_cancelled(&self) -> bool {
80        self.cancel.is_cancelled()
81    }
82
83    /// 获取取消令牌引用。
84    pub fn cancel_token(&self) -> &CancellationToken {
85        self.cancel
86    }
87
88    // ─── 控制信号 ─────────────────────────────────────────────
89
90    /// 跳转到指定节点。
91    pub fn goto(&mut self, target: impl Into<String>) {
92        self.control.goto(target);
93    }
94
95    /// 结束执行。
96    pub fn end(&mut self) {
97        self.control.end();
98    }
99
100    /// Barrier 挂起。
101    pub fn pause(
102        &mut self,
103        barrier_id: crate::event::BarrierId,
104        timeout: Option<std::time::Duration>,
105    ) {
106        self.control.pause(barrier_id, timeout);
107    }
108
109    // ─── 元数据 ───────────────────────────────────────────────
110
111    /// 设置 token 成本。
112    pub fn set_token_cost(&mut self, cost: f64) {
113        self.metadata.token_cost = cost;
114    }
115
116    /// 标记有副作用。
117    pub fn set_has_side_effects(&mut self) {
118        self.metadata.has_side_effects = true;
119    }
120}
121
122// ─── NodeContext ──────────────────────────────────────────────
123
124/// 节点能力视图 — 向后兼容,节点能做的三件事:读 State、记录 Mutation、发射 Stream。
125///
126/// # 设计原则
127///
128/// NodeContext 是 Runtime 的能力视图,不是 Runtime 的拥有者。
129/// - 节点只借用,不拥有。零复制透传给子组件
130/// - 禁止放入:RuntimeEventEmitter、TraceId、SpanId、GraphHandle、ExecutorConfig
131/// - **不提供 `state_mut()`** — 节点只能通过 `record()` 声明变更意图
132/// - 组合节点(如 ParallelNode)使用 `replace_state()` 整体替换状态
133///
134/// # 泛型参数
135///
136/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
137pub struct NodeContext<'a, S: WorkflowState = State> {
138    /// 类型化状态 — 可变引用(仅组合节点如 ParallelNode 需要写权限)
139    pub(crate) state: &'a mut S,
140    /// 数据面发射器 — 可选(阻塞模式 = None)
141    pub(crate) stream: Option<&'a dyn StreamSink>,
142    /// 取消令牌
143    pub(crate) cancel: &'a CancellationToken,
144    /// 控制信号 — 节点写入,Executor 读取
145    pub(crate) control: &'a mut ExecutionControl,
146    /// 节点元数据 — 节点写入
147    pub(crate) metadata: &'a mut NodeMetadata,
148    /// Mutation 缓冲 — 节点产生的强类型领域事件
149    pub(crate) mutations: &'a mut Vec<S::Mutation>,
150}
151
152impl<S: WorkflowState> NodeContext<'_, S> {
153    // ─── 读 State ─────────────────────────────────────────────
154
155    /// 获取类型化状态(只读)。
156    pub fn state(&self) -> &S {
157        &self.state
158    }
159
160    /// 替换整个状态(仅组合节点使用,如 ParallelNode)。
161    ///
162    /// 这是组合节点合并子分支结果后的 sanctioned API。
163    /// 普通节点应使用 `record()` 声明变更意图。
164    ///
165    /// 替换后,Engine 持有的状态直接变为 `new_state`。
166    /// 不会触发 Mutation 记录(因为这是整体替换,不是增量变更)。
167    pub fn replace_state(&mut self, new_state: S) {
168        *self.state = new_state;
169    }
170
171    // ─── 记录 Mutation ────────────────────────────────────────
172
173    /// 记录一个 Mutation(强类型状态变更命令)到缓冲。
174    ///
175    /// 这是节点变更状态的**唯一入口**。
176    /// 零序列化开销 — 直接存储 `S::Mutation`。
177    ///
178    /// Executor 在节点执行后统一消费并 apply 到 State。
179    pub fn record(&mut self, mutation: S::Mutation) {
180        self.mutations.push(mutation);
181    }
182
183    // ─── 数据面发射 ───────────────────────────────────────────
184
185    /// 发射数据面事件(无 stream 则静默丢弃)。
186    pub fn emit(&self, chunk: StreamChunk) {
187        if let Some(stream) = self.stream {
188            stream.emit(chunk);
189        }
190    }
191
192    // ─── 取消检查 ─────────────────────────────────────────────
193
194    /// 检查是否已取消。
195    pub fn is_cancelled(&self) -> bool {
196        self.cancel.is_cancelled()
197    }
198
199    /// 获取取消令牌引用。
200    pub fn cancel_token(&self) -> &CancellationToken {
201        self.cancel
202    }
203
204    // ─── 控制信号 ─────────────────────────────────────────────
205
206    /// 跳转到指定节点。
207    pub fn goto(&mut self, target: impl Into<String>) {
208        self.control.goto(target);
209    }
210
211    /// 结束执行。
212    pub fn end(&mut self) {
213        self.control.end();
214    }
215
216    /// Barrier 挂起。
217    pub fn pause(
218        &mut self,
219        barrier_id: crate::event::BarrierId,
220        timeout: Option<std::time::Duration>,
221    ) {
222        self.control.pause(barrier_id, timeout);
223    }
224
225    // ─── 元数据 ───────────────────────────────────────────────
226
227    /// 设置 token 成本。
228    pub fn set_token_cost(&mut self, cost: f64) {
229        self.metadata.token_cost = cost;
230    }
231
232    /// 标记有副作用。
233    pub fn set_has_side_effects(&mut self) {
234        self.metadata.has_side_effects = true;
235    }
236}