Skip to main content

lellm_graph/
node_context.rs

1//! NodeContext + ExecutionControl — v04 核心类型。
2//!
3//! NodeContext 是 Runtime Handle(运行时句柄),节点只借用,不拥有。
4//! 节点通过 NodeContext 读写 State、发射数据面事件、发出控制信号。
5//!
6//! v0.4+: 泛型化 `NodeContext<'a, S>`,S: WorkflowState。
7//! 默认 `S = State`(HashMap)保持向后兼容。
8
9use tokio_util::sync::CancellationToken;
10
11use crate::branch_state::BranchState;
12use crate::event::FlowEvent;
13use crate::state::State;
14use crate::stream_chunk::StreamChunk;
15use crate::stream_emitter::StreamSink;
16use crate::workflow_state::WorkflowState;
17
18// ─── ExecutionSignal ──────────────────────────────────────────
19
20/// 控制信号 — 独立枚举,Barrier 挂起不是路由。
21#[derive(Debug, Clone)]
22pub enum ExecutionSignal {
23    /// Barrier 挂起执行
24    Pause {
25        barrier_id: crate::event::BarrierId,
26        timeout: Option<std::time::Duration>,
27    },
28}
29
30// ─── NextAction ────────────────────────────────────────────────
31
32/// 节点执行后的下一步路由。
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum NextAction {
35    /// 按拓扑顺序走下一步(默认值)
36    Next,
37    /// 跳转到指定节点
38    Goto(String),
39    /// 结束执行
40    End,
41}
42
43// ─── ExecutionControl ─────────────────────────────────────────
44
45/// 控制信号容器 — 节点写入,Executor 读取。
46#[derive(Debug, Default)]
47pub struct ExecutionControl {
48    next: Option<NextAction>,
49    signal: Option<ExecutionSignal>,
50}
51
52impl ExecutionControl {
53    pub fn new() -> Self {
54        Self::default()
55    }
56
57    /// 跳转到指定节点。
58    pub fn goto(&mut self, target: impl Into<String>) {
59        self.next = Some(NextAction::Goto(target.into()));
60    }
61
62    /// 结束执行。
63    pub fn end(&mut self) {
64        self.next = Some(NextAction::End);
65    }
66
67    /// Barrier 挂起。
68    pub fn pause(
69        &mut self,
70        barrier_id: crate::event::BarrierId,
71        timeout: Option<std::time::Duration>,
72    ) {
73        self.signal = Some(ExecutionSignal::Pause {
74            barrier_id,
75            timeout,
76        });
77    }
78
79    /// 获取最终的控制信号。
80    pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
81        let next = self.next.take().unwrap_or(NextAction::Next);
82        let signal = self.signal.take();
83        (next, signal)
84    }
85}
86
87// ─── NodeMetadata ─────────────────────────────────────────────
88
89/// 节点元数据 — 提供给 Executor 的额外信息。
90#[derive(Debug, Clone, Default)]
91pub struct NodeMetadata {
92    /// Token 消耗成本(0.0 表示无 LLM 调用)
93    pub token_cost: f64,
94    /// 是否有外部副作用(如部署、发送消息)
95    pub has_side_effects: bool,
96}
97
98// ─── NodeContext ──────────────────────────────────────────────
99
100/// 节点上下文 — Runtime Handle(运行时句柄)。
101///
102/// # 泛型参数
103///
104/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
105///
106/// # 设计原则
107///
108/// NodeContext 是 Runtime Handle,不是 Runtime State。
109/// - 节点只借用,不拥有。零复制透传给子组件
110/// - 禁止放入:RuntimeEventEmitter、TraceId、SpanId、GraphHandle、ExecutorConfig
111///
112/// # Effects 缓冲(v0.4+)
113///
114/// `effects` 字段收集节点产生的 Effect(领域事件),
115/// 供上层(如 Executor)统一 apply 到 Typed State。
116pub struct NodeContext<'a, S: WorkflowState = State> {
117    /// 类型化状态 — 直接读写
118    state: &'a mut S,
119    /// 底层分支状态 — 用于 fork 等操作(backward compat)
120    branch: &'a mut BranchState,
121    /// 数据面发射器 — 可选(阻塞模式 = None)
122    stream: Option<&'a dyn StreamSink>,
123    /// 取消令牌 — 消费者断开时触发
124    cancel: CancellationToken,
125    /// 控制信号 — 节点写入,Executor 读取
126    control: ExecutionControl,
127    /// 节点元数据 — 节点写入
128    metadata: NodeMetadata,
129    /// Effect 缓冲 — 节点产生的强类型领域事件
130    effects: Vec<S::Effect>,
131    /// FlowEvent 缓冲 — 节点产生的控制面事件
132    flow_events: Vec<FlowEvent>,
133}
134
135impl<'a, S: WorkflowState> NodeContext<'a, S> {
136    /// 创建新的 NodeContext。
137    pub fn new(
138        state: &'a mut S,
139        branch: &'a mut BranchState,
140        stream: Option<&'a dyn StreamSink>,
141        cancel: CancellationToken,
142    ) -> Self {
143        Self {
144            state,
145            branch,
146            stream,
147            cancel,
148            control: ExecutionControl::new(),
149            metadata: NodeMetadata::default(),
150            effects: Vec::new(),
151            flow_events: Vec::new(),
152        }
153    }
154
155    /// 获取类型化状态引用。
156    pub fn state(&self) -> &S {
157        self.state
158    }
159
160    /// 获取类型化状态可变引用。
161    pub fn state_mut(&mut self) -> &mut S {
162        self.state
163    }
164
165    /// 获取底层 BranchState 引用(用于 fork 等操作)。
166    pub fn branch(&self) -> &BranchState {
167        self.branch
168    }
169
170    /// 获取底层 BranchState 可变引用。
171    pub fn branch_mut(&mut self) -> &mut BranchState {
172        self.branch
173    }
174
175    // ─── 数据面发射 ─────────────────────────────────────────
176
177    /// 发射数据面事件(无 stream 则静默丢弃)。
178    pub fn emit(&self, chunk: StreamChunk) {
179        if let Some(stream) = &self.stream {
180            stream.emit(chunk);
181        }
182    }
183
184    /// 发射控制面 FlowEvent(缓冲到 NodeContext,供 Executor 收集转发)。
185    pub fn emit_flow_event(&mut self, event: FlowEvent) {
186        self.flow_events.push(event);
187    }
188
189    // ─── 取消检查 ────────────────────────────────────────────
190
191    /// 检查是否已取消。
192    pub fn is_cancelled(&self) -> bool {
193        self.cancel.is_cancelled()
194    }
195
196    /// 获取取消令牌引用。
197    pub fn cancel_token(&self) -> &CancellationToken {
198        &self.cancel
199    }
200
201    // ─── 控制信号 ─────────────────────────────────────────
202
203    /// 跳转到指定节点。
204    pub fn goto(&mut self, target: impl Into<String>) {
205        self.control.goto(target);
206    }
207
208    /// 结束执行。
209    pub fn end(&mut self) {
210        self.control.end();
211    }
212
213    /// Barrier 挂起。
214    pub fn pause(
215        &mut self,
216        barrier_id: crate::event::BarrierId,
217        timeout: Option<std::time::Duration>,
218    ) {
219        self.control.pause(barrier_id, timeout);
220    }
221
222    // ─── 元数据 ─────────────────────────────────────────
223
224    /// 设置 token 成本。
225    pub fn set_token_cost(&mut self, cost: f64) {
226        self.metadata.token_cost = cost;
227    }
228
229    /// 标记有副作用。
230    pub fn set_has_side_effects(&mut self) {
231        self.metadata.has_side_effects = true;
232    }
233
234    // ─── Effects 缓冲(v0.4+ Typed State)───────────────────
235
236    /// 发射一个 Effect(强类型领域事件)到缓冲。
237    ///
238    /// 零序列化开销 — 直接存储 `S::Effect`。
239    pub fn emit_effect(&mut self, effect: S::Effect) {
240        self.effects.push(effect);
241    }
242
243    /// 消费 Effect 缓冲(返回所有收集的 Effect)。
244    pub fn consume_effects(&mut self) -> Vec<S::Effect> {
245        std::mem::take(&mut self.effects)
246    }
247
248    /// 获取已收集的 Effect 数量(不消费)。
249    pub fn effects_len(&self) -> usize {
250        self.effects.len()
251    }
252
253    // ─── 内部方法(供 Executor 使用)─────────────────────────
254
255    /// 消费控制信号(Executor 调用)。
256    pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
257        self.control.take()
258    }
259
260    /// 获取元数据(Executor 调用)。
261    pub fn take_metadata(&mut self) -> NodeMetadata {
262        std::mem::take(&mut self.metadata)
263    }
264
265    /// 获取数据面发射器引用。
266    pub fn stream(&self) -> Option<&'a dyn StreamSink> {
267        self.stream
268    }
269
270    /// 消费 FlowEvent 缓冲(Executor 调用)。
271    pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
272        std::mem::take(&mut self.flow_events)
273    }
274}