1use std::collections::HashMap;
28use std::sync::Arc;
29
30use chrono::Utc;
31use tokio::sync::RwLock;
32use uuid::Uuid;
33
34use rust_decimal::Decimal;
35
36use crate::api_key_store::ApiKeyStore;
37use crate::entities::{
38 ApiKey, ApiKeyUpdate, FsmState, NewApiKey, NewRun, NewStep, NewStepDependency, NewUser, Page,
39 Run, RunFilter, RunStats, RunStatus, RunUpdate, Step, StepDependency, StepStatus, StepUpdate,
40 User,
41};
42use crate::error::StoreError;
43use crate::store::{RunStore, StoreFuture};
44use crate::user_store::UserStore;
45
46#[derive(Debug, Default)]
47struct State {
48 runs: HashMap<Uuid, Run>,
49 steps: HashMap<Uuid, Step>,
50 step_dependencies: Vec<StepDependency>,
51 users: HashMap<Uuid, User>,
52 api_keys: HashMap<Uuid, ApiKey>,
53}
54
55#[derive(Debug, Clone)]
68pub struct InMemoryStore {
69 state: Arc<RwLock<State>>,
70}
71
72impl InMemoryStore {
73 pub fn new() -> Self {
83 Self {
84 state: Arc::new(RwLock::new(State::default())),
85 }
86 }
87}
88
89impl Default for InMemoryStore {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl RunStore for InMemoryStore {
96 fn create_run(&self, req: NewRun) -> StoreFuture<'_, Run> {
97 Box::pin(async move {
98 let now = Utc::now();
99 let run = Run {
100 id: Uuid::now_v7(),
101 workflow_name: req.workflow_name,
102 status: FsmState::new(RunStatus::Pending, Uuid::now_v7()),
103 trigger: req.trigger,
104 payload: req.payload,
105 error: None,
106 retry_count: 0,
107 max_retries: req.max_retries,
108 cost_usd: Decimal::ZERO,
109 duration_ms: 0,
110 created_at: now,
111 updated_at: now,
112 started_at: None,
113 completed_at: None,
114 };
115
116 let mut state = self.state.write().await;
117 state.runs.insert(run.id, run.clone());
118 Ok(run)
119 })
120 }
121
122 fn get_run(&self, id: Uuid) -> StoreFuture<'_, Option<Run>> {
123 Box::pin(async move {
124 let state = self.state.read().await;
125 Ok(state.runs.get(&id).cloned())
126 })
127 }
128
129 fn list_runs(&self, filter: RunFilter, page: u32, per_page: u32) -> StoreFuture<'_, Page<Run>> {
130 Box::pin(async move {
131 let state = self.state.read().await;
132
133 let mut runs: Vec<&Run> = state
134 .runs
135 .values()
136 .filter(|r| {
137 if let Some(ref wf) = filter.workflow_name
138 && !r.workflow_name.to_lowercase().contains(&wf.to_lowercase())
139 {
140 return false;
141 }
142 if let Some(ref status) = filter.status
143 && &r.status.state != status
144 {
145 return false;
146 }
147 if let Some(after) = filter.created_after
148 && r.created_at < after
149 {
150 return false;
151 }
152 if let Some(before) = filter.created_before
153 && r.created_at > before
154 {
155 return false;
156 }
157 true
158 })
159 .collect();
160
161 runs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
163
164 let total = runs.len() as u64;
165 let page = page.max(1);
166 let per_page = per_page.clamp(1, 100);
167 let offset = ((page - 1) * per_page) as usize;
168 let items: Vec<Run> = runs
169 .into_iter()
170 .skip(offset)
171 .take(per_page as usize)
172 .cloned()
173 .collect();
174
175 Ok(Page {
176 items,
177 total,
178 page,
179 per_page,
180 })
181 })
182 }
183
184 fn update_run_status(&self, id: Uuid, new_status: RunStatus) -> StoreFuture<'_, ()> {
185 Box::pin(async move {
186 let mut state = self.state.write().await;
187 let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
188
189 if !run.status.state.can_transition_to(&new_status) {
190 return Err(StoreError::InvalidTransition {
191 from: run.status.state,
192 to: new_status,
193 });
194 }
195
196 let now = Utc::now();
197 run.status.state = new_status;
198 run.updated_at = now;
199
200 if new_status == RunStatus::Running && run.started_at.is_none() {
201 run.started_at = Some(now);
202 }
203 if new_status.is_terminal() {
204 run.completed_at = Some(now);
205 }
206
207 Ok(())
208 })
209 }
210
211 fn update_run(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, ()> {
212 Box::pin(async move {
213 let mut state = self.state.write().await;
214 let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
215
216 let now = Utc::now();
217
218 if let Some(status) = update.status {
219 if !run.status.state.can_transition_to(&status) {
220 return Err(StoreError::InvalidTransition {
221 from: run.status.state,
222 to: status,
223 });
224 }
225 run.status.state = status;
226 if status == RunStatus::Running && run.started_at.is_none() {
227 run.started_at = Some(now);
228 }
229 if status.is_terminal() {
230 run.completed_at = Some(now);
231 }
232 }
233
234 if let Some(error) = update.error {
235 run.error = Some(error);
236 }
237 if update.increment_retry {
238 run.retry_count += 1;
239 }
240 if let Some(cost) = update.cost_usd {
241 run.cost_usd = cost;
242 }
243 if let Some(dur) = update.duration_ms {
244 run.duration_ms = dur;
245 }
246 if let Some(started) = update.started_at {
247 run.started_at = Some(started);
248 }
249 if let Some(completed) = update.completed_at {
250 run.completed_at = Some(completed);
251 }
252
253 run.updated_at = now;
254 Ok(())
255 })
256 }
257
258 fn pick_next_pending(&self) -> StoreFuture<'_, Option<Run>> {
259 Box::pin(async move {
260 let mut state = self.state.write().await;
261
262 let oldest_id = state
264 .runs
265 .values()
266 .filter(|r| r.status.state == RunStatus::Pending)
267 .min_by_key(|r| r.created_at)
268 .map(|r| r.id);
269
270 let Some(id) = oldest_id else {
271 return Ok(None);
272 };
273
274 let run = state.runs.get_mut(&id).expect("run exists");
276 let now = Utc::now();
277 run.status.state = RunStatus::Running;
278 run.started_at = Some(now);
279 run.updated_at = now;
280
281 Ok(Some(run.clone()))
282 })
283 }
284
285 fn create_step(&self, req: NewStep) -> StoreFuture<'_, Step> {
286 Box::pin(async move {
287 let mut state = self.state.write().await;
288
289 if !state.runs.contains_key(&req.run_id) {
290 return Err(StoreError::RunNotFound(req.run_id));
291 }
292
293 let now = Utc::now();
294 let step = Step {
295 id: Uuid::now_v7(),
296 run_id: req.run_id,
297 name: req.name,
298 kind: req.kind,
299 position: req.position,
300 status: FsmState::new(StepStatus::Pending, Uuid::now_v7()),
301 input: req.input,
302 output: None,
303 error: None,
304 duration_ms: 0,
305 cost_usd: Decimal::ZERO,
306 input_tokens: None,
307 output_tokens: None,
308 created_at: now,
309 updated_at: now,
310 started_at: None,
311 completed_at: None,
312 debug_messages: None,
313 };
314
315 state.steps.insert(step.id, step.clone());
316 Ok(step)
317 })
318 }
319
320 fn update_step(&self, id: Uuid, update: StepUpdate) -> StoreFuture<'_, ()> {
321 Box::pin(async move {
322 let mut state = self.state.write().await;
323 let step = state
324 .steps
325 .get_mut(&id)
326 .ok_or(StoreError::StepNotFound(id))?;
327
328 let now = Utc::now();
329
330 if let Some(status) = update.status {
331 if !matches!(
332 (step.status.state, status),
333 (StepStatus::Pending, StepStatus::Running)
334 | (StepStatus::Pending, StepStatus::Skipped)
335 | (StepStatus::Running, StepStatus::Completed)
336 | (StepStatus::Running, StepStatus::Failed)
337 | (StepStatus::Running, StepStatus::AwaitingApproval)
338 | (StepStatus::AwaitingApproval, StepStatus::Running)
339 | (StepStatus::AwaitingApproval, StepStatus::Completed)
340 | (StepStatus::AwaitingApproval, StepStatus::Failed)
341 | (StepStatus::AwaitingApproval, StepStatus::Rejected)
342 ) {
343 return Err(StoreError::Database(format!(
344 "invalid step status transition: {:?} -> {:?}",
345 step.status.state, status
346 )));
347 }
348 step.status.state = status;
349 }
350 if let Some(output) = update.output {
351 step.output = Some(output);
352 }
353 if let Some(error) = update.error {
354 step.error = Some(error);
355 }
356 if let Some(dur) = update.duration_ms {
357 step.duration_ms = dur;
358 }
359 if let Some(cost) = update.cost_usd {
360 step.cost_usd = cost;
361 }
362 if let Some(tokens) = update.input_tokens {
363 step.input_tokens = Some(tokens);
364 }
365 if let Some(tokens) = update.output_tokens {
366 step.output_tokens = Some(tokens);
367 }
368 if let Some(started) = update.started_at {
369 step.started_at = Some(started);
370 }
371 if let Some(completed) = update.completed_at {
372 step.completed_at = Some(completed);
373 }
374 if let Some(debug_msgs) = update.debug_messages {
375 step.debug_messages = Some(debug_msgs);
376 }
377
378 step.updated_at = now;
379 Ok(())
380 })
381 }
382
383 fn list_steps(&self, run_id: Uuid) -> StoreFuture<'_, Vec<Step>> {
384 Box::pin(async move {
385 let state = self.state.read().await;
386 let mut steps: Vec<Step> = state
387 .steps
388 .values()
389 .filter(|s| s.run_id == run_id)
390 .cloned()
391 .collect();
392 steps.sort_by_key(|s| s.position);
393 Ok(steps)
394 })
395 }
396
397 fn get_stats(&self) -> StoreFuture<'_, RunStats> {
398 Box::pin(async move {
399 let state = self.state.read().await;
400
401 let mut total_cost_usd = Decimal::ZERO;
402 let mut total_duration_ms = 0u64;
403 let mut completed_runs = 0u64;
404 let mut failed_runs = 0u64;
405 let mut cancelled_runs = 0u64;
406 let mut active_runs = 0u64;
407
408 for run in state.runs.values() {
409 total_cost_usd += run.cost_usd;
410 total_duration_ms += run.duration_ms;
411
412 match run.status.state {
413 RunStatus::Completed => completed_runs += 1,
414 RunStatus::Failed => failed_runs += 1,
415 RunStatus::Cancelled => cancelled_runs += 1,
416 RunStatus::Pending
417 | RunStatus::Running
418 | RunStatus::Retrying
419 | RunStatus::AwaitingApproval => {
420 active_runs += 1;
421 }
422 }
423 }
424
425 let total_runs = state.runs.len() as u64;
426
427 Ok(RunStats {
428 total_runs,
429 completed_runs,
430 failed_runs,
431 cancelled_runs,
432 active_runs,
433 total_cost_usd,
434 total_duration_ms,
435 })
436 })
437 }
438
439 fn create_step_dependencies(&self, deps: Vec<NewStepDependency>) -> StoreFuture<'_, ()> {
440 Box::pin(async move {
441 let mut state = self.state.write().await;
442
443 for dep in deps {
444 if !state.steps.contains_key(&dep.step_id) {
445 return Err(StoreError::StepNotFound(dep.step_id));
446 }
447 if !state.steps.contains_key(&dep.depends_on) {
448 return Err(StoreError::StepNotFound(dep.depends_on));
449 }
450
451 let already_exists = state
452 .step_dependencies
453 .iter()
454 .any(|d| d.step_id == dep.step_id && d.depends_on == dep.depends_on);
455
456 if !already_exists {
457 state.step_dependencies.push(StepDependency {
458 step_id: dep.step_id,
459 depends_on: dep.depends_on,
460 created_at: Utc::now(),
461 });
462 }
463 }
464
465 Ok(())
466 })
467 }
468
469 fn list_step_dependencies(&self, run_id: Uuid) -> StoreFuture<'_, Vec<StepDependency>> {
470 Box::pin(async move {
471 let state = self.state.read().await;
472
473 let run_step_ids: std::collections::HashSet<Uuid> = state
474 .steps
475 .values()
476 .filter(|s| s.run_id == run_id)
477 .map(|s| s.id)
478 .collect();
479
480 let mut deps: Vec<StepDependency> = state
481 .step_dependencies
482 .iter()
483 .filter(|d| run_step_ids.contains(&d.step_id))
484 .cloned()
485 .collect();
486
487 deps.sort_by_key(|d| d.created_at);
488 Ok(deps)
489 })
490 }
491}
492
493impl UserStore for InMemoryStore {
494 fn create_user(&self, req: NewUser) -> StoreFuture<'_, User> {
495 Box::pin(async move {
496 let mut state = self.state.write().await;
497
498 let email_exists = state.users.values().any(|u| u.email == req.email);
499 if email_exists {
500 return Err(StoreError::DuplicateEmail(req.email));
501 }
502
503 let username_exists = state.users.values().any(|u| u.username == req.username);
504 if username_exists {
505 return Err(StoreError::DuplicateUsername(req.username));
506 }
507
508 let now = Utc::now();
509 let user = User {
510 id: Uuid::now_v7(),
511 email: req.email,
512 username: req.username,
513 password_hash: req.password_hash,
514 is_admin: false,
515 created_at: now,
516 updated_at: now,
517 };
518
519 state.users.insert(user.id, user.clone());
520 Ok(user)
521 })
522 }
523
524 fn find_user_by_email(&self, email: &str) -> StoreFuture<'_, Option<User>> {
525 let email = email.to_string();
526 Box::pin(async move {
527 let state = self.state.read().await;
528 Ok(state.users.values().find(|u| u.email == email).cloned())
529 })
530 }
531
532 fn find_user_by_id(&self, id: Uuid) -> StoreFuture<'_, Option<User>> {
533 Box::pin(async move {
534 let state = self.state.read().await;
535 Ok(state.users.get(&id).cloned())
536 })
537 }
538}
539
540impl ApiKeyStore for InMemoryStore {
541 fn create_api_key(&self, req: NewApiKey) -> StoreFuture<'_, ApiKey> {
542 Box::pin(async move {
543 let mut state = self.state.write().await;
544 let now = Utc::now();
545 let id = Uuid::now_v7();
546 let key = ApiKey {
547 id,
548 user_id: req.user_id,
549 name: req.name,
550 key_hash: req.key_hash,
551 key_prefix: req.key_prefix,
552 scopes: req.scopes,
553 is_active: true,
554 expires_at: req.expires_at,
555 last_used_at: None,
556 created_at: now,
557 updated_at: now,
558 };
559 state.api_keys.insert(id, key.clone());
560 Ok(key)
561 })
562 }
563
564 fn find_api_key_by_prefix(&self, prefix: &str) -> StoreFuture<'_, Option<ApiKey>> {
565 let prefix = prefix.to_string();
566 Box::pin(async move {
567 let state = self.state.read().await;
568 Ok(state
569 .api_keys
570 .values()
571 .find(|k| k.key_prefix == prefix && k.is_active)
572 .cloned())
573 })
574 }
575
576 fn find_api_key_by_id(&self, id: Uuid) -> StoreFuture<'_, Option<ApiKey>> {
577 Box::pin(async move {
578 let state = self.state.read().await;
579 Ok(state.api_keys.get(&id).cloned())
580 })
581 }
582
583 fn list_api_keys_by_user(&self, user_id: Uuid) -> StoreFuture<'_, Vec<ApiKey>> {
584 Box::pin(async move {
585 let state = self.state.read().await;
586 let keys: Vec<ApiKey> = state
587 .api_keys
588 .values()
589 .filter(|k| k.user_id == user_id)
590 .cloned()
591 .collect();
592 Ok(keys)
593 })
594 }
595
596 fn update_api_key(&self, id: Uuid, update: ApiKeyUpdate) -> StoreFuture<'_, ()> {
597 Box::pin(async move {
598 let mut state = self.state.write().await;
599 let key = state
600 .api_keys
601 .get_mut(&id)
602 .ok_or(StoreError::Database(format!("API key {id} not found")))?;
603 if let Some(name) = update.name {
604 key.name = name;
605 }
606 if let Some(scopes) = update.scopes {
607 key.scopes = scopes;
608 }
609 if let Some(is_active) = update.is_active {
610 key.is_active = is_active;
611 }
612 if let Some(expires_at) = update.expires_at {
613 key.expires_at = expires_at;
614 }
615 key.updated_at = Utc::now();
616 Ok(())
617 })
618 }
619
620 fn touch_api_key(&self, id: Uuid) -> StoreFuture<'_, ()> {
621 Box::pin(async move {
622 let mut state = self.state.write().await;
623 if let Some(key) = state.api_keys.get_mut(&id) {
624 key.last_used_at = Some(Utc::now());
625 }
626 Ok(())
627 })
628 }
629
630 fn delete_api_key(&self, id: Uuid) -> StoreFuture<'_, ()> {
631 Box::pin(async move {
632 let mut state = self.state.write().await;
633 state.api_keys.remove(&id);
634 Ok(())
635 })
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use serde_json::json;
642 use tokio::spawn;
643
644 use super::*;
645 use crate::entities::TriggerKind;
646
647 fn new_run_req(name: &str) -> NewRun {
648 NewRun {
649 workflow_name: name.to_string(),
650 trigger: TriggerKind::Manual,
651 payload: json!({}),
652 max_retries: 3,
653 }
654 }
655
656 #[tokio::test]
659 async fn create_run_returns_pending_status() {
660 let store = InMemoryStore::new();
661 let run = store.create_run(new_run_req("test")).await.unwrap();
662 assert_eq!(run.status.state, RunStatus::Pending);
663 assert_eq!(run.workflow_name, "test");
664 assert_eq!(run.retry_count, 0);
665 assert_eq!(run.max_retries, 3);
666 }
667
668 #[tokio::test]
669 async fn create_run_generates_unique_ids() {
670 let store = InMemoryStore::new();
671 let r1 = store.create_run(new_run_req("a")).await.unwrap();
672 let r2 = store.create_run(new_run_req("b")).await.unwrap();
673 assert_ne!(r1.id, r2.id);
674 }
675
676 #[tokio::test]
679 async fn get_run_returns_created_run() {
680 let store = InMemoryStore::new();
681 let run = store.create_run(new_run_req("test")).await.unwrap();
682 let fetched = store.get_run(run.id).await.unwrap();
683 assert!(fetched.is_some());
684 assert_eq!(fetched.unwrap().id, run.id);
685 }
686
687 #[tokio::test]
688 async fn get_run_returns_none_for_missing() {
689 let store = InMemoryStore::new();
690 let fetched = store.get_run(Uuid::nil()).await.unwrap();
691 assert!(fetched.is_none());
692 }
693
694 #[tokio::test]
697 async fn update_run_status_valid_transition() {
698 let store = InMemoryStore::new();
699 let run = store.create_run(new_run_req("test")).await.unwrap();
700
701 store
702 .update_run_status(run.id, RunStatus::Running)
703 .await
704 .unwrap();
705
706 let fetched = store.get_run(run.id).await.unwrap().unwrap();
707 assert_eq!(fetched.status.state, RunStatus::Running);
708 assert!(fetched.started_at.is_some());
709 }
710
711 #[tokio::test]
712 async fn update_run_status_invalid_transition_returns_error() {
713 let store = InMemoryStore::new();
714 let run = store.create_run(new_run_req("test")).await.unwrap();
715
716 let result = store.update_run_status(run.id, RunStatus::Completed).await;
717 assert!(result.is_err());
718
719 let err = result.unwrap_err();
720 assert!(matches!(err, StoreError::InvalidTransition { .. }));
721 }
722
723 #[tokio::test]
724 async fn update_run_status_not_found() {
725 let store = InMemoryStore::new();
726 let result = store
727 .update_run_status(Uuid::nil(), RunStatus::Running)
728 .await;
729 assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
730 }
731
732 #[tokio::test]
733 async fn update_run_status_terminal_sets_completed_at() {
734 let store = InMemoryStore::new();
735 let run = store.create_run(new_run_req("test")).await.unwrap();
736
737 store
738 .update_run_status(run.id, RunStatus::Running)
739 .await
740 .unwrap();
741 store
742 .update_run_status(run.id, RunStatus::Completed)
743 .await
744 .unwrap();
745
746 let fetched = store.get_run(run.id).await.unwrap().unwrap();
747 assert_eq!(fetched.status.state, RunStatus::Completed);
748 assert!(fetched.completed_at.is_some());
749 }
750
751 #[tokio::test]
754 async fn list_runs_empty_store() {
755 let store = InMemoryStore::new();
756 let page = store.list_runs(RunFilter::default(), 1, 20).await.unwrap();
757 assert_eq!(page.total, 0);
758 assert!(page.items.is_empty());
759 }
760
761 #[tokio::test]
762 async fn list_runs_with_workflow_filter() {
763 let store = InMemoryStore::new();
764 store.create_run(new_run_req("deploy")).await.unwrap();
765 store.create_run(new_run_req("test")).await.unwrap();
766 store.create_run(new_run_req("deploy")).await.unwrap();
767
768 let filter = RunFilter {
769 workflow_name: Some("deploy".to_string()),
770 ..RunFilter::default()
771 };
772 let page = store.list_runs(filter, 1, 20).await.unwrap();
773 assert_eq!(page.total, 2);
774 assert!(page.items.iter().all(|r| r.workflow_name == "deploy"));
775 }
776
777 #[tokio::test]
778 async fn list_runs_with_status_filter() {
779 let store = InMemoryStore::new();
780 let run = store.create_run(new_run_req("a")).await.unwrap();
781 store.create_run(new_run_req("b")).await.unwrap();
782
783 store
784 .update_run_status(run.id, RunStatus::Running)
785 .await
786 .unwrap();
787
788 let filter = RunFilter {
789 status: Some(RunStatus::Running),
790 ..RunFilter::default()
791 };
792 let page = store.list_runs(filter, 1, 20).await.unwrap();
793 assert_eq!(page.total, 1);
794 assert_eq!(page.items[0].id, run.id);
795 }
796
797 #[tokio::test]
798 async fn list_runs_pagination() {
799 let store = InMemoryStore::new();
800 for i in 0..5 {
801 store
802 .create_run(new_run_req(&format!("wf-{i}")))
803 .await
804 .unwrap();
805 }
806
807 let page1 = store.list_runs(RunFilter::default(), 1, 2).await.unwrap();
808 assert_eq!(page1.total, 5);
809 assert_eq!(page1.items.len(), 2);
810 assert_eq!(page1.page, 1);
811 assert_eq!(page1.per_page, 2);
812
813 let page2 = store.list_runs(RunFilter::default(), 2, 2).await.unwrap();
814 assert_eq!(page2.items.len(), 2);
815
816 let page3 = store.list_runs(RunFilter::default(), 3, 2).await.unwrap();
817 assert_eq!(page3.items.len(), 1);
818 }
819
820 #[tokio::test]
823 async fn pick_next_pending_empty_store() {
824 let store = InMemoryStore::new();
825 let result = store.pick_next_pending().await.unwrap();
826 assert!(result.is_none());
827 }
828
829 #[tokio::test]
830 async fn pick_next_pending_returns_oldest_and_transitions_to_running() {
831 let store = InMemoryStore::new();
832 let r1 = store.create_run(new_run_req("first")).await.unwrap();
833 let _r2 = store.create_run(new_run_req("second")).await.unwrap();
834
835 let picked = store.pick_next_pending().await.unwrap().unwrap();
836 assert_eq!(picked.id, r1.id);
837 assert_eq!(picked.status.state, RunStatus::Running);
838 assert!(picked.started_at.is_some());
839
840 let fetched = store.get_run(r1.id).await.unwrap().unwrap();
842 assert_eq!(fetched.status.state, RunStatus::Running);
843 }
844
845 #[tokio::test]
846 async fn pick_next_pending_skips_non_pending() {
847 let store = InMemoryStore::new();
848 let r1 = store.create_run(new_run_req("a")).await.unwrap();
849 let r2 = store.create_run(new_run_req("b")).await.unwrap();
850
851 store
853 .update_run_status(r1.id, RunStatus::Running)
854 .await
855 .unwrap();
856
857 let picked = store.pick_next_pending().await.unwrap().unwrap();
858 assert_eq!(picked.id, r2.id);
859 }
860
861 #[tokio::test]
864 async fn create_step_returns_pending() {
865 let store = InMemoryStore::new();
866 let run = store.create_run(new_run_req("test")).await.unwrap();
867
868 let step = store
869 .create_step(NewStep {
870 run_id: run.id,
871 name: "build".to_string(),
872 kind: crate::entities::StepKind::Shell,
873 position: 0,
874 input: Some(json!({"command": "cargo build"})),
875 })
876 .await
877 .unwrap();
878
879 assert_eq!(step.status.state, StepStatus::Pending);
880 assert_eq!(step.name, "build");
881 assert_eq!(step.run_id, run.id);
882 assert_eq!(step.position, 0);
883 }
884
885 #[tokio::test]
886 async fn create_step_for_missing_run_returns_error() {
887 let store = InMemoryStore::new();
888 let result = store
889 .create_step(NewStep {
890 run_id: Uuid::nil(),
891 name: "build".to_string(),
892 kind: crate::entities::StepKind::Shell,
893 position: 0,
894 input: None,
895 })
896 .await;
897 assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
898 }
899
900 #[tokio::test]
903 async fn update_step_applies_partial_update() {
904 let store = InMemoryStore::new();
905 let run = store.create_run(new_run_req("test")).await.unwrap();
906
907 let step = store
908 .create_step(NewStep {
909 run_id: run.id,
910 name: "build".to_string(),
911 kind: crate::entities::StepKind::Shell,
912 position: 0,
913 input: None,
914 })
915 .await
916 .unwrap();
917
918 store
920 .update_step(
921 step.id,
922 StepUpdate {
923 status: Some(StepStatus::Running),
924 ..StepUpdate::default()
925 },
926 )
927 .await
928 .unwrap();
929
930 store
932 .update_step(
933 step.id,
934 StepUpdate {
935 status: Some(StepStatus::Completed),
936 output: Some(json!({"stdout": "ok"})),
937 duration_ms: Some(150),
938 ..StepUpdate::default()
939 },
940 )
941 .await
942 .unwrap();
943
944 let steps = store.list_steps(run.id).await.unwrap();
945 assert_eq!(steps.len(), 1);
946 assert_eq!(steps[0].status.state, StepStatus::Completed);
947 assert_eq!(steps[0].duration_ms, 150);
948 assert!(steps[0].output.is_some());
949 }
950
951 #[tokio::test]
952 async fn update_step_not_found() {
953 let store = InMemoryStore::new();
954 let result = store.update_step(Uuid::nil(), StepUpdate::default()).await;
955 assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
956 }
957
958 #[tokio::test]
961 async fn list_steps_ordered_by_position() {
962 let store = InMemoryStore::new();
963 let run = store.create_run(new_run_req("test")).await.unwrap();
964
965 store
967 .create_step(NewStep {
968 run_id: run.id,
969 name: "deploy".to_string(),
970 kind: crate::entities::StepKind::Shell,
971 position: 2,
972 input: None,
973 })
974 .await
975 .unwrap();
976 store
977 .create_step(NewStep {
978 run_id: run.id,
979 name: "build".to_string(),
980 kind: crate::entities::StepKind::Shell,
981 position: 0,
982 input: None,
983 })
984 .await
985 .unwrap();
986 store
987 .create_step(NewStep {
988 run_id: run.id,
989 name: "test".to_string(),
990 kind: crate::entities::StepKind::Shell,
991 position: 1,
992 input: None,
993 })
994 .await
995 .unwrap();
996
997 let steps = store.list_steps(run.id).await.unwrap();
998 assert_eq!(steps.len(), 3);
999 assert_eq!(steps[0].name, "build");
1000 assert_eq!(steps[1].name, "test");
1001 assert_eq!(steps[2].name, "deploy");
1002 }
1003
1004 #[tokio::test]
1005 async fn list_steps_empty_for_run_without_steps() {
1006 let store = InMemoryStore::new();
1007 let run = store.create_run(new_run_req("test")).await.unwrap();
1008 let steps = store.list_steps(run.id).await.unwrap();
1009 assert!(steps.is_empty());
1010 }
1011
1012 #[tokio::test]
1015 async fn update_run_applies_cost_and_duration() {
1016 let store = InMemoryStore::new();
1017 let run = store.create_run(new_run_req("test")).await.unwrap();
1018
1019 store
1020 .update_run(
1021 run.id,
1022 RunUpdate {
1023 cost_usd: Some(Decimal::new(123, 2)),
1024 duration_ms: Some(5000),
1025 ..RunUpdate::default()
1026 },
1027 )
1028 .await
1029 .unwrap();
1030
1031 let fetched = store.get_run(run.id).await.unwrap().unwrap();
1032 assert_eq!(fetched.cost_usd, Decimal::new(123, 2));
1033 assert_eq!(fetched.duration_ms, 5000);
1034 }
1035
1036 #[tokio::test]
1037 async fn update_run_increment_retry() {
1038 let store = InMemoryStore::new();
1039 let run = store.create_run(new_run_req("test")).await.unwrap();
1040 assert_eq!(run.retry_count, 0);
1041
1042 store
1043 .update_run(
1044 run.id,
1045 RunUpdate {
1046 increment_retry: true,
1047 ..RunUpdate::default()
1048 },
1049 )
1050 .await
1051 .unwrap();
1052
1053 let fetched = store.get_run(run.id).await.unwrap().unwrap();
1054 assert_eq!(fetched.retry_count, 1);
1055 }
1056
1057 #[tokio::test]
1058 async fn update_run_not_found() {
1059 let store = InMemoryStore::new();
1060 let result = store.update_run(Uuid::nil(), RunUpdate::default()).await;
1061 assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
1062 }
1063
1064 #[tokio::test]
1067 async fn concurrent_pick_next_pending_no_double_pick() {
1068 let store = InMemoryStore::new();
1069
1070 for i in 0..10 {
1072 store
1073 .create_run(new_run_req(&format!("wf-{i}")))
1074 .await
1075 .unwrap();
1076 }
1077
1078 let mut handles = Vec::new();
1080 for _ in 0..10 {
1081 let s = store.clone();
1082 handles.push(spawn(async move { s.pick_next_pending().await }));
1083 }
1084
1085 let mut picked_ids = Vec::new();
1086 for h in handles {
1087 if let Ok(Ok(Some(run))) = h.await {
1088 picked_ids.push(run.id);
1089 }
1090 }
1091
1092 let unique: std::collections::HashSet<_> = picked_ids.iter().collect();
1094 assert_eq!(unique.len(), picked_ids.len());
1095 }
1096
1097 #[tokio::test]
1100 async fn get_stats_empty_store() {
1101 let store = InMemoryStore::new();
1102 let stats = store.get_stats().await.unwrap();
1103 assert_eq!(stats.total_runs, 0);
1104 assert_eq!(stats.completed_runs, 0);
1105 assert_eq!(stats.failed_runs, 0);
1106 assert_eq!(stats.cancelled_runs, 0);
1107 assert_eq!(stats.active_runs, 0);
1108 assert_eq!(stats.total_cost_usd, Decimal::ZERO);
1109 assert_eq!(stats.total_duration_ms, 0);
1110 }
1111
1112 #[tokio::test]
1113 async fn get_stats_aggregates_counts_and_totals() {
1114 let store = InMemoryStore::new();
1115
1116 let r1 = store.create_run(new_run_req("wf1")).await.unwrap();
1118 let r2 = store.create_run(new_run_req("wf2")).await.unwrap();
1119 let r3 = store.create_run(new_run_req("wf3")).await.unwrap();
1120 let _r4 = store.create_run(new_run_req("wf4")).await.unwrap();
1121
1122 store
1124 .update_run_status(r1.id, RunStatus::Running)
1125 .await
1126 .unwrap();
1127 store
1128 .update_run_status(r1.id, RunStatus::Completed)
1129 .await
1130 .unwrap();
1131
1132 store
1134 .update_run_status(r2.id, RunStatus::Running)
1135 .await
1136 .unwrap();
1137 store
1138 .update_run_status(r2.id, RunStatus::Failed)
1139 .await
1140 .unwrap();
1141
1142 store
1144 .update_run_status(r3.id, RunStatus::Cancelled)
1145 .await
1146 .unwrap();
1147
1148 store
1152 .update_run(
1153 r1.id,
1154 RunUpdate {
1155 cost_usd: Some(Decimal::new(1000, 2)),
1156 duration_ms: Some(1000),
1157 ..RunUpdate::default()
1158 },
1159 )
1160 .await
1161 .unwrap();
1162
1163 store
1164 .update_run(
1165 r2.id,
1166 RunUpdate {
1167 cost_usd: Some(Decimal::new(500, 2)),
1168 duration_ms: Some(500),
1169 ..RunUpdate::default()
1170 },
1171 )
1172 .await
1173 .unwrap();
1174
1175 let stats = store.get_stats().await.unwrap();
1176 assert_eq!(stats.total_runs, 4);
1177 assert_eq!(stats.completed_runs, 1);
1178 assert_eq!(stats.failed_runs, 1);
1179 assert_eq!(stats.cancelled_runs, 1);
1180 assert_eq!(stats.active_runs, 1); assert_eq!(stats.total_cost_usd, Decimal::new(1500, 2));
1182 assert_eq!(stats.total_duration_ms, 1500);
1183 }
1184
1185 fn new_user(email: &str, username: &str) -> NewUser {
1188 NewUser {
1189 email: email.to_string(),
1190 username: username.to_string(),
1191 password_hash: "argon2hash".to_string(),
1192 }
1193 }
1194
1195 #[tokio::test]
1196 async fn create_user_returns_user() {
1197 let store = InMemoryStore::new();
1198 let user = store
1199 .create_user(new_user("alice@example.com", "alice"))
1200 .await
1201 .unwrap();
1202
1203 assert_eq!(user.email, "alice@example.com");
1204 assert_eq!(user.username, "alice");
1205 assert_eq!(user.password_hash, "argon2hash");
1206 assert!(!user.is_admin);
1207 }
1208
1209 #[tokio::test]
1210 async fn create_user_duplicate_email_returns_error() {
1211 let store = InMemoryStore::new();
1212 store
1213 .create_user(new_user("alice@example.com", "alice"))
1214 .await
1215 .unwrap();
1216
1217 let err = store
1218 .create_user(new_user("alice@example.com", "bob"))
1219 .await
1220 .unwrap_err();
1221
1222 assert!(
1223 matches!(err, StoreError::DuplicateEmail(ref e) if e == "alice@example.com"),
1224 "expected DuplicateEmail, got: {err}"
1225 );
1226 }
1227
1228 #[tokio::test]
1229 async fn create_user_duplicate_username_returns_error() {
1230 let store = InMemoryStore::new();
1231 store
1232 .create_user(new_user("alice@example.com", "alice"))
1233 .await
1234 .unwrap();
1235
1236 let err = store
1237 .create_user(new_user("bob@example.com", "alice"))
1238 .await
1239 .unwrap_err();
1240
1241 assert!(
1242 matches!(err, StoreError::DuplicateUsername(ref u) if u == "alice"),
1243 "expected DuplicateUsername, got: {err}"
1244 );
1245 }
1246
1247 #[tokio::test]
1248 async fn find_user_by_email_existing() {
1249 let store = InMemoryStore::new();
1250 let created = store
1251 .create_user(new_user("alice@example.com", "alice"))
1252 .await
1253 .unwrap();
1254
1255 let found = store
1256 .find_user_by_email("alice@example.com")
1257 .await
1258 .unwrap()
1259 .expect("user should exist");
1260
1261 assert_eq!(found.id, created.id);
1262 assert_eq!(found.email, "alice@example.com");
1263 }
1264
1265 #[tokio::test]
1266 async fn find_user_by_email_missing_returns_none() {
1267 let store = InMemoryStore::new();
1268 let found = store
1269 .find_user_by_email("nobody@example.com")
1270 .await
1271 .unwrap();
1272
1273 assert!(found.is_none());
1274 }
1275
1276 #[tokio::test]
1277 async fn find_user_by_id_existing() {
1278 let store = InMemoryStore::new();
1279 let created = store
1280 .create_user(new_user("alice@example.com", "alice"))
1281 .await
1282 .unwrap();
1283
1284 let found = store
1285 .find_user_by_id(created.id)
1286 .await
1287 .unwrap()
1288 .expect("user should exist");
1289
1290 assert_eq!(found.email, "alice@example.com");
1291 assert_eq!(found.username, "alice");
1292 }
1293
1294 #[tokio::test]
1295 async fn find_user_by_id_missing_returns_none() {
1296 let store = InMemoryStore::new();
1297 let found = store.find_user_by_id(Uuid::now_v7()).await.unwrap();
1298 assert!(found.is_none());
1299 }
1300
1301 #[tokio::test]
1304 async fn update_run_status_running_to_retrying() {
1305 let store = InMemoryStore::new();
1306 let run = store.create_run(new_run_req("test")).await.unwrap();
1307
1308 store
1309 .update_run_status(run.id, RunStatus::Running)
1310 .await
1311 .unwrap();
1312
1313 store
1314 .update_run_status(run.id, RunStatus::Retrying)
1315 .await
1316 .unwrap();
1317
1318 let fetched = store.get_run(run.id).await.unwrap().unwrap();
1319 assert_eq!(fetched.status.state, RunStatus::Retrying);
1320 assert!(!fetched.status.state.is_terminal());
1321 assert!(fetched.completed_at.is_none()); }
1323
1324 #[tokio::test]
1325 async fn update_run_status_retrying_to_running_allowed() {
1326 let store = InMemoryStore::new();
1327 let run = store.create_run(new_run_req("test")).await.unwrap();
1328
1329 store
1330 .update_run_status(run.id, RunStatus::Running)
1331 .await
1332 .unwrap();
1333 store
1334 .update_run_status(run.id, RunStatus::Retrying)
1335 .await
1336 .unwrap();
1337
1338 store
1340 .update_run_status(run.id, RunStatus::Running)
1341 .await
1342 .unwrap();
1343
1344 let fetched = store.get_run(run.id).await.unwrap().unwrap();
1345 assert_eq!(fetched.status.state, RunStatus::Running);
1346 }
1347
1348 #[tokio::test]
1349 async fn update_run_with_invalid_status_transition_errors() {
1350 let store = InMemoryStore::new();
1351 let run = store.create_run(new_run_req("test")).await.unwrap();
1352
1353 let result = store
1355 .update_run(
1356 run.id,
1357 RunUpdate {
1358 status: Some(RunStatus::Completed), ..RunUpdate::default()
1360 },
1361 )
1362 .await;
1363
1364 assert!(result.is_err());
1365 }
1366
1367 #[tokio::test]
1368 async fn create_step_with_complex_input() {
1369 let store = InMemoryStore::new();
1370 let run = store.create_run(new_run_req("test")).await.unwrap();
1371
1372 let complex_input = json!({
1373 "command": "cargo build",
1374 "env": {
1375 "RUST_LOG": "debug",
1376 "CUSTOM": "value"
1377 },
1378 "timeout": 60,
1379 "retry_policy": {
1380 "max_attempts": 3,
1381 "backoff": "exponential"
1382 }
1383 });
1384
1385 let step = store
1386 .create_step(NewStep {
1387 run_id: run.id,
1388 name: "build".to_string(),
1389 kind: crate::entities::StepKind::Agent,
1390 position: 0,
1391 input: Some(complex_input.clone()),
1392 })
1393 .await
1394 .unwrap();
1395
1396 assert_eq!(step.input, Some(complex_input));
1397 }
1398
1399 #[tokio::test]
1400 async fn update_step_with_error_message() {
1401 let store = InMemoryStore::new();
1402 let run = store.create_run(new_run_req("test")).await.unwrap();
1403
1404 let step = store
1405 .create_step(NewStep {
1406 run_id: run.id,
1407 name: "build".to_string(),
1408 kind: crate::entities::StepKind::Shell,
1409 position: 0,
1410 input: None,
1411 })
1412 .await
1413 .unwrap();
1414
1415 store
1416 .update_step(
1417 step.id,
1418 StepUpdate {
1419 status: Some(StepStatus::Running),
1420 ..StepUpdate::default()
1421 },
1422 )
1423 .await
1424 .unwrap();
1425
1426 store
1427 .update_step(
1428 step.id,
1429 StepUpdate {
1430 status: Some(StepStatus::Failed),
1431 error: Some("Connection timeout after 30s".to_string()),
1432 duration_ms: Some(30000),
1433 ..StepUpdate::default()
1434 },
1435 )
1436 .await
1437 .unwrap();
1438
1439 let steps = store.list_steps(run.id).await.unwrap();
1440 assert_eq!(steps[0].status.state, StepStatus::Failed);
1441 assert_eq!(
1442 steps[0].error,
1443 Some("Connection timeout after 30s".to_string())
1444 );
1445 assert_eq!(steps[0].duration_ms, 30000);
1446 }
1447
1448 #[tokio::test]
1449 async fn list_steps_for_nonexistent_run_returns_empty() {
1450 let store = InMemoryStore::new();
1451 let steps = store.list_steps(Uuid::nil()).await.unwrap();
1452 assert!(steps.is_empty());
1453 }
1454
1455 #[tokio::test]
1456 async fn update_step_pending_to_skipped() {
1457 let store = InMemoryStore::new();
1458 let run = store.create_run(new_run_req("test")).await.unwrap();
1459
1460 let step = store
1461 .create_step(NewStep {
1462 run_id: run.id,
1463 name: "build".to_string(),
1464 kind: crate::entities::StepKind::Shell,
1465 position: 0,
1466 input: None,
1467 })
1468 .await
1469 .unwrap();
1470
1471 store
1473 .update_step(
1474 step.id,
1475 StepUpdate {
1476 status: Some(StepStatus::Skipped),
1477 ..StepUpdate::default()
1478 },
1479 )
1480 .await
1481 .unwrap();
1482
1483 let steps = store.list_steps(run.id).await.unwrap();
1484 assert_eq!(steps[0].status.state, StepStatus::Skipped);
1485 }
1486
1487 #[tokio::test]
1488 async fn list_runs_with_combined_filters() {
1489 let store = InMemoryStore::new();
1490
1491 let r1 = store.create_run(new_run_req("deploy")).await.unwrap();
1492 let r2 = store.create_run(new_run_req("deploy")).await.unwrap();
1493 let _r3 = store.create_run(new_run_req("test")).await.unwrap();
1494
1495 store
1497 .update_run_status(r1.id, RunStatus::Running)
1498 .await
1499 .unwrap();
1500 store
1501 .update_run_status(r1.id, RunStatus::Completed)
1502 .await
1503 .unwrap();
1504
1505 store
1507 .update_run_status(r2.id, RunStatus::Running)
1508 .await
1509 .unwrap();
1510
1511 let filter = RunFilter {
1513 workflow_name: Some("deploy".to_string()),
1514 status: Some(RunStatus::Running),
1515 ..RunFilter::default()
1516 };
1517
1518 let page = store.list_runs(filter, 1, 100).await.unwrap();
1519 assert_eq!(page.total, 1);
1520 assert_eq!(page.items[0].id, r2.id);
1521 }
1522
1523 #[tokio::test]
1524 async fn list_runs_workflow_filter_is_case_insensitive_partial_match() {
1525 let store = InMemoryStore::new();
1526 store
1527 .create_run(new_run_req("weather-report"))
1528 .await
1529 .unwrap();
1530 store.create_run(new_run_req("deploy-prod")).await.unwrap();
1531
1532 let filter = RunFilter {
1534 workflow_name: Some("weather".to_string()),
1535 ..RunFilter::default()
1536 };
1537 let page = store.list_runs(filter, 1, 100).await.unwrap();
1538 assert_eq!(page.total, 1);
1539 assert_eq!(page.items[0].workflow_name, "weather-report");
1540
1541 let filter = RunFilter {
1543 workflow_name: Some("Weather-REPORT".to_string()),
1544 ..RunFilter::default()
1545 };
1546 let page = store.list_runs(filter, 1, 100).await.unwrap();
1547 assert_eq!(page.total, 1);
1548 assert_eq!(page.items[0].workflow_name, "weather-report");
1549
1550 let filter = RunFilter {
1552 workflow_name: Some("report".to_string()),
1553 ..RunFilter::default()
1554 };
1555 let page = store.list_runs(filter, 1, 100).await.unwrap();
1556 assert_eq!(page.total, 1);
1557 assert_eq!(page.items[0].workflow_name, "weather-report");
1558
1559 let filter = RunFilter {
1561 workflow_name: Some("build".to_string()),
1562 ..RunFilter::default()
1563 };
1564 let page = store.list_runs(filter, 1, 100).await.unwrap();
1565 assert_eq!(page.total, 0);
1566 }
1567
1568 #[tokio::test]
1569 async fn get_stats_with_mixed_active_statuses() {
1570 let store = InMemoryStore::new();
1571
1572 let _r1 = store.create_run(new_run_req("wf")).await.unwrap(); let r2 = store.create_run(new_run_req("wf")).await.unwrap();
1574 let r3 = store.create_run(new_run_req("wf")).await.unwrap();
1575
1576 store
1577 .update_run_status(r2.id, RunStatus::Running)
1578 .await
1579 .unwrap();
1580 store
1581 .update_run_status(r3.id, RunStatus::Running)
1582 .await
1583 .unwrap();
1584 store
1585 .update_run_status(r3.id, RunStatus::Retrying)
1586 .await
1587 .unwrap();
1588
1589 let stats = store.get_stats().await.unwrap();
1590 assert_eq!(stats.active_runs, 3); }
1592
1593 #[tokio::test]
1594 async fn run_with_different_trigger_kinds() {
1595 let store = InMemoryStore::new();
1596
1597 let r1 = store
1598 .create_run(NewRun {
1599 workflow_name: "test".to_string(),
1600 trigger: TriggerKind::Manual,
1601 payload: json!({}),
1602 max_retries: 1,
1603 })
1604 .await
1605 .unwrap();
1606
1607 let r2 = store
1608 .create_run(NewRun {
1609 workflow_name: "test".to_string(),
1610 trigger: TriggerKind::Webhook {
1611 path: "/hooks/github".to_string(),
1612 },
1613 payload: json!({}),
1614 max_retries: 1,
1615 })
1616 .await
1617 .unwrap();
1618
1619 let r3 = store
1620 .create_run(NewRun {
1621 workflow_name: "test".to_string(),
1622 trigger: TriggerKind::Cron {
1623 schedule: "0 0 * * *".to_string(),
1624 },
1625 payload: json!({}),
1626 max_retries: 1,
1627 })
1628 .await
1629 .unwrap();
1630
1631 let r4 = store
1632 .create_run(NewRun {
1633 workflow_name: "test".to_string(),
1634 trigger: TriggerKind::Api,
1635 payload: json!({}),
1636 max_retries: 1,
1637 })
1638 .await
1639 .unwrap();
1640
1641 let r5 = store
1642 .create_run(NewRun {
1643 workflow_name: "test".to_string(),
1644 trigger: TriggerKind::Retry {
1645 parent_run_id: Uuid::nil(),
1646 },
1647 payload: json!({}),
1648 max_retries: 1,
1649 })
1650 .await
1651 .unwrap();
1652
1653 assert_eq!(r1.trigger, TriggerKind::Manual);
1654 assert!(matches!(r2.trigger, TriggerKind::Webhook { .. }));
1655 assert!(matches!(r3.trigger, TriggerKind::Cron { .. }));
1656 assert_eq!(r4.trigger, TriggerKind::Api);
1657 assert!(matches!(r5.trigger, TriggerKind::Retry { .. }));
1658 }
1659
1660 #[tokio::test]
1663 async fn update_run_returning_applies_and_returns() {
1664 let store = InMemoryStore::new();
1665 let run = store.create_run(new_run_req("test")).await.unwrap();
1666
1667 store
1669 .update_run_status(run.id, RunStatus::Running)
1670 .await
1671 .unwrap();
1672
1673 let updated = store
1674 .update_run_returning(
1675 run.id,
1676 RunUpdate {
1677 status: Some(RunStatus::Completed),
1678 cost_usd: Some(Decimal::new(4200, 2)),
1679 duration_ms: Some(1500),
1680 ..RunUpdate::default()
1681 },
1682 )
1683 .await
1684 .unwrap();
1685
1686 assert_eq!(updated.id, run.id);
1687 assert_eq!(updated.status.state, RunStatus::Completed);
1688 assert_eq!(updated.cost_usd, Decimal::new(4200, 2));
1689 assert_eq!(updated.duration_ms, 1500);
1690 assert!(updated.completed_at.is_some());
1691 }
1692
1693 #[tokio::test]
1694 async fn update_run_returning_not_found() {
1695 let store = InMemoryStore::new();
1696 let result = store
1697 .update_run_returning(
1698 Uuid::nil(),
1699 RunUpdate {
1700 status: Some(RunStatus::Running),
1701 ..RunUpdate::default()
1702 },
1703 )
1704 .await;
1705
1706 assert!(matches!(result, Err(StoreError::RunNotFound(_))));
1707 }
1708
1709 #[tokio::test]
1710 async fn update_run_returning_invalid_transition() {
1711 let store = InMemoryStore::new();
1712 let run = store.create_run(new_run_req("test")).await.unwrap();
1713
1714 let result = store
1715 .update_run_returning(
1716 run.id,
1717 RunUpdate {
1718 status: Some(RunStatus::Completed),
1719 ..RunUpdate::default()
1720 },
1721 )
1722 .await;
1723
1724 assert!(matches!(result, Err(StoreError::InvalidTransition { .. })));
1725 }
1726}