use std::time::Duration;
use crate::checkpoint::CheckpointId;
use crate::error::{GraphError, ObservedError};
use crate::ids::{SpanId, TraceId};
use crate::state::{GraphResult, State};
#[derive(Debug)]
pub enum FlowEvent {
NodeStarted { node_id: String, span_id: SpanId },
NodeCompleted {
node_id: String,
span_id: SpanId,
duration: Duration,
},
NodeFailed { node_id: String, error: String },
StateChanged {
node_id: String,
key: String,
value: serde_json::Value,
},
ParallelStarted {
node_id: String,
branch_count: usize,
span_id: SpanId,
},
ParallelCompleted {
node_id: String,
span_id: SpanId,
duration: Duration,
},
BranchCompleted {
branch_name: String,
node_id: String,
span_id: SpanId,
success: bool,
duration: Duration,
},
Custom {
node_id: String,
payload: Box<dyn std::any::Any + Send + Sync>,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BarrierId {
pub node_id: String,
pub occurrence: u32,
}
impl BarrierId {
pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
Self {
node_id: node_id.into(),
occurrence,
}
}
}
#[derive(Debug, Clone)]
pub enum BarrierDecision {
Approve,
Reject { reason: String },
Modify {
key: String,
value: serde_json::Value,
},
Reroute { target: String },
}
#[derive(Debug)]
pub enum GraphEvent<S: crate::state::workflow_state::WorkflowState = State> {
GraphStart { trace_id: TraceId },
NodeStart {
node_name: String,
trace_id: TraceId,
span_id: SpanId,
step: usize,
},
NodeEnd {
node_name: String,
trace_id: TraceId,
span_id: SpanId,
success: bool,
duration: Duration,
},
Node {
span_id: SpanId,
node_name: String,
event: FlowEvent,
},
BarrierWaiting {
barrier_id: BarrierId,
node_name: String,
span_id: SpanId,
},
BarrierResolved {
barrier_id: BarrierId,
decision: BarrierDecision,
},
ObservedError {
error: ObservedError,
node_name: String,
},
CheckpointSaved {
checkpoint_id: CheckpointId,
node_name: String,
step: usize,
},
GraphComplete { result: GraphResult<S> },
GraphError { error: GraphError, state: S },
}
pub type GraphStream<S = State> = tokio::sync::mpsc::Receiver<GraphEvent<S>>;
pub struct GraphExecution<S: crate::state::workflow_state::WorkflowState = State> {
pub stream: GraphStream<S>,
pub handle: GraphHandle,
}
pub struct GraphHandle {
decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
cancel_tx: tokio::sync::mpsc::Sender<()>,
}
#[allow(dead_code)]
pub(crate) enum BarrierDecisionMessage {
Exact {
barrier_id: BarrierId,
decision: BarrierDecision,
},
Wildcard {
node_id: String,
decision: BarrierDecision,
},
}
impl GraphHandle {
pub(crate) fn new(
decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
cancel_tx: tokio::sync::mpsc::Sender<()>,
) -> Self {
Self {
decision_tx,
cancel_tx,
}
}
pub async fn decide(
&self,
barrier_id: BarrierId,
decision: BarrierDecision,
) -> Result<(), GraphError> {
self.decision_tx
.send(BarrierDecisionMessage::Exact {
barrier_id,
decision,
})
.await
.map_err(|_| {
GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
node: "decision channel closed".into(),
})
})
}
pub async fn decide_wildcard(
&self,
node_id: impl Into<String>,
decision: BarrierDecision,
) -> Result<(), GraphError> {
self.decision_tx
.send(BarrierDecisionMessage::Wildcard {
node_id: node_id.into(),
decision,
})
.await
.map_err(|_| {
GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
node: "decision channel closed".into(),
})
})
}
pub fn cancel(&self) {
let _ = self.cancel_tx.try_send(());
}
}