1use std::time::Duration;
8
9use crate::checkpoint::CheckpointId;
10use crate::error::{GraphError, ObservedError};
11use crate::ids::{SpanId, TraceId};
12use crate::state::{GraphResult, State};
13
14#[derive(Debug)]
18pub enum FlowEvent {
19 NodeStarted { node_id: String, span_id: SpanId },
21 NodeCompleted {
23 node_id: String,
24 span_id: SpanId,
25 duration: Duration,
26 },
27 NodeFailed { node_id: String, error: String },
29 StateChanged {
31 node_id: String,
32 key: String,
33 value: serde_json::Value,
34 },
35 ParallelStarted {
37 node_id: String,
38 branch_count: usize,
39 span_id: SpanId,
40 },
41 ParallelCompleted {
43 node_id: String,
44 span_id: SpanId,
45 duration: Duration,
46 },
47 BranchCompleted {
49 branch_name: String,
50 node_id: String,
51 span_id: SpanId,
52 success: bool,
53 duration: Duration,
54 },
55 Custom {
57 node_id: String,
58 payload: Box<dyn std::any::Any + Send + Sync>,
59 },
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct BarrierId {
67 pub node_id: String,
68 pub occurrence: u32,
69}
70
71impl BarrierId {
72 pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
73 Self {
74 node_id: node_id.into(),
75 occurrence,
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
82pub enum BarrierDecision {
83 Approve,
85 Reject { reason: String },
87 Modify {
89 key: String,
90 value: serde_json::Value,
91 },
92 Reroute { target: String },
94}
95
96#[derive(Debug)]
104pub enum GraphEvent<S: crate::workflow_state::WorkflowState = State> {
105 GraphStart { trace_id: TraceId },
107 NodeStart {
109 node_name: String,
110 trace_id: TraceId,
111 span_id: SpanId,
112 step: usize,
113 },
114 NodeEnd {
116 node_name: String,
117 trace_id: TraceId,
118 span_id: SpanId,
119 success: bool,
120 duration: Duration,
121 },
122 Node {
124 span_id: SpanId,
125 node_name: String,
126 event: FlowEvent,
127 },
128 BarrierWaiting {
130 barrier_id: BarrierId,
131 node_name: String,
132 span_id: SpanId,
133 },
134 BarrierResolved {
136 barrier_id: BarrierId,
137 decision: BarrierDecision,
138 },
139 ObservedError {
141 error: ObservedError,
142 node_name: String,
143 },
144 CheckpointSaved {
146 checkpoint_id: CheckpointId,
147 node_name: String,
148 step: usize,
149 },
150 GraphComplete { result: GraphResult<S> },
152 GraphError { error: GraphError, state: S },
154}
155
156pub type GraphStream<S = State> = tokio::sync::mpsc::Receiver<GraphEvent<S>>;
158
159pub struct GraphExecution<S: crate::workflow_state::WorkflowState = State> {
161 pub stream: GraphStream<S>,
162 pub handle: GraphHandle,
163}
164
165pub struct GraphHandle {
169 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
170 cancel_tx: tokio::sync::mpsc::Sender<()>,
171}
172
173#[allow(dead_code)]
175pub(crate) enum BarrierDecisionMessage {
176 Exact {
177 barrier_id: BarrierId,
178 decision: BarrierDecision,
179 },
180 Wildcard {
181 node_id: String,
182 decision: BarrierDecision,
183 },
184}
185
186impl GraphHandle {
187 pub(crate) fn new(
188 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
189 cancel_tx: tokio::sync::mpsc::Sender<()>,
190 ) -> Self {
191 Self {
192 decision_tx,
193 cancel_tx,
194 }
195 }
196
197 pub async fn decide(
198 &self,
199 barrier_id: BarrierId,
200 decision: BarrierDecision,
201 ) -> Result<(), GraphError> {
202 self.decision_tx
203 .send(BarrierDecisionMessage::Exact {
204 barrier_id,
205 decision,
206 })
207 .await
208 .map_err(|_| {
209 GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
210 node: "decision channel closed".into(),
211 })
212 })
213 }
214
215 pub async fn decide_wildcard(
216 &self,
217 node_id: impl Into<String>,
218 decision: BarrierDecision,
219 ) -> Result<(), GraphError> {
220 self.decision_tx
221 .send(BarrierDecisionMessage::Wildcard {
222 node_id: node_id.into(),
223 decision,
224 })
225 .await
226 .map_err(|_| {
227 GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
228 node: "decision channel closed".into(),
229 })
230 })
231 }
232
233 pub fn cancel(&self) {
234 let _ = self.cancel_tx.try_send(());
235 }
236}