use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::event::FlowEvent;
use crate::stream_chunk::StreamChunk;
use crate::stream_emitter::StreamSink;
use crate::workflow_state::WorkflowState;
#[derive(Debug, Clone)]
pub enum ExecutionSignal {
Pause {
barrier_id: crate::event::BarrierId,
timeout: Option<std::time::Duration>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NextAction {
Next,
Goto(String),
End,
}
#[derive(Debug, Default)]
pub struct ExecutionControl {
next: Option<NextAction>,
signal: Option<ExecutionSignal>,
}
impl ExecutionControl {
pub fn new() -> Self {
Self::default()
}
pub fn goto(&mut self, target: impl Into<String>) {
self.next = Some(NextAction::Goto(target.into()));
}
pub fn end(&mut self) {
self.next = Some(NextAction::End);
}
pub fn pause(
&mut self,
barrier_id: crate::event::BarrierId,
timeout: Option<std::time::Duration>,
) {
self.signal = Some(ExecutionSignal::Pause {
barrier_id,
timeout,
});
}
pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
let next = self.next.take().unwrap_or(NextAction::Next);
let signal = self.signal.take();
(next, signal)
}
}
#[derive(Debug, Clone, Default)]
pub struct NodeMetadata {
pub token_cost: f64,
pub has_side_effects: bool,
}
pub trait ExecutionView<S: WorkflowState>: Send + Sync {
fn state(&self) -> &S;
fn emit(&self, chunk: StreamChunk);
fn is_cancelled(&self) -> bool;
}
pub trait ExecutorState<S: WorkflowState>: ExecutionView<S> {
fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S>;
fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S>;
fn clone_state(&self) -> S;
fn replace_state(&mut self, state: S);
fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>);
fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>);
fn take_metadata(&mut self) -> NodeMetadata;
fn take_flow_events(&mut self) -> Vec<FlowEvent>;
fn emit_flow_event(&mut self, event: FlowEvent);
}
pub struct ExecutionEngine<S: WorkflowState> {
state: S,
stream: Option<Arc<dyn StreamSink>>,
cancel: CancellationToken,
control: ExecutionControl,
metadata: NodeMetadata,
mutations: Vec<S::Mutation>,
flow_events: Vec<FlowEvent>,
}
impl<S: WorkflowState> ExecutionEngine<S> {
pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
Self {
state,
stream,
cancel,
control: ExecutionControl::new(),
metadata: NodeMetadata::default(),
mutations: Vec::new(),
flow_events: Vec::new(),
}
}
pub fn take_mutations(&mut self) -> Vec<S::Mutation> {
std::mem::take(&mut self.mutations)
}
pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
self.control.take()
}
pub fn take_metadata(&mut self) -> NodeMetadata {
std::mem::take(&mut self.metadata)
}
pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
std::mem::take(&mut self.flow_events)
}
pub fn state(&self) -> &S {
&self.state
}
pub(crate) fn state_mut(&mut self) -> &mut S {
&mut self.state
}
pub fn stream(&self) -> Option<&dyn StreamSink> {
self.stream.as_deref()
}
pub fn cancel_token(&self) -> &CancellationToken {
&self.cancel
}
pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
self.stream.clone()
}
pub fn into_state(self) -> S {
self.state
}
pub fn take_commit_batch(&mut self) -> Vec<S::Mutation> {
std::mem::take(&mut self.mutations)
}
pub fn apply_batch_to_state(&mut self, mutations: Vec<S::Mutation>) {
if !mutations.is_empty() {
self.state.apply_batch(mutations);
}
}
pub fn commit(&mut self) {
let batch = self.take_commit_batch();
self.apply_batch_to_state(batch);
}
}
impl<S: WorkflowState> ExecutionView<S> for ExecutionEngine<S> {
fn state(&self) -> &S {
&self.state
}
fn emit(&self, chunk: StreamChunk) {
if let Some(ref stream) = self.stream {
stream.emit(chunk);
}
}
fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
}
impl<S: WorkflowState> ExecutorState<S> for ExecutionEngine<S> {
fn build_node_context(&mut self) -> crate::node_context::NodeContext<'_, S> {
crate::node_context::NodeContext {
state: &mut self.state,
stream: self.stream.as_deref(),
cancel: &self.cancel,
control: &mut self.control,
metadata: &mut self.metadata,
mutations: &mut self.mutations,
flow_events: &mut self.flow_events,
}
}
fn build_leaf_context(&mut self) -> crate::node_context::LeafContext<'_, S> {
crate::node_context::LeafContext {
state: &self.state,
stream: self.stream.as_deref(),
cancel: &self.cancel,
control: &mut self.control,
metadata: &mut self.metadata,
mutations: &mut self.mutations,
flow_events: &mut self.flow_events,
}
}
fn clone_state(&self) -> S {
self.state.clone()
}
fn replace_state(&mut self, state: S) {
self.state = state;
}
fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
self.state.apply_batch(mutations);
}
fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
self.control.take()
}
fn take_metadata(&mut self) -> NodeMetadata {
std::mem::take(&mut self.metadata)
}
fn take_flow_events(&mut self) -> Vec<FlowEvent> {
std::mem::take(&mut self.flow_events)
}
fn emit_flow_event(&mut self, event: FlowEvent) {
self.flow_events.push(event);
}
}
pub type ExecutionContext<S> = ExecutionEngine<S>;