Skip to main content

distri_workflow/
step_executions.rs

1//! Per-step execution records — the storage sidecar to the cloud's
2//! canonical `Task` system.
3//!
4//! Each step that runs gets:
5//!   - one `Task` row in the cloud's task store (status, parent_task_id
6//!     pointing at the run task, timestamps), and
7//!   - one `WorkflowStepExecution` row holding the workflow-specific
8//!     extras: the `step_id` (link back to the definition template),
9//!     the resolved `result` / `error`, and start / completion times.
10//!
11//! Phase 2c will rewire the engine to drive this store + `TaskStore`
12//! directly. For now this trait + types are introduced so the cloud
13//! migration + Postgres impl can land without disrupting the existing
14//! `WorkflowStateStore`-based engine.
15
16use chrono::{DateTime, Utc};
17use distri_types::TaskStatus;
18use serde::{Deserialize, Serialize};
19
20/// One step's execution row. `task_id` is the cloud `Task` row's id;
21/// `run_task_id` is the parent run-level Task. `step_id` is the
22/// definition-level identifier (e.g. `"fetch"`, `"summarize"`).
23///
24/// `status` mirrors the corresponding `Task` row's status — duplicated
25/// here so listing one run's steps + their statuses needs only one
26/// query against `workflow_step_executions`.
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28pub struct WorkflowStepExecution {
29    pub task_id: String,
30    pub run_task_id: String,
31    pub step_id: String,
32    pub status: TaskStatus,
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub started_at: Option<DateTime<Utc>>,
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub completed_at: Option<DateTime<Utc>>,
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub result: Option<serde_json::Value>,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub error: Option<String>,
41}
42
43/// Update for the runtime fields of a step execution. Fields left
44/// `None` are not touched; pass `Some(...)` to set, including
45/// `Some(None)` if you need to clear (callers can compose by reading
46/// then writing — this struct sticks to write-set semantics).
47#[derive(Debug, Default, Clone, Serialize, Deserialize)]
48pub struct WorkflowStepExecutionUpdate {
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub status: Option<TaskStatus>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub started_at: Option<DateTime<Utc>>,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub completed_at: Option<DateTime<Utc>>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub result: Option<serde_json::Value>,
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub error: Option<String>,
59}
60
61/// Persist and load `WorkflowStepExecution` rows.
62///
63/// Implementations: cloud Postgres (production), in-memory (tests +
64/// the standalone client-side runner).
65#[async_trait::async_trait]
66pub trait WorkflowStepExecutionStore: Send + Sync {
67    /// Insert a step execution row. Returns the inserted record.
68    async fn insert(
69        &self,
70        execution: WorkflowStepExecution,
71    ) -> anyhow::Result<WorkflowStepExecution>;
72
73    /// Apply an update to the step identified by `(run_task_id,
74    /// step_id)`. Returns the updated record.
75    async fn update(
76        &self,
77        run_task_id: &str,
78        step_id: &str,
79        update: WorkflowStepExecutionUpdate,
80    ) -> anyhow::Result<WorkflowStepExecution>;
81
82    /// Fetch one step execution by `(run_task_id, step_id)`.
83    async fn get(
84        &self,
85        run_task_id: &str,
86        step_id: &str,
87    ) -> anyhow::Result<Option<WorkflowStepExecution>>;
88
89    /// List all step executions for a run, in insertion order.
90    async fn list(&self, run_task_id: &str) -> anyhow::Result<Vec<WorkflowStepExecution>>;
91}
92
93/// In-memory `WorkflowStepExecutionStore` for tests and the standalone
94/// client-side runner.
95pub struct InMemoryWorkflowStepExecutionStore {
96    rows: std::sync::Mutex<Vec<WorkflowStepExecution>>,
97}
98
99impl Default for InMemoryWorkflowStepExecutionStore {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl InMemoryWorkflowStepExecutionStore {
106    pub fn new() -> Self {
107        Self {
108            rows: std::sync::Mutex::new(Vec::new()),
109        }
110    }
111}
112
113#[async_trait::async_trait]
114impl WorkflowStepExecutionStore for InMemoryWorkflowStepExecutionStore {
115    async fn insert(
116        &self,
117        execution: WorkflowStepExecution,
118    ) -> anyhow::Result<WorkflowStepExecution> {
119        let mut rows = self
120            .rows
121            .lock()
122            .map_err(|e| anyhow::anyhow!(e.to_string()))?;
123        rows.push(execution.clone());
124        Ok(execution)
125    }
126
127    async fn update(
128        &self,
129        run_task_id: &str,
130        step_id: &str,
131        update: WorkflowStepExecutionUpdate,
132    ) -> anyhow::Result<WorkflowStepExecution> {
133        let mut rows = self
134            .rows
135            .lock()
136            .map_err(|e| anyhow::anyhow!(e.to_string()))?;
137        let row = rows
138            .iter_mut()
139            .find(|r| r.run_task_id == run_task_id && r.step_id == step_id)
140            .ok_or_else(|| {
141                anyhow::anyhow!("step execution not found: run={run_task_id}, step={step_id}")
142            })?;
143        if let Some(s) = update.status {
144            row.status = s;
145        }
146        if let Some(t) = update.started_at {
147            row.started_at = Some(t);
148        }
149        if let Some(t) = update.completed_at {
150            row.completed_at = Some(t);
151        }
152        if let Some(r) = update.result {
153            row.result = Some(r);
154        }
155        if let Some(e) = update.error {
156            row.error = Some(e);
157        }
158        Ok(row.clone())
159    }
160
161    async fn get(
162        &self,
163        run_task_id: &str,
164        step_id: &str,
165    ) -> anyhow::Result<Option<WorkflowStepExecution>> {
166        let rows = self
167            .rows
168            .lock()
169            .map_err(|e| anyhow::anyhow!(e.to_string()))?;
170        Ok(rows
171            .iter()
172            .find(|r| r.run_task_id == run_task_id && r.step_id == step_id)
173            .cloned())
174    }
175
176    async fn list(&self, run_task_id: &str) -> anyhow::Result<Vec<WorkflowStepExecution>> {
177        let rows = self
178            .rows
179            .lock()
180            .map_err(|e| anyhow::anyhow!(e.to_string()))?;
181        Ok(rows
182            .iter()
183            .filter(|r| r.run_task_id == run_task_id)
184            .cloned()
185            .collect())
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    fn sample(run: &str, step: &str) -> WorkflowStepExecution {
194        WorkflowStepExecution {
195            task_id: format!("task-{step}"),
196            run_task_id: run.to_string(),
197            step_id: step.to_string(),
198            status: TaskStatus::Pending,
199            started_at: None,
200            completed_at: None,
201            result: None,
202            error: None,
203        }
204    }
205
206    #[tokio::test]
207    async fn insert_and_get_roundtrip() {
208        let store = InMemoryWorkflowStepExecutionStore::new();
209        store.insert(sample("run1", "fetch")).await.unwrap();
210        let got = store.get("run1", "fetch").await.unwrap().unwrap();
211        assert_eq!(got.step_id, "fetch");
212        assert_eq!(got.status, TaskStatus::Pending);
213    }
214
215    #[tokio::test]
216    async fn update_changes_only_set_fields() {
217        let store = InMemoryWorkflowStepExecutionStore::new();
218        store.insert(sample("run1", "s1")).await.unwrap();
219
220        store
221            .update(
222                "run1",
223                "s1",
224                WorkflowStepExecutionUpdate {
225                    status: Some(TaskStatus::Running),
226                    started_at: Some(Utc::now()),
227                    ..Default::default()
228                },
229            )
230            .await
231            .unwrap();
232
233        let got = store.get("run1", "s1").await.unwrap().unwrap();
234        assert_eq!(got.status, TaskStatus::Running);
235        assert!(got.started_at.is_some());
236        assert!(got.completed_at.is_none());
237        assert!(got.error.is_none());
238    }
239
240    #[tokio::test]
241    async fn list_is_per_run() {
242        let store = InMemoryWorkflowStepExecutionStore::new();
243        store.insert(sample("run1", "a")).await.unwrap();
244        store.insert(sample("run1", "b")).await.unwrap();
245        store.insert(sample("run2", "a")).await.unwrap();
246
247        let run1 = store.list("run1").await.unwrap();
248        assert_eq!(run1.len(), 2);
249        let run2 = store.list("run2").await.unwrap();
250        assert_eq!(run2.len(), 1);
251    }
252}