Skip to main content

lellm_graph/exec/
owned_execution_engine.rs

1//! OwnedExecutionEngine — 拥有 State 所有权的执行引擎。
2//!
3//! 用于 Parallel 分支等需要独立 State 副本的场景。
4//! 与 [`ExecutionEngine`](crate::exec::execution_engine::ExecutionEngine) 的区别:
5//! - `ExecutionEngine<'a, S>` 借用 `&'a mut S`,用于主执行路径
6//! - `OwnedExecutionEngine<S>` 拥有 `S`,用于需要独立 State 副本的场景
7
8use std::sync::Arc;
9
10use tokio_util::sync::CancellationToken;
11
12use crate::exec::execution_engine::{
13    ExecutionControl, ExecutionView, ExecutorState, NextAction, NodeMetadata,
14};
15use crate::node::node_context::{LeafContext, NodeContext};
16use crate::state::workflow_state::WorkflowState;
17use crate::stream_chunk::StreamChunk;
18use crate::stream_emitter::StreamSink;
19
20/// 拥有 State 所有权的执行引擎 — 用于 Parallel 分支等需要独立 State 的场景。
21pub struct OwnedExecutionEngine<S: WorkflowState> {
22    inner: S,
23    stream: Option<Arc<dyn StreamSink>>,
24    cancel: CancellationToken,
25    control: ExecutionControl,
26    metadata: NodeMetadata,
27    mutations: Vec<S::Mutation>,
28}
29
30impl<S: WorkflowState> OwnedExecutionEngine<S> {
31    /// 创建拥有 State 所有权的 Engine(用于 Parallel 分支等场景)。
32    pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
33        Self {
34            inner: state,
35            stream,
36            cancel,
37            control: ExecutionControl::new(),
38            metadata: NodeMetadata::default(),
39            mutations: Vec::new(),
40        }
41    }
42
43    /// 消费并返回最终状态。
44    pub fn into_state(self) -> S {
45        self.inner
46    }
47
48    pub fn state(&self) -> &S {
49        &self.inner
50    }
51
52    pub fn state_mut(&mut self) -> &mut S {
53        &mut self.inner
54    }
55
56    pub fn cancel_token(&self) -> &CancellationToken {
57        &self.cancel
58    }
59
60    pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
61        self.stream.clone()
62    }
63
64    pub fn commit(&mut self) {
65        let batch = std::mem::take(&mut self.mutations);
66        if !batch.is_empty() {
67            self.inner.apply_batch(batch);
68        }
69    }
70}
71
72impl<S: WorkflowState> ExecutionView<S> for OwnedExecutionEngine<S> {
73    fn state(&self) -> &S {
74        &self.inner
75    }
76
77    fn emit(&self, chunk: StreamChunk) {
78        if let Some(ref stream) = self.stream {
79            stream.emit(chunk);
80        }
81    }
82
83    fn is_cancelled(&self) -> bool {
84        self.cancel.is_cancelled()
85    }
86}
87
88impl<S: WorkflowState> ExecutorState<S> for OwnedExecutionEngine<S> {
89    fn build_node_context(&mut self) -> NodeContext<'_, S> {
90        NodeContext {
91            state: &mut self.inner,
92            stream: self.stream.as_deref(),
93            cancel: &self.cancel,
94            control: &mut self.control,
95            metadata: &mut self.metadata,
96            mutations: &mut self.mutations,
97        }
98    }
99
100    fn build_leaf_context(&mut self) -> LeafContext<'_, S> {
101        LeafContext {
102            state: &self.inner,
103            stream: self.stream.as_deref(),
104            cancel: &self.cancel,
105            control: &mut self.control,
106            metadata: &mut self.metadata,
107            mutations: &mut self.mutations,
108        }
109    }
110
111    fn clone_state(&self) -> S {
112        self.inner.clone()
113    }
114
115    fn replace_state(&mut self, state: S) {
116        self.inner = state;
117    }
118
119    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
120        self.inner.apply_batch(mutations);
121    }
122
123    fn take_control(&mut self) -> (NextAction, Option<crate::ExecutionSignal>) {
124        self.control.take()
125    }
126
127    fn take_metadata(&mut self) -> NodeMetadata {
128        std::mem::take(&mut self.metadata)
129    }
130}