use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::exec::execution_engine::{
ExecutionControl, ExecutionView, ExecutorState, NextAction, NodeMetadata,
};
use crate::node::node_context::{LeafContext, NodeContext};
use crate::state::workflow_state::WorkflowState;
use crate::stream_chunk::StreamChunk;
use crate::stream_emitter::StreamSink;
pub struct OwnedExecutionEngine<S: WorkflowState> {
inner: S,
stream: Option<Arc<dyn StreamSink>>,
cancel: CancellationToken,
control: ExecutionControl,
metadata: NodeMetadata,
mutations: Vec<S::Mutation>,
}
impl<S: WorkflowState> OwnedExecutionEngine<S> {
pub fn new(state: S, stream: Option<Arc<dyn StreamSink>>, cancel: CancellationToken) -> Self {
Self {
inner: state,
stream,
cancel,
control: ExecutionControl::new(),
metadata: NodeMetadata::default(),
mutations: Vec::new(),
}
}
pub fn into_state(self) -> S {
self.inner
}
pub fn state(&self) -> &S {
&self.inner
}
pub fn state_mut(&mut self) -> &mut S {
&mut self.inner
}
pub fn cancel_token(&self) -> &CancellationToken {
&self.cancel
}
pub fn stream_sink(&self) -> Option<Arc<dyn StreamSink>> {
self.stream.clone()
}
pub fn commit(&mut self) {
let batch = std::mem::take(&mut self.mutations);
if !batch.is_empty() {
self.inner.apply_batch(batch);
}
}
}
impl<S: WorkflowState> ExecutionView<S> for OwnedExecutionEngine<S> {
fn state(&self) -> &S {
&self.inner
}
fn emit(&self, chunk: StreamChunk) {
if let Some(ref stream) = self.stream {
stream.emit(chunk);
}
}
fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
}
impl<S: WorkflowState> ExecutorState<S> for OwnedExecutionEngine<S> {
fn build_node_context(&mut self) -> NodeContext<'_, S> {
NodeContext {
state: &mut self.inner,
stream: self.stream.as_deref(),
cancel: &self.cancel,
control: &mut self.control,
metadata: &mut self.metadata,
mutations: &mut self.mutations,
}
}
fn build_leaf_context(&mut self) -> LeafContext<'_, S> {
LeafContext {
state: &self.inner,
stream: self.stream.as_deref(),
cancel: &self.cancel,
control: &mut self.control,
metadata: &mut self.metadata,
mutations: &mut self.mutations,
}
}
fn clone_state(&self) -> S {
self.inner.clone()
}
fn replace_state(&mut self, state: S) {
self.inner = state;
}
fn apply_batch(&mut self, mutations: impl IntoIterator<Item = S::Mutation>) {
self.inner.apply_batch(mutations);
}
fn take_control(&mut self) -> (NextAction, Option<crate::ExecutionSignal>) {
self.control.take()
}
fn take_metadata(&mut self) -> NodeMetadata {
std::mem::take(&mut self.metadata)
}
}