1use std::time::Duration;
8
9use crate::checkpoint::CheckpointId;
10use crate::error::{GraphError, ObservedError};
11use crate::ids::{SpanId, TraceId};
12use crate::state::GraphResult;
13
14#[derive(Debug)]
18pub enum FlowEvent {
19 NodeStarted { node_id: String, span_id: SpanId },
21 NodeCompleted {
23 node_id: String,
24 span_id: SpanId,
25 duration: Duration,
26 },
27 NodeFailed { node_id: String, error: String },
29 StateChanged {
31 node_id: String,
32 delta: crate::delta::StateDelta,
33 },
34 ParallelStarted {
36 node_id: String,
37 branch_count: usize,
38 span_id: SpanId,
39 },
40 ParallelCompleted {
42 node_id: String,
43 span_id: SpanId,
44 duration: Duration,
45 },
46 BranchCompleted {
48 branch_name: String,
49 node_id: String,
50 span_id: SpanId,
51 success: bool,
52 duration: Duration,
53 },
54 Custom {
56 node_id: String,
57 payload: Box<dyn std::any::Any + Send + Sync>,
58 },
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Hash)]
65pub struct BarrierId {
66 pub node_id: String,
67 pub occurrence: u32,
68}
69
70impl BarrierId {
71 pub fn new(node_id: impl Into<String>, occurrence: u32) -> Self {
72 Self {
73 node_id: node_id.into(),
74 occurrence,
75 }
76 }
77}
78
79#[derive(Debug, Clone)]
81pub enum BarrierDecision {
82 Approve,
84 Reject { reason: String },
86 Modify {
88 key: String,
89 value: serde_json::Value,
90 },
91 Reroute { target: String },
93}
94
95#[derive(Debug)]
99pub enum GraphEvent {
100 GraphStart { trace_id: TraceId },
102 NodeStart {
104 node_name: String,
105 trace_id: TraceId,
106 span_id: SpanId,
107 step: usize,
108 },
109 NodeEnd {
111 node_name: String,
112 trace_id: TraceId,
113 span_id: SpanId,
114 success: bool,
115 duration: Duration,
116 },
117 Node {
119 span_id: SpanId,
120 node_name: String,
121 event: FlowEvent,
122 },
123 BarrierWaiting {
125 barrier_id: BarrierId,
126 node_name: String,
127 span_id: SpanId,
128 },
129 BarrierResolved {
131 barrier_id: BarrierId,
132 decision: BarrierDecision,
133 },
134 ObservedError {
136 error: ObservedError,
137 node_name: String,
138 },
139 CheckpointSaved {
141 checkpoint_id: CheckpointId,
142 node_name: String,
143 step: usize,
144 },
145 GraphComplete { result: GraphResult },
147 GraphError {
149 error: GraphError,
150 state: crate::state::State,
151 },
152}
153
154pub type GraphStream = tokio::sync::mpsc::Receiver<GraphEvent>;
156
157pub struct GraphExecution {
159 pub stream: GraphStream,
160 pub handle: GraphHandle,
161}
162
163pub struct GraphHandle {
167 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
168 cancel_tx: tokio::sync::mpsc::Sender<()>,
169}
170
171#[allow(dead_code)]
173pub(crate) enum BarrierDecisionMessage {
174 Exact {
175 barrier_id: BarrierId,
176 decision: BarrierDecision,
177 },
178 Wildcard {
179 node_id: String,
180 decision: BarrierDecision,
181 },
182}
183
184impl GraphHandle {
185 pub(crate) fn new(
186 decision_tx: tokio::sync::mpsc::Sender<BarrierDecisionMessage>,
187 cancel_tx: tokio::sync::mpsc::Sender<()>,
188 ) -> Self {
189 Self {
190 decision_tx,
191 cancel_tx,
192 }
193 }
194
195 pub async fn decide(
196 &self,
197 barrier_id: BarrierId,
198 decision: BarrierDecision,
199 ) -> Result<(), GraphError> {
200 self.decision_tx
201 .send(BarrierDecisionMessage::Exact {
202 barrier_id,
203 decision,
204 })
205 .await
206 .map_err(|_| {
207 GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
208 node: "decision channel closed".into(),
209 })
210 })
211 }
212
213 pub async fn decide_wildcard(
214 &self,
215 node_id: impl Into<String>,
216 decision: BarrierDecision,
217 ) -> Result<(), GraphError> {
218 self.decision_tx
219 .send(BarrierDecisionMessage::Wildcard {
220 node_id: node_id.into(),
221 decision,
222 })
223 .await
224 .map_err(|_| {
225 GraphError::Terminal(crate::error::TerminalError::BarrierCancelled {
226 node: "decision channel closed".into(),
227 })
228 })
229 }
230
231 pub fn cancel(&self) {
232 let _ = self.cancel_tx.try_send(());
233 }
234}