use tokio_util::sync::CancellationToken;
use crate::event::FlowEvent;
use crate::execution_engine::{ExecutionControl, NodeMetadata};
use crate::state::State;
use crate::stream_chunk::StreamChunk;
use crate::stream_emitter::StreamSink;
use crate::workflow_state::WorkflowState;
pub use crate::execution_engine::ExecutionContext;
pub struct LeafContext<'a, S: WorkflowState = State> {
pub(crate) state: &'a S,
pub(crate) stream: Option<&'a dyn StreamSink>,
pub(crate) cancel: &'a CancellationToken,
pub(crate) control: &'a mut ExecutionControl,
pub(crate) metadata: &'a mut NodeMetadata,
pub(crate) mutations: &'a mut Vec<S::Mutation>,
pub(crate) flow_events: &'a mut Vec<FlowEvent>,
}
impl<S: WorkflowState> LeafContext<'_, S> {
pub fn state(&self) -> &S {
self.state
}
pub fn record(&mut self, mutation: S::Mutation) {
self.mutations.push(mutation);
}
pub fn emit(&self, chunk: StreamChunk) {
if let Some(stream) = self.stream {
stream.emit(chunk);
}
}
pub fn emit_flow_event(&mut self, event: FlowEvent) {
self.flow_events.push(event);
}
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
pub fn cancel_token(&self) -> &CancellationToken {
self.cancel
}
pub fn goto(&mut self, target: impl Into<String>) {
self.control.goto(target);
}
pub fn end(&mut self) {
self.control.end();
}
pub fn pause(
&mut self,
barrier_id: crate::event::BarrierId,
timeout: Option<std::time::Duration>,
) {
self.control.pause(barrier_id, timeout);
}
pub fn set_token_cost(&mut self, cost: f64) {
self.metadata.token_cost = cost;
}
pub fn set_has_side_effects(&mut self) {
self.metadata.has_side_effects = true;
}
}
pub struct NodeContext<'a, S: WorkflowState = State> {
pub(crate) state: &'a mut S,
pub(crate) stream: Option<&'a dyn StreamSink>,
pub(crate) cancel: &'a CancellationToken,
pub(crate) control: &'a mut ExecutionControl,
pub(crate) metadata: &'a mut NodeMetadata,
pub(crate) mutations: &'a mut Vec<S::Mutation>,
pub(crate) flow_events: &'a mut Vec<FlowEvent>,
}
impl<S: WorkflowState> NodeContext<'_, S> {
pub fn state(&self) -> &S {
&self.state
}
pub fn replace_state(&mut self, new_state: S) {
*self.state = new_state;
}
pub fn record(&mut self, mutation: S::Mutation) {
self.mutations.push(mutation);
}
pub fn emit(&self, chunk: StreamChunk) {
if let Some(stream) = self.stream {
stream.emit(chunk);
}
}
pub fn emit_flow_event(&mut self, event: FlowEvent) {
self.flow_events.push(event);
}
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
pub fn cancel_token(&self) -> &CancellationToken {
self.cancel
}
pub fn goto(&mut self, target: impl Into<String>) {
self.control.goto(target);
}
pub fn end(&mut self) {
self.control.end();
}
pub fn pause(
&mut self,
barrier_id: crate::event::BarrierId,
timeout: Option<std::time::Duration>,
) {
self.control.pause(barrier_id, timeout);
}
pub fn set_token_cost(&mut self, cost: f64) {
self.metadata.token_cost = cost;
}
pub fn set_has_side_effects(&mut self) {
self.metadata.has_side_effects = true;
}
}