Skip to main content

lellm_graph/
execution_engine.rs

1//! ExecutionEngine — 执行引擎核心类型。
2//!
3//! 职责分离:
4//! - `ExecutionEngine<S>` — Executor 内部拥有,持有 State、Mutation 缓冲、流发射器等
5//! - `NodeContext<'a, S>` / `LeafContext<'a, S>` — 节点能力视图(在 node_context.rs 中)
6//!
7//! 数据流单向:
8//!
9//! ```text
10//! Node
11//!   ↓
12//! ctx.record(Mutation)
13//!   ↓
14//! Mutation Buffer (ExecutionEngine)
15//!   ↓
16//! Engine: take_mutations()
17//!   ↓
18//! state.apply_batch(mutations)
19//!   ↓
20//! State
21//! ```
22//!
23//! 节点只能通过 `record()` 声明变更意图,无法直接修改 State。
24//! 这保证了 Mutation Log 是唯一写入口,使 Replay、Trace、Parallel Merge、Undo 全部成立。
25
26use std::sync::Arc;
27
28use tokio_util::sync::CancellationToken;
29
30use crate::event::FlowEvent;
31use crate::stream_chunk::StreamChunk;
32use crate::stream_emitter::StreamSink;
33use crate::workflow_state::WorkflowState;
34
35// ─── ExecutionSignal ──────────────────────────────────────────
36
37/// 控制信号 — 独立枚举,Barrier 挂起不是路由。
38#[derive(Debug, Clone)]
39pub enum ExecutionSignal {
40    /// Barrier 挂起执行
41    Pause {
42        barrier_id: crate::event::BarrierId,
43        timeout: Option<std::time::Duration>,
44    },
45}
46
47// ─── NextAction ────────────────────────────────────────────────
48
49/// 节点执行后的下一步路由。
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum NextAction {
52    /// 按拓扑顺序走下一步(默认值)
53    Next,
54    /// 跳转到指定节点
55    Goto(String),
56    /// 结束执行
57    End,
58}
59
60// ─── ExecutionControl ─────────────────────────────────────────
61
62/// 控制信号容器 — 节点写入,Executor 读取。
63#[derive(Debug, Default)]
64pub struct ExecutionControl {
65    next: Option<NextAction>,
66    signal: Option<ExecutionSignal>,
67}
68
69impl ExecutionControl {
70    pub fn new() -> Self {
71        Self::default()
72    }
73
74    /// 跳转到指定节点。
75    pub fn goto(&mut self, target: impl Into<String>) {
76        self.next = Some(NextAction::Goto(target.into()));
77    }
78
79    /// 结束执行。
80    pub fn end(&mut self) {
81        self.next = Some(NextAction::End);
82    }
83
84    /// Barrier 挂起。
85    pub fn pause(
86        &mut self,
87        barrier_id: crate::event::BarrierId,
88        timeout: Option<std::time::Duration>,
89    ) {
90        self.signal = Some(ExecutionSignal::Pause {
91            barrier_id,
92            timeout,
93        });
94    }
95
96    /// 获取最终的控制信号。
97    pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
98        let next = self.next.take().unwrap_or(NextAction::Next);
99        let signal = self.signal.take();
100        (next, signal)
101    }
102}
103
104// ─── NodeMetadata ─────────────────────────────────────────────
105
106/// 节点元数据 — 提供给 Executor 的额外信息。
107#[derive(Debug, Clone, Default)]
108pub struct NodeMetadata {
109    /// Token 消耗成本(0.0 表示无 LLM 调用)
110    pub token_cost: f64,
111    /// 是否有外部副作用(如部署、发送消息)
112    pub has_side_effects: bool,
113}
114
115// ─── ExecutionView trait ──────────────────────────────────────
116
117/// 受限视图 — Leaf 节点需要的最小能力。
118pub trait ExecutionView<S: WorkflowState>: Send + Sync {
119    fn state(&self) -> &S;
120    fn emit(&self, chunk: StreamChunk);
121    fn is_cancelled(&self) -> bool;
122}
123
124// ─── ExecutorState trait ──────────────────────────────────────
125
126/// 完整能力 — Composite 节点 + LeafAdapter 使用。
127///
128/// # 注意
129///
130/// 此 trait **不是 dyn compatible**(`build_node_context` 返回带生命周期的 `NodeContext`,
131/// `apply_batch` 使用泛型)。仅用于静态分发(`T: ExecutorState<S>`),不用于 `dyn ExecutorState<S>`。
132pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
133    fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S>;
134    fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S>;
135    fn clone_state(&self) -> S;
136    fn replace_state(&mut self, state: S);
137    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
138    fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
139    fn take_metadata(&mut self) -> NodeMetadata;
140    fn take_flow_events(&mut self) -> Vec<FlowEvent>;
141    /// 发射控制面 FlowEvent(Composite 节点如 ParallelNode 需要)。
142    fn emit_flow_event(&mut self, event: FlowEvent);
143}
144
145// ─── ExecutionEngine ──────────────────────────────────────────
146
147/// 执行引擎 — 拥有所有可变状态,替代 ExecutionContext。
148///
149/// 不对节点开发者公开。节点通过 [`NodeContext`](crate::node_context::NodeContext)
150/// 或 [`LeafContext`](crate::node_context::LeafContext) 能力视图交互。
151///
152/// # 三层 API
153///
154/// - **Leaf Execution API**: `build_leaf_context()` — 构建只读 + emit 视图
155/// - **Composite Execution API**: `clone_state()`, `replace_state()` — 执行控制
156/// - **Runtime Control Plane**: `stream`, `cancel`, `commit()` — 运行时管理
157pub struct ExecutionEngine<S: WorkflowState> {
158    /// 类型化状态 — Engine 独占写权限
159    state: S,
160    /// 数据面发射器 — 可选(阻塞模式 = None)。使用 Arc 以便 Parallel 子分支 clone。
161    stream: Option<Arc<dyn StreamSink>>,
162    /// 取消令牌 — 消费者断开时触发
163    cancel: CancellationToken,
164    /// 控制信号 — 节点写入,Executor 读取
165    control: ExecutionControl,
166    /// 节点元数据 — 节点写入
167    metadata: NodeMetadata,
168    /// Mutation 缓冲 — 节点产生的强类型领域事件
169    mutations: Vec<S::Mutation>,
170    /// FlowEvent 缓冲 — 节点产生的控制面事件
171    flow_events: Vec<FlowEvent>,
172}
173
174impl<S: WorkflowState> ExecutionEngine<S> {
175    /// 创建新的 ExecutionEngine。
176    pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
177        Self {
178            state,
179            stream,
180            cancel,
181            control: ExecutionControl::new(),
182            metadata: NodeMetadata::default(),
183            mutations: Vec::new(),
184            flow_events: Vec::new(),
185        }
186    }
187
188    // ─── Executor API ─────────────────────────────────────────
189
190    /// 消费 Mutation 缓冲(Executor 调用)。
191    pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
192        std::mem::take(&mut self.mutations)
193    }
194
195    /// 消费控制信号(Executor 调用)。
196    pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
197        self.control.take()
198    }
199
200    /// 获取元数据(Executor 调用)。
201    pub fn take_metadata(&mut self) -> NodeMetadata {
202        std::mem::take(&mut self.metadata)
203    }
204
205    /// 消费 FlowEvent 缓冲(Executor 调用)。
206    pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
207        std::mem::take(&mut self.flow_events)
208    }
209
210    /// 获取状态引用。
211    pub fn state(&self) -> &S {
212        &self.state
213    }
214
215    /// 获取状态可变引用(Executor 内部使用)。
216    ///
217    /// ⚠️ 仅限 crate 内部调用。外部代码应通过 `ExecutorState::apply_batch()` 或
218    /// `NodeContext::record()` 间接操作状态。
219    pub(crate) fn state_mut(&mut self) -> &mut S {
220        &mut self.state
221    }
222
223    /// 获取数据面发射器引用。
224    pub fn stream(&self) -> Option<&dyn StreamSink> {
225        self.stream.as_deref()
226    }
227
228    /// 获取取消令牌引用(Composite 节点用于 child_token)。
229    pub fn cancel_token(&self) -> &CancellationToken {
230        &self.cancel
231    }
232
233    /// 获取 stream 的 Arc 引用(Parallel 子分支 clone 用)。
234    pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
235        self.stream.clone()
236    }
237
238    /// 取出最终状态。
239    pub fn into_state(self) -> S {
240        self.state
241    }
242
243    // ─── commit() — Unit of Work 流水线 ───────────────────────
244
245    /// 取出 mutation batch(Executor 调用)。
246    ///
247    /// 这是 commit 流水线的第一段:
248    /// ```text
249    /// take_commit_batch() → TraceSink/MutationLog 消费 → apply_batch_to_state()
250    /// ```
251    ///
252    /// 调用方可以在此处插入 Trace 记录、MutationLog 持久化等扩展点。
253    pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
254        std::mem::take(&mut self.mutations)
255    }
256
257    /// 将 mutation batch 应用到状态(Executor 调用)。
258    ///
259    /// commit 流水线的最后一段。与 `take_commit_batch()` 配合使用。
260    pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
261        if !mutations.is_empty() {
262            self.state.apply_batch(mutations);
263        }
264    }
265
266    /// 完整的 commit 流水线(便捷方法)。
267    ///
268    /// 等价于 `apply_batch_to_state(take_commit_batch())`。
269    /// 内部调用使用此方法即可;需要扩展 Trace/MutationLog 的场景
270    /// 应手动调用 `take_commit_batch()` + 扩展点 + `apply_batch_to_state()`。
271    pub fn commit(&mut self) {
272        let batch = self.take_commit_batch();
273        self.apply_batch_to_state(batch);
274    }
275}
276
277// ─── ExecutionView for ExecutionEngine ───────────────────────
278
279impl<S: WorkflowState> ExecutionView<S> for ExecutionEngine<S> {
280    fn state(&self) -> &S {
281        &self.state
282    }
283
284    fn emit(&self, chunk: StreamChunk) {
285        if let Some(ref stream) = self.stream {
286            stream.emit(chunk);
287        }
288    }
289
290    fn is_cancelled(&self) -> bool {
291        self.cancel.is_cancelled()
292    }
293}
294
295// ─── ExecutorState for ExecutionEngine ────────────────────────
296
297impl<S: WorkflowState> ExecutorState<S> for ExecutionEngine<S> {
298    fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S> {
299        crate::node_context::NodeContext {
300            state: &mut self.state,
301            stream: self.stream.as_deref(),
302            cancel: &self.cancel,
303            control: &mut self.control,
304            metadata: &mut self.metadata,
305            mutations: &mut self.mutations,
306            flow_events: &mut self.flow_events,
307        }
308    }
309
310    fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S> {
311        crate::node_context::LeafContext {
312            state: &self.state,
313            stream: self.stream.as_deref(),
314            cancel: &self.cancel,
315            control: &mut self.control,
316            metadata: &mut self.metadata,
317            mutations: &mut self.mutations,
318            flow_events: &mut self.flow_events,
319        }
320    }
321
322    fn clone_state(&self) -> S {
323        self.state.clone()
324    }
325
326    fn replace_state(&mut self, state: S) {
327        self.state = state;
328    }
329
330    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
331        self.state.apply_batch(mutations);
332    }
333
334    fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
335        self.control.take()
336    }
337
338    fn take_metadata(&mut self) -> NodeMetadata {
339        std::mem::take(&mut self.metadata)
340    }
341
342    fn take_flow_events(&mut self) -> Vec<FlowEvent> {
343        std::mem::take(&mut self.flow_events)
344    }
345
346    fn emit_flow_event(&mut self, event: FlowEvent) {
347        self.flow_events.push(event);
348    }
349}
350
351// ─── Backward Compat Alias ────────────────────────────────────
352
353/// 向后兼容别名 — `ExecutionContext` → `ExecutionEngine`。
354pub type ExecutionContext<S> = ExecutionEngine<S>;