lellm_graph/exec/
owned_execution_engine.rs1use std::sync::Arc;
9
10use tokio_util::sync::CancellationToken;
11
12use crate::exec::execution_engine::{
13 ExecutionControl, ExecutionView, ExecutorState, NextAction, NodeMetadata,
14};
15use crate::node::node_context::{LeafContext, NodeContext};
16use crate::state::workflow_state::WorkflowState;
17use crate::stream_chunk::StreamChunk;
18use crate::stream_emitter::StreamSink;
19
20pub struct OwnedExecutionEngine<S: WorkflowState> {
22 inner: S,
23 stream: Option<Arc<dyn StreamSink>>,
24 cancel: CancellationToken,
25 control: ExecutionControl,
26 metadata: NodeMetadata,
27 mutations: Vec<S::Mutation>,
28}
29
30impl<S: WorkflowState> OwnedExecutionEngine<S> {
31 pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
33 Self {
34 inner: state,
35 stream,
36 cancel,
37 control: ExecutionControl::new(),
38 metadata: NodeMetadata::default(),
39 mutations: Vec::new(),
40 }
41 }
42
43 pub fn into_state(self) -> S {
45 self.inner
46 }
47
48 pub fn state(&self) -> &S {
49 &self.inner
50 }
51
52 pub fn state_mut(&mut self) -> &mut S {
53 &mut self.inner
54 }
55
56 pub fn cancel_token(&self) -> &CancellationToken {
57 &self.cancel
58 }
59
60 pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
61 self.stream.clone()
62 }
63
64 pub fn commit(&mut self) {
65 let batch = std::mem::take(&mut self.mutations);
66 if !batch.is_empty() {
67 self.inner.apply_batch(batch);
68 }
69 }
70}
71
72impl<S: WorkflowState> ExecutionView<S> for OwnedExecutionEngine<S> {
73 fn state(&self) -> &S {
74 &self.inner
75 }
76
77 fn emit(&self, chunk: StreamChunk) {
78 if let Some(ref stream) = self.stream {
79 stream.emit(chunk);
80 }
81 }
82
83 fn is_cancelled(&self) -> bool {
84 self.cancel.is_cancelled()
85 }
86}
87
88impl<S: WorkflowState> ExecutorState<S> for OwnedExecutionEngine<S> {
89 fn build_node_context(&mut self) -> NodeContext<'_, S> {
90 NodeContext {
91 state: &mut self.inner,
92 stream: self.stream.as_deref(),
93 cancel: &self.cancel,
94 control: &mut self.control,
95 metadata: &mut self.metadata,
96 mutations: &mut self.mutations,
97 }
98 }
99
100 fn build_leaf_context(&mut self) -> LeafContext<'_, S> {
101 LeafContext {
102 state: &self.inner,
103 stream: self.stream.as_deref(),
104 cancel: &self.cancel,
105 control: &mut self.control,
106 metadata: &mut self.metadata,
107 mutations: &mut self.mutations,
108 }
109 }
110
111 fn clone_state(&self) -> S {
112 self.inner.clone()
113 }
114
115 fn replace_state(&mut self, state: S) {
116 self.inner = state;
117 }
118
119 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
120 self.inner.apply_batch(mutations);
121 }
122
123 fn take_control(&mut self) -> (NextAction, Option<crate::ExecutionSignal>) {
124 self.control.take()
125 }
126
127 fn take_metadata(&mut self) -> NodeMetadata {
128 std::mem::take(&mut self.metadata)
129 }
130}