1use std::time::Duration;
8
9use crate::checkpoint::CheckpointId;
10use crate::error::{GraphError, ObservedError};
11use crate::ids::{SpanId, TraceId};
12use crate::state::GraphResult;
13
14#[derive(Debug)]
18pub enum FlowEvent {
19 NodeStarted { node_id: String, span_id: SpanId },
21 NodeCompleted {
23 node_id: String,
24 span_id: SpanId,
25 duration: Duration,
26 },
27 NodeFailed { node_id: String, error: String },
29 StateChanged {
31 node_id: String,
32 delta: crate::delta::StateDelta,
33 },
34 ParallelStarted {
36 node_id: String,
37 branch_count: usize,
38 span_id: SpanId,
39 },
40 ParallelCompleted {
42 node_id: String,
43 span_id: SpanId,
44 duration: Duration,
45 },
46 BranchCompleted {
48 branch_name: String,
49 node_id: String,
50 span_id: SpanId,
51 success: bool,
52 duration: Duration,
53 },
54 Custom {
56 node_id: String,
57 payload: Box<dyn std::any::Any + Send + Sync>,
58 },
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct BarrierId {
66 pub node_id: String,
67 pub occurrence: u32,
68}
69
70impl BarrierId {
71 pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
72 Self {
73 node_id: node_id.into(),
74 occurrence,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub enum BarrierDecision {
82 Approve,
84 Reject { reason: String },
86 Modify {
88 key: String,
89 value: serde_json::Value,
90 },
91 Reroute { target: String },
93}
94
95#[derive(Debug)]
99pub enum GraphEvent {
100 GraphStart { trace_id: TraceId },
102 NodeStart {
104 node_name: String,
105 trace_id: TraceId,
106 span_id: SpanId,
107 step: usize,
108 },
109 NodeEnd {
111 node_name: String,
112 trace_id: TraceId,
113 span_id: SpanId,
114 success: bool,
115 duration: Duration,
116 },
117 Node {
119 span_id: SpanId,
120 node_name: String,
121 event: FlowEvent,
122 },
123 BarrierWaiting {
125 barrier_id: BarrierId,
126 node_name: String,
127 span_id: SpanId,
128 },
129 BarrierResolved {
131 barrier_id: BarrierId,
132 decision: BarrierDecision,
133 },
134 ObservedError {
136 error: ObservedError,
137 node_name: String,
138 },
139 CheckpointSaved {
141 checkpoint_id: CheckpointId,
142 node_name: String,
143 step: usize,
144 },
145 GraphComplete { result: GraphResult },
147 GraphError {
149 error: GraphError,
150 state: crate::state::State,
151 },
152}
153
154pub type GraphStream = tokio::sync::mpsc::Receiver<GraphEvent>;
156
157pub struct GraphExecution {
159 pub stream: GraphStream,
160 pub handle: GraphHandle,
161}
162
163pub struct GraphHandle {
167 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
168 cancel_tx: tokio::sync::mpsc::Sender<()>,
169 checkpoint_tx: tokio::sync::mpsc::Sender<()>,
170}
171
172#[allow(dead_code)]
174pub(crate) enum BarrierDecisionMessage {
175 Exact {
176 barrier_id: BarrierId,
177 decision: BarrierDecision,
178 },
179 Wildcard {
180 node_id: String,
181 decision: BarrierDecision,
182 },
183}
184
185impl GraphHandle {
186 pub(crate) fn new(
187 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
188 cancel_tx: tokio::sync::mpsc::Sender<()>,
189 checkpoint_tx: tokio::sync::mpsc::Sender<()>,
190 ) -> Self {
191 Self {
192 decision_tx,
193 cancel_tx,
194 checkpoint_tx,
195 }
196 }
197
198 pub async fn decide(
199 &self,
200 barrier_id: BarrierId,
201 decision: BarrierDecision,
202 ) -> Result<(), GraphError> {
203 self.decision_tx
204 .send(BarrierDecisionMessage::Exact {
205 barrier_id,
206 decision,
207 })
208 .await
209 .map_err(|_| {
210 GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
211 node: "decision channel closed".into(),
212 })
213 })
214 }
215
216 pub async fn decide_wildcard(
217 &self,
218 node_id: impl Into<String>,
219 decision: BarrierDecision,
220 ) -> Result<(), GraphError> {
221 self.decision_tx
222 .send(BarrierDecisionMessage::Wildcard {
223 node_id: node_id.into(),
224 decision,
225 })
226 .await
227 .map_err(|_| {
228 GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
229 node: "decision channel closed".into(),
230 })
231 })
232 }
233
234 pub fn cancel(&self) {
235 let _ = self.cancel_tx.try_send(());
236 }
237
238 pub async fn checkpoint(&self) -> Result<(), GraphError> {
239 self.checkpoint_tx.send(()).await.map_err(|_| {
240 GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
241 node: "checkpoint channel closed".into(),
242 })
243 })
244 }
245}