distri_workflow/
step_executions.rs1use chrono::{DateTime, Utc};
17use distri_types::TaskStatus;
18use serde::{Deserialize, Serialize};
19
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28pub struct WorkflowStepExecution {
29 pub task_id: String,
30 pub run_task_id: String,
31 pub step_id: String,
32 pub status: TaskStatus,
33 #[serde(default, skip_serializing_if = "Option::is_none")]
34 pub started_at: Option<DateTime<Utc>>,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub completed_at: Option<DateTime<Utc>>,
37 #[serde(default, skip_serializing_if = "Option::is_none")]
38 pub result: Option<serde_json::Value>,
39 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub error: Option<String>,
41}
42
43#[derive(Debug, Default, Clone, Serialize, Deserialize)]
48pub struct WorkflowStepExecutionUpdate {
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub status: Option<TaskStatus>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub started_at: Option<DateTime<Utc>>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub completed_at: Option<DateTime<Utc>>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub result: Option<serde_json::Value>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub error: Option<String>,
59}
60
61#[async_trait::async_trait]
66pub trait WorkflowStepExecutionStore: Send + Sync {
67 async fn insert(
69 &self,
70 execution: WorkflowStepExecution,
71 ) -> anyhow::Result<WorkflowStepExecution>;
72
73 async fn update(
76 &self,
77 run_task_id: &str,
78 step_id: &str,
79 update: WorkflowStepExecutionUpdate,
80 ) -> anyhow::Result<WorkflowStepExecution>;
81
82 async fn get(
84 &self,
85 run_task_id: &str,
86 step_id: &str,
87 ) -> anyhow::Result<Option<WorkflowStepExecution>>;
88
89 async fn list(&self, run_task_id: &str) -> anyhow::Result<Vec<WorkflowStepExecution>>;
91}
92
93pub struct InMemoryWorkflowStepExecutionStore {
96 rows: std::sync::Mutex<Vec<WorkflowStepExecution>>,
97}
98
99impl Default for InMemoryWorkflowStepExecutionStore {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl InMemoryWorkflowStepExecutionStore {
106 pub fn new() -> Self {
107 Self {
108 rows: std::sync::Mutex::new(Vec::new()),
109 }
110 }
111}
112
113#[async_trait::async_trait]
114impl WorkflowStepExecutionStore for InMemoryWorkflowStepExecutionStore {
115 async fn insert(
116 &self,
117 execution: WorkflowStepExecution,
118 ) -> anyhow::Result<WorkflowStepExecution> {
119 let mut rows = self
120 .rows
121 .lock()
122 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
123 rows.push(execution.clone());
124 Ok(execution)
125 }
126
127 async fn update(
128 &self,
129 run_task_id: &str,
130 step_id: &str,
131 update: WorkflowStepExecutionUpdate,
132 ) -> anyhow::Result<WorkflowStepExecution> {
133 let mut rows = self
134 .rows
135 .lock()
136 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
137 let row = rows
138 .iter_mut()
139 .find(|r| r.run_task_id == run_task_id && r.step_id == step_id)
140 .ok_or_else(|| {
141 anyhow::anyhow!("step execution not found: run={run_task_id}, step={step_id}")
142 })?;
143 if let Some(s) = update.status {
144 row.status = s;
145 }
146 if let Some(t) = update.started_at {
147 row.started_at = Some(t);
148 }
149 if let Some(t) = update.completed_at {
150 row.completed_at = Some(t);
151 }
152 if let Some(r) = update.result {
153 row.result = Some(r);
154 }
155 if let Some(e) = update.error {
156 row.error = Some(e);
157 }
158 Ok(row.clone())
159 }
160
161 async fn get(
162 &self,
163 run_task_id: &str,
164 step_id: &str,
165 ) -> anyhow::Result<Option<WorkflowStepExecution>> {
166 let rows = self
167 .rows
168 .lock()
169 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
170 Ok(rows
171 .iter()
172 .find(|r| r.run_task_id == run_task_id && r.step_id == step_id)
173 .cloned())
174 }
175
176 async fn list(&self, run_task_id: &str) -> anyhow::Result<Vec<WorkflowStepExecution>> {
177 let rows = self
178 .rows
179 .lock()
180 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
181 Ok(rows
182 .iter()
183 .filter(|r| r.run_task_id == run_task_id)
184 .cloned()
185 .collect())
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 fn sample(run: &str, step: &str) -> WorkflowStepExecution {
194 WorkflowStepExecution {
195 task_id: format!("task-{step}"),
196 run_task_id: run.to_string(),
197 step_id: step.to_string(),
198 status: TaskStatus::Pending,
199 started_at: None,
200 completed_at: None,
201 result: None,
202 error: None,
203 }
204 }
205
206 #[tokio::test]
207 async fn insert_and_get_roundtrip() {
208 let store = InMemoryWorkflowStepExecutionStore::new();
209 store.insert(sample("run1", "fetch")).await.unwrap();
210 let got = store.get("run1", "fetch").await.unwrap().unwrap();
211 assert_eq!(got.step_id, "fetch");
212 assert_eq!(got.status, TaskStatus::Pending);
213 }
214
215 #[tokio::test]
216 async fn update_changes_only_set_fields() {
217 let store = InMemoryWorkflowStepExecutionStore::new();
218 store.insert(sample("run1", "s1")).await.unwrap();
219
220 store
221 .update(
222 "run1",
223 "s1",
224 WorkflowStepExecutionUpdate {
225 status: Some(TaskStatus::Running),
226 started_at: Some(Utc::now()),
227 ..Default::default()
228 },
229 )
230 .await
231 .unwrap();
232
233 let got = store.get("run1", "s1").await.unwrap().unwrap();
234 assert_eq!(got.status, TaskStatus::Running);
235 assert!(got.started_at.is_some());
236 assert!(got.completed_at.is_none());
237 assert!(got.error.is_none());
238 }
239
240 #[tokio::test]
241 async fn list_is_per_run() {
242 let store = InMemoryWorkflowStepExecutionStore::new();
243 store.insert(sample("run1", "a")).await.unwrap();
244 store.insert(sample("run1", "b")).await.unwrap();
245 store.insert(sample("run2", "a")).await.unwrap();
246
247 let run1 = store.list("run1").await.unwrap();
248 assert_eq!(run1.len(), 2);
249 let run2 = store.list("run2").await.unwrap();
250 assert_eq!(run2.len(), 1);
251 }
252}