1use std::sync::Arc;
44
45use tokio_util::sync::CancellationToken;
46
47use crate::event::FlowEvent;
48use crate::stream_chunk::StreamChunk;
49use crate::stream_emitter::StreamSink;
50use crate::workflow_state::WorkflowState;
51
52#[derive(Debug, Clone)]
56pub enum ExecutionSignal {
57 Pause {
59 barrier_id: crate::event::BarrierId,
60 timeout: Option<std::time::Duration>,
61 },
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum NextAction {
69 Next,
71 Goto(String),
73 End,
75}
76
77#[derive(Debug, Default)]
81pub struct ExecutionControl {
82 next: Option<NextAction>,
83 signal: Option<ExecutionSignal>,
84}
85
86impl ExecutionControl {
87 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn goto(&mut self, target: impl Into<String>) {
93 self.next = Some(NextAction::Goto(target.into()));
94 }
95
96 pub fn end(&mut self) {
98 self.next = Some(NextAction::End);
99 }
100
101 pub fn pause(
103 &mut self,
104 barrier_id: crate::event::BarrierId,
105 timeout: Option<std::time::Duration>,
106 ) {
107 self.signal = Some(ExecutionSignal::Pause {
108 barrier_id,
109 timeout,
110 });
111 }
112
113 pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
115 let next = self.next.take().unwrap_or(NextAction::Next);
116 let signal = self.signal.take();
117 (next, signal)
118 }
119}
120
121#[derive(Debug, Clone, Default)]
125pub struct NodeMetadata {
126 pub token_cost: f64,
128 pub has_side_effects: bool,
130}
131
132pub trait ExecutionView<S: WorkflowState>: Send + Sync {
136 fn state(&self) -> &S;
137 fn emit(&self, chunk: StreamChunk);
138 fn is_cancelled(&self) -> bool;
139}
140
141pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
150 fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S>;
151 fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S>;
152 fn clone_state(&self) -> S;
153 fn replace_state(&mut self, state: S);
154 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
155 fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
156 fn take_metadata(&mut self) -> NodeMetadata;
157 fn take_flow_events(&mut self) -> Vec<FlowEvent>;
158 fn emit_flow_event(&mut self, event: FlowEvent);
160}
161
162pub struct ExecutionEngine<'a, S: WorkflowState> {
180 state: &'a mut S,
182 stream: Option<Arc<dyn StreamSink>>,
184 cancel: CancellationToken,
186 control: ExecutionControl,
188 metadata: NodeMetadata,
190 mutations: Vec<S::Mutation>,
192 flow_events: Vec<FlowEvent>,
194}
195
196impl<'a, S: WorkflowState> ExecutionEngine<'a, S> {
197 pub fn new(
201 state: &'a mut S,
202 stream: Option<Arc<dyn StreamSink>>,
203 cancel: CancellationToken,
204 ) -> Self {
205 Self {
206 state,
207 stream,
208 cancel,
209 control: ExecutionControl::new(),
210 metadata: NodeMetadata::default(),
211 mutations: Vec::new(),
212 flow_events: Vec::new(),
213 }
214 }
215
216 pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
220 std::mem::take(&mut self.mutations)
221 }
222
223 pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
225 self.control.take()
226 }
227
228 pub fn take_metadata(&mut self) -> NodeMetadata {
230 std::mem::take(&mut self.metadata)
231 }
232
233 pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
235 std::mem::take(&mut self.flow_events)
236 }
237
238 pub fn state(&self) -> &S {
240 &self.state
241 }
242
243 pub(crate) fn state_mut(&mut self) -> &mut S {
248 self.state
249 }
250
251 pub fn stream(&self) -> Option<&dyn StreamSink> {
253 self.stream.as_deref()
254 }
255
256 pub fn cancel_token(&self) -> &CancellationToken {
258 &self.cancel
259 }
260
261 pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
263 self.stream.clone()
264 }
265
266 pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
277 std::mem::take(&mut self.mutations)
278 }
279
280 pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
284 if !mutations.is_empty() {
285 self.state.apply_batch(mutations);
286 }
287 }
288
289 pub fn commit(&mut self) {
295 let batch = self.take_commit_batch();
296 self.apply_batch_to_state(batch);
297 }
298}
299
300impl<'a, S: WorkflowState> ExecutionView<S> for ExecutionEngine<'a, S> {
303 fn state(&self) -> &S {
304 &self.state
305 }
306
307 fn emit(&self, chunk: StreamChunk) {
308 if let Some(ref stream) = self.stream {
309 stream.emit(chunk);
310 }
311 }
312
313 fn is_cancelled(&self) -> bool {
314 self.cancel.is_cancelled()
315 }
316}
317
318impl<'a, S: WorkflowState> ExecutorState<S> for ExecutionEngine<'a, S> {
321 fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S> {
322 crate::node_context::NodeContext {
323 state: &mut self.state,
324 stream: self.stream.as_deref(),
325 cancel: &self.cancel,
326 control: &mut self.control,
327 metadata: &mut self.metadata,
328 mutations: &mut self.mutations,
329 flow_events: &mut self.flow_events,
330 }
331 }
332
333 fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S> {
334 crate::node_context::LeafContext {
335 state: &self.state,
336 stream: self.stream.as_deref(),
337 cancel: &self.cancel,
338 control: &mut self.control,
339 metadata: &mut self.metadata,
340 mutations: &mut self.mutations,
341 flow_events: &mut self.flow_events,
342 }
343 }
344
345 fn clone_state(&self) -> S {
346 self.state.clone()
347 }
348
349 fn replace_state(&mut self, state: S) {
350 *self.state = state;
351 }
352
353 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
354 self.state.apply_batch(mutations);
355 }
356
357 fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
358 self.control.take()
359 }
360
361 fn take_metadata(&mut self) -> NodeMetadata {
362 std::mem::take(&mut self.metadata)
363 }
364
365 fn take_flow_events(&mut self) -> Vec<FlowEvent> {
366 std::mem::take(&mut self.flow_events)
367 }
368
369 fn emit_flow_event(&mut self, event: FlowEvent) {
370 self.flow_events.push(event);
371 }
372}
373
374pub type ExecutionContext<'a, S> = ExecutionEngine<'a, S>;
378
379pub struct OwnedExecutionEngine<S: WorkflowState> {
387 inner: S,
388 stream: Option<Arc<dyn StreamSink>>,
389 cancel: CancellationToken,
390 control: ExecutionControl,
391 metadata: NodeMetadata,
392 mutations: Vec<S::Mutation>,
393 flow_events: Vec<FlowEvent>,
394}
395
396impl<S: WorkflowState> OwnedExecutionEngine<S> {
397 pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
399 Self {
400 inner: state,
401 stream,
402 cancel,
403 control: ExecutionControl::new(),
404 metadata: NodeMetadata::default(),
405 mutations: Vec::new(),
406 flow_events: Vec::new(),
407 }
408 }
409
410 pub fn into_state(self) -> S {
412 self.inner
413 }
414
415 pub fn state(&self) -> &S {
416 &self.inner
417 }
418
419 pub fn state_mut(&mut self) -> &mut S {
420 &mut self.inner
421 }
422
423 pub fn cancel_token(&self) -> &CancellationToken {
424 &self.cancel
425 }
426
427 pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
428 self.stream.clone()
429 }
430
431 pub fn commit(&mut self) {
432 let batch = std::mem::take(&mut self.mutations);
433 if !batch.is_empty() {
434 self.inner.apply_batch(batch);
435 }
436 }
437}
438
439impl<S: WorkflowState> ExecutionView<S> for OwnedExecutionEngine<S> {
440 fn state(&self) -> &S {
441 &self.inner
442 }
443
444 fn emit(&self, chunk: StreamChunk) {
445 if let Some(ref stream) = self.stream {
446 stream.emit(chunk);
447 }
448 }
449
450 fn is_cancelled(&self) -> bool {
451 self.cancel.is_cancelled()
452 }
453}
454
455impl<S: WorkflowState> ExecutorState<S> for OwnedExecutionEngine<S> {
456 fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S> {
457 crate::node_context::NodeContext {
458 state: &mut self.inner,
459 stream: self.stream.as_deref(),
460 cancel: &self.cancel,
461 control: &mut self.control,
462 metadata: &mut self.metadata,
463 mutations: &mut self.mutations,
464 flow_events: &mut self.flow_events,
465 }
466 }
467
468 fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S> {
469 crate::node_context::LeafContext {
470 state: &self.inner,
471 stream: self.stream.as_deref(),
472 cancel: &self.cancel,
473 control: &mut self.control,
474 metadata: &mut self.metadata,
475 mutations: &mut self.mutations,
476 flow_events: &mut self.flow_events,
477 }
478 }
479
480 fn clone_state(&self) -> S {
481 self.inner.clone()
482 }
483
484 fn replace_state(&mut self, state: S) {
485 self.inner = state;
486 }
487
488 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
489 self.inner.apply_batch(mutations);
490 }
491
492 fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
493 self.control.take()
494 }
495
496 fn take_metadata(&mut self) -> NodeMetadata {
497 std::mem::take(&mut self.metadata)
498 }
499
500 fn take_flow_events(&mut self) -> Vec<FlowEvent> {
501 std::mem::take(&mut self.flow_events)
502 }
503
504 fn emit_flow_event(&mut self, event: FlowEvent) {
505 self.flow_events.push(event);
506 }
507}