lellm_graph/exec/execution_engine.rs
1//! ExecutionEngine — 执行引擎核心类型。
2//!
3//! 职责分离:
4//! - `ExecutionEngine<'a, S>` — Executor 内部使用,**借用** State(`&'a mut S`),
5//! 持有 Mutation 缓冲、流发射器等运行时资源
6//! - `NodeContext<'a, S>` / `LeafContext<'a, S>` — 节点能力视图(在 node_context.rs 中)
7//!
8//! # 状态所有权模型
9//!
10//! ExecutionEngine **借用** State,不拥有它。调用方持有 State 的所有权,
11//! Engine 只在执行期间借用。这使得 Subgraph 组合成为可能:
12//!
13//! ```text
14//! 调用方
15//! ├── state: S (拥有所有权)
16//! ├── engine: Engine<'_, S> (借用 &mut state)
17//! │ └── SubgraphSpec::execute()
18//! │ ├── lens.get(state) → &mut Inner
19//! │ ├── inner_engine: Engine<'_, Inner> (借用 &mut inner)
20//! │ └── graph.run_inline(&mut inner_engine)
21//! └── state 仍然可用(engine drop 后借用释放)
22//! ```
23//!
24//! 数据流单向:
25//!
26//! ```text
27//! Node
28//! ↓
29//! ctx.record(Mutation)
30//! ↓
31//! Mutation Buffer (ExecutionEngine)
32//! ↓
33//! Engine: take_mutations()
34//! ↓
35//! state.apply_batch(mutations)
36//! ↓
37//! State
38//! ```
39//!
40//! 节点只能通过 `record()` 声明变更意图,无法直接修改 State。
41//! 这保证了 Mutation Log 是唯一写入口,使 Replay、Trace、Parallel Merge、Undo 全部成立。
42//!
43//! # Sink 注入模型
44//!
45//! 所有高级能力通过 Sink 注入,Engine 不维护任何"事件缓冲":
46//!
47//! ```text
48//! Graph::run_inline()
49//! │
50//! ▼
51//! ExecutionEngine
52//! │
53//! ├── StreamSink — 数据面流式输出
54//! ├── CheckpointSink — 恢复边界通知
55//! └── BarrierSink — Barrier 等待 + 决策注入
56//! ```
57
58use std::sync::Arc;
59
60use tokio_util::sync::CancellationToken;
61
62use crate::checkpoint::CheckpointSink;
63use crate::node::barrier_sink::BarrierSink;
64use crate::state::workflow_state::WorkflowState;
65use crate::stream_chunk::StreamChunk;
66use crate::stream_emitter::StreamSink;
67
68// ─── ExecutionSignal ──────────────────────────────────────────
69
70/// 控制信号 — 独立枚举,Barrier 挂起不是路由。
71#[derive(Debug, Clone)]
72pub enum ExecutionSignal {
73 /// Barrier 挂起执行
74 Pause {
75 barrier_id: crate::event::BarrierId,
76 timeout: Option<std::time::Duration>,
77 },
78}
79
80// ─── NextAction ────────────────────────────────────────────────
81
82/// 节点执行后的下一步路由。
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum NextAction {
85 /// 按拓扑顺序走下一步(默认值)
86 Next,
87 /// 跳转到指定节点
88 Goto(String),
89 /// 结束执行
90 End,
91}
92
93// ─── ExecutionControl ─────────────────────────────────────────
94
95/// 控制信号容器 — 节点写入,Executor 读取。
96#[derive(Debug, Default)]
97pub struct ExecutionControl {
98 next: Option<NextAction>,
99 signal: Option<ExecutionSignal>,
100}
101
102impl ExecutionControl {
103 pub fn new() -> Self {
104 Self::default()
105 }
106
107 /// 跳转到指定节点。
108 pub fn goto(&mut self, target: impl Into<String>) {
109 self.next = Some(NextAction::Goto(target.into()));
110 }
111
112 /// 结束执行。
113 pub fn end(&mut self) {
114 self.next = Some(NextAction::End);
115 }
116
117 /// Barrier 挂起。
118 pub fn pause(
119 &mut self,
120 barrier_id: crate::event::BarrierId,
121 timeout: Option<std::time::Duration>,
122 ) {
123 self.signal = Some(ExecutionSignal::Pause {
124 barrier_id,
125 timeout,
126 });
127 }
128
129 /// 获取最终的控制信号。
130 pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
131 let next = self.next.take().unwrap_or(NextAction::Next);
132 let signal = self.signal.take();
133 (next, signal)
134 }
135}
136
137// ─── NodeMetadata ─────────────────────────────────────────────
138
139/// 节点元数据 — 提供给 Executor 的额外信息。
140#[derive(Debug, Clone, Default)]
141pub struct NodeMetadata {
142 /// Token 消耗成本(0.0 表示无 LLM 调用)
143 pub token_cost: f64,
144 /// 是否有外部副作用(如部署、发送消息)
145 pub has_side_effects: bool,
146}
147
148// ─── ExecutionView trait ──────────────────────────────────────
149
150/// 受限视图 — Leaf 节点需要的最小能力。
151pub trait ExecutionView<S: WorkflowState>: Send + Sync {
152 fn state(&self) -> &S;
153 fn emit(&self, chunk: StreamChunk);
154 fn is_cancelled(&self) -> bool;
155}
156
157// ─── ExecutorState trait ──────────────────────────────────────
158
159/// 完整能力 — Composite 节点 + LeafAdapter 使用。
160///
161/// # 注意
162///
163/// 此 trait **不是 dyn compatible**(`build_node_context` 返回带生命周期的 `NodeContext`,
164/// `apply_batch` 使用泛型)。仅用于静态分发(`T: ExecutorState<S>`),不用于 `dyn ExecutorState<S>`。
165pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
166 fn build_node_context(&mut self) -> crate::node::node_context::NodeContext<'_, S>;
167 fn build_leaf_context(&mut self) -> crate::node::node_context::LeafContext<'_, S>;
168 fn clone_state(&self) -> S;
169 fn replace_state(&mut self, state: S);
170 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
171 fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
172 fn take_metadata(&mut self) -> NodeMetadata;
173}
174
175// ─── ExecutionEngine ──────────────────────────────────────────
176
177/// 执行引擎 — **借用** State,持有 Mutation 缓冲、流发射器等运行时资源。
178///
179/// 不对节点开发者公开。节点通过 [`NodeContext`](crate::node::node_context::NodeContext)
180/// 或 [`LeafContext`](crate::node::node_context::LeafContext) 能力视图交互。
181///
182/// # 状态所有权
183///
184/// Engine 借用 `&'a mut S`,不拥有 State。调用方在 Engine 生命周期外持有所有权。
185/// 这使得 Subgraph 组合成为可能 — 外层 Engine 借用 Outer,内层 Engine 借用 Inner。
186///
187/// # Sink 注入
188///
189/// 所有高级能力通过 Sink 注入:
190/// - `stream` — 数据面流式输出
191/// - `checkpoint` — 恢复边界通知
192/// - `barrier` — Barrier 等待 + 决策注入
193///
194/// # 三层 API
195///
196/// - **Leaf Execution API**: `build_leaf_context()` — 构建只读 + emit 视图
197/// - **Composite Execution API**: `clone_state()`, `replace_state()` — 执行控制
198/// - **Runtime Control Plane**: `stream`, `cancel`, `commit()` — 运行时管理
199pub struct ExecutionEngine<'a, S: WorkflowState> {
200 /// 类型化状态 — Engine 借用,不拥有
201 state: &'a mut S,
202 /// 数据面发射器 — 可选(阻塞模式 = None)。使用 Arc 以便 Parallel 子分支 clone。
203 stream: Option<Arc<dyn StreamSink>>,
204 /// 取消令牌 — 消费者断开时触发
205 cancel: CancellationToken,
206 /// Checkpoint Sink — 可选。Engine 借用,不拥有 Checkpoint 生命周期。
207 /// 在 commit() 之后通知 Sink 到达了合法的恢复边界。
208 checkpoint: Option<&'a mut dyn CheckpointSink<S>>,
209 /// Barrier Sink — 可选。Engine 借用,不拥有 Barrier 生命周期。
210 /// 在检测到 Pause 信号时,通过 BarrierSink 等待外部决策。
211 barrier: Option<&'a mut dyn BarrierSink>,
212 /// 控制信号 — 节点写入,Executor 读取
213 control: ExecutionControl,
214 /// 节点元数据 — 节点写入
215 metadata: NodeMetadata,
216 /// Mutation 缓冲 — 节点产生的强类型领域事件
217 mutations: Vec<S::Mutation>,
218}
219
220impl<'a, S: WorkflowState> ExecutionEngine<'a, S> {
221 /// 创建新的 ExecutionEngine。
222 ///
223 /// Engine 借用 `state`,不拥有它。调用方在 Engine 外保持所有权。
224 ///
225 /// - `checkpoint` — 可选的 Checkpoint Sink(`None` = 不需要自动 checkpoint)
226 /// - `barrier` — 可选的 Barrier Sink(`None` = 遇到 Barrier 直接 Approve)
227 pub fn new(
228 state: &'a mut S,
229 stream: Option<Arc<dyn StreamSink>>,
230 cancel: CancellationToken,
231 checkpoint: Option<&'a mut dyn CheckpointSink<S>>,
232 barrier: Option<&'a mut dyn BarrierSink>,
233 ) -> Self {
234 Self {
235 state,
236 stream,
237 cancel,
238 checkpoint,
239 barrier,
240 control: ExecutionControl::new(),
241 metadata: NodeMetadata::default(),
242 mutations: Vec::new(),
243 }
244 }
245
246 /// 通知 Checkpoint Sink 到达了合法的恢复边界(crate 内部使用)。
247 ///
248 /// 由 Graph::run_inline() 在 commit() 之后调用。
249 /// 这个方法在 Engine 内部同时访问 state 和 sink,避免借用冲突。
250 pub(crate) fn emit_checkpoint(&mut self, node_id: impl Into<String>, step: usize) {
251 if let Some(ref mut sink) = self.checkpoint {
252 use crate::checkpoint::FrameInfo;
253 sink.on_checkpoint(self.state, &FrameInfo::new(node_id, step));
254 }
255 }
256
257 /// 等待 Barrier 决策(crate 内部使用)。
258 ///
259 /// 由 Graph::run_inline() 在检测到 Pause 信号时调用。
260 /// 无 BarrierSink 时,直接返回 Approve。
261 pub(crate) async fn wait_barrier(
262 &mut self,
263 barrier_id: &crate::event::BarrierId,
264 timeout: Option<std::time::Duration>,
265 ) -> crate::node::barrier_sink::BarrierOutcome {
266 if let Some(ref mut sink) = self.barrier {
267 sink.wait_decision(barrier_id, timeout).await
268 } else {
269 crate::node::barrier_sink::BarrierOutcome::Decision(
270 crate::event::BarrierDecision::Approve,
271 )
272 }
273 }
274
275 // ─── Executor API ─────────────────────────────────────────
276
277 /// 消费 Mutation 缓冲(Executor 调用)。
278 pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
279 std::mem::take(&mut self.mutations)
280 }
281
282 /// 消费控制信号(Executor 调用)。
283 pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
284 self.control.take()
285 }
286
287 /// 获取元数据(Executor 调用)。
288 pub fn take_metadata(&mut self) -> NodeMetadata {
289 std::mem::take(&mut self.metadata)
290 }
291
292 /// 获取状态引用。
293 pub fn state(&self) -> &S {
294 &self.state
295 }
296
297 /// 获取状态可变引用(Executor 内部使用)。
298 ///
299 /// ⚠️ 仅限 crate 内部调用。外部代码应通过 `ExecutorState::apply_batch()` 或
300 /// `NodeContext::record()` 间接操作状态。
301 pub(crate) fn state_mut(&mut self) -> &mut S {
302 self.state
303 }
304
305 /// 获取数据面发射器引用。
306 pub fn stream(&self) -> Option<&dyn StreamSink> {
307 self.stream.as_deref()
308 }
309
310 /// 获取取消令牌引用(Composite 节点用于 child_token)。
311 pub fn cancel_token(&self) -> &CancellationToken {
312 &self.cancel
313 }
314
315 /// 获取 stream 的 Arc 引用(Parallel 子分支 clone 用)。
316 pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
317 self.stream.clone()
318 }
319
320 // ─── commit() — Unit of Work 流水线 ───────────────────────
321
322 /// 取出 mutation batch(Executor 调用)。
323 ///
324 /// 这是 commit 流水线的第一段:
325 /// ```text
326 /// take_commit_batch() → TraceSink/MutationLog 消费 → apply_batch_to_state()
327 /// ```
328 ///
329 /// 调用方可以在此处插入 Trace 记录、MutationLog 持久化等扩展点。
330 pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
331 std::mem::take(&mut self.mutations)
332 }
333
334 /// 将 mutation batch 应用到状态(Executor 调用)。
335 ///
336 /// commit 流水线的最后一段。与 `take_commit_batch()` 配合使用。
337 pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
338 if !mutations.is_empty() {
339 self.state.apply_batch(mutations);
340 }
341 }
342
343 /// 完整的 commit 流水线(便捷方法)。
344 ///
345 /// 等价于 `apply_batch_to_state(take_commit_batch())`。
346 /// 内部调用使用此方法即可;需要扩展 Trace/MutationLog 的场景
347 /// 应手动调用 `take_commit_batch()` + 扩展点 + `apply_batch_to_state()`。
348 pub fn commit(&mut self) {
349 let batch = self.take_commit_batch();
350 self.apply_batch_to_state(batch);
351 }
352}
353
354// ─── ExecutionView for ExecutionEngine ───────────────────────
355
356impl<'a, S: WorkflowState> ExecutionView<S> for ExecutionEngine<'a, S> {
357 fn state(&self) -> &S {
358 &self.state
359 }
360
361 fn emit(&self, chunk: StreamChunk) {
362 if let Some(ref stream) = self.stream {
363 stream.emit(chunk);
364 }
365 }
366
367 fn is_cancelled(&self) -> bool {
368 self.cancel.is_cancelled()
369 }
370}
371
372// ─── ExecutorState for ExecutionEngine ────────────────────────
373
374impl<'a, S: WorkflowState> ExecutorState<S> for ExecutionEngine<'a, S> {
375 fn build_node_context(&mut self) -> crate::node::node_context::NodeContext<'_, S> {
376 crate::node::node_context::NodeContext {
377 state: &mut self.state,
378 stream: self.stream.as_deref(),
379 cancel: &self.cancel,
380 control: &mut self.control,
381 metadata: &mut self.metadata,
382 mutations: &mut self.mutations,
383 }
384 }
385
386 fn build_leaf_context(&mut self) -> crate::node::node_context::LeafContext<'_, S> {
387 crate::node::node_context::LeafContext {
388 state: &self.state,
389 stream: self.stream.as_deref(),
390 cancel: &self.cancel,
391 control: &mut self.control,
392 metadata: &mut self.metadata,
393 mutations: &mut self.mutations,
394 }
395 }
396
397 fn clone_state(&self) -> S {
398 self.state.clone()
399 }
400
401 fn replace_state(&mut self, state: S) {
402 *self.state = state;
403 }
404
405 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
406 self.state.apply_batch(mutations);
407 }
408
409 fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
410 self.control.take()
411 }
412
413 fn take_metadata(&mut self) -> NodeMetadata {
414 std::mem::take(&mut self.metadata)
415 }
416}
417
418// ─── Backward Compat Alias ────────────────────────────────────
419
420/// 向后兼容别名 — `ExecutionContext` → `ExecutionEngine`。
421pub type ExecutionContext<'a, S> = ExecutionEngine<'a, S>;
422
423// ─── OwnedExecutionEngine (re-export) ─────────────────────────
424
425/// 拥有 State 所有权的执行引擎 — 用于 Parallel 分支等需要独立 State 的场景。
426///
427/// 与 `ExecutionEngine<'a, S>` 的区别:
428/// - `ExecutionEngine<'a, S>` 借用 `&'a mut S`,用于主执行路径
429/// - `OwnedExecutionEngine<S>` 拥有 `S`,用于需要独立 State 副本的场景(如 Parallel 分支)
430pub use crate::exec::owned_execution_engine::OwnedExecutionEngine;