Skip to main content

lellm_graph/
execution_engine.rs

1//! ExecutionEngine — 执行引擎核心类型。
2//!
3//! 职责分离:
4//! - `ExecutionEngine<'a, S>` — Executor 内部使用,**借用** State(`&'a mut S`),
5//!   持有 Mutation 缓冲、流发射器等运行时资源
6//! - `NodeContext<'a, S>` / `LeafContext<'a, S>` — 节点能力视图(在 node_context.rs 中)
7//!
8//! # 状态所有权模型
9//!
10//! ExecutionEngine **借用** State,不拥有它。调用方持有 State 的所有权,
11//! Engine 只在执行期间借用。这使得 Subgraph 组合成为可能:
12//!
13//! ```text
14//! 调用方
15//!   ├── state: S                (拥有所有权)
16//!   ├── engine: Engine<'_, S>   (借用 &mut state)
17//!   │     └── SubgraphSpec::execute()
18//!   │           ├── lens.get(state) → &mut Inner
19//!   │           ├── inner_engine: Engine<'_, Inner>  (借用 &mut inner)
20//!   │           └── graph.run_inline(&mut inner_engine)
21//!   └── state 仍然可用(engine drop 后借用释放)
22//! ```
23//!
24//! 数据流单向:
25//!
26//! ```text
27//! Node
28//!   ↓
29//! ctx.record(Mutation)
30//!   ↓
31//! Mutation Buffer (ExecutionEngine)
32//!   ↓
33//! Engine: take_mutations()
34//!   ↓
35//! state.apply_batch(mutations)
36//!   ↓
37//! State
38//! ```
39//!
40//! 节点只能通过 `record()` 声明变更意图,无法直接修改 State。
41//! 这保证了 Mutation Log 是唯一写入口,使 Replay、Trace、Parallel Merge、Undo 全部成立。
42
43use std::sync::Arc;
44
45use tokio_util::sync::CancellationToken;
46
47use crate::event::FlowEvent;
48use crate::stream_chunk::StreamChunk;
49use crate::stream_emitter::StreamSink;
50use crate::workflow_state::WorkflowState;
51
52// ─── ExecutionSignal ──────────────────────────────────────────
53
54/// 控制信号 — 独立枚举,Barrier 挂起不是路由。
55#[derive(Debug, Clone)]
56pub enum ExecutionSignal {
57    /// Barrier 挂起执行
58    Pause {
59        barrier_id: crate::event::BarrierId,
60        timeout: Option<std::time::Duration>,
61    },
62}
63
64// ─── NextAction ────────────────────────────────────────────────
65
66/// 节点执行后的下一步路由。
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum NextAction {
69    /// 按拓扑顺序走下一步(默认值)
70    Next,
71    /// 跳转到指定节点
72    Goto(String),
73    /// 结束执行
74    End,
75}
76
77// ─── ExecutionControl ─────────────────────────────────────────
78
79/// 控制信号容器 — 节点写入,Executor 读取。
80#[derive(Debug, Default)]
81pub struct ExecutionControl {
82    next: Option<NextAction>,
83    signal: Option<ExecutionSignal>,
84}
85
86impl ExecutionControl {
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// 跳转到指定节点。
92    pub fn goto(&mut self, target: impl Into<String>) {
93        self.next = Some(NextAction::Goto(target.into()));
94    }
95
96    /// 结束执行。
97    pub fn end(&mut self) {
98        self.next = Some(NextAction::End);
99    }
100
101    /// Barrier 挂起。
102    pub fn pause(
103        &mut self,
104        barrier_id: crate::event::BarrierId,
105        timeout: Option<std::time::Duration>,
106    ) {
107        self.signal = Some(ExecutionSignal::Pause {
108            barrier_id,
109            timeout,
110        });
111    }
112
113    /// 获取最终的控制信号。
114    pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
115        let next = self.next.take().unwrap_or(NextAction::Next);
116        let signal = self.signal.take();
117        (next, signal)
118    }
119}
120
121// ─── NodeMetadata ─────────────────────────────────────────────
122
123/// 节点元数据 — 提供给 Executor 的额外信息。
124#[derive(Debug, Clone, Default)]
125pub struct NodeMetadata {
126    /// Token 消耗成本(0.0 表示无 LLM 调用)
127    pub token_cost: f64,
128    /// 是否有外部副作用(如部署、发送消息)
129    pub has_side_effects: bool,
130}
131
132// ─── ExecutionView trait ──────────────────────────────────────
133
134/// 受限视图 — Leaf 节点需要的最小能力。
135pub trait ExecutionView<S: WorkflowState>: Send + Sync {
136    fn state(&self) -> &S;
137    fn emit(&self, chunk: StreamChunk);
138    fn is_cancelled(&self) -> bool;
139}
140
141// ─── ExecutorState trait ──────────────────────────────────────
142
143/// 完整能力 — Composite 节点 + LeafAdapter 使用。
144///
145/// # 注意
146///
147/// 此 trait **不是 dyn compatible**(`build_node_context` 返回带生命周期的 `NodeContext`,
148/// `apply_batch` 使用泛型)。仅用于静态分发(`T: ExecutorState<S>`),不用于 `dyn ExecutorState<S>`。
149pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
150    fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S>;
151    fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S>;
152    fn clone_state(&self) -> S;
153    fn replace_state(&mut self, state: S);
154    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
155    fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
156    fn take_metadata(&mut self) -> NodeMetadata;
157    fn take_flow_events(&mut self) -> Vec<FlowEvent>;
158    /// 发射控制面 FlowEvent(Composite 节点如 ParallelNode 需要)。
159    fn emit_flow_event(&mut self, event: FlowEvent);
160}
161
162// ─── ExecutionEngine ──────────────────────────────────────────
163
164/// 执行引擎 — **借用** State,持有 Mutation 缓冲、流发射器等运行时资源。
165///
166/// 不对节点开发者公开。节点通过 [`NodeContext`](crate::node_context::NodeContext)
167/// 或 [`LeafContext`](crate::node_context::LeafContext) 能力视图交互。
168///
169/// # 状态所有权
170///
171/// Engine 借用 `&'a mut S`,不拥有 State。调用方在 Engine 生命周期外持有所有权。
172/// 这使得 Subgraph 组合成为可能 — 外层 Engine 借用 Outer,内层 Engine 借用 Inner。
173///
174/// # 三层 API
175///
176/// - **Leaf Execution API**: `build_leaf_context()` — 构建只读 + emit 视图
177/// - **Composite Execution API**: `clone_state()`, `replace_state()` — 执行控制
178/// - **Runtime Control Plane**: `stream`, `cancel`, `commit()` — 运行时管理
179pub struct ExecutionEngine<'a, S: WorkflowState> {
180    /// 类型化状态 — Engine 借用,不拥有
181    state: &'a mut S,
182    /// 数据面发射器 — 可选(阻塞模式 = None)。使用 Arc 以便 Parallel 子分支 clone。
183    stream: Option<Arc<dyn StreamSink>>,
184    /// 取消令牌 — 消费者断开时触发
185    cancel: CancellationToken,
186    /// 控制信号 — 节点写入,Executor 读取
187    control: ExecutionControl,
188    /// 节点元数据 — 节点写入
189    metadata: NodeMetadata,
190    /// Mutation 缓冲 — 节点产生的强类型领域事件
191    mutations: Vec<S::Mutation>,
192    /// FlowEvent 缓冲 — 节点产生的控制面事件
193    flow_events: Vec<FlowEvent>,
194}
195
196impl<'a, S: WorkflowState> ExecutionEngine<'a, S> {
197    /// 创建新的 ExecutionEngine。
198    ///
199    /// Engine 借用 `state`,不拥有它。调用方在 Engine 外保持所有权。
200    pub fn new(
201        state: &'a mut S,
202        stream: Option<Arc<dyn StreamSink>>,
203        cancel: CancellationToken,
204    ) -> Self {
205        Self {
206            state,
207            stream,
208            cancel,
209            control: ExecutionControl::new(),
210            metadata: NodeMetadata::default(),
211            mutations: Vec::new(),
212            flow_events: Vec::new(),
213        }
214    }
215
216    // ─── Executor API ─────────────────────────────────────────
217
218    /// 消费 Mutation 缓冲(Executor 调用)。
219    pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
220        std::mem::take(&mut self.mutations)
221    }
222
223    /// 消费控制信号(Executor 调用)。
224    pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
225        self.control.take()
226    }
227
228    /// 获取元数据(Executor 调用)。
229    pub fn take_metadata(&mut self) -> NodeMetadata {
230        std::mem::take(&mut self.metadata)
231    }
232
233    /// 消费 FlowEvent 缓冲(Executor 调用)。
234    pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
235        std::mem::take(&mut self.flow_events)
236    }
237
238    /// 获取状态引用。
239    pub fn state(&self) -> &S {
240        &self.state
241    }
242
243    /// 获取状态可变引用(Executor 内部使用)。
244    ///
245    /// ⚠️ 仅限 crate 内部调用。外部代码应通过 `ExecutorState::apply_batch()` 或
246    /// `NodeContext::record()` 间接操作状态。
247    pub(crate) fn state_mut(&mut self) -> &mut S {
248        self.state
249    }
250
251    /// 获取数据面发射器引用。
252    pub fn stream(&self) -> Option<&dyn StreamSink> {
253        self.stream.as_deref()
254    }
255
256    /// 获取取消令牌引用(Composite 节点用于 child_token)。
257    pub fn cancel_token(&self) -> &CancellationToken {
258        &self.cancel
259    }
260
261    /// 获取 stream 的 Arc 引用(Parallel 子分支 clone 用)。
262    pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
263        self.stream.clone()
264    }
265
266    // ─── commit() — Unit of Work 流水线 ───────────────────────
267
268    /// 取出 mutation batch(Executor 调用)。
269    ///
270    /// 这是 commit 流水线的第一段:
271    /// ```text
272    /// take_commit_batch() → TraceSink/MutationLog 消费 → apply_batch_to_state()
273    /// ```
274    ///
275    /// 调用方可以在此处插入 Trace 记录、MutationLog 持久化等扩展点。
276    pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
277        std::mem::take(&mut self.mutations)
278    }
279
280    /// 将 mutation batch 应用到状态(Executor 调用)。
281    ///
282    /// commit 流水线的最后一段。与 `take_commit_batch()` 配合使用。
283    pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
284        if !mutations.is_empty() {
285            self.state.apply_batch(mutations);
286        }
287    }
288
289    /// 完整的 commit 流水线(便捷方法)。
290    ///
291    /// 等价于 `apply_batch_to_state(take_commit_batch())`。
292    /// 内部调用使用此方法即可;需要扩展 Trace/MutationLog 的场景
293    /// 应手动调用 `take_commit_batch()` + 扩展点 + `apply_batch_to_state()`。
294    pub fn commit(&mut self) {
295        let batch = self.take_commit_batch();
296        self.apply_batch_to_state(batch);
297    }
298}
299
300// ─── ExecutionView for ExecutionEngine ───────────────────────
301
302impl<'a, S: WorkflowState> ExecutionView<S> for ExecutionEngine<'a, S> {
303    fn state(&self) -> &S {
304        &self.state
305    }
306
307    fn emit(&self, chunk: StreamChunk) {
308        if let Some(ref stream) = self.stream {
309            stream.emit(chunk);
310        }
311    }
312
313    fn is_cancelled(&self) -> bool {
314        self.cancel.is_cancelled()
315    }
316}
317
318// ─── ExecutorState for ExecutionEngine ────────────────────────
319
320impl<'a, S: WorkflowState> ExecutorState<S> for ExecutionEngine<'a, S> {
321    fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S> {
322        crate::node_context::NodeContext {
323            state: &mut self.state,
324            stream: self.stream.as_deref(),
325            cancel: &self.cancel,
326            control: &mut self.control,
327            metadata: &mut self.metadata,
328            mutations: &mut self.mutations,
329            flow_events: &mut self.flow_events,
330        }
331    }
332
333    fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S> {
334        crate::node_context::LeafContext {
335            state: &self.state,
336            stream: self.stream.as_deref(),
337            cancel: &self.cancel,
338            control: &mut self.control,
339            metadata: &mut self.metadata,
340            mutations: &mut self.mutations,
341            flow_events: &mut self.flow_events,
342        }
343    }
344
345    fn clone_state(&self) -> S {
346        self.state.clone()
347    }
348
349    fn replace_state(&mut self, state: S) {
350        *self.state = state;
351    }
352
353    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
354        self.state.apply_batch(mutations);
355    }
356
357    fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
358        self.control.take()
359    }
360
361    fn take_metadata(&mut self) -> NodeMetadata {
362        std::mem::take(&mut self.metadata)
363    }
364
365    fn take_flow_events(&mut self) -> Vec<FlowEvent> {
366        std::mem::take(&mut self.flow_events)
367    }
368
369    fn emit_flow_event(&mut self, event: FlowEvent) {
370        self.flow_events.push(event);
371    }
372}
373
374// ─── Backward Compat Alias ────────────────────────────────────
375
376/// 向后兼容别名 — `ExecutionContext` → `ExecutionEngine`。
377pub type ExecutionContext<'a, S> = ExecutionEngine<'a, S>;
378
379// ─── OwnedExecutionEngine ────────────────────────────────────
380
381/// 拥有 State 所有权的执行引擎 — 用于 Parallel 分支等需要独立 State 的场景。
382///
383/// 与 `ExecutionEngine<'a, S>` 的区别:
384/// - `ExecutionEngine<'a, S>` 借用 `&'a mut S`,用于主执行路径
385/// - `OwnedExecutionEngine<S>` 拥有 `S`,用于需要独立 State 副本的场景(如 Parallel 分支)
386pub struct OwnedExecutionEngine<S: WorkflowState> {
387    inner: S,
388    stream: Option<Arc<dyn StreamSink>>,
389    cancel: CancellationToken,
390    control: ExecutionControl,
391    metadata: NodeMetadata,
392    mutations: Vec<S::Mutation>,
393    flow_events: Vec<FlowEvent>,
394}
395
396impl<S: WorkflowState> OwnedExecutionEngine<S> {
397    /// 创建拥有 State 所有权的 Engine(用于 Parallel 分支等场景)。
398    pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
399        Self {
400            inner: state,
401            stream,
402            cancel,
403            control: ExecutionControl::new(),
404            metadata: NodeMetadata::default(),
405            mutations: Vec::new(),
406            flow_events: Vec::new(),
407        }
408    }
409
410    /// 消费并返回最终状态。
411    pub fn into_state(self) -> S {
412        self.inner
413    }
414
415    pub fn state(&self) -> &S {
416        &self.inner
417    }
418
419    pub fn state_mut(&mut self) -> &mut S {
420        &mut self.inner
421    }
422
423    pub fn cancel_token(&self) -> &CancellationToken {
424        &self.cancel
425    }
426
427    pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
428        self.stream.clone()
429    }
430
431    pub fn commit(&mut self) {
432        let batch = std::mem::take(&mut self.mutations);
433        if !batch.is_empty() {
434            self.inner.apply_batch(batch);
435        }
436    }
437}
438
439impl<S: WorkflowState> ExecutionView<S> for OwnedExecutionEngine<S> {
440    fn state(&self) -> &S {
441        &self.inner
442    }
443
444    fn emit(&self, chunk: StreamChunk) {
445        if let Some(ref stream) = self.stream {
446            stream.emit(chunk);
447        }
448    }
449
450    fn is_cancelled(&self) -> bool {
451        self.cancel.is_cancelled()
452    }
453}
454
455impl<S: WorkflowState> ExecutorState<S> for OwnedExecutionEngine<S> {
456    fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S> {
457        crate::node_context::NodeContext {
458            state: &mut self.inner,
459            stream: self.stream.as_deref(),
460            cancel: &self.cancel,
461            control: &mut self.control,
462            metadata: &mut self.metadata,
463            mutations: &mut self.mutations,
464            flow_events: &mut self.flow_events,
465        }
466    }
467
468    fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S> {
469        crate::node_context::LeafContext {
470            state: &self.inner,
471            stream: self.stream.as_deref(),
472            cancel: &self.cancel,
473            control: &mut self.control,
474            metadata: &mut self.metadata,
475            mutations: &mut self.mutations,
476            flow_events: &mut self.flow_events,
477        }
478    }
479
480    fn clone_state(&self) -> S {
481        self.inner.clone()
482    }
483
484    fn replace_state(&mut self, state: S) {
485        self.inner = state;
486    }
487
488    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
489        self.inner.apply_batch(mutations);
490    }
491
492    fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
493        self.control.take()
494    }
495
496    fn take_metadata(&mut self) -> NodeMetadata {
497        std::mem::take(&mut self.metadata)
498    }
499
500    fn take_flow_events(&mut self) -> Vec<FlowEvent> {
501        std::mem::take(&mut self.flow_events)
502    }
503
504    fn emit_flow_event(&mut self, event: FlowEvent) {
505        self.flow_events.push(event);
506    }
507}