lellm_graph/
execution_engine.rs1use std::sync::Arc;
27
28use tokio_util::sync::CancellationToken;
29
30use crate::event::FlowEvent;
31use crate::stream_chunk::StreamChunk;
32use crate::stream_emitter::StreamSink;
33use crate::workflow_state::WorkflowState;
34
35#[derive(Debug, Clone)]
39pub enum ExecutionSignal {
40 Pause {
42 barrier_id: crate::event::BarrierId,
43 timeout: Option<std::time::Duration>,
44 },
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum NextAction {
52 Next,
54 Goto(String),
56 End,
58}
59
60#[derive(Debug, Default)]
64pub struct ExecutionControl {
65 next: Option<NextAction>,
66 signal: Option<ExecutionSignal>,
67}
68
69impl ExecutionControl {
70 pub fn new() -> Self {
71 Self::default()
72 }
73
74 pub fn goto(&mut self, target: impl Into<String>) {
76 self.next = Some(NextAction::Goto(target.into()));
77 }
78
79 pub fn end(&mut self) {
81 self.next = Some(NextAction::End);
82 }
83
84 pub fn pause(
86 &mut self,
87 barrier_id: crate::event::BarrierId,
88 timeout: Option<std::time::Duration>,
89 ) {
90 self.signal = Some(ExecutionSignal::Pause {
91 barrier_id,
92 timeout,
93 });
94 }
95
96 pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
98 let next = self.next.take().unwrap_or(NextAction::Next);
99 let signal = self.signal.take();
100 (next, signal)
101 }
102}
103
104#[derive(Debug, Clone, Default)]
108pub struct NodeMetadata {
109 pub token_cost: f64,
111 pub has_side_effects: bool,
113}
114
115pub trait ExecutionView<S: WorkflowState>: Send + Sync {
119 fn state(&self) -> &S;
120 fn emit(&self, chunk: StreamChunk);
121 fn is_cancelled(&self) -> bool;
122}
123
124pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
133 fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S>;
134 fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S>;
135 fn clone_state(&self) -> S;
136 fn replace_state(&mut self, state: S);
137 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
138 fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
139 fn take_metadata(&mut self) -> NodeMetadata;
140 fn take_flow_events(&mut self) -> Vec<FlowEvent>;
141 fn emit_flow_event(&mut self, event: FlowEvent);
143}
144
145pub struct ExecutionEngine<S: WorkflowState> {
158 state: S,
160 stream: Option<Arc<dyn StreamSink>>,
162 cancel: CancellationToken,
164 control: ExecutionControl,
166 metadata: NodeMetadata,
168 mutations: Vec<S::Mutation>,
170 flow_events: Vec<FlowEvent>,
172}
173
174impl<S: WorkflowState> ExecutionEngine<S> {
175 pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
177 Self {
178 state,
179 stream,
180 cancel,
181 control: ExecutionControl::new(),
182 metadata: NodeMetadata::default(),
183 mutations: Vec::new(),
184 flow_events: Vec::new(),
185 }
186 }
187
188 pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
192 std::mem::take(&mut self.mutations)
193 }
194
195 pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
197 self.control.take()
198 }
199
200 pub fn take_metadata(&mut self) -> NodeMetadata {
202 std::mem::take(&mut self.metadata)
203 }
204
205 pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
207 std::mem::take(&mut self.flow_events)
208 }
209
210 pub fn state(&self) -> &S {
212 &self.state
213 }
214
215 pub(crate) fn state_mut(&mut self) -> &mut S {
220 &mut self.state
221 }
222
223 pub fn stream(&self) -> Option<&dyn StreamSink> {
225 self.stream.as_deref()
226 }
227
228 pub fn cancel_token(&self) -> &CancellationToken {
230 &self.cancel
231 }
232
233 pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
235 self.stream.clone()
236 }
237
238 pub fn into_state(self) -> S {
240 self.state
241 }
242
243 pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
254 std::mem::take(&mut self.mutations)
255 }
256
257 pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
261 if !mutations.is_empty() {
262 self.state.apply_batch(mutations);
263 }
264 }
265
266 pub fn commit(&mut self) {
272 let batch = self.take_commit_batch();
273 self.apply_batch_to_state(batch);
274 }
275}
276
277impl<S: WorkflowState> ExecutionView<S> for ExecutionEngine<S> {
280 fn state(&self) -> &S {
281 &self.state
282 }
283
284 fn emit(&self, chunk: StreamChunk) {
285 if let Some(ref stream) = self.stream {
286 stream.emit(chunk);
287 }
288 }
289
290 fn is_cancelled(&self) -> bool {
291 self.cancel.is_cancelled()
292 }
293}
294
295impl<S: WorkflowState> ExecutorState<S> for ExecutionEngine<S> {
298 fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S> {
299 crate::node_context::NodeContext {
300 state: &mut self.state,
301 stream: self.stream.as_deref(),
302 cancel: &self.cancel,
303 control: &mut self.control,
304 metadata: &mut self.metadata,
305 mutations: &mut self.mutations,
306 flow_events: &mut self.flow_events,
307 }
308 }
309
310 fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S> {
311 crate::node_context::LeafContext {
312 state: &self.state,
313 stream: self.stream.as_deref(),
314 cancel: &self.cancel,
315 control: &mut self.control,
316 metadata: &mut self.metadata,
317 mutations: &mut self.mutations,
318 flow_events: &mut self.flow_events,
319 }
320 }
321
322 fn clone_state(&self) -> S {
323 self.state.clone()
324 }
325
326 fn replace_state(&mut self, state: S) {
327 self.state = state;
328 }
329
330 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
331 self.state.apply_batch(mutations);
332 }
333
334 fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
335 self.control.take()
336 }
337
338 fn take_metadata(&mut self) -> NodeMetadata {
339 std::mem::take(&mut self.metadata)
340 }
341
342 fn take_flow_events(&mut self) -> Vec<FlowEvent> {
343 std::mem::take(&mut self.flow_events)
344 }
345
346 fn emit_flow_event(&mut self, event: FlowEvent) {
347 self.flow_events.push(event);
348 }
349}
350
351pub type ExecutionContext<S> = ExecutionEngine<S>;