1use std::collections::HashMap;
2
3use chrono::Utc;
4use rust_decimal::Decimal;
5use uuid::Uuid;
6
7use crate::entities::{
8 NewRun, NewStep, NewStepDependency, Page, Run, RunFilter, RunStats, RunStatus, RunUpdate, Step,
9 StepDependency, StepStatus, StepUpdate,
10};
11use crate::error::StoreError;
12use crate::store::{RunStore, StoreFuture};
13
14use super::InMemoryStore;
15
16fn run_matches_filter(run: &Run, filter: &RunFilter, steps: &HashMap<Uuid, Step>) -> bool {
17 if let Some(ref wf) = filter.workflow_name
18 && !run
19 .workflow_name
20 .to_lowercase()
21 .contains(&wf.to_lowercase())
22 {
23 return false;
24 }
25 if let Some(ref status) = filter.status
26 && &run.status.state != status
27 {
28 return false;
29 }
30 if let Some(after) = filter.created_after
31 && run.created_at < after
32 {
33 return false;
34 }
35 if let Some(before) = filter.created_before
36 && run.created_at > before
37 {
38 return false;
39 }
40 if let Some(has_steps) = filter.has_steps
41 && matches!(
42 run.status.state,
43 RunStatus::Completed | RunStatus::Cancelled
44 )
45 {
46 let run_has_steps = steps.values().any(|s| s.run_id == run.id);
47 if has_steps != run_has_steps {
48 return false;
49 }
50 }
51 if let Some(ref labels) = filter.labels {
52 for (key, value) in labels {
53 if run.labels.get(key) != Some(value) {
54 return false;
55 }
56 }
57 }
58 true
59}
60
61impl RunStore for InMemoryStore {
62 fn create_run(&self, req: NewRun) -> StoreFuture<'_, Run> {
63 Box::pin(async move {
64 let now = Utc::now();
65 let run = Run {
66 id: Uuid::now_v7(),
67 workflow_name: req.workflow_name,
68 status: crate::entities::FsmState::new(RunStatus::Pending, Uuid::now_v7()),
69 trigger: req.trigger,
70 payload: req.payload,
71 error: None,
72 retry_count: 0,
73 max_retries: req.max_retries,
74 cost_usd: Decimal::ZERO,
75 duration_ms: 0,
76 created_at: now,
77 updated_at: now,
78 started_at: None,
79 completed_at: None,
80 handler_version: req.handler_version,
81 labels: req.labels,
82 scheduled_at: req.scheduled_at,
83 };
84
85 let mut state = self.state.write().await;
86 state.runs.insert(run.id, run.clone());
87 Ok(run)
88 })
89 }
90
91 fn get_run(&self, id: Uuid) -> StoreFuture<'_, Option<Run>> {
92 Box::pin(async move {
93 let state = self.state.read().await;
94 Ok(state.runs.get(&id).cloned())
95 })
96 }
97
98 fn list_runs(&self, filter: RunFilter, page: u32, per_page: u32) -> StoreFuture<'_, Page<Run>> {
99 Box::pin(async move {
100 let state = self.state.read().await;
101
102 let mut runs: Vec<&Run> = state
103 .runs
104 .values()
105 .filter(|r| run_matches_filter(r, &filter, &state.steps))
106 .collect();
107
108 runs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
110
111 let total = runs.len() as u64;
112 let page = page.max(1);
113 let per_page = per_page.clamp(1, 100);
114 let offset = ((page - 1) * per_page) as usize;
115 let items: Vec<Run> = runs
116 .into_iter()
117 .skip(offset)
118 .take(per_page as usize)
119 .cloned()
120 .collect();
121
122 Ok(Page {
123 items,
124 total,
125 page,
126 per_page,
127 })
128 })
129 }
130
131 fn update_run_status(&self, id: Uuid, new_status: RunStatus) -> StoreFuture<'_, ()> {
132 Box::pin(async move {
133 let mut state = self.state.write().await;
134 let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
135
136 if !run.status.state.can_transition_to(&new_status) {
137 return Err(StoreError::InvalidTransition {
138 from: run.status.state,
139 to: new_status,
140 });
141 }
142
143 if run.status.state == new_status && new_status.is_terminal() {
144 return Ok(());
145 }
146
147 let now = Utc::now();
148 run.status.state = new_status;
149 run.updated_at = now;
150
151 if new_status == RunStatus::Running && run.started_at.is_none() {
152 run.started_at = Some(now);
153 }
154 if new_status.is_terminal() {
155 run.completed_at = Some(now);
156 }
157
158 Ok(())
159 })
160 }
161
162 fn update_run(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, ()> {
163 Box::pin(async move {
164 let mut state = self.state.write().await;
165 let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
166
167 let now = Utc::now();
168
169 if let Some(status) = update.status {
170 if !run.status.state.can_transition_to(&status) {
171 return Err(StoreError::InvalidTransition {
172 from: run.status.state,
173 to: status,
174 });
175 }
176 if !(run.status.state == status && status.is_terminal()) {
177 run.status.state = status;
178 if status == RunStatus::Running && run.started_at.is_none() {
179 run.started_at = Some(now);
180 }
181 if status.is_terminal() {
182 run.completed_at = Some(now);
183 }
184 }
185 }
186
187 if let Some(error) = update.error {
188 run.error = Some(error);
189 }
190 if update.increment_retry {
191 run.retry_count += 1;
192 }
193 if let Some(cost) = update.cost_usd {
194 run.cost_usd = cost;
195 }
196 if let Some(dur) = update.duration_ms {
197 run.duration_ms = dur;
198 }
199 if let Some(started) = update.started_at {
200 run.started_at = Some(started);
201 }
202 if let Some(completed) = update.completed_at {
203 run.completed_at = Some(completed);
204 }
205
206 run.updated_at = now;
207 Ok(())
208 })
209 }
210
211 fn pick_next_pending(&self) -> StoreFuture<'_, Option<Run>> {
212 Box::pin(async move {
213 let mut state = self.state.write().await;
214 let now = Utc::now();
215
216 let oldest_id = state
218 .runs
219 .values()
220 .filter(|r| {
221 r.status.state == RunStatus::Pending
222 && r.scheduled_at.is_none_or(|at| at <= now)
223 })
224 .min_by_key(|r| r.created_at)
225 .map(|r| r.id);
226
227 let Some(id) = oldest_id else {
228 return Ok(None);
229 };
230
231 let run = state.runs.get_mut(&id).expect("run exists");
233 let now = Utc::now();
234 run.status.state = RunStatus::Running;
235 run.started_at = Some(now);
236 run.updated_at = now;
237
238 Ok(Some(run.clone()))
239 })
240 }
241
242 fn create_step(&self, req: NewStep) -> StoreFuture<'_, Step> {
243 Box::pin(async move {
244 let mut state = self.state.write().await;
245
246 if !state.runs.contains_key(&req.run_id) {
247 return Err(StoreError::RunNotFound(req.run_id));
248 }
249
250 let now = Utc::now();
251 let step = Step {
252 id: Uuid::now_v7(),
253 run_id: req.run_id,
254 name: req.name,
255 kind: req.kind,
256 position: req.position,
257 status: crate::entities::FsmState::new(StepStatus::Pending, Uuid::now_v7()),
258 input: req.input,
259 output: None,
260 error: None,
261 duration_ms: 0,
262 cost_usd: Decimal::ZERO,
263 input_tokens: None,
264 output_tokens: None,
265 created_at: now,
266 updated_at: now,
267 started_at: None,
268 completed_at: None,
269 debug_messages: None,
270 };
271
272 state.steps.insert(step.id, step.clone());
273 Ok(step)
274 })
275 }
276
277 fn update_step(&self, id: Uuid, update: StepUpdate) -> StoreFuture<'_, ()> {
278 Box::pin(async move {
279 let mut state = self.state.write().await;
280 let step = state
281 .steps
282 .get_mut(&id)
283 .ok_or(StoreError::StepNotFound(id))?;
284
285 let now = Utc::now();
286
287 if let Some(status) = update.status {
288 if !matches!(
289 (step.status.state, status),
290 (StepStatus::Pending, StepStatus::Running)
291 | (StepStatus::Pending, StepStatus::Skipped)
292 | (StepStatus::Running, StepStatus::Completed)
293 | (StepStatus::Running, StepStatus::Failed)
294 | (StepStatus::Running, StepStatus::AwaitingApproval)
295 | (StepStatus::AwaitingApproval, StepStatus::Running)
296 | (StepStatus::AwaitingApproval, StepStatus::Completed)
297 | (StepStatus::AwaitingApproval, StepStatus::Failed)
298 | (StepStatus::AwaitingApproval, StepStatus::Rejected)
299 ) {
300 return Err(StoreError::Database(format!(
301 "invalid step status transition: {:?} -> {:?}",
302 step.status.state, status
303 )));
304 }
305 step.status.state = status;
306 }
307 if let Some(output) = update.output {
308 step.output = Some(output);
309 }
310 if let Some(error) = update.error {
311 step.error = Some(error);
312 }
313 if let Some(dur) = update.duration_ms {
314 step.duration_ms = dur;
315 }
316 if let Some(cost) = update.cost_usd {
317 step.cost_usd = cost;
318 }
319 if let Some(tokens) = update.input_tokens {
320 step.input_tokens = Some(tokens);
321 }
322 if let Some(tokens) = update.output_tokens {
323 step.output_tokens = Some(tokens);
324 }
325 if let Some(started) = update.started_at {
326 step.started_at = Some(started);
327 }
328 if let Some(completed) = update.completed_at {
329 step.completed_at = Some(completed);
330 }
331 if let Some(debug_msgs) = update.debug_messages {
332 step.debug_messages = Some(debug_msgs);
333 }
334
335 step.updated_at = now;
336 Ok(())
337 })
338 }
339
340 fn get_step(&self, id: Uuid) -> StoreFuture<'_, Option<Step>> {
341 Box::pin(async move {
342 let state = self.state.read().await;
343 Ok(state.steps.get(&id).cloned())
344 })
345 }
346
347 fn list_steps(&self, run_id: Uuid) -> StoreFuture<'_, Vec<Step>> {
348 Box::pin(async move {
349 let state = self.state.read().await;
350 let mut steps: Vec<Step> = state
351 .steps
352 .values()
353 .filter(|s| s.run_id == run_id)
354 .cloned()
355 .collect();
356 steps.sort_by_key(|s| s.position);
357 Ok(steps)
358 })
359 }
360
361 fn get_stats(&self, filter: RunFilter) -> StoreFuture<'_, RunStats> {
362 Box::pin(async move {
363 let state = self.state.read().await;
364
365 let mut total_cost_usd = Decimal::ZERO;
366 let mut total_duration_ms = 0u64;
367 let mut total_runs = 0u64;
368 let mut completed_runs = 0u64;
369 let mut failed_runs = 0u64;
370 let mut cancelled_runs = 0u64;
371 let mut active_runs = 0u64;
372
373 for run in state.runs.values() {
374 if !run_matches_filter(run, &filter, &state.steps) {
375 continue;
376 }
377
378 total_cost_usd += run.cost_usd;
379 total_duration_ms += run.duration_ms;
380 total_runs += 1;
381
382 match run.status.state {
383 RunStatus::Completed => completed_runs += 1,
384 RunStatus::Failed => failed_runs += 1,
385 RunStatus::Cancelled => cancelled_runs += 1,
386 RunStatus::Pending
387 | RunStatus::Running
388 | RunStatus::Retrying
389 | RunStatus::AwaitingApproval => {
390 active_runs += 1;
391 }
392 }
393 }
394
395 Ok(RunStats {
396 total_runs,
397 completed_runs,
398 failed_runs,
399 cancelled_runs,
400 active_runs,
401 total_cost_usd,
402 total_duration_ms,
403 })
404 })
405 }
406
407 fn create_step_dependencies(&self, deps: Vec<NewStepDependency>) -> StoreFuture<'_, ()> {
408 Box::pin(async move {
409 let mut state = self.state.write().await;
410
411 for dep in deps {
412 if !state.steps.contains_key(&dep.step_id) {
413 return Err(StoreError::StepNotFound(dep.step_id));
414 }
415 if !state.steps.contains_key(&dep.depends_on) {
416 return Err(StoreError::StepNotFound(dep.depends_on));
417 }
418
419 let already_exists = state
420 .step_dependencies
421 .iter()
422 .any(|d| d.step_id == dep.step_id && d.depends_on == dep.depends_on);
423
424 if !already_exists {
425 state.step_dependencies.push(StepDependency {
426 step_id: dep.step_id,
427 depends_on: dep.depends_on,
428 created_at: Utc::now(),
429 });
430 }
431 }
432
433 Ok(())
434 })
435 }
436
437 fn list_step_dependencies(&self, run_id: Uuid) -> StoreFuture<'_, Vec<StepDependency>> {
438 Box::pin(async move {
439 let state = self.state.read().await;
440
441 let run_step_ids: std::collections::HashSet<Uuid> = state
442 .steps
443 .values()
444 .filter(|s| s.run_id == run_id)
445 .map(|s| s.id)
446 .collect();
447
448 let mut deps: Vec<StepDependency> = state
449 .step_dependencies
450 .iter()
451 .filter(|d| run_step_ids.contains(&d.step_id))
452 .cloned()
453 .collect();
454
455 deps.sort_by_key(|d| d.created_at);
456 Ok(deps)
457 })
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::collections::HashMap;
464
465 use serde_json::json;
466 use tokio::spawn;
467
468 use super::*;
469 use crate::entities::TriggerKind;
470
471 use crate::memory::tests::{create_terminal_run, new_run_req};
472 use crate::store::RunStore;
473
474 #[tokio::test]
477 async fn create_run_returns_pending_status() {
478 let store = InMemoryStore::new();
479 let run = store.create_run(new_run_req("test")).await.unwrap();
480 assert_eq!(run.status.state, RunStatus::Pending);
481 assert_eq!(run.workflow_name, "test");
482 assert_eq!(run.retry_count, 0);
483 assert_eq!(run.max_retries, 3);
484 }
485
486 #[tokio::test]
487 async fn create_run_generates_unique_ids() {
488 let store = InMemoryStore::new();
489 let r1 = store.create_run(new_run_req("a")).await.unwrap();
490 let r2 = store.create_run(new_run_req("b")).await.unwrap();
491 assert_ne!(r1.id, r2.id);
492 }
493
494 #[tokio::test]
497 async fn get_run_returns_created_run() {
498 let store = InMemoryStore::new();
499 let run = store.create_run(new_run_req("test")).await.unwrap();
500 let fetched = store.get_run(run.id).await.unwrap();
501 assert!(fetched.is_some());
502 assert_eq!(fetched.unwrap().id, run.id);
503 }
504
505 #[tokio::test]
506 async fn get_run_returns_none_for_missing() {
507 let store = InMemoryStore::new();
508 let fetched = store.get_run(Uuid::nil()).await.unwrap();
509 assert!(fetched.is_none());
510 }
511
512 #[tokio::test]
515 async fn update_run_status_valid_transition() {
516 let store = InMemoryStore::new();
517 let run = store.create_run(new_run_req("test")).await.unwrap();
518
519 store
520 .update_run_status(run.id, RunStatus::Running)
521 .await
522 .unwrap();
523
524 let fetched = store.get_run(run.id).await.unwrap().unwrap();
525 assert_eq!(fetched.status.state, RunStatus::Running);
526 assert!(fetched.started_at.is_some());
527 }
528
529 #[tokio::test]
530 async fn update_run_status_invalid_transition_returns_error() {
531 let store = InMemoryStore::new();
532 let run = store.create_run(new_run_req("test")).await.unwrap();
533
534 let result = store.update_run_status(run.id, RunStatus::Completed).await;
535 assert!(result.is_err());
536
537 let err = result.unwrap_err();
538 assert!(matches!(err, StoreError::InvalidTransition { .. }));
539 }
540
541 #[tokio::test]
542 async fn update_run_status_not_found() {
543 let store = InMemoryStore::new();
544 let result = store
545 .update_run_status(Uuid::nil(), RunStatus::Running)
546 .await;
547 assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
548 }
549
550 #[tokio::test]
551 async fn update_run_status_terminal_sets_completed_at() {
552 let store = InMemoryStore::new();
553 let run = store.create_run(new_run_req("test")).await.unwrap();
554
555 store
556 .update_run_status(run.id, RunStatus::Running)
557 .await
558 .unwrap();
559 store
560 .update_run_status(run.id, RunStatus::Completed)
561 .await
562 .unwrap();
563
564 let fetched = store.get_run(run.id).await.unwrap().unwrap();
565 assert_eq!(fetched.status.state, RunStatus::Completed);
566 assert!(fetched.completed_at.is_some());
567 }
568
569 #[tokio::test]
570 async fn update_run_status_terminal_to_same_is_idempotent() {
571 let store = InMemoryStore::new();
572 let run = store.create_run(new_run_req("test")).await.unwrap();
573
574 store
575 .update_run_status(run.id, RunStatus::Running)
576 .await
577 .unwrap();
578 store
579 .update_run_status(run.id, RunStatus::Failed)
580 .await
581 .unwrap();
582
583 let before = store.get_run(run.id).await.unwrap().unwrap();
584 let completed_at_before = before.completed_at;
585
586 store
587 .update_run_status(run.id, RunStatus::Failed)
588 .await
589 .unwrap();
590
591 let after = store.get_run(run.id).await.unwrap().unwrap();
592 assert_eq!(after.status.state, RunStatus::Failed);
593 assert_eq!(after.completed_at, completed_at_before);
594 }
595
596 #[tokio::test]
597 async fn update_run_terminal_to_same_via_update_run_is_idempotent() {
598 let store = InMemoryStore::new();
599 let run = store.create_run(new_run_req("test")).await.unwrap();
600
601 store
602 .update_run_status(run.id, RunStatus::Running)
603 .await
604 .unwrap();
605 store
606 .update_run(
607 run.id,
608 RunUpdate {
609 status: Some(RunStatus::Failed),
610 error: Some("first failure".to_string()),
611 ..RunUpdate::default()
612 },
613 )
614 .await
615 .unwrap();
616
617 let before = store.get_run(run.id).await.unwrap().unwrap();
618
619 store
620 .update_run(
621 run.id,
622 RunUpdate {
623 status: Some(RunStatus::Failed),
624 ..RunUpdate::default()
625 },
626 )
627 .await
628 .unwrap();
629
630 let after = store.get_run(run.id).await.unwrap().unwrap();
631 assert_eq!(after.status.state, RunStatus::Failed);
632 assert_eq!(after.completed_at, before.completed_at);
633 assert_eq!(after.error, Some("first failure".to_string()));
634 }
635
636 #[tokio::test]
639 async fn list_runs_empty_store() {
640 let store = InMemoryStore::new();
641 let page = store.list_runs(RunFilter::default(), 1, 20).await.unwrap();
642 assert_eq!(page.total, 0);
643 assert!(page.items.is_empty());
644 }
645
646 #[tokio::test]
647 async fn list_runs_with_workflow_filter() {
648 let store = InMemoryStore::new();
649 store.create_run(new_run_req("deploy")).await.unwrap();
650 store.create_run(new_run_req("test")).await.unwrap();
651 store.create_run(new_run_req("deploy")).await.unwrap();
652
653 let filter = RunFilter {
654 workflow_name: Some("deploy".to_string()),
655 ..RunFilter::default()
656 };
657 let page = store.list_runs(filter, 1, 20).await.unwrap();
658 assert_eq!(page.total, 2);
659 assert!(page.items.iter().all(|r| r.workflow_name == "deploy"));
660 }
661
662 #[tokio::test]
663 async fn list_runs_with_status_filter() {
664 let store = InMemoryStore::new();
665 let run = store.create_run(new_run_req("a")).await.unwrap();
666 store.create_run(new_run_req("b")).await.unwrap();
667
668 store
669 .update_run_status(run.id, RunStatus::Running)
670 .await
671 .unwrap();
672
673 let filter = RunFilter {
674 status: Some(RunStatus::Running),
675 ..RunFilter::default()
676 };
677 let page = store.list_runs(filter, 1, 20).await.unwrap();
678 assert_eq!(page.total, 1);
679 assert_eq!(page.items[0].id, run.id);
680 }
681
682 #[tokio::test]
683 async fn list_runs_pagination() {
684 let store = InMemoryStore::new();
685 for i in 0..5 {
686 store
687 .create_run(new_run_req(&format!("wf-{i}")))
688 .await
689 .unwrap();
690 }
691
692 let page1 = store.list_runs(RunFilter::default(), 1, 2).await.unwrap();
693 assert_eq!(page1.total, 5);
694 assert_eq!(page1.items.len(), 2);
695 assert_eq!(page1.page, 1);
696 assert_eq!(page1.per_page, 2);
697
698 let page2 = store.list_runs(RunFilter::default(), 2, 2).await.unwrap();
699 assert_eq!(page2.items.len(), 2);
700
701 let page3 = store.list_runs(RunFilter::default(), 3, 2).await.unwrap();
702 assert_eq!(page3.items.len(), 1);
703 }
704
705 #[tokio::test]
708 async fn pick_next_pending_empty_store() {
709 let store = InMemoryStore::new();
710 let result = store.pick_next_pending().await.unwrap();
711 assert!(result.is_none());
712 }
713
714 #[tokio::test]
715 async fn pick_next_pending_returns_oldest_and_transitions_to_running() {
716 let store = InMemoryStore::new();
717 let r1 = store.create_run(new_run_req("first")).await.unwrap();
718 let _r2 = store.create_run(new_run_req("second")).await.unwrap();
719
720 let picked = store.pick_next_pending().await.unwrap().unwrap();
721 assert_eq!(picked.id, r1.id);
722 assert_eq!(picked.status.state, RunStatus::Running);
723 assert!(picked.started_at.is_some());
724
725 let fetched = store.get_run(r1.id).await.unwrap().unwrap();
727 assert_eq!(fetched.status.state, RunStatus::Running);
728 }
729
730 #[tokio::test]
731 async fn pick_next_pending_skips_non_pending() {
732 let store = InMemoryStore::new();
733 let r1 = store.create_run(new_run_req("a")).await.unwrap();
734 let r2 = store.create_run(new_run_req("b")).await.unwrap();
735
736 store
738 .update_run_status(r1.id, RunStatus::Running)
739 .await
740 .unwrap();
741
742 let picked = store.pick_next_pending().await.unwrap().unwrap();
743 assert_eq!(picked.id, r2.id);
744 }
745
746 #[tokio::test]
749 async fn create_step_returns_pending() {
750 let store = InMemoryStore::new();
751 let run = store.create_run(new_run_req("test")).await.unwrap();
752
753 let step = store
754 .create_step(NewStep {
755 run_id: run.id,
756 name: "build".to_string(),
757 kind: crate::entities::StepKind::Shell,
758 position: 0,
759 input: Some(json!({"command": "cargo build"})),
760 })
761 .await
762 .unwrap();
763
764 assert_eq!(step.status.state, StepStatus::Pending);
765 assert_eq!(step.name, "build");
766 assert_eq!(step.run_id, run.id);
767 assert_eq!(step.position, 0);
768 }
769
770 #[tokio::test]
771 async fn create_step_for_missing_run_returns_error() {
772 let store = InMemoryStore::new();
773 let result = store
774 .create_step(NewStep {
775 run_id: Uuid::nil(),
776 name: "build".to_string(),
777 kind: crate::entities::StepKind::Shell,
778 position: 0,
779 input: None,
780 })
781 .await;
782 assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
783 }
784
785 #[tokio::test]
788 async fn update_step_applies_partial_update() {
789 let store = InMemoryStore::new();
790 let run = store.create_run(new_run_req("test")).await.unwrap();
791
792 let step = store
793 .create_step(NewStep {
794 run_id: run.id,
795 name: "build".to_string(),
796 kind: crate::entities::StepKind::Shell,
797 position: 0,
798 input: None,
799 })
800 .await
801 .unwrap();
802
803 store
805 .update_step(
806 step.id,
807 StepUpdate {
808 status: Some(StepStatus::Running),
809 ..StepUpdate::default()
810 },
811 )
812 .await
813 .unwrap();
814
815 store
817 .update_step(
818 step.id,
819 StepUpdate {
820 status: Some(StepStatus::Completed),
821 output: Some(json!({"stdout": "ok"})),
822 duration_ms: Some(150),
823 ..StepUpdate::default()
824 },
825 )
826 .await
827 .unwrap();
828
829 let steps = store.list_steps(run.id).await.unwrap();
830 assert_eq!(steps.len(), 1);
831 assert_eq!(steps[0].status.state, StepStatus::Completed);
832 assert_eq!(steps[0].duration_ms, 150);
833 assert!(steps[0].output.is_some());
834 }
835
836 #[tokio::test]
837 async fn update_step_not_found() {
838 let store = InMemoryStore::new();
839 let result = store.update_step(Uuid::nil(), StepUpdate::default()).await;
840 assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
841 }
842
843 #[tokio::test]
846 async fn list_steps_ordered_by_position() {
847 let store = InMemoryStore::new();
848 let run = store.create_run(new_run_req("test")).await.unwrap();
849
850 store
852 .create_step(NewStep {
853 run_id: run.id,
854 name: "deploy".to_string(),
855 kind: crate::entities::StepKind::Shell,
856 position: 2,
857 input: None,
858 })
859 .await
860 .unwrap();
861 store
862 .create_step(NewStep {
863 run_id: run.id,
864 name: "build".to_string(),
865 kind: crate::entities::StepKind::Shell,
866 position: 0,
867 input: None,
868 })
869 .await
870 .unwrap();
871 store
872 .create_step(NewStep {
873 run_id: run.id,
874 name: "test".to_string(),
875 kind: crate::entities::StepKind::Shell,
876 position: 1,
877 input: None,
878 })
879 .await
880 .unwrap();
881
882 let steps = store.list_steps(run.id).await.unwrap();
883 assert_eq!(steps.len(), 3);
884 assert_eq!(steps[0].name, "build");
885 assert_eq!(steps[1].name, "test");
886 assert_eq!(steps[2].name, "deploy");
887 }
888
889 #[tokio::test]
890 async fn list_steps_empty_for_run_without_steps() {
891 let store = InMemoryStore::new();
892 let run = store.create_run(new_run_req("test")).await.unwrap();
893 let steps = store.list_steps(run.id).await.unwrap();
894 assert!(steps.is_empty());
895 }
896
897 #[tokio::test]
900 async fn update_run_applies_cost_and_duration() {
901 let store = InMemoryStore::new();
902 let run = store.create_run(new_run_req("test")).await.unwrap();
903
904 store
905 .update_run(
906 run.id,
907 RunUpdate {
908 cost_usd: Some(Decimal::new(123, 2)),
909 duration_ms: Some(5000),
910 ..RunUpdate::default()
911 },
912 )
913 .await
914 .unwrap();
915
916 let fetched = store.get_run(run.id).await.unwrap().unwrap();
917 assert_eq!(fetched.cost_usd, Decimal::new(123, 2));
918 assert_eq!(fetched.duration_ms, 5000);
919 }
920
921 #[tokio::test]
922 async fn update_run_increment_retry() {
923 let store = InMemoryStore::new();
924 let run = store.create_run(new_run_req("test")).await.unwrap();
925 assert_eq!(run.retry_count, 0);
926
927 store
928 .update_run(
929 run.id,
930 RunUpdate {
931 increment_retry: true,
932 ..RunUpdate::default()
933 },
934 )
935 .await
936 .unwrap();
937
938 let fetched = store.get_run(run.id).await.unwrap().unwrap();
939 assert_eq!(fetched.retry_count, 1);
940 }
941
942 #[tokio::test]
943 async fn update_run_not_found() {
944 let store = InMemoryStore::new();
945 let result = store.update_run(Uuid::nil(), RunUpdate::default()).await;
946 assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
947 }
948
949 #[tokio::test]
952 async fn concurrent_pick_next_pending_no_double_pick() {
953 let store = InMemoryStore::new();
954
955 for i in 0..10 {
957 store
958 .create_run(new_run_req(&format!("wf-{i}")))
959 .await
960 .unwrap();
961 }
962
963 let mut handles = Vec::new();
965 for _ in 0..10 {
966 let s = store.clone();
967 handles.push(spawn(async move { s.pick_next_pending().await }));
968 }
969
970 let mut picked_ids = Vec::new();
971 for h in handles {
972 if let Ok(Ok(Some(run))) = h.await {
973 picked_ids.push(run.id);
974 }
975 }
976
977 let unique: std::collections::HashSet<_> = picked_ids.iter().collect();
979 assert_eq!(unique.len(), picked_ids.len());
980 }
981
982 #[tokio::test]
985 async fn get_stats_empty_store() {
986 let store = InMemoryStore::new();
987 let stats = store.get_stats(RunFilter::default()).await.unwrap();
988 assert_eq!(stats.total_runs, 0);
989 assert_eq!(stats.completed_runs, 0);
990 assert_eq!(stats.failed_runs, 0);
991 assert_eq!(stats.cancelled_runs, 0);
992 assert_eq!(stats.active_runs, 0);
993 assert_eq!(stats.total_cost_usd, Decimal::ZERO);
994 assert_eq!(stats.total_duration_ms, 0);
995 }
996
997 #[tokio::test]
998 async fn get_stats_aggregates_counts_and_totals() {
999 let store = InMemoryStore::new();
1000
1001 let r1 = store.create_run(new_run_req("wf1")).await.unwrap();
1003 let r2 = store.create_run(new_run_req("wf2")).await.unwrap();
1004 let r3 = store.create_run(new_run_req("wf3")).await.unwrap();
1005 let _r4 = store.create_run(new_run_req("wf4")).await.unwrap();
1006
1007 store
1009 .update_run_status(r1.id, RunStatus::Running)
1010 .await
1011 .unwrap();
1012 store
1013 .update_run_status(r1.id, RunStatus::Completed)
1014 .await
1015 .unwrap();
1016
1017 store
1019 .update_run_status(r2.id, RunStatus::Running)
1020 .await
1021 .unwrap();
1022 store
1023 .update_run_status(r2.id, RunStatus::Failed)
1024 .await
1025 .unwrap();
1026
1027 store
1029 .update_run_status(r3.id, RunStatus::Cancelled)
1030 .await
1031 .unwrap();
1032
1033 store
1037 .update_run(
1038 r1.id,
1039 RunUpdate {
1040 cost_usd: Some(Decimal::new(1000, 2)),
1041 duration_ms: Some(1000),
1042 ..RunUpdate::default()
1043 },
1044 )
1045 .await
1046 .unwrap();
1047
1048 store
1049 .update_run(
1050 r2.id,
1051 RunUpdate {
1052 cost_usd: Some(Decimal::new(500, 2)),
1053 duration_ms: Some(500),
1054 ..RunUpdate::default()
1055 },
1056 )
1057 .await
1058 .unwrap();
1059
1060 let stats = store.get_stats(RunFilter::default()).await.unwrap();
1061 assert_eq!(stats.total_runs, 4);
1062 assert_eq!(stats.completed_runs, 1);
1063 assert_eq!(stats.failed_runs, 1);
1064 assert_eq!(stats.cancelled_runs, 1);
1065 assert_eq!(stats.active_runs, 1); assert_eq!(stats.total_cost_usd, Decimal::new(1500, 2));
1067 assert_eq!(stats.total_duration_ms, 1500);
1068 }
1069
1070 #[tokio::test]
1071 async fn update_run_status_running_to_retrying() {
1072 let store = InMemoryStore::new();
1073 let run = store.create_run(new_run_req("test")).await.unwrap();
1074
1075 store
1076 .update_run_status(run.id, RunStatus::Running)
1077 .await
1078 .unwrap();
1079
1080 store
1081 .update_run_status(run.id, RunStatus::Retrying)
1082 .await
1083 .unwrap();
1084
1085 let fetched = store.get_run(run.id).await.unwrap().unwrap();
1086 assert_eq!(fetched.status.state, RunStatus::Retrying);
1087 assert!(!fetched.status.state.is_terminal());
1088 assert!(fetched.completed_at.is_none()); }
1090
1091 #[tokio::test]
1092 async fn update_run_status_retrying_to_running_allowed() {
1093 let store = InMemoryStore::new();
1094 let run = store.create_run(new_run_req("test")).await.unwrap();
1095
1096 store
1097 .update_run_status(run.id, RunStatus::Running)
1098 .await
1099 .unwrap();
1100 store
1101 .update_run_status(run.id, RunStatus::Retrying)
1102 .await
1103 .unwrap();
1104
1105 store
1107 .update_run_status(run.id, RunStatus::Running)
1108 .await
1109 .unwrap();
1110
1111 let fetched = store.get_run(run.id).await.unwrap().unwrap();
1112 assert_eq!(fetched.status.state, RunStatus::Running);
1113 }
1114
1115 #[tokio::test]
1116 async fn update_run_with_invalid_status_transition_errors() {
1117 let store = InMemoryStore::new();
1118 let run = store.create_run(new_run_req("test")).await.unwrap();
1119
1120 let result = store
1122 .update_run(
1123 run.id,
1124 RunUpdate {
1125 status: Some(RunStatus::Completed), ..RunUpdate::default()
1127 },
1128 )
1129 .await;
1130
1131 assert!(result.is_err());
1132 }
1133
1134 #[tokio::test]
1135 async fn create_step_with_complex_input() {
1136 let store = InMemoryStore::new();
1137 let run = store.create_run(new_run_req("test")).await.unwrap();
1138
1139 let complex_input = json!({
1140 "command": "cargo build",
1141 "env": {
1142 "RUST_LOG": "debug",
1143 "CUSTOM": "value"
1144 },
1145 "timeout": 60,
1146 "retry_policy": {
1147 "max_attempts": 3,
1148 "backoff": "exponential"
1149 }
1150 });
1151
1152 let step = store
1153 .create_step(NewStep {
1154 run_id: run.id,
1155 name: "build".to_string(),
1156 kind: crate::entities::StepKind::Agent,
1157 position: 0,
1158 input: Some(complex_input.clone()),
1159 })
1160 .await
1161 .unwrap();
1162
1163 assert_eq!(step.input, Some(complex_input));
1164 }
1165
1166 #[tokio::test]
1167 async fn update_step_with_error_message() {
1168 let store = InMemoryStore::new();
1169 let run = store.create_run(new_run_req("test")).await.unwrap();
1170
1171 let step = store
1172 .create_step(NewStep {
1173 run_id: run.id,
1174 name: "build".to_string(),
1175 kind: crate::entities::StepKind::Shell,
1176 position: 0,
1177 input: None,
1178 })
1179 .await
1180 .unwrap();
1181
1182 store
1183 .update_step(
1184 step.id,
1185 StepUpdate {
1186 status: Some(StepStatus::Running),
1187 ..StepUpdate::default()
1188 },
1189 )
1190 .await
1191 .unwrap();
1192
1193 store
1194 .update_step(
1195 step.id,
1196 StepUpdate {
1197 status: Some(StepStatus::Failed),
1198 error: Some("Connection timeout after 30s".to_string()),
1199 duration_ms: Some(30000),
1200 ..StepUpdate::default()
1201 },
1202 )
1203 .await
1204 .unwrap();
1205
1206 let steps = store.list_steps(run.id).await.unwrap();
1207 assert_eq!(steps[0].status.state, StepStatus::Failed);
1208 assert_eq!(
1209 steps[0].error,
1210 Some("Connection timeout after 30s".to_string())
1211 );
1212 assert_eq!(steps[0].duration_ms, 30000);
1213 }
1214
1215 #[tokio::test]
1216 async fn list_steps_for_nonexistent_run_returns_empty() {
1217 let store = InMemoryStore::new();
1218 let steps = store.list_steps(Uuid::nil()).await.unwrap();
1219 assert!(steps.is_empty());
1220 }
1221
1222 #[tokio::test]
1223 async fn update_step_pending_to_skipped() {
1224 let store = InMemoryStore::new();
1225 let run = store.create_run(new_run_req("test")).await.unwrap();
1226
1227 let step = store
1228 .create_step(NewStep {
1229 run_id: run.id,
1230 name: "build".to_string(),
1231 kind: crate::entities::StepKind::Shell,
1232 position: 0,
1233 input: None,
1234 })
1235 .await
1236 .unwrap();
1237
1238 store
1240 .update_step(
1241 step.id,
1242 StepUpdate {
1243 status: Some(StepStatus::Skipped),
1244 ..StepUpdate::default()
1245 },
1246 )
1247 .await
1248 .unwrap();
1249
1250 let steps = store.list_steps(run.id).await.unwrap();
1251 assert_eq!(steps[0].status.state, StepStatus::Skipped);
1252 }
1253
1254 #[tokio::test]
1255 async fn list_runs_with_combined_filters() {
1256 let store = InMemoryStore::new();
1257
1258 let r1 = store.create_run(new_run_req("deploy")).await.unwrap();
1259 let r2 = store.create_run(new_run_req("deploy")).await.unwrap();
1260 let _r3 = store.create_run(new_run_req("test")).await.unwrap();
1261
1262 store
1264 .update_run_status(r1.id, RunStatus::Running)
1265 .await
1266 .unwrap();
1267 store
1268 .update_run_status(r1.id, RunStatus::Completed)
1269 .await
1270 .unwrap();
1271
1272 store
1274 .update_run_status(r2.id, RunStatus::Running)
1275 .await
1276 .unwrap();
1277
1278 let filter = RunFilter {
1280 workflow_name: Some("deploy".to_string()),
1281 status: Some(RunStatus::Running),
1282 ..RunFilter::default()
1283 };
1284
1285 let page = store.list_runs(filter, 1, 100).await.unwrap();
1286 assert_eq!(page.total, 1);
1287 assert_eq!(page.items[0].id, r2.id);
1288 }
1289
1290 #[tokio::test]
1291 async fn list_runs_workflow_filter_is_case_insensitive_partial_match() {
1292 let store = InMemoryStore::new();
1293 store
1294 .create_run(new_run_req("weather-report"))
1295 .await
1296 .unwrap();
1297 store.create_run(new_run_req("deploy-prod")).await.unwrap();
1298
1299 let filter = RunFilter {
1301 workflow_name: Some("weather".to_string()),
1302 ..RunFilter::default()
1303 };
1304 let page = store.list_runs(filter, 1, 100).await.unwrap();
1305 assert_eq!(page.total, 1);
1306 assert_eq!(page.items[0].workflow_name, "weather-report");
1307
1308 let filter = RunFilter {
1310 workflow_name: Some("Weather-REPORT".to_string()),
1311 ..RunFilter::default()
1312 };
1313 let page = store.list_runs(filter, 1, 100).await.unwrap();
1314 assert_eq!(page.total, 1);
1315 assert_eq!(page.items[0].workflow_name, "weather-report");
1316
1317 let filter = RunFilter {
1319 workflow_name: Some("report".to_string()),
1320 ..RunFilter::default()
1321 };
1322 let page = store.list_runs(filter, 1, 100).await.unwrap();
1323 assert_eq!(page.total, 1);
1324 assert_eq!(page.items[0].workflow_name, "weather-report");
1325
1326 let filter = RunFilter {
1328 workflow_name: Some("build".to_string()),
1329 ..RunFilter::default()
1330 };
1331 let page = store.list_runs(filter, 1, 100).await.unwrap();
1332 assert_eq!(page.total, 0);
1333 }
1334
1335 #[tokio::test]
1336 async fn list_runs_has_steps_true_only_filters_completed_and_cancelled() {
1337 let store = InMemoryStore::new();
1338 let run_with = create_terminal_run(&store, "with-steps", RunStatus::Completed).await;
1339 let _run_without = create_terminal_run(&store, "without-steps", RunStatus::Completed).await;
1340
1341 store
1342 .create_step(NewStep {
1343 run_id: run_with.id,
1344 name: "build".to_string(),
1345 kind: crate::entities::StepKind::Shell,
1346 position: 0,
1347 input: None,
1348 })
1349 .await
1350 .unwrap();
1351
1352 let filter = RunFilter {
1353 has_steps: Some(true),
1354 ..RunFilter::default()
1355 };
1356 let page = store.list_runs(filter, 1, 100).await.unwrap();
1357 assert_eq!(page.total, 1);
1358 assert_eq!(page.items[0].id, run_with.id);
1359 }
1360
1361 #[tokio::test]
1362 async fn list_runs_has_steps_false_only_filters_completed_and_cancelled() {
1363 let store = InMemoryStore::new();
1364 let run_with = create_terminal_run(&store, "with-steps", RunStatus::Cancelled).await;
1365 let run_without = create_terminal_run(&store, "without-steps", RunStatus::Cancelled).await;
1366
1367 store
1368 .create_step(NewStep {
1369 run_id: run_with.id,
1370 name: "build".to_string(),
1371 kind: crate::entities::StepKind::Shell,
1372 position: 0,
1373 input: None,
1374 })
1375 .await
1376 .unwrap();
1377
1378 let filter = RunFilter {
1379 has_steps: Some(false),
1380 ..RunFilter::default()
1381 };
1382 let page = store.list_runs(filter, 1, 100).await.unwrap();
1383 assert_eq!(page.total, 1);
1384 assert_eq!(page.items[0].id, run_without.id);
1385 }
1386
1387 #[tokio::test]
1388 async fn list_runs_has_steps_none_returns_all() {
1389 let store = InMemoryStore::new();
1390 let run_with = store.create_run(new_run_req("with-steps")).await.unwrap();
1391 let _run_without = store
1392 .create_run(new_run_req("without-steps"))
1393 .await
1394 .unwrap();
1395
1396 store
1397 .create_step(NewStep {
1398 run_id: run_with.id,
1399 name: "build".to_string(),
1400 kind: crate::entities::StepKind::Shell,
1401 position: 0,
1402 input: None,
1403 })
1404 .await
1405 .unwrap();
1406
1407 let filter = RunFilter {
1408 has_steps: None,
1409 ..RunFilter::default()
1410 };
1411 let page = store.list_runs(filter, 1, 100).await.unwrap();
1412 assert_eq!(page.total, 2);
1413 }
1414
1415 #[tokio::test]
1416 async fn list_runs_has_steps_true_does_not_filter_non_terminal_runs() {
1417 let store = InMemoryStore::new();
1418 let pending_run = store
1419 .create_run(new_run_req("pending-empty"))
1420 .await
1421 .unwrap();
1422 let running_run = store
1423 .create_run(new_run_req("running-empty"))
1424 .await
1425 .unwrap();
1426 store
1427 .update_run_status(running_run.id, RunStatus::Running)
1428 .await
1429 .unwrap();
1430
1431 let filter = RunFilter {
1432 has_steps: Some(true),
1433 ..RunFilter::default()
1434 };
1435 let page = store.list_runs(filter, 1, 100).await.unwrap();
1436 assert_eq!(page.total, 2);
1437 let ids: Vec<_> = page.items.iter().map(|r| r.id).collect();
1438 assert!(ids.contains(&pending_run.id));
1439 assert!(ids.contains(&running_run.id));
1440 }
1441
1442 #[tokio::test]
1443 async fn get_stats_with_mixed_active_statuses() {
1444 let store = InMemoryStore::new();
1445
1446 let _r1 = store.create_run(new_run_req("wf")).await.unwrap(); let r2 = store.create_run(new_run_req("wf")).await.unwrap();
1448 let r3 = store.create_run(new_run_req("wf")).await.unwrap();
1449
1450 store
1451 .update_run_status(r2.id, RunStatus::Running)
1452 .await
1453 .unwrap();
1454 store
1455 .update_run_status(r3.id, RunStatus::Running)
1456 .await
1457 .unwrap();
1458 store
1459 .update_run_status(r3.id, RunStatus::Retrying)
1460 .await
1461 .unwrap();
1462
1463 let stats = store.get_stats(RunFilter::default()).await.unwrap();
1464 assert_eq!(stats.active_runs, 3); }
1466
1467 #[tokio::test]
1468 async fn run_with_different_trigger_kinds() {
1469 let store = InMemoryStore::new();
1470
1471 let r1 = store
1472 .create_run(NewRun {
1473 workflow_name: "test".to_string(),
1474 trigger: TriggerKind::Manual,
1475 payload: json!({}),
1476 max_retries: 1,
1477 handler_version: None,
1478 labels: HashMap::new(),
1479 scheduled_at: None,
1480 })
1481 .await
1482 .unwrap();
1483
1484 let r2 = store
1485 .create_run(NewRun {
1486 workflow_name: "test".to_string(),
1487 trigger: TriggerKind::Webhook {
1488 path: "/hooks/github".to_string(),
1489 },
1490 payload: json!({}),
1491 max_retries: 1,
1492 handler_version: None,
1493 labels: HashMap::new(),
1494 scheduled_at: None,
1495 })
1496 .await
1497 .unwrap();
1498
1499 let r3 = store
1500 .create_run(NewRun {
1501 workflow_name: "test".to_string(),
1502 trigger: TriggerKind::Cron {
1503 schedule: "0 0 * * *".to_string(),
1504 },
1505 payload: json!({}),
1506 max_retries: 1,
1507 handler_version: None,
1508 labels: HashMap::new(),
1509 scheduled_at: None,
1510 })
1511 .await
1512 .unwrap();
1513
1514 let r4 = store
1515 .create_run(NewRun {
1516 workflow_name: "test".to_string(),
1517 trigger: TriggerKind::Api,
1518 payload: json!({}),
1519 max_retries: 1,
1520 handler_version: None,
1521 labels: HashMap::new(),
1522 scheduled_at: None,
1523 })
1524 .await
1525 .unwrap();
1526
1527 let r5 = store
1528 .create_run(NewRun {
1529 workflow_name: "test".to_string(),
1530 trigger: TriggerKind::Retry {
1531 parent_run_id: Uuid::nil(),
1532 },
1533 payload: json!({}),
1534 max_retries: 1,
1535 handler_version: None,
1536 labels: HashMap::new(),
1537 scheduled_at: None,
1538 })
1539 .await
1540 .unwrap();
1541
1542 assert_eq!(r1.trigger, TriggerKind::Manual);
1543 assert!(matches!(r2.trigger, TriggerKind::Webhook { .. }));
1544 assert!(matches!(r3.trigger, TriggerKind::Cron { .. }));
1545 assert_eq!(r4.trigger, TriggerKind::Api);
1546 assert!(matches!(r5.trigger, TriggerKind::Retry { .. }));
1547 }
1548
1549 #[tokio::test]
1552 async fn create_step_dependencies_stores_dependencies() {
1553 let store = InMemoryStore::new();
1554 let run = store.create_run(new_run_req("test")).await.unwrap();
1555
1556 let step1 = store
1557 .create_step(NewStep {
1558 run_id: run.id,
1559 name: "step1".to_string(),
1560 kind: crate::entities::StepKind::Shell,
1561 position: 0,
1562 input: None,
1563 })
1564 .await
1565 .unwrap();
1566
1567 let step2 = store
1568 .create_step(NewStep {
1569 run_id: run.id,
1570 name: "step2".to_string(),
1571 kind: crate::entities::StepKind::Shell,
1572 position: 1,
1573 input: None,
1574 })
1575 .await
1576 .unwrap();
1577
1578 let result = store
1579 .create_step_dependencies(vec![NewStepDependency {
1580 step_id: step2.id,
1581 depends_on: step1.id,
1582 }])
1583 .await;
1584
1585 assert!(result.is_ok());
1586
1587 let deps = store.list_step_dependencies(run.id).await.unwrap();
1588 assert_eq!(deps.len(), 1);
1589 assert_eq!(deps[0].step_id, step2.id);
1590 assert_eq!(deps[0].depends_on, step1.id);
1591 }
1592
1593 #[tokio::test]
1594 async fn create_step_dependencies_duplicate_dependencies_are_idempotent() {
1595 let store = InMemoryStore::new();
1596 let run = store.create_run(new_run_req("test")).await.unwrap();
1597
1598 let step1 = store
1599 .create_step(NewStep {
1600 run_id: run.id,
1601 name: "step1".to_string(),
1602 kind: crate::entities::StepKind::Shell,
1603 position: 0,
1604 input: None,
1605 })
1606 .await
1607 .unwrap();
1608
1609 let step2 = store
1610 .create_step(NewStep {
1611 run_id: run.id,
1612 name: "step2".to_string(),
1613 kind: crate::entities::StepKind::Shell,
1614 position: 1,
1615 input: None,
1616 })
1617 .await
1618 .unwrap();
1619
1620 let dep = NewStepDependency {
1621 step_id: step2.id,
1622 depends_on: step1.id,
1623 };
1624
1625 store
1626 .create_step_dependencies(vec![dep.clone()])
1627 .await
1628 .unwrap();
1629 store.create_step_dependencies(vec![dep]).await.unwrap();
1630
1631 let deps = store.list_step_dependencies(run.id).await.unwrap();
1632 assert_eq!(deps.len(), 1);
1633 }
1634
1635 #[tokio::test]
1636 async fn create_step_dependencies_missing_step_id_returns_error() {
1637 let store = InMemoryStore::new();
1638 let run = store.create_run(new_run_req("test")).await.unwrap();
1639
1640 let step1 = store
1641 .create_step(NewStep {
1642 run_id: run.id,
1643 name: "step1".to_string(),
1644 kind: crate::entities::StepKind::Shell,
1645 position: 0,
1646 input: None,
1647 })
1648 .await
1649 .unwrap();
1650
1651 let result = store
1652 .create_step_dependencies(vec![NewStepDependency {
1653 step_id: Uuid::nil(),
1654 depends_on: step1.id,
1655 }])
1656 .await;
1657
1658 assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
1659 }
1660
1661 #[tokio::test]
1662 async fn create_step_dependencies_missing_depends_on_returns_error() {
1663 let store = InMemoryStore::new();
1664 let run = store.create_run(new_run_req("test")).await.unwrap();
1665
1666 let step1 = store
1667 .create_step(NewStep {
1668 run_id: run.id,
1669 name: "step1".to_string(),
1670 kind: crate::entities::StepKind::Shell,
1671 position: 0,
1672 input: None,
1673 })
1674 .await
1675 .unwrap();
1676
1677 let result = store
1678 .create_step_dependencies(vec![NewStepDependency {
1679 step_id: step1.id,
1680 depends_on: Uuid::nil(),
1681 }])
1682 .await;
1683
1684 assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
1685 }
1686
1687 #[tokio::test]
1688 async fn create_step_dependencies_multiple_dependencies() {
1689 let store = InMemoryStore::new();
1690 let run = store.create_run(new_run_req("test")).await.unwrap();
1691
1692 let step1 = store
1693 .create_step(NewStep {
1694 run_id: run.id,
1695 name: "step1".to_string(),
1696 kind: crate::entities::StepKind::Shell,
1697 position: 0,
1698 input: None,
1699 })
1700 .await
1701 .unwrap();
1702
1703 let step2 = store
1704 .create_step(NewStep {
1705 run_id: run.id,
1706 name: "step2".to_string(),
1707 kind: crate::entities::StepKind::Shell,
1708 position: 1,
1709 input: None,
1710 })
1711 .await
1712 .unwrap();
1713
1714 let step3 = store
1715 .create_step(NewStep {
1716 run_id: run.id,
1717 name: "step3".to_string(),
1718 kind: crate::entities::StepKind::Shell,
1719 position: 2,
1720 input: None,
1721 })
1722 .await
1723 .unwrap();
1724
1725 let result = store
1726 .create_step_dependencies(vec![
1727 NewStepDependency {
1728 step_id: step2.id,
1729 depends_on: step1.id,
1730 },
1731 NewStepDependency {
1732 step_id: step3.id,
1733 depends_on: step2.id,
1734 },
1735 ])
1736 .await;
1737
1738 assert!(result.is_ok());
1739
1740 let deps = store.list_step_dependencies(run.id).await.unwrap();
1741 assert_eq!(deps.len(), 2);
1742 }
1743
1744 #[tokio::test]
1747 async fn list_step_dependencies_empty_for_run_with_no_dependencies() {
1748 let store = InMemoryStore::new();
1749 let run = store.create_run(new_run_req("test")).await.unwrap();
1750
1751 store
1752 .create_step(NewStep {
1753 run_id: run.id,
1754 name: "step1".to_string(),
1755 kind: crate::entities::StepKind::Shell,
1756 position: 0,
1757 input: None,
1758 })
1759 .await
1760 .unwrap();
1761
1762 let deps = store.list_step_dependencies(run.id).await.unwrap();
1763 assert!(deps.is_empty());
1764 }
1765
1766 #[tokio::test]
1767 async fn list_step_dependencies_returns_only_deps_for_given_run() {
1768 let store = InMemoryStore::new();
1769 let run1 = store.create_run(new_run_req("test1")).await.unwrap();
1770 let run2 = store.create_run(new_run_req("test2")).await.unwrap();
1771
1772 let step1_run1 = store
1773 .create_step(NewStep {
1774 run_id: run1.id,
1775 name: "step1".to_string(),
1776 kind: crate::entities::StepKind::Shell,
1777 position: 0,
1778 input: None,
1779 })
1780 .await
1781 .unwrap();
1782
1783 let step2_run1 = store
1784 .create_step(NewStep {
1785 run_id: run1.id,
1786 name: "step2".to_string(),
1787 kind: crate::entities::StepKind::Shell,
1788 position: 1,
1789 input: None,
1790 })
1791 .await
1792 .unwrap();
1793
1794 let step1_run2 = store
1795 .create_step(NewStep {
1796 run_id: run2.id,
1797 name: "step1".to_string(),
1798 kind: crate::entities::StepKind::Shell,
1799 position: 0,
1800 input: None,
1801 })
1802 .await
1803 .unwrap();
1804
1805 let step2_run2 = store
1806 .create_step(NewStep {
1807 run_id: run2.id,
1808 name: "step2".to_string(),
1809 kind: crate::entities::StepKind::Shell,
1810 position: 1,
1811 input: None,
1812 })
1813 .await
1814 .unwrap();
1815
1816 store
1817 .create_step_dependencies(vec![
1818 NewStepDependency {
1819 step_id: step2_run1.id,
1820 depends_on: step1_run1.id,
1821 },
1822 NewStepDependency {
1823 step_id: step2_run2.id,
1824 depends_on: step1_run2.id,
1825 },
1826 ])
1827 .await
1828 .unwrap();
1829
1830 let deps_run1 = store.list_step_dependencies(run1.id).await.unwrap();
1831 let deps_run2 = store.list_step_dependencies(run2.id).await.unwrap();
1832
1833 assert_eq!(deps_run1.len(), 1);
1834 assert_eq!(deps_run1[0].step_id, step2_run1.id);
1835 assert_eq!(deps_run1[0].depends_on, step1_run1.id);
1836
1837 assert_eq!(deps_run2.len(), 1);
1838 assert_eq!(deps_run2[0].step_id, step2_run2.id);
1839 assert_eq!(deps_run2[0].depends_on, step1_run2.id);
1840 }
1841
1842 #[tokio::test]
1843 async fn list_step_dependencies_returns_empty_for_nonexistent_run() {
1844 let store = InMemoryStore::new();
1845 let deps = store.list_step_dependencies(Uuid::nil()).await.unwrap();
1846 assert!(deps.is_empty());
1847 }
1848
1849 #[tokio::test]
1850 async fn list_step_dependencies_sorted_by_created_at() {
1851 let store = InMemoryStore::new();
1852 let run = store.create_run(new_run_req("test")).await.unwrap();
1853
1854 let step1 = store
1855 .create_step(NewStep {
1856 run_id: run.id,
1857 name: "step1".to_string(),
1858 kind: crate::entities::StepKind::Shell,
1859 position: 0,
1860 input: None,
1861 })
1862 .await
1863 .unwrap();
1864
1865 let step2 = store
1866 .create_step(NewStep {
1867 run_id: run.id,
1868 name: "step2".to_string(),
1869 kind: crate::entities::StepKind::Shell,
1870 position: 1,
1871 input: None,
1872 })
1873 .await
1874 .unwrap();
1875
1876 let step3 = store
1877 .create_step(NewStep {
1878 run_id: run.id,
1879 name: "step3".to_string(),
1880 kind: crate::entities::StepKind::Shell,
1881 position: 2,
1882 input: None,
1883 })
1884 .await
1885 .unwrap();
1886
1887 store
1888 .create_step_dependencies(vec![NewStepDependency {
1889 step_id: step2.id,
1890 depends_on: step1.id,
1891 }])
1892 .await
1893 .unwrap();
1894
1895 store
1896 .create_step_dependencies(vec![NewStepDependency {
1897 step_id: step3.id,
1898 depends_on: step1.id,
1899 }])
1900 .await
1901 .unwrap();
1902
1903 let deps = store.list_step_dependencies(run.id).await.unwrap();
1904 assert_eq!(deps.len(), 2);
1905 assert!(deps[0].created_at <= deps[1].created_at);
1906 }
1907
1908 #[tokio::test]
1911 async fn update_run_returning_applies_and_returns() {
1912 let store = InMemoryStore::new();
1913 let run = store.create_run(new_run_req("test")).await.unwrap();
1914
1915 store
1917 .update_run_status(run.id, RunStatus::Running)
1918 .await
1919 .unwrap();
1920
1921 let updated = store
1922 .update_run_returning(
1923 run.id,
1924 RunUpdate {
1925 status: Some(RunStatus::Completed),
1926 cost_usd: Some(Decimal::new(4200, 2)),
1927 duration_ms: Some(1500),
1928 ..RunUpdate::default()
1929 },
1930 )
1931 .await
1932 .unwrap();
1933
1934 assert_eq!(updated.id, run.id);
1935 assert_eq!(updated.status.state, RunStatus::Completed);
1936 assert_eq!(updated.cost_usd, Decimal::new(4200, 2));
1937 assert_eq!(updated.duration_ms, 1500);
1938 assert!(updated.completed_at.is_some());
1939 }
1940
1941 #[tokio::test]
1942 async fn update_run_returning_not_found() {
1943 let store = InMemoryStore::new();
1944 let result = store
1945 .update_run_returning(
1946 Uuid::nil(),
1947 RunUpdate {
1948 status: Some(RunStatus::Running),
1949 ..RunUpdate::default()
1950 },
1951 )
1952 .await;
1953
1954 assert!(matches!(result, Err(StoreError::RunNotFound(_))));
1955 }
1956
1957 #[tokio::test]
1958 async fn update_run_returning_invalid_transition() {
1959 let store = InMemoryStore::new();
1960 let run = store.create_run(new_run_req("test")).await.unwrap();
1961
1962 let result = store
1963 .update_run_returning(
1964 run.id,
1965 RunUpdate {
1966 status: Some(RunStatus::Completed),
1967 ..RunUpdate::default()
1968 },
1969 )
1970 .await;
1971
1972 assert!(matches!(result, Err(StoreError::InvalidTransition { .. })));
1973 }
1974}