use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::checkpoint::CheckpointSink;
use crate::node::barrier_sink::BarrierSink;
use crate::state::workflow_state::WorkflowState;
use crate::stream_chunk::StreamChunk;
use crate::stream_emitter::StreamSink;
#[derive(Debug, Clone)]
pub enum ExecutionSignal {
Pause {
barrier_id: crate::event::BarrierId,
timeout: Option<std::time::Duration>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NextAction {
Next,
Goto(String),
End,
}
#[derive(Debug, Default)]
pub struct ExecutionControl {
next: Option<NextAction>,
signal: Option<ExecutionSignal>,
}
impl ExecutionControl {
pub fn new() -> Self {
Self::default()
}
pub fn goto(&mut self, target: impl Into<String>) {
self.next = Some(NextAction::Goto(target.into()));
}
pub fn end(&mut self) {
self.next = Some(NextAction::End);
}
pub fn pause(
&mut self,
barrier_id: crate::event::BarrierId,
timeout: Option<std::time::Duration>,
) {
self.signal = Some(ExecutionSignal::Pause {
barrier_id,
timeout,
});
}
pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
let next = self.next.take().unwrap_or(NextAction::Next);
let signal = self.signal.take();
(next, signal)
}
}
#[derive(Debug, Clone, Default)]
pub struct NodeMetadata {
pub token_cost: f64,
pub has_side_effects: bool,
}
pub trait ExecutionView<S: WorkflowState>: Send + Sync {
fn state(&self) -> &S;
fn emit(&self, chunk: StreamChunk);
fn is_cancelled(&self) -> bool;
}
pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
fn build_node_context(&mut self) -> crate::node::node_context::NodeContext<'_, S>;
fn build_leaf_context(&mut self) -> crate::node::node_context::LeafContext<'_, S>;
fn clone_state(&self) -> S;
fn replace_state(&mut self, state: S);
fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
fn take_metadata(&mut self) -> NodeMetadata;
}
pub struct ExecutionEngine<'a, S: WorkflowState> {
state: &'a mut S,
stream: Option<Arc<dyn StreamSink>>,
cancel: CancellationToken,
checkpoint: Option<&'a mut dyn CheckpointSink<S>>,
barrier: Option<&'a mut dyn BarrierSink>,
control: ExecutionControl,
metadata: NodeMetadata,
mutations: Vec<S::Mutation>,
}
impl<'a, S: WorkflowState> ExecutionEngine<'a, S> {
pub fn new(
state: &'a mut S,
stream: Option<Arc<dyn StreamSink>>,
cancel: CancellationToken,
checkpoint: Option<&'a mut dyn CheckpointSink<S>>,
barrier: Option<&'a mut dyn BarrierSink>,
) -> Self {
Self {
state,
stream,
cancel,
checkpoint,
barrier,
control: ExecutionControl::new(),
metadata: NodeMetadata::default(),
mutations: Vec::new(),
}
}
pub(crate) fn emit_checkpoint(&mut self, node_id: impl Into<String>, step: usize) {
if let Some(ref mut sink) = self.checkpoint {
use crate::checkpoint::FrameInfo;
sink.on_checkpoint(self.state, &FrameInfo::new(node_id, step));
}
}
pub(crate) async fn wait_barrier(
&mut self,
barrier_id: &crate::event::BarrierId,
timeout: Option<std::time::Duration>,
) -> crate::node::barrier_sink::BarrierOutcome {
if let Some(ref mut sink) = self.barrier {
sink.wait_decision(barrier_id, timeout).await
} else {
crate::node::barrier_sink::BarrierOutcome::Decision(
crate::event::BarrierDecision::Approve,
)
}
}
pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
std::mem::take(&mut self.mutations)
}
pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
self.control.take()
}
pub fn take_metadata(&mut self) -> NodeMetadata {
std::mem::take(&mut self.metadata)
}
pub fn state(&self) -> &S {
&self.state
}
pub(crate) fn state_mut(&mut self) -> &mut S {
self.state
}
pub fn stream(&self) -> Option<&dyn StreamSink> {
self.stream.as_deref()
}
pub fn cancel_token(&self) -> &CancellationToken {
&self.cancel
}
pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
self.stream.clone()
}
pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
std::mem::take(&mut self.mutations)
}
pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
if !mutations.is_empty() {
self.state.apply_batch(mutations);
}
}
pub fn commit(&mut self) {
let batch = self.take_commit_batch();
self.apply_batch_to_state(batch);
}
}
impl<'a, S: WorkflowState> ExecutionView<S> for ExecutionEngine<'a, S> {
fn state(&self) -> &S {
&self.state
}
fn emit(&self, chunk: StreamChunk) {
if let Some(ref stream) = self.stream {
stream.emit(chunk);
}
}
fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
}
impl<'a, S: WorkflowState> ExecutorState<S> for ExecutionEngine<'a, S> {
fn build_node_context(&mut self) -> crate::node::node_context::NodeContext<'_, S> {
crate::node::node_context::NodeContext {
state: &mut self.state,
stream: self.stream.as_deref(),
cancel: &self.cancel,
control: &mut self.control,
metadata: &mut self.metadata,
mutations: &mut self.mutations,
}
}
fn build_leaf_context(&mut self) -> crate::node::node_context::LeafContext<'_, S> {
crate::node::node_context::LeafContext {
state: &self.state,
stream: self.stream.as_deref(),
cancel: &self.cancel,
control: &mut self.control,
metadata: &mut self.metadata,
mutations: &mut self.mutations,
}
}
fn clone_state(&self) -> S {
self.state.clone()
}
fn replace_state(&mut self, state: S) {
*self.state = state;
}
fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
self.state.apply_batch(mutations);
}
fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
self.control.take()
}
fn take_metadata(&mut self) -> NodeMetadata {
std::mem::take(&mut self.metadata)
}
}
pub type ExecutionContext<'a, S> = ExecutionEngine<'a, S>;
pub use crate::exec::owned_execution_engine::OwnedExecutionEngine;