Skip to main content

lellm_graph/exec/
execution_engine.rs

1//! ExecutionEngine — 执行引擎核心类型。
2//!
3//! 职责分离:
4//! - `ExecutionEngine<'a, S>` — Executor 内部使用,**借用** State(`&'a mut S`),
5//!   持有 Mutation 缓冲、流发射器等运行时资源
6//! - `NodeContext<'a, S>` / `LeafContext<'a, S>` — 节点能力视图(在 node_context.rs 中)
7//!
8//! # 状态所有权模型
9//!
10//! ExecutionEngine **借用** State,不拥有它。调用方持有 State 的所有权,
11//! Engine 只在执行期间借用。这使得 Subgraph 组合成为可能:
12//!
13//! ```text
14//! 调用方
15//!   ├── state: S                (拥有所有权)
16//!   ├── engine: Engine<'_, S>   (借用 &mut state)
17//!   │     └── SubgraphSpec::execute()
18//!   │           ├── lens.get(state) → &mut Inner
19//!   │           ├── inner_engine: Engine<'_, Inner>  (借用 &mut inner)
20//!   │           └── graph.run_inline(&mut inner_engine)
21//!   └── state 仍然可用(engine drop 后借用释放)
22//! ```
23//!
24//! 数据流单向:
25//!
26//! ```text
27//! Node
28//!   ↓
29//! ctx.record(Mutation)
30//!   ↓
31//! Mutation Buffer (ExecutionEngine)
32//!   ↓
33//! Engine: take_mutations()
34//!   ↓
35//! state.apply_batch(mutations)
36//!   ↓
37//! State
38//! ```
39//!
40//! 节点只能通过 `record()` 声明变更意图,无法直接修改 State。
41//! 这保证了 Mutation Log 是唯一写入口,使 Replay、Trace、Parallel Merge、Undo 全部成立。
42//!
43//! # Sink 注入模型
44//!
45//! 所有高级能力通过 Sink 注入,Engine 不维护任何"事件缓冲":
46//!
47//! ```text
48//! Graph::run_inline()
49//!         │
50//!         ▼
51//!    ExecutionEngine
52//!         │
53//!         ├── StreamSink        — 数据面流式输出
54//!         ├── CheckpointSink    — 恢复边界通知
55//!         └── BarrierSink       — Barrier 等待 + 决策注入
56//! ```
57
58use std::sync::Arc;
59
60use tokio_util::sync::CancellationToken;
61
62use crate::checkpoint::CheckpointSink;
63use crate::node::barrier_sink::BarrierSink;
64use crate::state::workflow_state::WorkflowState;
65use crate::stream_chunk::StreamChunk;
66use crate::stream_emitter::StreamSink;
67
68// ─── ExecutionSignal ──────────────────────────────────────────
69
70/// 控制信号 — 独立枚举,Barrier 挂起不是路由。
71#[derive(Debug, Clone)]
72pub enum ExecutionSignal {
73    /// Barrier 挂起执行
74    Pause {
75        barrier_id: crate::event::BarrierId,
76        timeout: Option<std::time::Duration>,
77    },
78}
79
80// ─── NextAction ────────────────────────────────────────────────
81
82/// 节点执行后的下一步路由。
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum NextAction {
85    /// 按拓扑顺序走下一步(默认值)
86    Next,
87    /// 跳转到指定节点
88    Goto(String),
89    /// 结束执行
90    End,
91}
92
93// ─── ExecutionControl ─────────────────────────────────────────
94
95/// 控制信号容器 — 节点写入,Executor 读取。
96#[derive(Debug, Default)]
97pub struct ExecutionControl {
98    next: Option<NextAction>,
99    signal: Option<ExecutionSignal>,
100}
101
102impl ExecutionControl {
103    pub fn new() -> Self {
104        Self::default()
105    }
106
107    /// 跳转到指定节点。
108    pub fn goto(&mut self, target: impl Into<String>) {
109        self.next = Some(NextAction::Goto(target.into()));
110    }
111
112    /// 结束执行。
113    pub fn end(&mut self) {
114        self.next = Some(NextAction::End);
115    }
116
117    /// Barrier 挂起。
118    pub fn pause(
119        &mut self,
120        barrier_id: crate::event::BarrierId,
121        timeout: Option<std::time::Duration>,
122    ) {
123        self.signal = Some(ExecutionSignal::Pause {
124            barrier_id,
125            timeout,
126        });
127    }
128
129    /// 获取最终的控制信号。
130    pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
131        let next = self.next.take().unwrap_or(NextAction::Next);
132        let signal = self.signal.take();
133        (next, signal)
134    }
135}
136
137// ─── NodeMetadata ─────────────────────────────────────────────
138
139/// 节点元数据 — 提供给 Executor 的额外信息。
140#[derive(Debug, Clone, Default)]
141pub struct NodeMetadata {
142    /// Token 消耗成本(0.0 表示无 LLM 调用)
143    pub token_cost: f64,
144    /// 是否有外部副作用(如部署、发送消息)
145    pub has_side_effects: bool,
146}
147
148// ─── ExecutionView trait ──────────────────────────────────────
149
150/// 受限视图 — Leaf 节点需要的最小能力。
151pub trait ExecutionView<S: WorkflowState>: Send + Sync {
152    fn state(&self) -> &S;
153    fn emit(&self, chunk: StreamChunk);
154    fn is_cancelled(&self) -> bool;
155}
156
157// ─── ExecutorState trait ──────────────────────────────────────
158
159/// 完整能力 — Composite 节点 + LeafAdapter 使用。
160///
161/// # 注意
162///
163/// 此 trait **不是 dyn compatible**(`build_node_context` 返回带生命周期的 `NodeContext`,
164/// `apply_batch` 使用泛型)。仅用于静态分发(`T: ExecutorState<S>`),不用于 `dyn ExecutorState<S>`。
165pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
166    fn build_node_context(&mut self) -> crate::node::node_context::NodeContext<'_, S>;
167    fn build_leaf_context(&mut self) -> crate::node::node_context::LeafContext<'_, S>;
168    fn clone_state(&self) -> S;
169    fn replace_state(&mut self, state: S);
170    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
171    fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
172    fn take_metadata(&mut self) -> NodeMetadata;
173}
174
175// ─── ExecutionEngine ──────────────────────────────────────────
176
177/// 执行引擎 — **借用** State,持有 Mutation 缓冲、流发射器等运行时资源。
178///
179/// 不对节点开发者公开。节点通过 [`NodeContext`](crate::node::node_context::NodeContext)
180/// 或 [`LeafContext`](crate::node::node_context::LeafContext) 能力视图交互。
181///
182/// # 状态所有权
183///
184/// Engine 借用 `&'a mut S`,不拥有 State。调用方在 Engine 生命周期外持有所有权。
185/// 这使得 Subgraph 组合成为可能 — 外层 Engine 借用 Outer,内层 Engine 借用 Inner。
186///
187/// # Sink 注入
188///
189/// 所有高级能力通过 Sink 注入:
190/// - `stream` — 数据面流式输出
191/// - `checkpoint` — 恢复边界通知
192/// - `barrier` — Barrier 等待 + 决策注入
193///
194/// # 三层 API
195///
196/// - **Leaf Execution API**: `build_leaf_context()` — 构建只读 + emit 视图
197/// - **Composite Execution API**: `clone_state()`, `replace_state()` — 执行控制
198/// - **Runtime Control Plane**: `stream`, `cancel`, `commit()` — 运行时管理
199pub struct ExecutionEngine<'a, S: WorkflowState> {
200    /// 类型化状态 — Engine 借用,不拥有
201    state: &'a mut S,
202    /// 数据面发射器 — 可选(阻塞模式 = None)。使用 Arc 以便 Parallel 子分支 clone。
203    stream: Option<Arc<dyn StreamSink>>,
204    /// 取消令牌 — 消费者断开时触发
205    cancel: CancellationToken,
206    /// Checkpoint Sink — 可选。Engine 借用,不拥有 Checkpoint 生命周期。
207    /// 在 commit() 之后通知 Sink 到达了合法的恢复边界。
208    checkpoint: Option<&'a mut dyn CheckpointSink<S>>,
209    /// Barrier Sink — 可选。Engine 借用,不拥有 Barrier 生命周期。
210    /// 在检测到 Pause 信号时,通过 BarrierSink 等待外部决策。
211    barrier: Option<&'a mut dyn BarrierSink>,
212    /// 控制信号 — 节点写入,Executor 读取
213    control: ExecutionControl,
214    /// 节点元数据 — 节点写入
215    metadata: NodeMetadata,
216    /// Mutation 缓冲 — 节点产生的强类型领域事件
217    mutations: Vec<S::Mutation>,
218}
219
220impl<'a, S: WorkflowState> ExecutionEngine<'a, S> {
221    /// 创建新的 ExecutionEngine。
222    ///
223    /// Engine 借用 `state`,不拥有它。调用方在 Engine 外保持所有权。
224    ///
225    /// - `checkpoint` — 可选的 Checkpoint Sink(`None` = 不需要自动 checkpoint)
226    /// - `barrier` — 可选的 Barrier Sink(`None` = 遇到 Barrier 直接 Approve)
227    pub fn new(
228        state: &'a mut S,
229        stream: Option<Arc<dyn StreamSink>>,
230        cancel: CancellationToken,
231        checkpoint: Option<&'a mut dyn CheckpointSink<S>>,
232        barrier: Option<&'a mut dyn BarrierSink>,
233    ) -> Self {
234        Self {
235            state,
236            stream,
237            cancel,
238            checkpoint,
239            barrier,
240            control: ExecutionControl::new(),
241            metadata: NodeMetadata::default(),
242            mutations: Vec::new(),
243        }
244    }
245
246    /// 通知 Checkpoint Sink 到达了合法的恢复边界(crate 内部使用)。
247    ///
248    /// 由 Graph::run_inline() 在 commit() 之后调用。
249    /// 这个方法在 Engine 内部同时访问 state 和 sink,避免借用冲突。
250    pub(crate) fn emit_checkpoint(&mut self, node_id: impl Into<String>, step: usize) {
251        if let Some(ref mut sink) = self.checkpoint {
252            use crate::checkpoint::FrameInfo;
253            sink.on_checkpoint(self.state, &FrameInfo::new(node_id, step));
254        }
255    }
256
257    /// 等待 Barrier 决策(crate 内部使用)。
258    ///
259    /// 由 Graph::run_inline() 在检测到 Pause 信号时调用。
260    /// 无 BarrierSink 时,直接返回 Approve。
261    pub(crate) async fn wait_barrier(
262        &mut self,
263        barrier_id: &crate::event::BarrierId,
264        timeout: Option<std::time::Duration>,
265    ) -> crate::node::barrier_sink::BarrierOutcome {
266        if let Some(ref mut sink) = self.barrier {
267            sink.wait_decision(barrier_id, timeout).await
268        } else {
269            crate::node::barrier_sink::BarrierOutcome::Decision(
270                crate::event::BarrierDecision::Approve,
271            )
272        }
273    }
274
275    // ─── Executor API ─────────────────────────────────────────
276
277    /// 消费 Mutation 缓冲(Executor 调用)。
278    pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
279        std::mem::take(&mut self.mutations)
280    }
281
282    /// 消费控制信号(Executor 调用)。
283    pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
284        self.control.take()
285    }
286
287    /// 获取元数据(Executor 调用)。
288    pub fn take_metadata(&mut self) -> NodeMetadata {
289        std::mem::take(&mut self.metadata)
290    }
291
292    /// 获取状态引用。
293    pub fn state(&self) -> &S {
294        &self.state
295    }
296
297    /// 获取状态可变引用(Executor 内部使用)。
298    ///
299    /// ⚠️ 仅限 crate 内部调用。外部代码应通过 `ExecutorState::apply_batch()` 或
300    /// `NodeContext::record()` 间接操作状态。
301    pub(crate) fn state_mut(&mut self) -> &mut S {
302        self.state
303    }
304
305    /// 获取数据面发射器引用。
306    pub fn stream(&self) -> Option<&dyn StreamSink> {
307        self.stream.as_deref()
308    }
309
310    /// 获取取消令牌引用(Composite 节点用于 child_token)。
311    pub fn cancel_token(&self) -> &CancellationToken {
312        &self.cancel
313    }
314
315    /// 获取 stream 的 Arc 引用(Parallel 子分支 clone 用)。
316    pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
317        self.stream.clone()
318    }
319
320    // ─── commit() — Unit of Work 流水线 ───────────────────────
321
322    /// 取出 mutation batch(Executor 调用)。
323    ///
324    /// 这是 commit 流水线的第一段:
325    /// ```text
326    /// take_commit_batch() → TraceSink/MutationLog 消费 → apply_batch_to_state()
327    /// ```
328    ///
329    /// 调用方可以在此处插入 Trace 记录、MutationLog 持久化等扩展点。
330    pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
331        std::mem::take(&mut self.mutations)
332    }
333
334    /// 将 mutation batch 应用到状态(Executor 调用)。
335    ///
336    /// commit 流水线的最后一段。与 `take_commit_batch()` 配合使用。
337    pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
338        if !mutations.is_empty() {
339            self.state.apply_batch(mutations);
340        }
341    }
342
343    /// 完整的 commit 流水线(便捷方法)。
344    ///
345    /// 等价于 `apply_batch_to_state(take_commit_batch())`。
346    /// 内部调用使用此方法即可;需要扩展 Trace/MutationLog 的场景
347    /// 应手动调用 `take_commit_batch()` + 扩展点 + `apply_batch_to_state()`。
348    pub fn commit(&mut self) {
349        let batch = self.take_commit_batch();
350        self.apply_batch_to_state(batch);
351    }
352}
353
354// ─── ExecutionView for ExecutionEngine ───────────────────────
355
356impl<'a, S: WorkflowState> ExecutionView<S> for ExecutionEngine<'a, S> {
357    fn state(&self) -> &S {
358        &self.state
359    }
360
361    fn emit(&self, chunk: StreamChunk) {
362        if let Some(ref stream) = self.stream {
363            stream.emit(chunk);
364        }
365    }
366
367    fn is_cancelled(&self) -> bool {
368        self.cancel.is_cancelled()
369    }
370}
371
372// ─── ExecutorState for ExecutionEngine ────────────────────────
373
374impl<'a, S: WorkflowState> ExecutorState<S> for ExecutionEngine<'a, S> {
375    fn build_node_context(&mut self) -> crate::node::node_context::NodeContext<'_, S> {
376        crate::node::node_context::NodeContext {
377            state: &mut self.state,
378            stream: self.stream.as_deref(),
379            cancel: &self.cancel,
380            control: &mut self.control,
381            metadata: &mut self.metadata,
382            mutations: &mut self.mutations,
383        }
384    }
385
386    fn build_leaf_context(&mut self) -> crate::node::node_context::LeafContext<'_, S> {
387        crate::node::node_context::LeafContext {
388            state: &self.state,
389            stream: self.stream.as_deref(),
390            cancel: &self.cancel,
391            control: &mut self.control,
392            metadata: &mut self.metadata,
393            mutations: &mut self.mutations,
394        }
395    }
396
397    fn clone_state(&self) -> S {
398        self.state.clone()
399    }
400
401    fn replace_state(&mut self, state: S) {
402        *self.state = state;
403    }
404
405    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
406        self.state.apply_batch(mutations);
407    }
408
409    fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
410        self.control.take()
411    }
412
413    fn take_metadata(&mut self) -> NodeMetadata {
414        std::mem::take(&mut self.metadata)
415    }
416}
417
418// ─── Backward Compat Alias ────────────────────────────────────
419
420/// 向后兼容别名 — `ExecutionContext` → `ExecutionEngine`。
421pub type ExecutionContext<'a, S> = ExecutionEngine<'a, S>;
422
423// ─── OwnedExecutionEngine (re-export) ─────────────────────────
424
425/// 拥有 State 所有权的执行引擎 — 用于 Parallel 分支等需要独立 State 的场景。
426///
427/// 与 `ExecutionEngine<'a, S>` 的区别:
428/// - `ExecutionEngine<'a, S>` 借用 `&'a mut S`,用于主执行路径
429/// - `OwnedExecutionEngine<S>` 拥有 `S`,用于需要独立 State 副本的场景(如 Parallel 分支)
430pub use crate::exec::owned_execution_engine::OwnedExecutionEngine;