Skip to main content

worldinterface_host/
status.rs

1//! Flow run status types and derivation from AQ projection state.
2
3use std::collections::HashMap;
4
5use actionqueue_core::ids::TaskId;
6use actionqueue_core::run::RunState;
7use actionqueue_storage::recovery::reducer::ReplayReducer;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use worldinterface_core::id::{FlowRunId, NodeId};
11use worldinterface_flowspec::payload::CoordinatorPayload;
12
13/// Overall status of a flow run.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct FlowRunStatus {
16    pub flow_run_id: FlowRunId,
17    pub phase: FlowPhase,
18    pub steps: Vec<StepStatus>,
19    pub outputs: Option<HashMap<NodeId, Value>>,
20    pub error: Option<String>,
21    /// Timestamp when the flow was submitted (unix epoch seconds).
22    pub submitted_at: u64,
23    /// Timestamp of the last state change (unix epoch seconds).
24    pub last_updated_at: u64,
25    /// Trigger receipt (present when the flow was started via webhook).
26    #[serde(default, skip_serializing_if = "Option::is_none")]
27    pub trigger_receipt: Option<Value>,
28}
29
30/// High-level phase of a flow run's lifecycle.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
32pub enum FlowPhase {
33    Pending,
34    Running,
35    Completed,
36    Failed,
37    Canceled,
38}
39
40/// Status of an individual step within a flow.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct StepStatus {
43    pub node_id: NodeId,
44    pub phase: StepPhase,
45    pub output: Option<Value>,
46    /// Receipt for this step's boundary crossing (if available).
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub receipt: Option<Value>,
49    /// Error message for failed steps.
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub error: Option<String>,
52    /// Label from the FlowSpec node (if set).
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub label: Option<String>,
55    /// Connector name (for connector nodes).
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub connector: Option<String>,
58}
59
60/// Lightweight summary of a flow run for list display.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct FlowRunSummary {
63    pub flow_run_id: FlowRunId,
64    pub phase: FlowPhase,
65    pub submitted_at: u64,
66    pub last_updated_at: u64,
67}
68
69/// Phase of an individual step.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
71pub enum StepPhase {
72    Pending,
73    Running,
74    Completed,
75    Failed,
76    Excluded,
77}
78
79/// Derive the full FlowRunStatus from AQ projection + ContextStore.
80pub(crate) fn derive_flow_run_status(
81    projection: &ReplayReducer,
82    store: &dyn worldinterface_contextstore::ContextStore,
83    flow_run_id: FlowRunId,
84    coordinator_task_id: TaskId,
85) -> FlowRunStatus {
86    // Find the Coordinator's latest run
87    let coordinator_run = find_latest_run(projection, coordinator_task_id);
88
89    let flow_phase = match coordinator_run {
90        None => {
91            // Task exists but no run yet (pre-first-tick)
92            if projection.get_task(&coordinator_task_id).is_some() {
93                FlowPhase::Pending
94            } else {
95                // Should not happen if coordinator_task_id is valid
96                FlowPhase::Pending
97            }
98        }
99        Some(ref run) => match run.state() {
100            RunState::Completed => FlowPhase::Completed,
101            RunState::Failed => FlowPhase::Failed,
102            RunState::Canceled => FlowPhase::Canceled,
103            _ => FlowPhase::Running,
104        },
105    };
106
107    // Collect step statuses by reading the coordinator payload for node→task mapping
108    let (steps, outputs) = collect_step_info(projection, store, flow_run_id, coordinator_task_id);
109
110    // Extract error if failed
111    let error = if flow_phase == FlowPhase::Failed {
112        coordinator_run.as_ref().and_then(|run| extract_last_error(projection, run.id()))
113    } else {
114        None
115    };
116
117    // Timestamps: submitted_at from TaskRecord, last_updated_at from run state
118    let submitted_at =
119        projection.get_task_record(&coordinator_task_id).map(|r| r.created_at()).unwrap_or(0);
120    let last_updated_at =
121        coordinator_run.as_ref().map(|r| r.last_state_change_at()).unwrap_or(submitted_at);
122
123    // Look up trigger receipt (present for webhook-triggered flows)
124    let trigger_receipt_key = format!("receipt:trigger:{}", flow_run_id);
125    let trigger_receipt = store.get_global(&trigger_receipt_key).ok().flatten();
126
127    FlowRunStatus {
128        flow_run_id,
129        phase: flow_phase,
130        steps,
131        outputs: if flow_phase == FlowPhase::Completed { Some(outputs) } else { None },
132        error,
133        submitted_at,
134        last_updated_at,
135        trigger_receipt,
136    }
137}
138
139/// Find the latest RunInstance for a task.
140pub(crate) fn find_latest_run(
141    projection: &ReplayReducer,
142    task_id: TaskId,
143) -> Option<actionqueue_core::run::RunInstance> {
144    projection.runs_for_task(task_id).max_by_key(|r| r.last_state_change_at()).cloned()
145}
146
147/// Collect step statuses and outputs from the AQ projection and ContextStore.
148fn collect_step_info(
149    projection: &ReplayReducer,
150    store: &dyn worldinterface_contextstore::ContextStore,
151    flow_run_id: FlowRunId,
152    coordinator_task_id: TaskId,
153) -> (Vec<StepStatus>, HashMap<NodeId, Value>) {
154    let mut steps = Vec::new();
155    let mut outputs = HashMap::new();
156
157    // Get the coordinator's payload to find node→task mapping
158    let coordinator_spec = match projection.get_task(&coordinator_task_id) {
159        Some(spec) => spec,
160        None => return (steps, outputs),
161    };
162
163    let coordinator_payload: CoordinatorPayload =
164        match serde_json::from_slice(coordinator_spec.payload()) {
165            Ok(p) => p,
166            Err(_) => return (steps, outputs),
167        };
168
169    // For each node in the flow, derive its step status
170    for node in &coordinator_payload.flow_spec.nodes {
171        let task_id = match coordinator_payload.node_task_map.get(&node.id) {
172            Some(&tid) => tid,
173            None => continue,
174        };
175
176        // Try to get the step's output from ContextStore
177        let output = store.get(flow_run_id, node.id).ok().flatten();
178
179        // Find the step's latest run
180        let step_run = find_latest_run(projection, task_id);
181
182        let step_phase = match step_run {
183            None => {
184                // Check if task exists — if not, it may have been excluded (branch not taken)
185                if projection.get_task(&task_id).is_some() {
186                    StepPhase::Pending
187                } else {
188                    // Not submitted → excluded (branch not taken)
189                    // But also check if output exists (completed in prior run before crash)
190                    if output.is_some() {
191                        StepPhase::Completed
192                    } else {
193                        StepPhase::Excluded
194                    }
195                }
196            }
197            Some(ref run) => match run.state() {
198                RunState::Completed => StepPhase::Completed,
199                RunState::Failed => StepPhase::Failed,
200                RunState::Canceled => {
201                    // Check if this step was excluded via branch
202                    // A canceled step task means the coordinator marked it as excluded
203                    if output.is_some() {
204                        StepPhase::Completed
205                    } else {
206                        // Check the step payload — if branch evaluated to exclude, this is Excluded
207                        // For now, check via attempt output
208                        match extract_step_exclusion(projection, run.id()) {
209                            true => StepPhase::Excluded,
210                            false => StepPhase::Failed,
211                        }
212                    }
213                }
214                _ => StepPhase::Running,
215            },
216        };
217
218        if let Some(ref val) = output {
219            outputs.insert(node.id, val.clone());
220        }
221
222        // Look up step receipt from ContextStore globals
223        let receipt_key = format!("receipt:step:{}:{}", flow_run_id, node.id);
224        let receipt = store.get_global(&receipt_key).ok().flatten();
225
226        // Extract error from AQ attempt history for failed steps
227        let step_error = if step_phase == StepPhase::Failed {
228            step_run.as_ref().and_then(|run| extract_last_error(projection, run.id()))
229        } else {
230            None
231        };
232
233        // Extract label and connector from FlowSpec node
234        let label = node.label.clone();
235        let connector = match &node.node_type {
236            worldinterface_core::flowspec::NodeType::Connector(c) => Some(c.connector.clone()),
237            _ => None,
238        };
239
240        steps.push(StepStatus {
241            node_id: node.id,
242            phase: step_phase,
243            output,
244            receipt,
245            error: step_error,
246            label,
247            connector,
248        });
249    }
250
251    (steps, outputs)
252}
253
254/// Check if a step run was excluded (branch not taken) by looking at its attempt output.
255fn extract_step_exclusion(
256    projection: &ReplayReducer,
257    run_id: actionqueue_core::ids::RunId,
258) -> bool {
259    if let Some(history) = projection.get_attempt_history(&run_id) {
260        for entry in history.iter().rev() {
261            if let Some(output) = entry.output() {
262                // The step handler writes "excluded" markers for branch-excluded steps
263                if let Ok(payload) = serde_json::from_slice::<Value>(output) {
264                    if payload.get("excluded").and_then(|v| v.as_bool()) == Some(true) {
265                        return true;
266                    }
267                }
268            }
269        }
270    }
271    false
272}
273
274/// Extract the last error message from a run's attempt history.
275fn extract_last_error(
276    projection: &ReplayReducer,
277    run_id: actionqueue_core::ids::RunId,
278) -> Option<String> {
279    let history = projection.get_attempt_history(&run_id)?;
280    history.iter().rev().find_map(|a| a.error().map(|s| s.to_string()))
281}
282
283/// Extract all flow outputs from ContextStore for a completed flow.
284pub(crate) fn extract_flow_outputs(
285    store: &dyn worldinterface_contextstore::ContextStore,
286    flow_run_id: FlowRunId,
287) -> Result<HashMap<NodeId, Value>, worldinterface_contextstore::error::ContextStoreError> {
288    let keys = store.list_keys(flow_run_id)?;
289    let mut outputs = HashMap::new();
290    for node_id in keys {
291        if let Some(val) = store.get(flow_run_id, node_id)? {
292            outputs.insert(node_id, val);
293        }
294    }
295    Ok(outputs)
296}