Skip to main content

ironflow_store/memory/
run_store.rs

1use std::collections::HashMap;
2
3use chrono::Utc;
4use rust_decimal::Decimal;
5use uuid::Uuid;
6
7use crate::entities::{
8    NewRun, NewStep, NewStepDependency, Page, Run, RunFilter, RunStats, RunStatus, RunUpdate, Step,
9    StepDependency, StepStatus, StepUpdate,
10};
11use crate::error::StoreError;
12use crate::store::{RunStore, StoreFuture};
13
14use super::InMemoryStore;
15
16fn run_matches_filter(run: &Run, filter: &RunFilter, steps: &HashMap<Uuid, Step>) -> bool {
17    if let Some(ref wf) = filter.workflow_name
18        && !run
19            .workflow_name
20            .to_lowercase()
21            .contains(&wf.to_lowercase())
22    {
23        return false;
24    }
25    if let Some(ref status) = filter.status
26        && &run.status.state != status
27    {
28        return false;
29    }
30    if let Some(after) = filter.created_after
31        && run.created_at < after
32    {
33        return false;
34    }
35    if let Some(before) = filter.created_before
36        && run.created_at > before
37    {
38        return false;
39    }
40    if let Some(has_steps) = filter.has_steps
41        && matches!(
42            run.status.state,
43            RunStatus::Completed | RunStatus::Cancelled
44        )
45    {
46        let run_has_steps = steps.values().any(|s| s.run_id == run.id);
47        if has_steps != run_has_steps {
48            return false;
49        }
50    }
51    if let Some(ref labels) = filter.labels {
52        for (key, value) in labels {
53            if run.labels.get(key) != Some(value) {
54                return false;
55            }
56        }
57    }
58    true
59}
60
61impl RunStore for InMemoryStore {
62    fn create_run(&self, req: NewRun) -> StoreFuture<'_, Run> {
63        Box::pin(async move {
64            let now = Utc::now();
65            let run = Run {
66                id: Uuid::now_v7(),
67                workflow_name: req.workflow_name,
68                status: crate::entities::FsmState::new(RunStatus::Pending, Uuid::now_v7()),
69                trigger: req.trigger,
70                payload: req.payload,
71                error: None,
72                retry_count: 0,
73                max_retries: req.max_retries,
74                cost_usd: Decimal::ZERO,
75                duration_ms: 0,
76                created_at: now,
77                updated_at: now,
78                started_at: None,
79                completed_at: None,
80                handler_version: req.handler_version,
81                labels: req.labels,
82                scheduled_at: req.scheduled_at,
83            };
84
85            let mut state = self.state.write().await;
86            state.runs.insert(run.id, run.clone());
87            Ok(run)
88        })
89    }
90
91    fn get_run(&self, id: Uuid) -> StoreFuture<'_, Option<Run>> {
92        Box::pin(async move {
93            let state = self.state.read().await;
94            Ok(state.runs.get(&id).cloned())
95        })
96    }
97
98    fn list_runs(&self, filter: RunFilter, page: u32, per_page: u32) -> StoreFuture<'_, Page<Run>> {
99        Box::pin(async move {
100            let state = self.state.read().await;
101
102            let mut runs: Vec<&Run> = state
103                .runs
104                .values()
105                .filter(|r| run_matches_filter(r, &filter, &state.steps))
106                .collect();
107
108            // Sort newest first.
109            runs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
110
111            let total = runs.len() as u64;
112            let page = page.max(1);
113            let per_page = per_page.clamp(1, 100);
114            let offset = ((page - 1) * per_page) as usize;
115            let items: Vec<Run> = runs
116                .into_iter()
117                .skip(offset)
118                .take(per_page as usize)
119                .cloned()
120                .collect();
121
122            Ok(Page {
123                items,
124                total,
125                page,
126                per_page,
127            })
128        })
129    }
130
131    fn update_run_status(&self, id: Uuid, new_status: RunStatus) -> StoreFuture<'_, ()> {
132        Box::pin(async move {
133            let mut state = self.state.write().await;
134            let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
135
136            if !run.status.state.can_transition_to(&new_status) {
137                return Err(StoreError::InvalidTransition {
138                    from: run.status.state,
139                    to: new_status,
140                });
141            }
142
143            if run.status.state == new_status && new_status.is_terminal() {
144                return Ok(());
145            }
146
147            let now = Utc::now();
148            run.status.state = new_status;
149            run.updated_at = now;
150
151            if new_status == RunStatus::Running && run.started_at.is_none() {
152                run.started_at = Some(now);
153            }
154            if new_status.is_terminal() {
155                run.completed_at = Some(now);
156            }
157
158            Ok(())
159        })
160    }
161
162    fn update_run(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, ()> {
163        Box::pin(async move {
164            let mut state = self.state.write().await;
165            let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
166
167            let now = Utc::now();
168
169            if let Some(status) = update.status {
170                if !run.status.state.can_transition_to(&status) {
171                    return Err(StoreError::InvalidTransition {
172                        from: run.status.state,
173                        to: status,
174                    });
175                }
176                if !(run.status.state == status && status.is_terminal()) {
177                    run.status.state = status;
178                    if status == RunStatus::Running && run.started_at.is_none() {
179                        run.started_at = Some(now);
180                    }
181                    if status.is_terminal() {
182                        run.completed_at = Some(now);
183                    }
184                }
185            }
186
187            if let Some(error) = update.error {
188                run.error = Some(error);
189            }
190            if update.increment_retry {
191                run.retry_count += 1;
192            }
193            if let Some(cost) = update.cost_usd {
194                run.cost_usd = cost;
195            }
196            if let Some(dur) = update.duration_ms {
197                run.duration_ms = dur;
198            }
199            if let Some(started) = update.started_at {
200                run.started_at = Some(started);
201            }
202            if let Some(completed) = update.completed_at {
203                run.completed_at = Some(completed);
204            }
205
206            run.updated_at = now;
207            Ok(())
208        })
209    }
210
211    fn pick_next_pending(&self) -> StoreFuture<'_, Option<Run>> {
212        Box::pin(async move {
213            let mut state = self.state.write().await;
214            let now = Utc::now();
215
216            // Find the oldest pending run whose scheduled_at has passed (or is None).
217            let oldest_id = state
218                .runs
219                .values()
220                .filter(|r| {
221                    r.status.state == RunStatus::Pending
222                        && r.scheduled_at.is_none_or(|at| at <= now)
223                })
224                .min_by_key(|r| r.created_at)
225                .map(|r| r.id);
226
227            let Some(id) = oldest_id else {
228                return Ok(None);
229            };
230
231            // Transition to Running atomically.
232            let run = state.runs.get_mut(&id).expect("run exists");
233            let now = Utc::now();
234            run.status.state = RunStatus::Running;
235            run.started_at = Some(now);
236            run.updated_at = now;
237
238            Ok(Some(run.clone()))
239        })
240    }
241
242    fn create_step(&self, req: NewStep) -> StoreFuture<'_, Step> {
243        Box::pin(async move {
244            let mut state = self.state.write().await;
245
246            if !state.runs.contains_key(&req.run_id) {
247                return Err(StoreError::RunNotFound(req.run_id));
248            }
249
250            let now = Utc::now();
251            let step = Step {
252                id: Uuid::now_v7(),
253                run_id: req.run_id,
254                name: req.name,
255                kind: req.kind,
256                position: req.position,
257                status: crate::entities::FsmState::new(StepStatus::Pending, Uuid::now_v7()),
258                input: req.input,
259                output: None,
260                error: None,
261                duration_ms: 0,
262                cost_usd: Decimal::ZERO,
263                input_tokens: None,
264                output_tokens: None,
265                created_at: now,
266                updated_at: now,
267                started_at: None,
268                completed_at: None,
269                debug_messages: None,
270            };
271
272            state.steps.insert(step.id, step.clone());
273            Ok(step)
274        })
275    }
276
277    fn update_step(&self, id: Uuid, update: StepUpdate) -> StoreFuture<'_, ()> {
278        Box::pin(async move {
279            let mut state = self.state.write().await;
280            let step = state
281                .steps
282                .get_mut(&id)
283                .ok_or(StoreError::StepNotFound(id))?;
284
285            let now = Utc::now();
286
287            if let Some(status) = update.status {
288                if !matches!(
289                    (step.status.state, status),
290                    (StepStatus::Pending, StepStatus::Running)
291                        | (StepStatus::Pending, StepStatus::Skipped)
292                        | (StepStatus::Running, StepStatus::Completed)
293                        | (StepStatus::Running, StepStatus::Failed)
294                        | (StepStatus::Running, StepStatus::AwaitingApproval)
295                        | (StepStatus::AwaitingApproval, StepStatus::Running)
296                        | (StepStatus::AwaitingApproval, StepStatus::Completed)
297                        | (StepStatus::AwaitingApproval, StepStatus::Failed)
298                        | (StepStatus::AwaitingApproval, StepStatus::Rejected)
299                ) {
300                    return Err(StoreError::Database(format!(
301                        "invalid step status transition: {:?} -> {:?}",
302                        step.status.state, status
303                    )));
304                }
305                step.status.state = status;
306            }
307            if let Some(output) = update.output {
308                step.output = Some(output);
309            }
310            if let Some(error) = update.error {
311                step.error = Some(error);
312            }
313            if let Some(dur) = update.duration_ms {
314                step.duration_ms = dur;
315            }
316            if let Some(cost) = update.cost_usd {
317                step.cost_usd = cost;
318            }
319            if let Some(tokens) = update.input_tokens {
320                step.input_tokens = Some(tokens);
321            }
322            if let Some(tokens) = update.output_tokens {
323                step.output_tokens = Some(tokens);
324            }
325            if let Some(started) = update.started_at {
326                step.started_at = Some(started);
327            }
328            if let Some(completed) = update.completed_at {
329                step.completed_at = Some(completed);
330            }
331            if let Some(debug_msgs) = update.debug_messages {
332                step.debug_messages = Some(debug_msgs);
333            }
334
335            step.updated_at = now;
336            Ok(())
337        })
338    }
339
340    fn get_step(&self, id: Uuid) -> StoreFuture<'_, Option<Step>> {
341        Box::pin(async move {
342            let state = self.state.read().await;
343            Ok(state.steps.get(&id).cloned())
344        })
345    }
346
347    fn list_steps(&self, run_id: Uuid) -> StoreFuture<'_, Vec<Step>> {
348        Box::pin(async move {
349            let state = self.state.read().await;
350            let mut steps: Vec<Step> = state
351                .steps
352                .values()
353                .filter(|s| s.run_id == run_id)
354                .cloned()
355                .collect();
356            steps.sort_by_key(|s| s.position);
357            Ok(steps)
358        })
359    }
360
361    fn get_stats(&self, filter: RunFilter) -> StoreFuture<'_, RunStats> {
362        Box::pin(async move {
363            let state = self.state.read().await;
364
365            let mut total_cost_usd = Decimal::ZERO;
366            let mut total_duration_ms = 0u64;
367            let mut total_runs = 0u64;
368            let mut completed_runs = 0u64;
369            let mut failed_runs = 0u64;
370            let mut cancelled_runs = 0u64;
371            let mut active_runs = 0u64;
372
373            for run in state.runs.values() {
374                if !run_matches_filter(run, &filter, &state.steps) {
375                    continue;
376                }
377
378                total_cost_usd += run.cost_usd;
379                total_duration_ms += run.duration_ms;
380                total_runs += 1;
381
382                match run.status.state {
383                    RunStatus::Completed => completed_runs += 1,
384                    RunStatus::Failed => failed_runs += 1,
385                    RunStatus::Cancelled => cancelled_runs += 1,
386                    RunStatus::Pending
387                    | RunStatus::Running
388                    | RunStatus::Retrying
389                    | RunStatus::AwaitingApproval => {
390                        active_runs += 1;
391                    }
392                }
393            }
394
395            Ok(RunStats {
396                total_runs,
397                completed_runs,
398                failed_runs,
399                cancelled_runs,
400                active_runs,
401                total_cost_usd,
402                total_duration_ms,
403            })
404        })
405    }
406
407    fn create_step_dependencies(&self, deps: Vec<NewStepDependency>) -> StoreFuture<'_, ()> {
408        Box::pin(async move {
409            let mut state = self.state.write().await;
410
411            for dep in deps {
412                if !state.steps.contains_key(&dep.step_id) {
413                    return Err(StoreError::StepNotFound(dep.step_id));
414                }
415                if !state.steps.contains_key(&dep.depends_on) {
416                    return Err(StoreError::StepNotFound(dep.depends_on));
417                }
418
419                let already_exists = state
420                    .step_dependencies
421                    .iter()
422                    .any(|d| d.step_id == dep.step_id && d.depends_on == dep.depends_on);
423
424                if !already_exists {
425                    state.step_dependencies.push(StepDependency {
426                        step_id: dep.step_id,
427                        depends_on: dep.depends_on,
428                        created_at: Utc::now(),
429                    });
430                }
431            }
432
433            Ok(())
434        })
435    }
436
437    fn list_step_dependencies(&self, run_id: Uuid) -> StoreFuture<'_, Vec<StepDependency>> {
438        Box::pin(async move {
439            let state = self.state.read().await;
440
441            let run_step_ids: std::collections::HashSet<Uuid> = state
442                .steps
443                .values()
444                .filter(|s| s.run_id == run_id)
445                .map(|s| s.id)
446                .collect();
447
448            let mut deps: Vec<StepDependency> = state
449                .step_dependencies
450                .iter()
451                .filter(|d| run_step_ids.contains(&d.step_id))
452                .cloned()
453                .collect();
454
455            deps.sort_by_key(|d| d.created_at);
456            Ok(deps)
457        })
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::collections::HashMap;
464
465    use serde_json::json;
466    use tokio::spawn;
467
468    use super::*;
469    use crate::entities::TriggerKind;
470
471    use crate::memory::tests::{create_terminal_run, new_run_req};
472    use crate::store::RunStore;
473
474    // ---- create_run ----
475
476    #[tokio::test]
477    async fn create_run_returns_pending_status() {
478        let store = InMemoryStore::new();
479        let run = store.create_run(new_run_req("test")).await.unwrap();
480        assert_eq!(run.status.state, RunStatus::Pending);
481        assert_eq!(run.workflow_name, "test");
482        assert_eq!(run.retry_count, 0);
483        assert_eq!(run.max_retries, 3);
484    }
485
486    #[tokio::test]
487    async fn create_run_generates_unique_ids() {
488        let store = InMemoryStore::new();
489        let r1 = store.create_run(new_run_req("a")).await.unwrap();
490        let r2 = store.create_run(new_run_req("b")).await.unwrap();
491        assert_ne!(r1.id, r2.id);
492    }
493
494    // ---- get_run ----
495
496    #[tokio::test]
497    async fn get_run_returns_created_run() {
498        let store = InMemoryStore::new();
499        let run = store.create_run(new_run_req("test")).await.unwrap();
500        let fetched = store.get_run(run.id).await.unwrap();
501        assert!(fetched.is_some());
502        assert_eq!(fetched.unwrap().id, run.id);
503    }
504
505    #[tokio::test]
506    async fn get_run_returns_none_for_missing() {
507        let store = InMemoryStore::new();
508        let fetched = store.get_run(Uuid::nil()).await.unwrap();
509        assert!(fetched.is_none());
510    }
511
512    // ---- update_run_status ----
513
514    #[tokio::test]
515    async fn update_run_status_valid_transition() {
516        let store = InMemoryStore::new();
517        let run = store.create_run(new_run_req("test")).await.unwrap();
518
519        store
520            .update_run_status(run.id, RunStatus::Running)
521            .await
522            .unwrap();
523
524        let fetched = store.get_run(run.id).await.unwrap().unwrap();
525        assert_eq!(fetched.status.state, RunStatus::Running);
526        assert!(fetched.started_at.is_some());
527    }
528
529    #[tokio::test]
530    async fn update_run_status_invalid_transition_returns_error() {
531        let store = InMemoryStore::new();
532        let run = store.create_run(new_run_req("test")).await.unwrap();
533
534        let result = store.update_run_status(run.id, RunStatus::Completed).await;
535        assert!(result.is_err());
536
537        let err = result.unwrap_err();
538        assert!(matches!(err, StoreError::InvalidTransition { .. }));
539    }
540
541    #[tokio::test]
542    async fn update_run_status_not_found() {
543        let store = InMemoryStore::new();
544        let result = store
545            .update_run_status(Uuid::nil(), RunStatus::Running)
546            .await;
547        assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
548    }
549
550    #[tokio::test]
551    async fn update_run_status_terminal_sets_completed_at() {
552        let store = InMemoryStore::new();
553        let run = store.create_run(new_run_req("test")).await.unwrap();
554
555        store
556            .update_run_status(run.id, RunStatus::Running)
557            .await
558            .unwrap();
559        store
560            .update_run_status(run.id, RunStatus::Completed)
561            .await
562            .unwrap();
563
564        let fetched = store.get_run(run.id).await.unwrap().unwrap();
565        assert_eq!(fetched.status.state, RunStatus::Completed);
566        assert!(fetched.completed_at.is_some());
567    }
568
569    #[tokio::test]
570    async fn update_run_status_terminal_to_same_is_idempotent() {
571        let store = InMemoryStore::new();
572        let run = store.create_run(new_run_req("test")).await.unwrap();
573
574        store
575            .update_run_status(run.id, RunStatus::Running)
576            .await
577            .unwrap();
578        store
579            .update_run_status(run.id, RunStatus::Failed)
580            .await
581            .unwrap();
582
583        let before = store.get_run(run.id).await.unwrap().unwrap();
584        let completed_at_before = before.completed_at;
585
586        store
587            .update_run_status(run.id, RunStatus::Failed)
588            .await
589            .unwrap();
590
591        let after = store.get_run(run.id).await.unwrap().unwrap();
592        assert_eq!(after.status.state, RunStatus::Failed);
593        assert_eq!(after.completed_at, completed_at_before);
594    }
595
596    #[tokio::test]
597    async fn update_run_terminal_to_same_via_update_run_is_idempotent() {
598        let store = InMemoryStore::new();
599        let run = store.create_run(new_run_req("test")).await.unwrap();
600
601        store
602            .update_run_status(run.id, RunStatus::Running)
603            .await
604            .unwrap();
605        store
606            .update_run(
607                run.id,
608                RunUpdate {
609                    status: Some(RunStatus::Failed),
610                    error: Some("first failure".to_string()),
611                    ..RunUpdate::default()
612                },
613            )
614            .await
615            .unwrap();
616
617        let before = store.get_run(run.id).await.unwrap().unwrap();
618
619        store
620            .update_run(
621                run.id,
622                RunUpdate {
623                    status: Some(RunStatus::Failed),
624                    ..RunUpdate::default()
625                },
626            )
627            .await
628            .unwrap();
629
630        let after = store.get_run(run.id).await.unwrap().unwrap();
631        assert_eq!(after.status.state, RunStatus::Failed);
632        assert_eq!(after.completed_at, before.completed_at);
633        assert_eq!(after.error, Some("first failure".to_string()));
634    }
635
636    // ---- list_runs ----
637
638    #[tokio::test]
639    async fn list_runs_empty_store() {
640        let store = InMemoryStore::new();
641        let page = store.list_runs(RunFilter::default(), 1, 20).await.unwrap();
642        assert_eq!(page.total, 0);
643        assert!(page.items.is_empty());
644    }
645
646    #[tokio::test]
647    async fn list_runs_with_workflow_filter() {
648        let store = InMemoryStore::new();
649        store.create_run(new_run_req("deploy")).await.unwrap();
650        store.create_run(new_run_req("test")).await.unwrap();
651        store.create_run(new_run_req("deploy")).await.unwrap();
652
653        let filter = RunFilter {
654            workflow_name: Some("deploy".to_string()),
655            ..RunFilter::default()
656        };
657        let page = store.list_runs(filter, 1, 20).await.unwrap();
658        assert_eq!(page.total, 2);
659        assert!(page.items.iter().all(|r| r.workflow_name == "deploy"));
660    }
661
662    #[tokio::test]
663    async fn list_runs_with_status_filter() {
664        let store = InMemoryStore::new();
665        let run = store.create_run(new_run_req("a")).await.unwrap();
666        store.create_run(new_run_req("b")).await.unwrap();
667
668        store
669            .update_run_status(run.id, RunStatus::Running)
670            .await
671            .unwrap();
672
673        let filter = RunFilter {
674            status: Some(RunStatus::Running),
675            ..RunFilter::default()
676        };
677        let page = store.list_runs(filter, 1, 20).await.unwrap();
678        assert_eq!(page.total, 1);
679        assert_eq!(page.items[0].id, run.id);
680    }
681
682    #[tokio::test]
683    async fn list_runs_pagination() {
684        let store = InMemoryStore::new();
685        for i in 0..5 {
686            store
687                .create_run(new_run_req(&format!("wf-{i}")))
688                .await
689                .unwrap();
690        }
691
692        let page1 = store.list_runs(RunFilter::default(), 1, 2).await.unwrap();
693        assert_eq!(page1.total, 5);
694        assert_eq!(page1.items.len(), 2);
695        assert_eq!(page1.page, 1);
696        assert_eq!(page1.per_page, 2);
697
698        let page2 = store.list_runs(RunFilter::default(), 2, 2).await.unwrap();
699        assert_eq!(page2.items.len(), 2);
700
701        let page3 = store.list_runs(RunFilter::default(), 3, 2).await.unwrap();
702        assert_eq!(page3.items.len(), 1);
703    }
704
705    // ---- pick_next_pending ----
706
707    #[tokio::test]
708    async fn pick_next_pending_empty_store() {
709        let store = InMemoryStore::new();
710        let result = store.pick_next_pending().await.unwrap();
711        assert!(result.is_none());
712    }
713
714    #[tokio::test]
715    async fn pick_next_pending_returns_oldest_and_transitions_to_running() {
716        let store = InMemoryStore::new();
717        let r1 = store.create_run(new_run_req("first")).await.unwrap();
718        let _r2 = store.create_run(new_run_req("second")).await.unwrap();
719
720        let picked = store.pick_next_pending().await.unwrap().unwrap();
721        assert_eq!(picked.id, r1.id);
722        assert_eq!(picked.status.state, RunStatus::Running);
723        assert!(picked.started_at.is_some());
724
725        // Verify it's Running in the store too.
726        let fetched = store.get_run(r1.id).await.unwrap().unwrap();
727        assert_eq!(fetched.status.state, RunStatus::Running);
728    }
729
730    #[tokio::test]
731    async fn pick_next_pending_skips_non_pending() {
732        let store = InMemoryStore::new();
733        let r1 = store.create_run(new_run_req("a")).await.unwrap();
734        let r2 = store.create_run(new_run_req("b")).await.unwrap();
735
736        // Transition r1 to Running.
737        store
738            .update_run_status(r1.id, RunStatus::Running)
739            .await
740            .unwrap();
741
742        let picked = store.pick_next_pending().await.unwrap().unwrap();
743        assert_eq!(picked.id, r2.id);
744    }
745
746    // ---- create_step ----
747
748    #[tokio::test]
749    async fn create_step_returns_pending() {
750        let store = InMemoryStore::new();
751        let run = store.create_run(new_run_req("test")).await.unwrap();
752
753        let step = store
754            .create_step(NewStep {
755                run_id: run.id,
756                name: "build".to_string(),
757                kind: crate::entities::StepKind::Shell,
758                position: 0,
759                input: Some(json!({"command": "cargo build"})),
760            })
761            .await
762            .unwrap();
763
764        assert_eq!(step.status.state, StepStatus::Pending);
765        assert_eq!(step.name, "build");
766        assert_eq!(step.run_id, run.id);
767        assert_eq!(step.position, 0);
768    }
769
770    #[tokio::test]
771    async fn create_step_for_missing_run_returns_error() {
772        let store = InMemoryStore::new();
773        let result = store
774            .create_step(NewStep {
775                run_id: Uuid::nil(),
776                name: "build".to_string(),
777                kind: crate::entities::StepKind::Shell,
778                position: 0,
779                input: None,
780            })
781            .await;
782        assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
783    }
784
785    // ---- update_step ----
786
787    #[tokio::test]
788    async fn update_step_applies_partial_update() {
789        let store = InMemoryStore::new();
790        let run = store.create_run(new_run_req("test")).await.unwrap();
791
792        let step = store
793            .create_step(NewStep {
794                run_id: run.id,
795                name: "build".to_string(),
796                kind: crate::entities::StepKind::Shell,
797                position: 0,
798                input: None,
799            })
800            .await
801            .unwrap();
802
803        // Transition Pending → Running first
804        store
805            .update_step(
806                step.id,
807                StepUpdate {
808                    status: Some(StepStatus::Running),
809                    ..StepUpdate::default()
810                },
811            )
812            .await
813            .unwrap();
814
815        // Then Running → Completed
816        store
817            .update_step(
818                step.id,
819                StepUpdate {
820                    status: Some(StepStatus::Completed),
821                    output: Some(json!({"stdout": "ok"})),
822                    duration_ms: Some(150),
823                    ..StepUpdate::default()
824                },
825            )
826            .await
827            .unwrap();
828
829        let steps = store.list_steps(run.id).await.unwrap();
830        assert_eq!(steps.len(), 1);
831        assert_eq!(steps[0].status.state, StepStatus::Completed);
832        assert_eq!(steps[0].duration_ms, 150);
833        assert!(steps[0].output.is_some());
834    }
835
836    #[tokio::test]
837    async fn update_step_not_found() {
838        let store = InMemoryStore::new();
839        let result = store.update_step(Uuid::nil(), StepUpdate::default()).await;
840        assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
841    }
842
843    // ---- list_steps ----
844
845    #[tokio::test]
846    async fn list_steps_ordered_by_position() {
847        let store = InMemoryStore::new();
848        let run = store.create_run(new_run_req("test")).await.unwrap();
849
850        // Insert out of order.
851        store
852            .create_step(NewStep {
853                run_id: run.id,
854                name: "deploy".to_string(),
855                kind: crate::entities::StepKind::Shell,
856                position: 2,
857                input: None,
858            })
859            .await
860            .unwrap();
861        store
862            .create_step(NewStep {
863                run_id: run.id,
864                name: "build".to_string(),
865                kind: crate::entities::StepKind::Shell,
866                position: 0,
867                input: None,
868            })
869            .await
870            .unwrap();
871        store
872            .create_step(NewStep {
873                run_id: run.id,
874                name: "test".to_string(),
875                kind: crate::entities::StepKind::Shell,
876                position: 1,
877                input: None,
878            })
879            .await
880            .unwrap();
881
882        let steps = store.list_steps(run.id).await.unwrap();
883        assert_eq!(steps.len(), 3);
884        assert_eq!(steps[0].name, "build");
885        assert_eq!(steps[1].name, "test");
886        assert_eq!(steps[2].name, "deploy");
887    }
888
889    #[tokio::test]
890    async fn list_steps_empty_for_run_without_steps() {
891        let store = InMemoryStore::new();
892        let run = store.create_run(new_run_req("test")).await.unwrap();
893        let steps = store.list_steps(run.id).await.unwrap();
894        assert!(steps.is_empty());
895    }
896
897    // ---- update_run ----
898
899    #[tokio::test]
900    async fn update_run_applies_cost_and_duration() {
901        let store = InMemoryStore::new();
902        let run = store.create_run(new_run_req("test")).await.unwrap();
903
904        store
905            .update_run(
906                run.id,
907                RunUpdate {
908                    cost_usd: Some(Decimal::new(123, 2)),
909                    duration_ms: Some(5000),
910                    ..RunUpdate::default()
911                },
912            )
913            .await
914            .unwrap();
915
916        let fetched = store.get_run(run.id).await.unwrap().unwrap();
917        assert_eq!(fetched.cost_usd, Decimal::new(123, 2));
918        assert_eq!(fetched.duration_ms, 5000);
919    }
920
921    #[tokio::test]
922    async fn update_run_increment_retry() {
923        let store = InMemoryStore::new();
924        let run = store.create_run(new_run_req("test")).await.unwrap();
925        assert_eq!(run.retry_count, 0);
926
927        store
928            .update_run(
929                run.id,
930                RunUpdate {
931                    increment_retry: true,
932                    ..RunUpdate::default()
933                },
934            )
935            .await
936            .unwrap();
937
938        let fetched = store.get_run(run.id).await.unwrap().unwrap();
939        assert_eq!(fetched.retry_count, 1);
940    }
941
942    #[tokio::test]
943    async fn update_run_not_found() {
944        let store = InMemoryStore::new();
945        let result = store.update_run(Uuid::nil(), RunUpdate::default()).await;
946        assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
947    }
948
949    // ---- concurrent access ----
950
951    #[tokio::test]
952    async fn concurrent_pick_next_pending_no_double_pick() {
953        let store = InMemoryStore::new();
954
955        // Create 10 pending runs.
956        for i in 0..10 {
957            store
958                .create_run(new_run_req(&format!("wf-{i}")))
959                .await
960                .unwrap();
961        }
962
963        // Concurrently pick from multiple tasks.
964        let mut handles = Vec::new();
965        for _ in 0..10 {
966            let s = store.clone();
967            handles.push(spawn(async move { s.pick_next_pending().await }));
968        }
969
970        let mut picked_ids = Vec::new();
971        for h in handles {
972            if let Ok(Ok(Some(run))) = h.await {
973                picked_ids.push(run.id);
974            }
975        }
976
977        // Each run should be picked at most once.
978        let unique: std::collections::HashSet<_> = picked_ids.iter().collect();
979        assert_eq!(unique.len(), picked_ids.len());
980    }
981
982    // ---- get_stats ----
983
984    #[tokio::test]
985    async fn get_stats_empty_store() {
986        let store = InMemoryStore::new();
987        let stats = store.get_stats(RunFilter::default()).await.unwrap();
988        assert_eq!(stats.total_runs, 0);
989        assert_eq!(stats.completed_runs, 0);
990        assert_eq!(stats.failed_runs, 0);
991        assert_eq!(stats.cancelled_runs, 0);
992        assert_eq!(stats.active_runs, 0);
993        assert_eq!(stats.total_cost_usd, Decimal::ZERO);
994        assert_eq!(stats.total_duration_ms, 0);
995    }
996
997    #[tokio::test]
998    async fn get_stats_aggregates_counts_and_totals() {
999        let store = InMemoryStore::new();
1000
1001        // Create runs in various states.
1002        let r1 = store.create_run(new_run_req("wf1")).await.unwrap();
1003        let r2 = store.create_run(new_run_req("wf2")).await.unwrap();
1004        let r3 = store.create_run(new_run_req("wf3")).await.unwrap();
1005        let _r4 = store.create_run(new_run_req("wf4")).await.unwrap();
1006
1007        // Transition r1 to Running, then Completed.
1008        store
1009            .update_run_status(r1.id, RunStatus::Running)
1010            .await
1011            .unwrap();
1012        store
1013            .update_run_status(r1.id, RunStatus::Completed)
1014            .await
1015            .unwrap();
1016
1017        // Transition r2 to Running, then Failed.
1018        store
1019            .update_run_status(r2.id, RunStatus::Running)
1020            .await
1021            .unwrap();
1022        store
1023            .update_run_status(r2.id, RunStatus::Failed)
1024            .await
1025            .unwrap();
1026
1027        // Transition r3 to Cancelled.
1028        store
1029            .update_run_status(r3.id, RunStatus::Cancelled)
1030            .await
1031            .unwrap();
1032
1033        // r4 remains Pending (active).
1034
1035        // Update cost and duration.
1036        store
1037            .update_run(
1038                r1.id,
1039                RunUpdate {
1040                    cost_usd: Some(Decimal::new(1000, 2)),
1041                    duration_ms: Some(1000),
1042                    ..RunUpdate::default()
1043                },
1044            )
1045            .await
1046            .unwrap();
1047
1048        store
1049            .update_run(
1050                r2.id,
1051                RunUpdate {
1052                    cost_usd: Some(Decimal::new(500, 2)),
1053                    duration_ms: Some(500),
1054                    ..RunUpdate::default()
1055                },
1056            )
1057            .await
1058            .unwrap();
1059
1060        let stats = store.get_stats(RunFilter::default()).await.unwrap();
1061        assert_eq!(stats.total_runs, 4);
1062        assert_eq!(stats.completed_runs, 1);
1063        assert_eq!(stats.failed_runs, 1);
1064        assert_eq!(stats.cancelled_runs, 1);
1065        assert_eq!(stats.active_runs, 1); // r4 is Pending
1066        assert_eq!(stats.total_cost_usd, Decimal::new(1500, 2));
1067        assert_eq!(stats.total_duration_ms, 1500);
1068    }
1069
1070    #[tokio::test]
1071    async fn update_run_status_running_to_retrying() {
1072        let store = InMemoryStore::new();
1073        let run = store.create_run(new_run_req("test")).await.unwrap();
1074
1075        store
1076            .update_run_status(run.id, RunStatus::Running)
1077            .await
1078            .unwrap();
1079
1080        store
1081            .update_run_status(run.id, RunStatus::Retrying)
1082            .await
1083            .unwrap();
1084
1085        let fetched = store.get_run(run.id).await.unwrap().unwrap();
1086        assert_eq!(fetched.status.state, RunStatus::Retrying);
1087        assert!(!fetched.status.state.is_terminal());
1088        assert!(fetched.completed_at.is_none()); // Not a terminal state
1089    }
1090
1091    #[tokio::test]
1092    async fn update_run_status_retrying_to_running_allowed() {
1093        let store = InMemoryStore::new();
1094        let run = store.create_run(new_run_req("test")).await.unwrap();
1095
1096        store
1097            .update_run_status(run.id, RunStatus::Running)
1098            .await
1099            .unwrap();
1100        store
1101            .update_run_status(run.id, RunStatus::Retrying)
1102            .await
1103            .unwrap();
1104
1105        // Retrying → Running should be allowed
1106        store
1107            .update_run_status(run.id, RunStatus::Running)
1108            .await
1109            .unwrap();
1110
1111        let fetched = store.get_run(run.id).await.unwrap().unwrap();
1112        assert_eq!(fetched.status.state, RunStatus::Running);
1113    }
1114
1115    #[tokio::test]
1116    async fn update_run_with_invalid_status_transition_errors() {
1117        let store = InMemoryStore::new();
1118        let run = store.create_run(new_run_req("test")).await.unwrap();
1119
1120        // Try to apply invalid status transition via update_run
1121        let result = store
1122            .update_run(
1123                run.id,
1124                RunUpdate {
1125                    status: Some(RunStatus::Completed), // invalid from Pending
1126                    ..RunUpdate::default()
1127                },
1128            )
1129            .await;
1130
1131        assert!(result.is_err());
1132    }
1133
1134    #[tokio::test]
1135    async fn create_step_with_complex_input() {
1136        let store = InMemoryStore::new();
1137        let run = store.create_run(new_run_req("test")).await.unwrap();
1138
1139        let complex_input = json!({
1140            "command": "cargo build",
1141            "env": {
1142                "RUST_LOG": "debug",
1143                "CUSTOM": "value"
1144            },
1145            "timeout": 60,
1146            "retry_policy": {
1147                "max_attempts": 3,
1148                "backoff": "exponential"
1149            }
1150        });
1151
1152        let step = store
1153            .create_step(NewStep {
1154                run_id: run.id,
1155                name: "build".to_string(),
1156                kind: crate::entities::StepKind::Agent,
1157                position: 0,
1158                input: Some(complex_input.clone()),
1159            })
1160            .await
1161            .unwrap();
1162
1163        assert_eq!(step.input, Some(complex_input));
1164    }
1165
1166    #[tokio::test]
1167    async fn update_step_with_error_message() {
1168        let store = InMemoryStore::new();
1169        let run = store.create_run(new_run_req("test")).await.unwrap();
1170
1171        let step = store
1172            .create_step(NewStep {
1173                run_id: run.id,
1174                name: "build".to_string(),
1175                kind: crate::entities::StepKind::Shell,
1176                position: 0,
1177                input: None,
1178            })
1179            .await
1180            .unwrap();
1181
1182        store
1183            .update_step(
1184                step.id,
1185                StepUpdate {
1186                    status: Some(StepStatus::Running),
1187                    ..StepUpdate::default()
1188                },
1189            )
1190            .await
1191            .unwrap();
1192
1193        store
1194            .update_step(
1195                step.id,
1196                StepUpdate {
1197                    status: Some(StepStatus::Failed),
1198                    error: Some("Connection timeout after 30s".to_string()),
1199                    duration_ms: Some(30000),
1200                    ..StepUpdate::default()
1201                },
1202            )
1203            .await
1204            .unwrap();
1205
1206        let steps = store.list_steps(run.id).await.unwrap();
1207        assert_eq!(steps[0].status.state, StepStatus::Failed);
1208        assert_eq!(
1209            steps[0].error,
1210            Some("Connection timeout after 30s".to_string())
1211        );
1212        assert_eq!(steps[0].duration_ms, 30000);
1213    }
1214
1215    #[tokio::test]
1216    async fn list_steps_for_nonexistent_run_returns_empty() {
1217        let store = InMemoryStore::new();
1218        let steps = store.list_steps(Uuid::nil()).await.unwrap();
1219        assert!(steps.is_empty());
1220    }
1221
1222    #[tokio::test]
1223    async fn update_step_pending_to_skipped() {
1224        let store = InMemoryStore::new();
1225        let run = store.create_run(new_run_req("test")).await.unwrap();
1226
1227        let step = store
1228            .create_step(NewStep {
1229                run_id: run.id,
1230                name: "build".to_string(),
1231                kind: crate::entities::StepKind::Shell,
1232                position: 0,
1233                input: None,
1234            })
1235            .await
1236            .unwrap();
1237
1238        // Pending → Skipped is allowed (when prior step failed)
1239        store
1240            .update_step(
1241                step.id,
1242                StepUpdate {
1243                    status: Some(StepStatus::Skipped),
1244                    ..StepUpdate::default()
1245                },
1246            )
1247            .await
1248            .unwrap();
1249
1250        let steps = store.list_steps(run.id).await.unwrap();
1251        assert_eq!(steps[0].status.state, StepStatus::Skipped);
1252    }
1253
1254    #[tokio::test]
1255    async fn list_runs_with_combined_filters() {
1256        let store = InMemoryStore::new();
1257
1258        let r1 = store.create_run(new_run_req("deploy")).await.unwrap();
1259        let r2 = store.create_run(new_run_req("deploy")).await.unwrap();
1260        let _r3 = store.create_run(new_run_req("test")).await.unwrap();
1261
1262        // r1: Pending → Running → Completed
1263        store
1264            .update_run_status(r1.id, RunStatus::Running)
1265            .await
1266            .unwrap();
1267        store
1268            .update_run_status(r1.id, RunStatus::Completed)
1269            .await
1270            .unwrap();
1271
1272        // r2: Pending → Running
1273        store
1274            .update_run_status(r2.id, RunStatus::Running)
1275            .await
1276            .unwrap();
1277
1278        // Filter by workflow AND status
1279        let filter = RunFilter {
1280            workflow_name: Some("deploy".to_string()),
1281            status: Some(RunStatus::Running),
1282            ..RunFilter::default()
1283        };
1284
1285        let page = store.list_runs(filter, 1, 100).await.unwrap();
1286        assert_eq!(page.total, 1);
1287        assert_eq!(page.items[0].id, r2.id);
1288    }
1289
1290    #[tokio::test]
1291    async fn list_runs_workflow_filter_is_case_insensitive_partial_match() {
1292        let store = InMemoryStore::new();
1293        store
1294            .create_run(new_run_req("weather-report"))
1295            .await
1296            .unwrap();
1297        store.create_run(new_run_req("deploy-prod")).await.unwrap();
1298
1299        // Partial match
1300        let filter = RunFilter {
1301            workflow_name: Some("weather".to_string()),
1302            ..RunFilter::default()
1303        };
1304        let page = store.list_runs(filter, 1, 100).await.unwrap();
1305        assert_eq!(page.total, 1);
1306        assert_eq!(page.items[0].workflow_name, "weather-report");
1307
1308        // Case-insensitive
1309        let filter = RunFilter {
1310            workflow_name: Some("Weather-REPORT".to_string()),
1311            ..RunFilter::default()
1312        };
1313        let page = store.list_runs(filter, 1, 100).await.unwrap();
1314        assert_eq!(page.total, 1);
1315        assert_eq!(page.items[0].workflow_name, "weather-report");
1316
1317        // Suffix match
1318        let filter = RunFilter {
1319            workflow_name: Some("report".to_string()),
1320            ..RunFilter::default()
1321        };
1322        let page = store.list_runs(filter, 1, 100).await.unwrap();
1323        assert_eq!(page.total, 1);
1324        assert_eq!(page.items[0].workflow_name, "weather-report");
1325
1326        // No match
1327        let filter = RunFilter {
1328            workflow_name: Some("build".to_string()),
1329            ..RunFilter::default()
1330        };
1331        let page = store.list_runs(filter, 1, 100).await.unwrap();
1332        assert_eq!(page.total, 0);
1333    }
1334
1335    #[tokio::test]
1336    async fn list_runs_has_steps_true_only_filters_completed_and_cancelled() {
1337        let store = InMemoryStore::new();
1338        let run_with = create_terminal_run(&store, "with-steps", RunStatus::Completed).await;
1339        let _run_without = create_terminal_run(&store, "without-steps", RunStatus::Completed).await;
1340
1341        store
1342            .create_step(NewStep {
1343                run_id: run_with.id,
1344                name: "build".to_string(),
1345                kind: crate::entities::StepKind::Shell,
1346                position: 0,
1347                input: None,
1348            })
1349            .await
1350            .unwrap();
1351
1352        let filter = RunFilter {
1353            has_steps: Some(true),
1354            ..RunFilter::default()
1355        };
1356        let page = store.list_runs(filter, 1, 100).await.unwrap();
1357        assert_eq!(page.total, 1);
1358        assert_eq!(page.items[0].id, run_with.id);
1359    }
1360
1361    #[tokio::test]
1362    async fn list_runs_has_steps_false_only_filters_completed_and_cancelled() {
1363        let store = InMemoryStore::new();
1364        let run_with = create_terminal_run(&store, "with-steps", RunStatus::Cancelled).await;
1365        let run_without = create_terminal_run(&store, "without-steps", RunStatus::Cancelled).await;
1366
1367        store
1368            .create_step(NewStep {
1369                run_id: run_with.id,
1370                name: "build".to_string(),
1371                kind: crate::entities::StepKind::Shell,
1372                position: 0,
1373                input: None,
1374            })
1375            .await
1376            .unwrap();
1377
1378        let filter = RunFilter {
1379            has_steps: Some(false),
1380            ..RunFilter::default()
1381        };
1382        let page = store.list_runs(filter, 1, 100).await.unwrap();
1383        assert_eq!(page.total, 1);
1384        assert_eq!(page.items[0].id, run_without.id);
1385    }
1386
1387    #[tokio::test]
1388    async fn list_runs_has_steps_none_returns_all() {
1389        let store = InMemoryStore::new();
1390        let run_with = store.create_run(new_run_req("with-steps")).await.unwrap();
1391        let _run_without = store
1392            .create_run(new_run_req("without-steps"))
1393            .await
1394            .unwrap();
1395
1396        store
1397            .create_step(NewStep {
1398                run_id: run_with.id,
1399                name: "build".to_string(),
1400                kind: crate::entities::StepKind::Shell,
1401                position: 0,
1402                input: None,
1403            })
1404            .await
1405            .unwrap();
1406
1407        let filter = RunFilter {
1408            has_steps: None,
1409            ..RunFilter::default()
1410        };
1411        let page = store.list_runs(filter, 1, 100).await.unwrap();
1412        assert_eq!(page.total, 2);
1413    }
1414
1415    #[tokio::test]
1416    async fn list_runs_has_steps_true_does_not_filter_non_terminal_runs() {
1417        let store = InMemoryStore::new();
1418        let pending_run = store
1419            .create_run(new_run_req("pending-empty"))
1420            .await
1421            .unwrap();
1422        let running_run = store
1423            .create_run(new_run_req("running-empty"))
1424            .await
1425            .unwrap();
1426        store
1427            .update_run_status(running_run.id, RunStatus::Running)
1428            .await
1429            .unwrap();
1430
1431        let filter = RunFilter {
1432            has_steps: Some(true),
1433            ..RunFilter::default()
1434        };
1435        let page = store.list_runs(filter, 1, 100).await.unwrap();
1436        assert_eq!(page.total, 2);
1437        let ids: Vec<_> = page.items.iter().map(|r| r.id).collect();
1438        assert!(ids.contains(&pending_run.id));
1439        assert!(ids.contains(&running_run.id));
1440    }
1441
1442    #[tokio::test]
1443    async fn get_stats_with_mixed_active_statuses() {
1444        let store = InMemoryStore::new();
1445
1446        let _r1 = store.create_run(new_run_req("wf")).await.unwrap(); // Pending
1447        let r2 = store.create_run(new_run_req("wf")).await.unwrap();
1448        let r3 = store.create_run(new_run_req("wf")).await.unwrap();
1449
1450        store
1451            .update_run_status(r2.id, RunStatus::Running)
1452            .await
1453            .unwrap();
1454        store
1455            .update_run_status(r3.id, RunStatus::Running)
1456            .await
1457            .unwrap();
1458        store
1459            .update_run_status(r3.id, RunStatus::Retrying)
1460            .await
1461            .unwrap();
1462
1463        let stats = store.get_stats(RunFilter::default()).await.unwrap();
1464        assert_eq!(stats.active_runs, 3); // _r1 (Pending), r2 (Running), r3 (Retrying)
1465    }
1466
1467    #[tokio::test]
1468    async fn run_with_different_trigger_kinds() {
1469        let store = InMemoryStore::new();
1470
1471        let r1 = store
1472            .create_run(NewRun {
1473                workflow_name: "test".to_string(),
1474                trigger: TriggerKind::Manual,
1475                payload: json!({}),
1476                max_retries: 1,
1477                handler_version: None,
1478                labels: HashMap::new(),
1479                scheduled_at: None,
1480            })
1481            .await
1482            .unwrap();
1483
1484        let r2 = store
1485            .create_run(NewRun {
1486                workflow_name: "test".to_string(),
1487                trigger: TriggerKind::Webhook {
1488                    path: "/hooks/github".to_string(),
1489                },
1490                payload: json!({}),
1491                max_retries: 1,
1492                handler_version: None,
1493                labels: HashMap::new(),
1494                scheduled_at: None,
1495            })
1496            .await
1497            .unwrap();
1498
1499        let r3 = store
1500            .create_run(NewRun {
1501                workflow_name: "test".to_string(),
1502                trigger: TriggerKind::Cron {
1503                    schedule: "0 0 * * *".to_string(),
1504                },
1505                payload: json!({}),
1506                max_retries: 1,
1507                handler_version: None,
1508                labels: HashMap::new(),
1509                scheduled_at: None,
1510            })
1511            .await
1512            .unwrap();
1513
1514        let r4 = store
1515            .create_run(NewRun {
1516                workflow_name: "test".to_string(),
1517                trigger: TriggerKind::Api,
1518                payload: json!({}),
1519                max_retries: 1,
1520                handler_version: None,
1521                labels: HashMap::new(),
1522                scheduled_at: None,
1523            })
1524            .await
1525            .unwrap();
1526
1527        let r5 = store
1528            .create_run(NewRun {
1529                workflow_name: "test".to_string(),
1530                trigger: TriggerKind::Retry {
1531                    parent_run_id: Uuid::nil(),
1532                },
1533                payload: json!({}),
1534                max_retries: 1,
1535                handler_version: None,
1536                labels: HashMap::new(),
1537                scheduled_at: None,
1538            })
1539            .await
1540            .unwrap();
1541
1542        assert_eq!(r1.trigger, TriggerKind::Manual);
1543        assert!(matches!(r2.trigger, TriggerKind::Webhook { .. }));
1544        assert!(matches!(r3.trigger, TriggerKind::Cron { .. }));
1545        assert_eq!(r4.trigger, TriggerKind::Api);
1546        assert!(matches!(r5.trigger, TriggerKind::Retry { .. }));
1547    }
1548
1549    // ---- create_step_dependencies ----
1550
1551    #[tokio::test]
1552    async fn create_step_dependencies_stores_dependencies() {
1553        let store = InMemoryStore::new();
1554        let run = store.create_run(new_run_req("test")).await.unwrap();
1555
1556        let step1 = store
1557            .create_step(NewStep {
1558                run_id: run.id,
1559                name: "step1".to_string(),
1560                kind: crate::entities::StepKind::Shell,
1561                position: 0,
1562                input: None,
1563            })
1564            .await
1565            .unwrap();
1566
1567        let step2 = store
1568            .create_step(NewStep {
1569                run_id: run.id,
1570                name: "step2".to_string(),
1571                kind: crate::entities::StepKind::Shell,
1572                position: 1,
1573                input: None,
1574            })
1575            .await
1576            .unwrap();
1577
1578        let result = store
1579            .create_step_dependencies(vec![NewStepDependency {
1580                step_id: step2.id,
1581                depends_on: step1.id,
1582            }])
1583            .await;
1584
1585        assert!(result.is_ok());
1586
1587        let deps = store.list_step_dependencies(run.id).await.unwrap();
1588        assert_eq!(deps.len(), 1);
1589        assert_eq!(deps[0].step_id, step2.id);
1590        assert_eq!(deps[0].depends_on, step1.id);
1591    }
1592
1593    #[tokio::test]
1594    async fn create_step_dependencies_duplicate_dependencies_are_idempotent() {
1595        let store = InMemoryStore::new();
1596        let run = store.create_run(new_run_req("test")).await.unwrap();
1597
1598        let step1 = store
1599            .create_step(NewStep {
1600                run_id: run.id,
1601                name: "step1".to_string(),
1602                kind: crate::entities::StepKind::Shell,
1603                position: 0,
1604                input: None,
1605            })
1606            .await
1607            .unwrap();
1608
1609        let step2 = store
1610            .create_step(NewStep {
1611                run_id: run.id,
1612                name: "step2".to_string(),
1613                kind: crate::entities::StepKind::Shell,
1614                position: 1,
1615                input: None,
1616            })
1617            .await
1618            .unwrap();
1619
1620        let dep = NewStepDependency {
1621            step_id: step2.id,
1622            depends_on: step1.id,
1623        };
1624
1625        store
1626            .create_step_dependencies(vec![dep.clone()])
1627            .await
1628            .unwrap();
1629        store.create_step_dependencies(vec![dep]).await.unwrap();
1630
1631        let deps = store.list_step_dependencies(run.id).await.unwrap();
1632        assert_eq!(deps.len(), 1);
1633    }
1634
1635    #[tokio::test]
1636    async fn create_step_dependencies_missing_step_id_returns_error() {
1637        let store = InMemoryStore::new();
1638        let run = store.create_run(new_run_req("test")).await.unwrap();
1639
1640        let step1 = store
1641            .create_step(NewStep {
1642                run_id: run.id,
1643                name: "step1".to_string(),
1644                kind: crate::entities::StepKind::Shell,
1645                position: 0,
1646                input: None,
1647            })
1648            .await
1649            .unwrap();
1650
1651        let result = store
1652            .create_step_dependencies(vec![NewStepDependency {
1653                step_id: Uuid::nil(),
1654                depends_on: step1.id,
1655            }])
1656            .await;
1657
1658        assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
1659    }
1660
1661    #[tokio::test]
1662    async fn create_step_dependencies_missing_depends_on_returns_error() {
1663        let store = InMemoryStore::new();
1664        let run = store.create_run(new_run_req("test")).await.unwrap();
1665
1666        let step1 = store
1667            .create_step(NewStep {
1668                run_id: run.id,
1669                name: "step1".to_string(),
1670                kind: crate::entities::StepKind::Shell,
1671                position: 0,
1672                input: None,
1673            })
1674            .await
1675            .unwrap();
1676
1677        let result = store
1678            .create_step_dependencies(vec![NewStepDependency {
1679                step_id: step1.id,
1680                depends_on: Uuid::nil(),
1681            }])
1682            .await;
1683
1684        assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
1685    }
1686
1687    #[tokio::test]
1688    async fn create_step_dependencies_multiple_dependencies() {
1689        let store = InMemoryStore::new();
1690        let run = store.create_run(new_run_req("test")).await.unwrap();
1691
1692        let step1 = store
1693            .create_step(NewStep {
1694                run_id: run.id,
1695                name: "step1".to_string(),
1696                kind: crate::entities::StepKind::Shell,
1697                position: 0,
1698                input: None,
1699            })
1700            .await
1701            .unwrap();
1702
1703        let step2 = store
1704            .create_step(NewStep {
1705                run_id: run.id,
1706                name: "step2".to_string(),
1707                kind: crate::entities::StepKind::Shell,
1708                position: 1,
1709                input: None,
1710            })
1711            .await
1712            .unwrap();
1713
1714        let step3 = store
1715            .create_step(NewStep {
1716                run_id: run.id,
1717                name: "step3".to_string(),
1718                kind: crate::entities::StepKind::Shell,
1719                position: 2,
1720                input: None,
1721            })
1722            .await
1723            .unwrap();
1724
1725        let result = store
1726            .create_step_dependencies(vec![
1727                NewStepDependency {
1728                    step_id: step2.id,
1729                    depends_on: step1.id,
1730                },
1731                NewStepDependency {
1732                    step_id: step3.id,
1733                    depends_on: step2.id,
1734                },
1735            ])
1736            .await;
1737
1738        assert!(result.is_ok());
1739
1740        let deps = store.list_step_dependencies(run.id).await.unwrap();
1741        assert_eq!(deps.len(), 2);
1742    }
1743
1744    // ---- list_step_dependencies ----
1745
1746    #[tokio::test]
1747    async fn list_step_dependencies_empty_for_run_with_no_dependencies() {
1748        let store = InMemoryStore::new();
1749        let run = store.create_run(new_run_req("test")).await.unwrap();
1750
1751        store
1752            .create_step(NewStep {
1753                run_id: run.id,
1754                name: "step1".to_string(),
1755                kind: crate::entities::StepKind::Shell,
1756                position: 0,
1757                input: None,
1758            })
1759            .await
1760            .unwrap();
1761
1762        let deps = store.list_step_dependencies(run.id).await.unwrap();
1763        assert!(deps.is_empty());
1764    }
1765
1766    #[tokio::test]
1767    async fn list_step_dependencies_returns_only_deps_for_given_run() {
1768        let store = InMemoryStore::new();
1769        let run1 = store.create_run(new_run_req("test1")).await.unwrap();
1770        let run2 = store.create_run(new_run_req("test2")).await.unwrap();
1771
1772        let step1_run1 = store
1773            .create_step(NewStep {
1774                run_id: run1.id,
1775                name: "step1".to_string(),
1776                kind: crate::entities::StepKind::Shell,
1777                position: 0,
1778                input: None,
1779            })
1780            .await
1781            .unwrap();
1782
1783        let step2_run1 = store
1784            .create_step(NewStep {
1785                run_id: run1.id,
1786                name: "step2".to_string(),
1787                kind: crate::entities::StepKind::Shell,
1788                position: 1,
1789                input: None,
1790            })
1791            .await
1792            .unwrap();
1793
1794        let step1_run2 = store
1795            .create_step(NewStep {
1796                run_id: run2.id,
1797                name: "step1".to_string(),
1798                kind: crate::entities::StepKind::Shell,
1799                position: 0,
1800                input: None,
1801            })
1802            .await
1803            .unwrap();
1804
1805        let step2_run2 = store
1806            .create_step(NewStep {
1807                run_id: run2.id,
1808                name: "step2".to_string(),
1809                kind: crate::entities::StepKind::Shell,
1810                position: 1,
1811                input: None,
1812            })
1813            .await
1814            .unwrap();
1815
1816        store
1817            .create_step_dependencies(vec![
1818                NewStepDependency {
1819                    step_id: step2_run1.id,
1820                    depends_on: step1_run1.id,
1821                },
1822                NewStepDependency {
1823                    step_id: step2_run2.id,
1824                    depends_on: step1_run2.id,
1825                },
1826            ])
1827            .await
1828            .unwrap();
1829
1830        let deps_run1 = store.list_step_dependencies(run1.id).await.unwrap();
1831        let deps_run2 = store.list_step_dependencies(run2.id).await.unwrap();
1832
1833        assert_eq!(deps_run1.len(), 1);
1834        assert_eq!(deps_run1[0].step_id, step2_run1.id);
1835        assert_eq!(deps_run1[0].depends_on, step1_run1.id);
1836
1837        assert_eq!(deps_run2.len(), 1);
1838        assert_eq!(deps_run2[0].step_id, step2_run2.id);
1839        assert_eq!(deps_run2[0].depends_on, step1_run2.id);
1840    }
1841
1842    #[tokio::test]
1843    async fn list_step_dependencies_returns_empty_for_nonexistent_run() {
1844        let store = InMemoryStore::new();
1845        let deps = store.list_step_dependencies(Uuid::nil()).await.unwrap();
1846        assert!(deps.is_empty());
1847    }
1848
1849    #[tokio::test]
1850    async fn list_step_dependencies_sorted_by_created_at() {
1851        let store = InMemoryStore::new();
1852        let run = store.create_run(new_run_req("test")).await.unwrap();
1853
1854        let step1 = store
1855            .create_step(NewStep {
1856                run_id: run.id,
1857                name: "step1".to_string(),
1858                kind: crate::entities::StepKind::Shell,
1859                position: 0,
1860                input: None,
1861            })
1862            .await
1863            .unwrap();
1864
1865        let step2 = store
1866            .create_step(NewStep {
1867                run_id: run.id,
1868                name: "step2".to_string(),
1869                kind: crate::entities::StepKind::Shell,
1870                position: 1,
1871                input: None,
1872            })
1873            .await
1874            .unwrap();
1875
1876        let step3 = store
1877            .create_step(NewStep {
1878                run_id: run.id,
1879                name: "step3".to_string(),
1880                kind: crate::entities::StepKind::Shell,
1881                position: 2,
1882                input: None,
1883            })
1884            .await
1885            .unwrap();
1886
1887        store
1888            .create_step_dependencies(vec![NewStepDependency {
1889                step_id: step2.id,
1890                depends_on: step1.id,
1891            }])
1892            .await
1893            .unwrap();
1894
1895        store
1896            .create_step_dependencies(vec![NewStepDependency {
1897                step_id: step3.id,
1898                depends_on: step1.id,
1899            }])
1900            .await
1901            .unwrap();
1902
1903        let deps = store.list_step_dependencies(run.id).await.unwrap();
1904        assert_eq!(deps.len(), 2);
1905        assert!(deps[0].created_at <= deps[1].created_at);
1906    }
1907
1908    // ---- update_run_returning ----
1909
1910    #[tokio::test]
1911    async fn update_run_returning_applies_and_returns() {
1912        let store = InMemoryStore::new();
1913        let run = store.create_run(new_run_req("test")).await.unwrap();
1914
1915        // Transition Pending -> Running first
1916        store
1917            .update_run_status(run.id, RunStatus::Running)
1918            .await
1919            .unwrap();
1920
1921        let updated = store
1922            .update_run_returning(
1923                run.id,
1924                RunUpdate {
1925                    status: Some(RunStatus::Completed),
1926                    cost_usd: Some(Decimal::new(4200, 2)),
1927                    duration_ms: Some(1500),
1928                    ..RunUpdate::default()
1929                },
1930            )
1931            .await
1932            .unwrap();
1933
1934        assert_eq!(updated.id, run.id);
1935        assert_eq!(updated.status.state, RunStatus::Completed);
1936        assert_eq!(updated.cost_usd, Decimal::new(4200, 2));
1937        assert_eq!(updated.duration_ms, 1500);
1938        assert!(updated.completed_at.is_some());
1939    }
1940
1941    #[tokio::test]
1942    async fn update_run_returning_not_found() {
1943        let store = InMemoryStore::new();
1944        let result = store
1945            .update_run_returning(
1946                Uuid::nil(),
1947                RunUpdate {
1948                    status: Some(RunStatus::Running),
1949                    ..RunUpdate::default()
1950                },
1951            )
1952            .await;
1953
1954        assert!(matches!(result, Err(StoreError::RunNotFound(_))));
1955    }
1956
1957    #[tokio::test]
1958    async fn update_run_returning_invalid_transition() {
1959        let store = InMemoryStore::new();
1960        let run = store.create_run(new_run_req("test")).await.unwrap();
1961
1962        let result = store
1963            .update_run_returning(
1964                run.id,
1965                RunUpdate {
1966                    status: Some(RunStatus::Completed),
1967                    ..RunUpdate::default()
1968                },
1969            )
1970            .await;
1971
1972        assert!(matches!(result, Err(StoreError::InvalidTransition { .. })));
1973    }
1974}