1use std::collections::HashMap;
4
5use actionqueue_core::ids::TaskId;
6use actionqueue_core::run::RunState;
7use actionqueue_storage::recovery::reducer::ReplayReducer;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use worldinterface_core::id::{FlowRunId, NodeId};
11use worldinterface_flowspec::payload::CoordinatorPayload;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct FlowRunStatus {
16 pub flow_run_id: FlowRunId,
17 pub phase: FlowPhase,
18 pub steps: Vec<StepStatus>,
19 pub outputs: Option<HashMap<NodeId, Value>>,
20 pub error: Option<String>,
21 pub submitted_at: u64,
23 pub last_updated_at: u64,
25 #[serde(default, skip_serializing_if = "Option::is_none")]
27 pub trigger_receipt: Option<Value>,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32pub enum FlowPhase {
33 Pending,
34 Running,
35 Completed,
36 Failed,
37 Canceled,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct StepStatus {
43 pub node_id: NodeId,
44 pub phase: StepPhase,
45 pub output: Option<Value>,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub receipt: Option<Value>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub error: Option<String>,
52 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub label: Option<String>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub connector: Option<String>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct FlowRunSummary {
63 pub flow_run_id: FlowRunId,
64 pub phase: FlowPhase,
65 pub submitted_at: u64,
66 pub last_updated_at: u64,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
71pub enum StepPhase {
72 Pending,
73 Running,
74 Completed,
75 Failed,
76 Excluded,
77}
78
79pub(crate) fn derive_flow_run_status(
81 projection: &ReplayReducer,
82 store: &dyn worldinterface_contextstore::ContextStore,
83 flow_run_id: FlowRunId,
84 coordinator_task_id: TaskId,
85) -> FlowRunStatus {
86 let coordinator_run = find_latest_run(projection, coordinator_task_id);
88
89 let flow_phase = match coordinator_run {
90 None => {
91 if projection.get_task(&coordinator_task_id).is_some() {
93 FlowPhase::Pending
94 } else {
95 FlowPhase::Pending
97 }
98 }
99 Some(ref run) => match run.state() {
100 RunState::Completed => FlowPhase::Completed,
101 RunState::Failed => FlowPhase::Failed,
102 RunState::Canceled => FlowPhase::Canceled,
103 _ => FlowPhase::Running,
104 },
105 };
106
107 let (steps, outputs) = collect_step_info(projection, store, flow_run_id, coordinator_task_id);
109
110 let error = if flow_phase == FlowPhase::Failed {
112 coordinator_run.as_ref().and_then(|run| extract_last_error(projection, run.id()))
113 } else {
114 None
115 };
116
117 let submitted_at =
119 projection.get_task_record(&coordinator_task_id).map(|r| r.created_at()).unwrap_or(0);
120 let last_updated_at =
121 coordinator_run.as_ref().map(|r| r.last_state_change_at()).unwrap_or(submitted_at);
122
123 let trigger_receipt_key = format!("receipt:trigger:{}", flow_run_id);
125 let trigger_receipt = store.get_global(&trigger_receipt_key).ok().flatten();
126
127 FlowRunStatus {
128 flow_run_id,
129 phase: flow_phase,
130 steps,
131 outputs: if flow_phase == FlowPhase::Completed { Some(outputs) } else { None },
132 error,
133 submitted_at,
134 last_updated_at,
135 trigger_receipt,
136 }
137}
138
139pub(crate) fn find_latest_run(
141 projection: &ReplayReducer,
142 task_id: TaskId,
143) -> Option<actionqueue_core::run::RunInstance> {
144 projection.runs_for_task(task_id).max_by_key(|r| r.last_state_change_at()).cloned()
145}
146
147fn collect_step_info(
149 projection: &ReplayReducer,
150 store: &dyn worldinterface_contextstore::ContextStore,
151 flow_run_id: FlowRunId,
152 coordinator_task_id: TaskId,
153) -> (Vec<StepStatus>, HashMap<NodeId, Value>) {
154 let mut steps = Vec::new();
155 let mut outputs = HashMap::new();
156
157 let coordinator_spec = match projection.get_task(&coordinator_task_id) {
159 Some(spec) => spec,
160 None => return (steps, outputs),
161 };
162
163 let coordinator_payload: CoordinatorPayload =
164 match serde_json::from_slice(coordinator_spec.payload()) {
165 Ok(p) => p,
166 Err(_) => return (steps, outputs),
167 };
168
169 for node in &coordinator_payload.flow_spec.nodes {
171 let task_id = match coordinator_payload.node_task_map.get(&node.id) {
172 Some(&tid) => tid,
173 None => continue,
174 };
175
176 let output = store.get(flow_run_id, node.id).ok().flatten();
178
179 let step_run = find_latest_run(projection, task_id);
181
182 let step_phase = match step_run {
183 None => {
184 if projection.get_task(&task_id).is_some() {
186 StepPhase::Pending
187 } else {
188 if output.is_some() {
191 StepPhase::Completed
192 } else {
193 StepPhase::Excluded
194 }
195 }
196 }
197 Some(ref run) => match run.state() {
198 RunState::Completed => StepPhase::Completed,
199 RunState::Failed => StepPhase::Failed,
200 RunState::Canceled => {
201 if output.is_some() {
204 StepPhase::Completed
205 } else {
206 match extract_step_exclusion(projection, run.id()) {
209 true => StepPhase::Excluded,
210 false => StepPhase::Failed,
211 }
212 }
213 }
214 _ => StepPhase::Running,
215 },
216 };
217
218 if let Some(ref val) = output {
219 outputs.insert(node.id, val.clone());
220 }
221
222 let receipt_key = format!("receipt:step:{}:{}", flow_run_id, node.id);
224 let receipt = store.get_global(&receipt_key).ok().flatten();
225
226 let step_error = if step_phase == StepPhase::Failed {
228 step_run.as_ref().and_then(|run| extract_last_error(projection, run.id()))
229 } else {
230 None
231 };
232
233 let label = node.label.clone();
235 let connector = match &node.node_type {
236 worldinterface_core::flowspec::NodeType::Connector(c) => Some(c.connector.clone()),
237 _ => None,
238 };
239
240 steps.push(StepStatus {
241 node_id: node.id,
242 phase: step_phase,
243 output,
244 receipt,
245 error: step_error,
246 label,
247 connector,
248 });
249 }
250
251 (steps, outputs)
252}
253
254fn extract_step_exclusion(
256 projection: &ReplayReducer,
257 run_id: actionqueue_core::ids::RunId,
258) -> bool {
259 if let Some(history) = projection.get_attempt_history(&run_id) {
260 for entry in history.iter().rev() {
261 if let Some(output) = entry.output() {
262 if let Ok(payload) = serde_json::from_slice::<Value>(output) {
264 if payload.get("excluded").and_then(|v| v.as_bool()) == Some(true) {
265 return true;
266 }
267 }
268 }
269 }
270 }
271 false
272}
273
274fn extract_last_error(
276 projection: &ReplayReducer,
277 run_id: actionqueue_core::ids::RunId,
278) -> Option<String> {
279 let history = projection.get_attempt_history(&run_id)?;
280 history.iter().rev().find_map(|a| a.error().map(|s| s.to_string()))
281}
282
283pub(crate) fn extract_flow_outputs(
285 store: &dyn worldinterface_contextstore::ContextStore,
286 flow_run_id: FlowRunId,
287) -> Result<HashMap<NodeId, Value>, worldinterface_contextstore::error::ContextStoreError> {
288 let keys = store.list_keys(flow_run_id)?;
289 let mut outputs = HashMap::new();
290 for node_id in keys {
291 if let Some(val) = store.get(flow_run_id, node_id)? {
292 outputs.insert(node_id, val);
293 }
294 }
295 Ok(outputs)
296}