Skip to main content

lellm_graph/
node_context.rs

1//! NodeContext + ExecutionControl + StreamEmitter — v04 核心类型。
2//!
3//! NodeContext 是 Runtime Handle(运行时句柄),节点只借用,不拥有。
4//! 节点通过 NodeContext 读写 State、发射数据面事件、发出控制信号。
5//!
6//! v0.4+: 泛型化 `NodeContext<'a, S>`,S: WorkflowState。
7//! 默认 `S = State`(HashMap)保持向后兼容。
8
9use crate::branch_state::BranchState;
10use crate::event::FlowEvent;
11use crate::state::State;
12use crate::stream_emitter::StreamEmitter;
13use crate::workflow_state::WorkflowState;
14
15// ─── ExecutionSignal ──────────────────────────────────────────
16
17/// 控制信号 — 独立枚举,Barrier 挂起不是路由。
18#[derive(Debug, Clone)]
19pub enum ExecutionSignal {
20    /// Barrier 挂起执行
21    Pause {
22        barrier_id: crate::event::BarrierId,
23        timeout: Option<std::time::Duration>,
24    },
25}
26
27// ─── NextAction ────────────────────────────────────────────────
28
29/// 节点执行后的下一步路由。
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub enum NextAction {
32    /// 按拓扑顺序走下一步(默认值)
33    Next,
34    /// 跳转到指定节点
35    Goto(String),
36    /// 结束执行
37    End,
38}
39
40// ─── ExecutionControl ─────────────────────────────────────────
41
42/// 控制信号容器 — 节点写入,Executor 读取。
43#[derive(Debug, Default)]
44pub struct ExecutionControl {
45    next: Option<NextAction>,
46    signal: Option<ExecutionSignal>,
47}
48
49impl ExecutionControl {
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    /// 跳转到指定节点。
55    pub fn goto(&mut self, target: impl Into<String>) {
56        self.next = Some(NextAction::Goto(target.into()));
57    }
58
59    /// 结束执行。
60    pub fn end(&mut self) {
61        self.next = Some(NextAction::End);
62    }
63
64    /// Barrier 挂起。
65    pub fn pause(
66        &mut self,
67        barrier_id: crate::event::BarrierId,
68        timeout: Option<std::time::Duration>,
69    ) {
70        self.signal = Some(ExecutionSignal::Pause {
71            barrier_id,
72            timeout,
73        });
74    }
75
76    /// 获取最终的控制信号。
77    pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
78        let next = self.next.take().unwrap_or(NextAction::Next);
79        let signal = self.signal.take();
80        (next, signal)
81    }
82}
83
84// ─── NodeMetadata ─────────────────────────────────────────────
85
86/// 节点元数据 — 提供给 Executor 的额外信息。
87#[derive(Debug, Clone, Default)]
88pub struct NodeMetadata {
89    /// Token 消耗成本(0.0 表示无 LLM 调用)
90    pub token_cost: f64,
91    /// 是否有外部副作用(如部署、发送消息)
92    pub has_side_effects: bool,
93}
94
95// ─── NodeContext ──────────────────────────────────────────────
96
97/// 节点上下文 — Runtime Handle(运行时句柄)。
98///
99/// # 泛型参数
100///
101/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
102///
103/// # 设计原则
104///
105/// NodeContext 是 Runtime Handle,不是 Runtime State。
106/// - 节点只借用,不拥有。零复制透传给子组件
107/// - 禁止放入:RuntimeEventEmitter、TraceId、SpanId、GraphHandle、ExecutorConfig
108///
109/// # Effects 缓冲(v0.4+)
110///
111/// `effects` 字段收集节点产生的 Effect(领域事件),
112/// 供上层(如 Executor)统一 apply 到 Typed State。
113pub struct NodeContext<'a, S: WorkflowState = State> {
114    /// 类型化状态 — 直接读写
115    state: &'a mut S,
116    /// 底层分支状态 — 用于 fork 等操作(backward compat)
117    branch: &'a mut BranchState,
118    /// 数据面发射器 — 可选(阻塞模式 = None)
119    stream: Option<&'a StreamEmitter>,
120    /// 控制信号 — 节点写入,Executor 读取
121    control: ExecutionControl,
122    /// 节点元数据 — 节点写入
123    metadata: NodeMetadata,
124    /// Effect 缓冲 — 节点产生的强类型领域事件
125    effects: Vec<S::Effect>,
126    /// FlowEvent 缓冲 — 节点产生的控制面事件
127    flow_events: Vec<FlowEvent>,
128}
129
130impl<'a, S: WorkflowState> NodeContext<'a, S> {
131    /// 创建新的 NodeContext。
132    pub fn new(
133        state: &'a mut S,
134        branch: &'a mut BranchState,
135        stream: Option<&'a StreamEmitter>,
136    ) -> Self {
137        Self {
138            state,
139            branch,
140            stream,
141            control: ExecutionControl::new(),
142            metadata: NodeMetadata::default(),
143            effects: Vec::new(),
144            flow_events: Vec::new(),
145        }
146    }
147
148    /// 获取类型化状态引用。
149    pub fn state(&self) -> &S {
150        self.state
151    }
152
153    /// 获取类型化状态可变引用。
154    pub fn state_mut(&mut self) -> &mut S {
155        self.state
156    }
157
158    /// 获取底层 BranchState 引用(用于 fork 等操作)。
159    pub fn branch(&self) -> &BranchState {
160        self.branch
161    }
162
163    /// 获取底层 BranchState 可变引用。
164    pub fn branch_mut(&mut self) -> &mut BranchState {
165        self.branch
166    }
167
168    // ─── 数据面发射 ─────────────────────────────────────────
169
170    /// 发射数据面事件(无 stream 则静默丢弃)。
171    pub fn emit(&self, chunk: crate::stream_chunk::StreamChunk) {
172        if let Some(stream) = &self.stream {
173            stream.emit(chunk);
174        }
175    }
176
177    /// 发射控制面 FlowEvent(缓冲到 NodeContext,供 Executor 收集转发)。
178    pub fn emit_flow_event(&mut self, event: FlowEvent) {
179        self.flow_events.push(event);
180    }
181
182    // ─── 控制信号 ─────────────────────────────────────────
183
184    /// 跳转到指定节点。
185    pub fn goto(&mut self, target: impl Into<String>) {
186        self.control.goto(target);
187    }
188
189    /// 结束执行。
190    pub fn end(&mut self) {
191        self.control.end();
192    }
193
194    /// Barrier 挂起。
195    pub fn pause(
196        &mut self,
197        barrier_id: crate::event::BarrierId,
198        timeout: Option<std::time::Duration>,
199    ) {
200        self.control.pause(barrier_id, timeout);
201    }
202
203    // ─── 元数据 ─────────────────────────────────────────
204
205    /// 设置 token 成本。
206    pub fn set_token_cost(&mut self, cost: f64) {
207        self.metadata.token_cost = cost;
208    }
209
210    /// 标记有副作用。
211    pub fn set_has_side_effects(&mut self) {
212        self.metadata.has_side_effects = true;
213    }
214
215    // ─── Effects 缓冲(v0.4+ Typed State)───────────────────
216
217    /// 发射一个 Effect(强类型领域事件)到缓冲。
218    ///
219    /// 零序列化开销 — 直接存储 `S::Effect`。
220    pub fn emit_effect(&mut self, effect: S::Effect) {
221        self.effects.push(effect);
222    }
223
224    /// 消费 Effect 缓冲(返回所有收集的 Effect)。
225    pub fn consume_effects(&mut self) -> Vec<S::Effect> {
226        std::mem::take(&mut self.effects)
227    }
228
229    /// 获取已收集的 Effect 数量(不消费)。
230    pub fn effects_len(&self) -> usize {
231        self.effects.len()
232    }
233
234    // ─── 内部方法(供 Executor 使用)─────────────────────────
235
236    /// 消费控制信号(Executor 调用)。
237    pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
238        self.control.take()
239    }
240
241    /// 获取元数据(Executor 调用)。
242    pub fn take_metadata(&mut self) -> NodeMetadata {
243        std::mem::take(&mut self.metadata)
244    }
245
246    /// 获取数据面发射器引用。
247    pub fn stream(&self) -> Option<&'a StreamEmitter> {
248        self.stream
249    }
250
251    /// 消费 FlowEvent 缓冲(Executor 调用)。
252    pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
253        std::mem::take(&mut self.flow_events)
254    }
255}