Skip to main content

ironflow_store/
memory.rs

1//! In-memory [`RunStore`] implementation for development and testing.
2//!
3//! [`InMemoryStore`] uses `Arc<RwLock<..>>` internally, making it safe to share
4//! across tasks. Data is lost when the process exits.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use ironflow_store::prelude::*;
10//! use serde_json::json;
11//!
12//! # async fn example() -> Result<(), ironflow_store::error::StoreError> {
13//! let store = InMemoryStore::new();
14//!
15//! let run = store.create_run(NewRun {
16//!     workflow_name: "test".to_string(),
17//!     trigger: TriggerKind::Manual,
18//!     payload: json!({}),
19//!     max_retries: 3,
20//! }).await?;
21//!
22//! assert_eq!(run.status.state, RunStatus::Pending);
23//! # Ok(())
24//! # }
25//! ```
26
27use std::collections::HashMap;
28use std::sync::Arc;
29
30use chrono::Utc;
31use tokio::sync::RwLock;
32use uuid::Uuid;
33
34use rust_decimal::Decimal;
35
36use crate::api_key_store::ApiKeyStore;
37use crate::entities::{
38    ApiKey, ApiKeyUpdate, FsmState, NewApiKey, NewRun, NewStep, NewStepDependency, NewUser, Page,
39    Run, RunFilter, RunStats, RunStatus, RunUpdate, Step, StepDependency, StepStatus, StepUpdate,
40    User,
41};
42use crate::error::StoreError;
43use crate::store::{RunStore, StoreFuture};
44use crate::user_store::UserStore;
45
46#[derive(Debug, Default)]
47struct State {
48    runs: HashMap<Uuid, Run>,
49    steps: HashMap<Uuid, Step>,
50    step_dependencies: Vec<StepDependency>,
51    users: HashMap<Uuid, User>,
52    api_keys: HashMap<Uuid, ApiKey>,
53}
54
55/// In-memory store backed by `Arc<RwLock<..>>`.
56///
57/// Thread-safe and cheap to clone. All data is held in memory and lost on drop.
58///
59/// # Examples
60///
61/// ```
62/// use ironflow_store::memory::InMemoryStore;
63///
64/// let store = InMemoryStore::new();
65/// let store2 = store.clone(); // cheap Arc clone
66/// ```
67#[derive(Debug, Clone)]
68pub struct InMemoryStore {
69    state: Arc<RwLock<State>>,
70}
71
72impl InMemoryStore {
73    /// Create a new empty in-memory store.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use ironflow_store::memory::InMemoryStore;
79    ///
80    /// let store = InMemoryStore::new();
81    /// ```
82    pub fn new() -> Self {
83        Self {
84            state: Arc::new(RwLock::new(State::default())),
85        }
86    }
87}
88
89impl Default for InMemoryStore {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl RunStore for InMemoryStore {
96    fn create_run(&self, req: NewRun) -> StoreFuture<'_, Run> {
97        Box::pin(async move {
98            let now = Utc::now();
99            let run = Run {
100                id: Uuid::now_v7(),
101                workflow_name: req.workflow_name,
102                status: FsmState::new(RunStatus::Pending, Uuid::now_v7()),
103                trigger: req.trigger,
104                payload: req.payload,
105                error: None,
106                retry_count: 0,
107                max_retries: req.max_retries,
108                cost_usd: Decimal::ZERO,
109                duration_ms: 0,
110                created_at: now,
111                updated_at: now,
112                started_at: None,
113                completed_at: None,
114            };
115
116            let mut state = self.state.write().await;
117            state.runs.insert(run.id, run.clone());
118            Ok(run)
119        })
120    }
121
122    fn get_run(&self, id: Uuid) -> StoreFuture<'_, Option<Run>> {
123        Box::pin(async move {
124            let state = self.state.read().await;
125            Ok(state.runs.get(&id).cloned())
126        })
127    }
128
129    fn list_runs(&self, filter: RunFilter, page: u32, per_page: u32) -> StoreFuture<'_, Page<Run>> {
130        Box::pin(async move {
131            let state = self.state.read().await;
132
133            let mut runs: Vec<&Run> = state
134                .runs
135                .values()
136                .filter(|r| {
137                    if let Some(ref wf) = filter.workflow_name
138                        && !r.workflow_name.to_lowercase().contains(&wf.to_lowercase())
139                    {
140                        return false;
141                    }
142                    if let Some(ref status) = filter.status
143                        && &r.status.state != status
144                    {
145                        return false;
146                    }
147                    if let Some(after) = filter.created_after
148                        && r.created_at < after
149                    {
150                        return false;
151                    }
152                    if let Some(before) = filter.created_before
153                        && r.created_at > before
154                    {
155                        return false;
156                    }
157                    true
158                })
159                .collect();
160
161            // Sort newest first.
162            runs.sort_by(|a, b| b.created_at.cmp(&a.created_at));
163
164            let total = runs.len() as u64;
165            let page = page.max(1);
166            let per_page = per_page.clamp(1, 100);
167            let offset = ((page - 1) * per_page) as usize;
168            let items: Vec<Run> = runs
169                .into_iter()
170                .skip(offset)
171                .take(per_page as usize)
172                .cloned()
173                .collect();
174
175            Ok(Page {
176                items,
177                total,
178                page,
179                per_page,
180            })
181        })
182    }
183
184    fn update_run_status(&self, id: Uuid, new_status: RunStatus) -> StoreFuture<'_, ()> {
185        Box::pin(async move {
186            let mut state = self.state.write().await;
187            let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
188
189            if !run.status.state.can_transition_to(&new_status) {
190                return Err(StoreError::InvalidTransition {
191                    from: run.status.state,
192                    to: new_status,
193                });
194            }
195
196            let now = Utc::now();
197            run.status.state = new_status;
198            run.updated_at = now;
199
200            if new_status == RunStatus::Running && run.started_at.is_none() {
201                run.started_at = Some(now);
202            }
203            if new_status.is_terminal() {
204                run.completed_at = Some(now);
205            }
206
207            Ok(())
208        })
209    }
210
211    fn update_run(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, ()> {
212        Box::pin(async move {
213            let mut state = self.state.write().await;
214            let run = state.runs.get_mut(&id).ok_or(StoreError::RunNotFound(id))?;
215
216            let now = Utc::now();
217
218            if let Some(status) = update.status {
219                if !run.status.state.can_transition_to(&status) {
220                    return Err(StoreError::InvalidTransition {
221                        from: run.status.state,
222                        to: status,
223                    });
224                }
225                run.status.state = status;
226                if status == RunStatus::Running && run.started_at.is_none() {
227                    run.started_at = Some(now);
228                }
229                if status.is_terminal() {
230                    run.completed_at = Some(now);
231                }
232            }
233
234            if let Some(error) = update.error {
235                run.error = Some(error);
236            }
237            if update.increment_retry {
238                run.retry_count += 1;
239            }
240            if let Some(cost) = update.cost_usd {
241                run.cost_usd = cost;
242            }
243            if let Some(dur) = update.duration_ms {
244                run.duration_ms = dur;
245            }
246            if let Some(started) = update.started_at {
247                run.started_at = Some(started);
248            }
249            if let Some(completed) = update.completed_at {
250                run.completed_at = Some(completed);
251            }
252
253            run.updated_at = now;
254            Ok(())
255        })
256    }
257
258    fn pick_next_pending(&self) -> StoreFuture<'_, Option<Run>> {
259        Box::pin(async move {
260            let mut state = self.state.write().await;
261
262            // Find the oldest pending run.
263            let oldest_id = state
264                .runs
265                .values()
266                .filter(|r| r.status.state == RunStatus::Pending)
267                .min_by_key(|r| r.created_at)
268                .map(|r| r.id);
269
270            let Some(id) = oldest_id else {
271                return Ok(None);
272            };
273
274            // Transition to Running atomically.
275            let run = state.runs.get_mut(&id).expect("run exists");
276            let now = Utc::now();
277            run.status.state = RunStatus::Running;
278            run.started_at = Some(now);
279            run.updated_at = now;
280
281            Ok(Some(run.clone()))
282        })
283    }
284
285    fn create_step(&self, req: NewStep) -> StoreFuture<'_, Step> {
286        Box::pin(async move {
287            let mut state = self.state.write().await;
288
289            if !state.runs.contains_key(&req.run_id) {
290                return Err(StoreError::RunNotFound(req.run_id));
291            }
292
293            let now = Utc::now();
294            let step = Step {
295                id: Uuid::now_v7(),
296                run_id: req.run_id,
297                name: req.name,
298                kind: req.kind,
299                position: req.position,
300                status: FsmState::new(StepStatus::Pending, Uuid::now_v7()),
301                input: req.input,
302                output: None,
303                error: None,
304                duration_ms: 0,
305                cost_usd: Decimal::ZERO,
306                input_tokens: None,
307                output_tokens: None,
308                created_at: now,
309                updated_at: now,
310                started_at: None,
311                completed_at: None,
312                debug_messages: None,
313            };
314
315            state.steps.insert(step.id, step.clone());
316            Ok(step)
317        })
318    }
319
320    fn update_step(&self, id: Uuid, update: StepUpdate) -> StoreFuture<'_, ()> {
321        Box::pin(async move {
322            let mut state = self.state.write().await;
323            let step = state
324                .steps
325                .get_mut(&id)
326                .ok_or(StoreError::StepNotFound(id))?;
327
328            let now = Utc::now();
329
330            if let Some(status) = update.status {
331                if !matches!(
332                    (step.status.state, status),
333                    (StepStatus::Pending, StepStatus::Running)
334                        | (StepStatus::Pending, StepStatus::Skipped)
335                        | (StepStatus::Running, StepStatus::Completed)
336                        | (StepStatus::Running, StepStatus::Failed)
337                        | (StepStatus::Running, StepStatus::AwaitingApproval)
338                        | (StepStatus::AwaitingApproval, StepStatus::Running)
339                        | (StepStatus::AwaitingApproval, StepStatus::Completed)
340                        | (StepStatus::AwaitingApproval, StepStatus::Failed)
341                        | (StepStatus::AwaitingApproval, StepStatus::Rejected)
342                ) {
343                    return Err(StoreError::Database(format!(
344                        "invalid step status transition: {:?} -> {:?}",
345                        step.status.state, status
346                    )));
347                }
348                step.status.state = status;
349            }
350            if let Some(output) = update.output {
351                step.output = Some(output);
352            }
353            if let Some(error) = update.error {
354                step.error = Some(error);
355            }
356            if let Some(dur) = update.duration_ms {
357                step.duration_ms = dur;
358            }
359            if let Some(cost) = update.cost_usd {
360                step.cost_usd = cost;
361            }
362            if let Some(tokens) = update.input_tokens {
363                step.input_tokens = Some(tokens);
364            }
365            if let Some(tokens) = update.output_tokens {
366                step.output_tokens = Some(tokens);
367            }
368            if let Some(started) = update.started_at {
369                step.started_at = Some(started);
370            }
371            if let Some(completed) = update.completed_at {
372                step.completed_at = Some(completed);
373            }
374            if let Some(debug_msgs) = update.debug_messages {
375                step.debug_messages = Some(debug_msgs);
376            }
377
378            step.updated_at = now;
379            Ok(())
380        })
381    }
382
383    fn list_steps(&self, run_id: Uuid) -> StoreFuture<'_, Vec<Step>> {
384        Box::pin(async move {
385            let state = self.state.read().await;
386            let mut steps: Vec<Step> = state
387                .steps
388                .values()
389                .filter(|s| s.run_id == run_id)
390                .cloned()
391                .collect();
392            steps.sort_by_key(|s| s.position);
393            Ok(steps)
394        })
395    }
396
397    fn get_stats(&self) -> StoreFuture<'_, RunStats> {
398        Box::pin(async move {
399            let state = self.state.read().await;
400
401            let mut total_cost_usd = Decimal::ZERO;
402            let mut total_duration_ms = 0u64;
403            let mut completed_runs = 0u64;
404            let mut failed_runs = 0u64;
405            let mut cancelled_runs = 0u64;
406            let mut active_runs = 0u64;
407
408            for run in state.runs.values() {
409                total_cost_usd += run.cost_usd;
410                total_duration_ms += run.duration_ms;
411
412                match run.status.state {
413                    RunStatus::Completed => completed_runs += 1,
414                    RunStatus::Failed => failed_runs += 1,
415                    RunStatus::Cancelled => cancelled_runs += 1,
416                    RunStatus::Pending
417                    | RunStatus::Running
418                    | RunStatus::Retrying
419                    | RunStatus::AwaitingApproval => {
420                        active_runs += 1;
421                    }
422                }
423            }
424
425            let total_runs = state.runs.len() as u64;
426
427            Ok(RunStats {
428                total_runs,
429                completed_runs,
430                failed_runs,
431                cancelled_runs,
432                active_runs,
433                total_cost_usd,
434                total_duration_ms,
435            })
436        })
437    }
438
439    fn create_step_dependencies(&self, deps: Vec<NewStepDependency>) -> StoreFuture<'_, ()> {
440        Box::pin(async move {
441            let mut state = self.state.write().await;
442
443            for dep in deps {
444                if !state.steps.contains_key(&dep.step_id) {
445                    return Err(StoreError::StepNotFound(dep.step_id));
446                }
447                if !state.steps.contains_key(&dep.depends_on) {
448                    return Err(StoreError::StepNotFound(dep.depends_on));
449                }
450
451                let already_exists = state
452                    .step_dependencies
453                    .iter()
454                    .any(|d| d.step_id == dep.step_id && d.depends_on == dep.depends_on);
455
456                if !already_exists {
457                    state.step_dependencies.push(StepDependency {
458                        step_id: dep.step_id,
459                        depends_on: dep.depends_on,
460                        created_at: Utc::now(),
461                    });
462                }
463            }
464
465            Ok(())
466        })
467    }
468
469    fn list_step_dependencies(&self, run_id: Uuid) -> StoreFuture<'_, Vec<StepDependency>> {
470        Box::pin(async move {
471            let state = self.state.read().await;
472
473            let run_step_ids: std::collections::HashSet<Uuid> = state
474                .steps
475                .values()
476                .filter(|s| s.run_id == run_id)
477                .map(|s| s.id)
478                .collect();
479
480            let mut deps: Vec<StepDependency> = state
481                .step_dependencies
482                .iter()
483                .filter(|d| run_step_ids.contains(&d.step_id))
484                .cloned()
485                .collect();
486
487            deps.sort_by_key(|d| d.created_at);
488            Ok(deps)
489        })
490    }
491}
492
493impl UserStore for InMemoryStore {
494    fn create_user(&self, req: NewUser) -> StoreFuture<'_, User> {
495        Box::pin(async move {
496            let mut state = self.state.write().await;
497
498            let email_exists = state.users.values().any(|u| u.email == req.email);
499            if email_exists {
500                return Err(StoreError::DuplicateEmail(req.email));
501            }
502
503            let username_exists = state.users.values().any(|u| u.username == req.username);
504            if username_exists {
505                return Err(StoreError::DuplicateUsername(req.username));
506            }
507
508            let now = Utc::now();
509            let user = User {
510                id: Uuid::now_v7(),
511                email: req.email,
512                username: req.username,
513                password_hash: req.password_hash,
514                is_admin: false,
515                created_at: now,
516                updated_at: now,
517            };
518
519            state.users.insert(user.id, user.clone());
520            Ok(user)
521        })
522    }
523
524    fn find_user_by_email(&self, email: &str) -> StoreFuture<'_, Option<User>> {
525        let email = email.to_string();
526        Box::pin(async move {
527            let state = self.state.read().await;
528            Ok(state.users.values().find(|u| u.email == email).cloned())
529        })
530    }
531
532    fn find_user_by_id(&self, id: Uuid) -> StoreFuture<'_, Option<User>> {
533        Box::pin(async move {
534            let state = self.state.read().await;
535            Ok(state.users.get(&id).cloned())
536        })
537    }
538}
539
540impl ApiKeyStore for InMemoryStore {
541    fn create_api_key(&self, req: NewApiKey) -> StoreFuture<'_, ApiKey> {
542        Box::pin(async move {
543            let mut state = self.state.write().await;
544            let now = Utc::now();
545            let id = Uuid::now_v7();
546            let key = ApiKey {
547                id,
548                user_id: req.user_id,
549                name: req.name,
550                key_hash: req.key_hash,
551                key_prefix: req.key_prefix,
552                scopes: req.scopes,
553                is_active: true,
554                expires_at: req.expires_at,
555                last_used_at: None,
556                created_at: now,
557                updated_at: now,
558            };
559            state.api_keys.insert(id, key.clone());
560            Ok(key)
561        })
562    }
563
564    fn find_api_key_by_prefix(&self, prefix: &str) -> StoreFuture<'_, Option<ApiKey>> {
565        let prefix = prefix.to_string();
566        Box::pin(async move {
567            let state = self.state.read().await;
568            Ok(state
569                .api_keys
570                .values()
571                .find(|k| k.key_prefix == prefix && k.is_active)
572                .cloned())
573        })
574    }
575
576    fn find_api_key_by_id(&self, id: Uuid) -> StoreFuture<'_, Option<ApiKey>> {
577        Box::pin(async move {
578            let state = self.state.read().await;
579            Ok(state.api_keys.get(&id).cloned())
580        })
581    }
582
583    fn list_api_keys_by_user(&self, user_id: Uuid) -> StoreFuture<'_, Vec<ApiKey>> {
584        Box::pin(async move {
585            let state = self.state.read().await;
586            let keys: Vec<ApiKey> = state
587                .api_keys
588                .values()
589                .filter(|k| k.user_id == user_id)
590                .cloned()
591                .collect();
592            Ok(keys)
593        })
594    }
595
596    fn update_api_key(&self, id: Uuid, update: ApiKeyUpdate) -> StoreFuture<'_, ()> {
597        Box::pin(async move {
598            let mut state = self.state.write().await;
599            let key = state
600                .api_keys
601                .get_mut(&id)
602                .ok_or(StoreError::Database(format!("API key {id} not found")))?;
603            if let Some(name) = update.name {
604                key.name = name;
605            }
606            if let Some(scopes) = update.scopes {
607                key.scopes = scopes;
608            }
609            if let Some(is_active) = update.is_active {
610                key.is_active = is_active;
611            }
612            if let Some(expires_at) = update.expires_at {
613                key.expires_at = expires_at;
614            }
615            key.updated_at = Utc::now();
616            Ok(())
617        })
618    }
619
620    fn touch_api_key(&self, id: Uuid) -> StoreFuture<'_, ()> {
621        Box::pin(async move {
622            let mut state = self.state.write().await;
623            if let Some(key) = state.api_keys.get_mut(&id) {
624                key.last_used_at = Some(Utc::now());
625            }
626            Ok(())
627        })
628    }
629
630    fn delete_api_key(&self, id: Uuid) -> StoreFuture<'_, ()> {
631        Box::pin(async move {
632            let mut state = self.state.write().await;
633            state.api_keys.remove(&id);
634            Ok(())
635        })
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use serde_json::json;
642    use tokio::spawn;
643
644    use super::*;
645    use crate::entities::TriggerKind;
646
647    fn new_run_req(name: &str) -> NewRun {
648        NewRun {
649            workflow_name: name.to_string(),
650            trigger: TriggerKind::Manual,
651            payload: json!({}),
652            max_retries: 3,
653        }
654    }
655
656    // ---- create_run ----
657
658    #[tokio::test]
659    async fn create_run_returns_pending_status() {
660        let store = InMemoryStore::new();
661        let run = store.create_run(new_run_req("test")).await.unwrap();
662        assert_eq!(run.status.state, RunStatus::Pending);
663        assert_eq!(run.workflow_name, "test");
664        assert_eq!(run.retry_count, 0);
665        assert_eq!(run.max_retries, 3);
666    }
667
668    #[tokio::test]
669    async fn create_run_generates_unique_ids() {
670        let store = InMemoryStore::new();
671        let r1 = store.create_run(new_run_req("a")).await.unwrap();
672        let r2 = store.create_run(new_run_req("b")).await.unwrap();
673        assert_ne!(r1.id, r2.id);
674    }
675
676    // ---- get_run ----
677
678    #[tokio::test]
679    async fn get_run_returns_created_run() {
680        let store = InMemoryStore::new();
681        let run = store.create_run(new_run_req("test")).await.unwrap();
682        let fetched = store.get_run(run.id).await.unwrap();
683        assert!(fetched.is_some());
684        assert_eq!(fetched.unwrap().id, run.id);
685    }
686
687    #[tokio::test]
688    async fn get_run_returns_none_for_missing() {
689        let store = InMemoryStore::new();
690        let fetched = store.get_run(Uuid::nil()).await.unwrap();
691        assert!(fetched.is_none());
692    }
693
694    // ---- update_run_status ----
695
696    #[tokio::test]
697    async fn update_run_status_valid_transition() {
698        let store = InMemoryStore::new();
699        let run = store.create_run(new_run_req("test")).await.unwrap();
700
701        store
702            .update_run_status(run.id, RunStatus::Running)
703            .await
704            .unwrap();
705
706        let fetched = store.get_run(run.id).await.unwrap().unwrap();
707        assert_eq!(fetched.status.state, RunStatus::Running);
708        assert!(fetched.started_at.is_some());
709    }
710
711    #[tokio::test]
712    async fn update_run_status_invalid_transition_returns_error() {
713        let store = InMemoryStore::new();
714        let run = store.create_run(new_run_req("test")).await.unwrap();
715
716        let result = store.update_run_status(run.id, RunStatus::Completed).await;
717        assert!(result.is_err());
718
719        let err = result.unwrap_err();
720        assert!(matches!(err, StoreError::InvalidTransition { .. }));
721    }
722
723    #[tokio::test]
724    async fn update_run_status_not_found() {
725        let store = InMemoryStore::new();
726        let result = store
727            .update_run_status(Uuid::nil(), RunStatus::Running)
728            .await;
729        assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
730    }
731
732    #[tokio::test]
733    async fn update_run_status_terminal_sets_completed_at() {
734        let store = InMemoryStore::new();
735        let run = store.create_run(new_run_req("test")).await.unwrap();
736
737        store
738            .update_run_status(run.id, RunStatus::Running)
739            .await
740            .unwrap();
741        store
742            .update_run_status(run.id, RunStatus::Completed)
743            .await
744            .unwrap();
745
746        let fetched = store.get_run(run.id).await.unwrap().unwrap();
747        assert_eq!(fetched.status.state, RunStatus::Completed);
748        assert!(fetched.completed_at.is_some());
749    }
750
751    // ---- list_runs ----
752
753    #[tokio::test]
754    async fn list_runs_empty_store() {
755        let store = InMemoryStore::new();
756        let page = store.list_runs(RunFilter::default(), 1, 20).await.unwrap();
757        assert_eq!(page.total, 0);
758        assert!(page.items.is_empty());
759    }
760
761    #[tokio::test]
762    async fn list_runs_with_workflow_filter() {
763        let store = InMemoryStore::new();
764        store.create_run(new_run_req("deploy")).await.unwrap();
765        store.create_run(new_run_req("test")).await.unwrap();
766        store.create_run(new_run_req("deploy")).await.unwrap();
767
768        let filter = RunFilter {
769            workflow_name: Some("deploy".to_string()),
770            ..RunFilter::default()
771        };
772        let page = store.list_runs(filter, 1, 20).await.unwrap();
773        assert_eq!(page.total, 2);
774        assert!(page.items.iter().all(|r| r.workflow_name == "deploy"));
775    }
776
777    #[tokio::test]
778    async fn list_runs_with_status_filter() {
779        let store = InMemoryStore::new();
780        let run = store.create_run(new_run_req("a")).await.unwrap();
781        store.create_run(new_run_req("b")).await.unwrap();
782
783        store
784            .update_run_status(run.id, RunStatus::Running)
785            .await
786            .unwrap();
787
788        let filter = RunFilter {
789            status: Some(RunStatus::Running),
790            ..RunFilter::default()
791        };
792        let page = store.list_runs(filter, 1, 20).await.unwrap();
793        assert_eq!(page.total, 1);
794        assert_eq!(page.items[0].id, run.id);
795    }
796
797    #[tokio::test]
798    async fn list_runs_pagination() {
799        let store = InMemoryStore::new();
800        for i in 0..5 {
801            store
802                .create_run(new_run_req(&format!("wf-{i}")))
803                .await
804                .unwrap();
805        }
806
807        let page1 = store.list_runs(RunFilter::default(), 1, 2).await.unwrap();
808        assert_eq!(page1.total, 5);
809        assert_eq!(page1.items.len(), 2);
810        assert_eq!(page1.page, 1);
811        assert_eq!(page1.per_page, 2);
812
813        let page2 = store.list_runs(RunFilter::default(), 2, 2).await.unwrap();
814        assert_eq!(page2.items.len(), 2);
815
816        let page3 = store.list_runs(RunFilter::default(), 3, 2).await.unwrap();
817        assert_eq!(page3.items.len(), 1);
818    }
819
820    // ---- pick_next_pending ----
821
822    #[tokio::test]
823    async fn pick_next_pending_empty_store() {
824        let store = InMemoryStore::new();
825        let result = store.pick_next_pending().await.unwrap();
826        assert!(result.is_none());
827    }
828
829    #[tokio::test]
830    async fn pick_next_pending_returns_oldest_and_transitions_to_running() {
831        let store = InMemoryStore::new();
832        let r1 = store.create_run(new_run_req("first")).await.unwrap();
833        let _r2 = store.create_run(new_run_req("second")).await.unwrap();
834
835        let picked = store.pick_next_pending().await.unwrap().unwrap();
836        assert_eq!(picked.id, r1.id);
837        assert_eq!(picked.status.state, RunStatus::Running);
838        assert!(picked.started_at.is_some());
839
840        // Verify it's Running in the store too.
841        let fetched = store.get_run(r1.id).await.unwrap().unwrap();
842        assert_eq!(fetched.status.state, RunStatus::Running);
843    }
844
845    #[tokio::test]
846    async fn pick_next_pending_skips_non_pending() {
847        let store = InMemoryStore::new();
848        let r1 = store.create_run(new_run_req("a")).await.unwrap();
849        let r2 = store.create_run(new_run_req("b")).await.unwrap();
850
851        // Transition r1 to Running.
852        store
853            .update_run_status(r1.id, RunStatus::Running)
854            .await
855            .unwrap();
856
857        let picked = store.pick_next_pending().await.unwrap().unwrap();
858        assert_eq!(picked.id, r2.id);
859    }
860
861    // ---- create_step ----
862
863    #[tokio::test]
864    async fn create_step_returns_pending() {
865        let store = InMemoryStore::new();
866        let run = store.create_run(new_run_req("test")).await.unwrap();
867
868        let step = store
869            .create_step(NewStep {
870                run_id: run.id,
871                name: "build".to_string(),
872                kind: crate::entities::StepKind::Shell,
873                position: 0,
874                input: Some(json!({"command": "cargo build"})),
875            })
876            .await
877            .unwrap();
878
879        assert_eq!(step.status.state, StepStatus::Pending);
880        assert_eq!(step.name, "build");
881        assert_eq!(step.run_id, run.id);
882        assert_eq!(step.position, 0);
883    }
884
885    #[tokio::test]
886    async fn create_step_for_missing_run_returns_error() {
887        let store = InMemoryStore::new();
888        let result = store
889            .create_step(NewStep {
890                run_id: Uuid::nil(),
891                name: "build".to_string(),
892                kind: crate::entities::StepKind::Shell,
893                position: 0,
894                input: None,
895            })
896            .await;
897        assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
898    }
899
900    // ---- update_step ----
901
902    #[tokio::test]
903    async fn update_step_applies_partial_update() {
904        let store = InMemoryStore::new();
905        let run = store.create_run(new_run_req("test")).await.unwrap();
906
907        let step = store
908            .create_step(NewStep {
909                run_id: run.id,
910                name: "build".to_string(),
911                kind: crate::entities::StepKind::Shell,
912                position: 0,
913                input: None,
914            })
915            .await
916            .unwrap();
917
918        // Transition Pending → Running first
919        store
920            .update_step(
921                step.id,
922                StepUpdate {
923                    status: Some(StepStatus::Running),
924                    ..StepUpdate::default()
925                },
926            )
927            .await
928            .unwrap();
929
930        // Then Running → Completed
931        store
932            .update_step(
933                step.id,
934                StepUpdate {
935                    status: Some(StepStatus::Completed),
936                    output: Some(json!({"stdout": "ok"})),
937                    duration_ms: Some(150),
938                    ..StepUpdate::default()
939                },
940            )
941            .await
942            .unwrap();
943
944        let steps = store.list_steps(run.id).await.unwrap();
945        assert_eq!(steps.len(), 1);
946        assert_eq!(steps[0].status.state, StepStatus::Completed);
947        assert_eq!(steps[0].duration_ms, 150);
948        assert!(steps[0].output.is_some());
949    }
950
951    #[tokio::test]
952    async fn update_step_not_found() {
953        let store = InMemoryStore::new();
954        let result = store.update_step(Uuid::nil(), StepUpdate::default()).await;
955        assert!(matches!(result.unwrap_err(), StoreError::StepNotFound(_)));
956    }
957
958    // ---- list_steps ----
959
960    #[tokio::test]
961    async fn list_steps_ordered_by_position() {
962        let store = InMemoryStore::new();
963        let run = store.create_run(new_run_req("test")).await.unwrap();
964
965        // Insert out of order.
966        store
967            .create_step(NewStep {
968                run_id: run.id,
969                name: "deploy".to_string(),
970                kind: crate::entities::StepKind::Shell,
971                position: 2,
972                input: None,
973            })
974            .await
975            .unwrap();
976        store
977            .create_step(NewStep {
978                run_id: run.id,
979                name: "build".to_string(),
980                kind: crate::entities::StepKind::Shell,
981                position: 0,
982                input: None,
983            })
984            .await
985            .unwrap();
986        store
987            .create_step(NewStep {
988                run_id: run.id,
989                name: "test".to_string(),
990                kind: crate::entities::StepKind::Shell,
991                position: 1,
992                input: None,
993            })
994            .await
995            .unwrap();
996
997        let steps = store.list_steps(run.id).await.unwrap();
998        assert_eq!(steps.len(), 3);
999        assert_eq!(steps[0].name, "build");
1000        assert_eq!(steps[1].name, "test");
1001        assert_eq!(steps[2].name, "deploy");
1002    }
1003
1004    #[tokio::test]
1005    async fn list_steps_empty_for_run_without_steps() {
1006        let store = InMemoryStore::new();
1007        let run = store.create_run(new_run_req("test")).await.unwrap();
1008        let steps = store.list_steps(run.id).await.unwrap();
1009        assert!(steps.is_empty());
1010    }
1011
1012    // ---- update_run ----
1013
1014    #[tokio::test]
1015    async fn update_run_applies_cost_and_duration() {
1016        let store = InMemoryStore::new();
1017        let run = store.create_run(new_run_req("test")).await.unwrap();
1018
1019        store
1020            .update_run(
1021                run.id,
1022                RunUpdate {
1023                    cost_usd: Some(Decimal::new(123, 2)),
1024                    duration_ms: Some(5000),
1025                    ..RunUpdate::default()
1026                },
1027            )
1028            .await
1029            .unwrap();
1030
1031        let fetched = store.get_run(run.id).await.unwrap().unwrap();
1032        assert_eq!(fetched.cost_usd, Decimal::new(123, 2));
1033        assert_eq!(fetched.duration_ms, 5000);
1034    }
1035
1036    #[tokio::test]
1037    async fn update_run_increment_retry() {
1038        let store = InMemoryStore::new();
1039        let run = store.create_run(new_run_req("test")).await.unwrap();
1040        assert_eq!(run.retry_count, 0);
1041
1042        store
1043            .update_run(
1044                run.id,
1045                RunUpdate {
1046                    increment_retry: true,
1047                    ..RunUpdate::default()
1048                },
1049            )
1050            .await
1051            .unwrap();
1052
1053        let fetched = store.get_run(run.id).await.unwrap().unwrap();
1054        assert_eq!(fetched.retry_count, 1);
1055    }
1056
1057    #[tokio::test]
1058    async fn update_run_not_found() {
1059        let store = InMemoryStore::new();
1060        let result = store.update_run(Uuid::nil(), RunUpdate::default()).await;
1061        assert!(matches!(result.unwrap_err(), StoreError::RunNotFound(_)));
1062    }
1063
1064    // ---- concurrent access ----
1065
1066    #[tokio::test]
1067    async fn concurrent_pick_next_pending_no_double_pick() {
1068        let store = InMemoryStore::new();
1069
1070        // Create 10 pending runs.
1071        for i in 0..10 {
1072            store
1073                .create_run(new_run_req(&format!("wf-{i}")))
1074                .await
1075                .unwrap();
1076        }
1077
1078        // Concurrently pick from multiple tasks.
1079        let mut handles = Vec::new();
1080        for _ in 0..10 {
1081            let s = store.clone();
1082            handles.push(spawn(async move { s.pick_next_pending().await }));
1083        }
1084
1085        let mut picked_ids = Vec::new();
1086        for h in handles {
1087            if let Ok(Ok(Some(run))) = h.await {
1088                picked_ids.push(run.id);
1089            }
1090        }
1091
1092        // Each run should be picked at most once.
1093        let unique: std::collections::HashSet<_> = picked_ids.iter().collect();
1094        assert_eq!(unique.len(), picked_ids.len());
1095    }
1096
1097    // ---- get_stats ----
1098
1099    #[tokio::test]
1100    async fn get_stats_empty_store() {
1101        let store = InMemoryStore::new();
1102        let stats = store.get_stats().await.unwrap();
1103        assert_eq!(stats.total_runs, 0);
1104        assert_eq!(stats.completed_runs, 0);
1105        assert_eq!(stats.failed_runs, 0);
1106        assert_eq!(stats.cancelled_runs, 0);
1107        assert_eq!(stats.active_runs, 0);
1108        assert_eq!(stats.total_cost_usd, Decimal::ZERO);
1109        assert_eq!(stats.total_duration_ms, 0);
1110    }
1111
1112    #[tokio::test]
1113    async fn get_stats_aggregates_counts_and_totals() {
1114        let store = InMemoryStore::new();
1115
1116        // Create runs in various states.
1117        let r1 = store.create_run(new_run_req("wf1")).await.unwrap();
1118        let r2 = store.create_run(new_run_req("wf2")).await.unwrap();
1119        let r3 = store.create_run(new_run_req("wf3")).await.unwrap();
1120        let _r4 = store.create_run(new_run_req("wf4")).await.unwrap();
1121
1122        // Transition r1 to Running, then Completed.
1123        store
1124            .update_run_status(r1.id, RunStatus::Running)
1125            .await
1126            .unwrap();
1127        store
1128            .update_run_status(r1.id, RunStatus::Completed)
1129            .await
1130            .unwrap();
1131
1132        // Transition r2 to Running, then Failed.
1133        store
1134            .update_run_status(r2.id, RunStatus::Running)
1135            .await
1136            .unwrap();
1137        store
1138            .update_run_status(r2.id, RunStatus::Failed)
1139            .await
1140            .unwrap();
1141
1142        // Transition r3 to Cancelled.
1143        store
1144            .update_run_status(r3.id, RunStatus::Cancelled)
1145            .await
1146            .unwrap();
1147
1148        // r4 remains Pending (active).
1149
1150        // Update cost and duration.
1151        store
1152            .update_run(
1153                r1.id,
1154                RunUpdate {
1155                    cost_usd: Some(Decimal::new(1000, 2)),
1156                    duration_ms: Some(1000),
1157                    ..RunUpdate::default()
1158                },
1159            )
1160            .await
1161            .unwrap();
1162
1163        store
1164            .update_run(
1165                r2.id,
1166                RunUpdate {
1167                    cost_usd: Some(Decimal::new(500, 2)),
1168                    duration_ms: Some(500),
1169                    ..RunUpdate::default()
1170                },
1171            )
1172            .await
1173            .unwrap();
1174
1175        let stats = store.get_stats().await.unwrap();
1176        assert_eq!(stats.total_runs, 4);
1177        assert_eq!(stats.completed_runs, 1);
1178        assert_eq!(stats.failed_runs, 1);
1179        assert_eq!(stats.cancelled_runs, 1);
1180        assert_eq!(stats.active_runs, 1); // r4 is Pending
1181        assert_eq!(stats.total_cost_usd, Decimal::new(1500, 2));
1182        assert_eq!(stats.total_duration_ms, 1500);
1183    }
1184
1185    // ── UserStore ───────────────────────────────────────────────────
1186
1187    fn new_user(email: &str, username: &str) -> NewUser {
1188        NewUser {
1189            email: email.to_string(),
1190            username: username.to_string(),
1191            password_hash: "argon2hash".to_string(),
1192        }
1193    }
1194
1195    #[tokio::test]
1196    async fn create_user_returns_user() {
1197        let store = InMemoryStore::new();
1198        let user = store
1199            .create_user(new_user("alice@example.com", "alice"))
1200            .await
1201            .unwrap();
1202
1203        assert_eq!(user.email, "alice@example.com");
1204        assert_eq!(user.username, "alice");
1205        assert_eq!(user.password_hash, "argon2hash");
1206        assert!(!user.is_admin);
1207    }
1208
1209    #[tokio::test]
1210    async fn create_user_duplicate_email_returns_error() {
1211        let store = InMemoryStore::new();
1212        store
1213            .create_user(new_user("alice@example.com", "alice"))
1214            .await
1215            .unwrap();
1216
1217        let err = store
1218            .create_user(new_user("alice@example.com", "bob"))
1219            .await
1220            .unwrap_err();
1221
1222        assert!(
1223            matches!(err, StoreError::DuplicateEmail(ref e) if e == "alice@example.com"),
1224            "expected DuplicateEmail, got: {err}"
1225        );
1226    }
1227
1228    #[tokio::test]
1229    async fn create_user_duplicate_username_returns_error() {
1230        let store = InMemoryStore::new();
1231        store
1232            .create_user(new_user("alice@example.com", "alice"))
1233            .await
1234            .unwrap();
1235
1236        let err = store
1237            .create_user(new_user("bob@example.com", "alice"))
1238            .await
1239            .unwrap_err();
1240
1241        assert!(
1242            matches!(err, StoreError::DuplicateUsername(ref u) if u == "alice"),
1243            "expected DuplicateUsername, got: {err}"
1244        );
1245    }
1246
1247    #[tokio::test]
1248    async fn find_user_by_email_existing() {
1249        let store = InMemoryStore::new();
1250        let created = store
1251            .create_user(new_user("alice@example.com", "alice"))
1252            .await
1253            .unwrap();
1254
1255        let found = store
1256            .find_user_by_email("alice@example.com")
1257            .await
1258            .unwrap()
1259            .expect("user should exist");
1260
1261        assert_eq!(found.id, created.id);
1262        assert_eq!(found.email, "alice@example.com");
1263    }
1264
1265    #[tokio::test]
1266    async fn find_user_by_email_missing_returns_none() {
1267        let store = InMemoryStore::new();
1268        let found = store
1269            .find_user_by_email("nobody@example.com")
1270            .await
1271            .unwrap();
1272
1273        assert!(found.is_none());
1274    }
1275
1276    #[tokio::test]
1277    async fn find_user_by_id_existing() {
1278        let store = InMemoryStore::new();
1279        let created = store
1280            .create_user(new_user("alice@example.com", "alice"))
1281            .await
1282            .unwrap();
1283
1284        let found = store
1285            .find_user_by_id(created.id)
1286            .await
1287            .unwrap()
1288            .expect("user should exist");
1289
1290        assert_eq!(found.email, "alice@example.com");
1291        assert_eq!(found.username, "alice");
1292    }
1293
1294    #[tokio::test]
1295    async fn find_user_by_id_missing_returns_none() {
1296        let store = InMemoryStore::new();
1297        let found = store.find_user_by_id(Uuid::now_v7()).await.unwrap();
1298        assert!(found.is_none());
1299    }
1300
1301    // ─── Additional Edge Cases ──────────────────────────────────
1302
1303    #[tokio::test]
1304    async fn update_run_status_running_to_retrying() {
1305        let store = InMemoryStore::new();
1306        let run = store.create_run(new_run_req("test")).await.unwrap();
1307
1308        store
1309            .update_run_status(run.id, RunStatus::Running)
1310            .await
1311            .unwrap();
1312
1313        store
1314            .update_run_status(run.id, RunStatus::Retrying)
1315            .await
1316            .unwrap();
1317
1318        let fetched = store.get_run(run.id).await.unwrap().unwrap();
1319        assert_eq!(fetched.status.state, RunStatus::Retrying);
1320        assert!(!fetched.status.state.is_terminal());
1321        assert!(fetched.completed_at.is_none()); // Not a terminal state
1322    }
1323
1324    #[tokio::test]
1325    async fn update_run_status_retrying_to_running_allowed() {
1326        let store = InMemoryStore::new();
1327        let run = store.create_run(new_run_req("test")).await.unwrap();
1328
1329        store
1330            .update_run_status(run.id, RunStatus::Running)
1331            .await
1332            .unwrap();
1333        store
1334            .update_run_status(run.id, RunStatus::Retrying)
1335            .await
1336            .unwrap();
1337
1338        // Retrying → Running should be allowed
1339        store
1340            .update_run_status(run.id, RunStatus::Running)
1341            .await
1342            .unwrap();
1343
1344        let fetched = store.get_run(run.id).await.unwrap().unwrap();
1345        assert_eq!(fetched.status.state, RunStatus::Running);
1346    }
1347
1348    #[tokio::test]
1349    async fn update_run_with_invalid_status_transition_errors() {
1350        let store = InMemoryStore::new();
1351        let run = store.create_run(new_run_req("test")).await.unwrap();
1352
1353        // Try to apply invalid status transition via update_run
1354        let result = store
1355            .update_run(
1356                run.id,
1357                RunUpdate {
1358                    status: Some(RunStatus::Completed), // invalid from Pending
1359                    ..RunUpdate::default()
1360                },
1361            )
1362            .await;
1363
1364        assert!(result.is_err());
1365    }
1366
1367    #[tokio::test]
1368    async fn create_step_with_complex_input() {
1369        let store = InMemoryStore::new();
1370        let run = store.create_run(new_run_req("test")).await.unwrap();
1371
1372        let complex_input = json!({
1373            "command": "cargo build",
1374            "env": {
1375                "RUST_LOG": "debug",
1376                "CUSTOM": "value"
1377            },
1378            "timeout": 60,
1379            "retry_policy": {
1380                "max_attempts": 3,
1381                "backoff": "exponential"
1382            }
1383        });
1384
1385        let step = store
1386            .create_step(NewStep {
1387                run_id: run.id,
1388                name: "build".to_string(),
1389                kind: crate::entities::StepKind::Agent,
1390                position: 0,
1391                input: Some(complex_input.clone()),
1392            })
1393            .await
1394            .unwrap();
1395
1396        assert_eq!(step.input, Some(complex_input));
1397    }
1398
1399    #[tokio::test]
1400    async fn update_step_with_error_message() {
1401        let store = InMemoryStore::new();
1402        let run = store.create_run(new_run_req("test")).await.unwrap();
1403
1404        let step = store
1405            .create_step(NewStep {
1406                run_id: run.id,
1407                name: "build".to_string(),
1408                kind: crate::entities::StepKind::Shell,
1409                position: 0,
1410                input: None,
1411            })
1412            .await
1413            .unwrap();
1414
1415        store
1416            .update_step(
1417                step.id,
1418                StepUpdate {
1419                    status: Some(StepStatus::Running),
1420                    ..StepUpdate::default()
1421                },
1422            )
1423            .await
1424            .unwrap();
1425
1426        store
1427            .update_step(
1428                step.id,
1429                StepUpdate {
1430                    status: Some(StepStatus::Failed),
1431                    error: Some("Connection timeout after 30s".to_string()),
1432                    duration_ms: Some(30000),
1433                    ..StepUpdate::default()
1434                },
1435            )
1436            .await
1437            .unwrap();
1438
1439        let steps = store.list_steps(run.id).await.unwrap();
1440        assert_eq!(steps[0].status.state, StepStatus::Failed);
1441        assert_eq!(
1442            steps[0].error,
1443            Some("Connection timeout after 30s".to_string())
1444        );
1445        assert_eq!(steps[0].duration_ms, 30000);
1446    }
1447
1448    #[tokio::test]
1449    async fn list_steps_for_nonexistent_run_returns_empty() {
1450        let store = InMemoryStore::new();
1451        let steps = store.list_steps(Uuid::nil()).await.unwrap();
1452        assert!(steps.is_empty());
1453    }
1454
1455    #[tokio::test]
1456    async fn update_step_pending_to_skipped() {
1457        let store = InMemoryStore::new();
1458        let run = store.create_run(new_run_req("test")).await.unwrap();
1459
1460        let step = store
1461            .create_step(NewStep {
1462                run_id: run.id,
1463                name: "build".to_string(),
1464                kind: crate::entities::StepKind::Shell,
1465                position: 0,
1466                input: None,
1467            })
1468            .await
1469            .unwrap();
1470
1471        // Pending → Skipped is allowed (when prior step failed)
1472        store
1473            .update_step(
1474                step.id,
1475                StepUpdate {
1476                    status: Some(StepStatus::Skipped),
1477                    ..StepUpdate::default()
1478                },
1479            )
1480            .await
1481            .unwrap();
1482
1483        let steps = store.list_steps(run.id).await.unwrap();
1484        assert_eq!(steps[0].status.state, StepStatus::Skipped);
1485    }
1486
1487    #[tokio::test]
1488    async fn list_runs_with_combined_filters() {
1489        let store = InMemoryStore::new();
1490
1491        let r1 = store.create_run(new_run_req("deploy")).await.unwrap();
1492        let r2 = store.create_run(new_run_req("deploy")).await.unwrap();
1493        let _r3 = store.create_run(new_run_req("test")).await.unwrap();
1494
1495        // r1: Pending → Running → Completed
1496        store
1497            .update_run_status(r1.id, RunStatus::Running)
1498            .await
1499            .unwrap();
1500        store
1501            .update_run_status(r1.id, RunStatus::Completed)
1502            .await
1503            .unwrap();
1504
1505        // r2: Pending → Running
1506        store
1507            .update_run_status(r2.id, RunStatus::Running)
1508            .await
1509            .unwrap();
1510
1511        // Filter by workflow AND status
1512        let filter = RunFilter {
1513            workflow_name: Some("deploy".to_string()),
1514            status: Some(RunStatus::Running),
1515            ..RunFilter::default()
1516        };
1517
1518        let page = store.list_runs(filter, 1, 100).await.unwrap();
1519        assert_eq!(page.total, 1);
1520        assert_eq!(page.items[0].id, r2.id);
1521    }
1522
1523    #[tokio::test]
1524    async fn list_runs_workflow_filter_is_case_insensitive_partial_match() {
1525        let store = InMemoryStore::new();
1526        store
1527            .create_run(new_run_req("weather-report"))
1528            .await
1529            .unwrap();
1530        store.create_run(new_run_req("deploy-prod")).await.unwrap();
1531
1532        // Partial match
1533        let filter = RunFilter {
1534            workflow_name: Some("weather".to_string()),
1535            ..RunFilter::default()
1536        };
1537        let page = store.list_runs(filter, 1, 100).await.unwrap();
1538        assert_eq!(page.total, 1);
1539        assert_eq!(page.items[0].workflow_name, "weather-report");
1540
1541        // Case-insensitive
1542        let filter = RunFilter {
1543            workflow_name: Some("Weather-REPORT".to_string()),
1544            ..RunFilter::default()
1545        };
1546        let page = store.list_runs(filter, 1, 100).await.unwrap();
1547        assert_eq!(page.total, 1);
1548        assert_eq!(page.items[0].workflow_name, "weather-report");
1549
1550        // Suffix match
1551        let filter = RunFilter {
1552            workflow_name: Some("report".to_string()),
1553            ..RunFilter::default()
1554        };
1555        let page = store.list_runs(filter, 1, 100).await.unwrap();
1556        assert_eq!(page.total, 1);
1557        assert_eq!(page.items[0].workflow_name, "weather-report");
1558
1559        // No match
1560        let filter = RunFilter {
1561            workflow_name: Some("build".to_string()),
1562            ..RunFilter::default()
1563        };
1564        let page = store.list_runs(filter, 1, 100).await.unwrap();
1565        assert_eq!(page.total, 0);
1566    }
1567
1568    #[tokio::test]
1569    async fn get_stats_with_mixed_active_statuses() {
1570        let store = InMemoryStore::new();
1571
1572        let _r1 = store.create_run(new_run_req("wf")).await.unwrap(); // Pending
1573        let r2 = store.create_run(new_run_req("wf")).await.unwrap();
1574        let r3 = store.create_run(new_run_req("wf")).await.unwrap();
1575
1576        store
1577            .update_run_status(r2.id, RunStatus::Running)
1578            .await
1579            .unwrap();
1580        store
1581            .update_run_status(r3.id, RunStatus::Running)
1582            .await
1583            .unwrap();
1584        store
1585            .update_run_status(r3.id, RunStatus::Retrying)
1586            .await
1587            .unwrap();
1588
1589        let stats = store.get_stats().await.unwrap();
1590        assert_eq!(stats.active_runs, 3); // _r1 (Pending), r2 (Running), r3 (Retrying)
1591    }
1592
1593    #[tokio::test]
1594    async fn run_with_different_trigger_kinds() {
1595        let store = InMemoryStore::new();
1596
1597        let r1 = store
1598            .create_run(NewRun {
1599                workflow_name: "test".to_string(),
1600                trigger: TriggerKind::Manual,
1601                payload: json!({}),
1602                max_retries: 1,
1603            })
1604            .await
1605            .unwrap();
1606
1607        let r2 = store
1608            .create_run(NewRun {
1609                workflow_name: "test".to_string(),
1610                trigger: TriggerKind::Webhook {
1611                    path: "/hooks/github".to_string(),
1612                },
1613                payload: json!({}),
1614                max_retries: 1,
1615            })
1616            .await
1617            .unwrap();
1618
1619        let r3 = store
1620            .create_run(NewRun {
1621                workflow_name: "test".to_string(),
1622                trigger: TriggerKind::Cron {
1623                    schedule: "0 0 * * *".to_string(),
1624                },
1625                payload: json!({}),
1626                max_retries: 1,
1627            })
1628            .await
1629            .unwrap();
1630
1631        let r4 = store
1632            .create_run(NewRun {
1633                workflow_name: "test".to_string(),
1634                trigger: TriggerKind::Api,
1635                payload: json!({}),
1636                max_retries: 1,
1637            })
1638            .await
1639            .unwrap();
1640
1641        let r5 = store
1642            .create_run(NewRun {
1643                workflow_name: "test".to_string(),
1644                trigger: TriggerKind::Retry {
1645                    parent_run_id: Uuid::nil(),
1646                },
1647                payload: json!({}),
1648                max_retries: 1,
1649            })
1650            .await
1651            .unwrap();
1652
1653        assert_eq!(r1.trigger, TriggerKind::Manual);
1654        assert!(matches!(r2.trigger, TriggerKind::Webhook { .. }));
1655        assert!(matches!(r3.trigger, TriggerKind::Cron { .. }));
1656        assert_eq!(r4.trigger, TriggerKind::Api);
1657        assert!(matches!(r5.trigger, TriggerKind::Retry { .. }));
1658    }
1659
1660    // ---- update_run_returning ----
1661
1662    #[tokio::test]
1663    async fn update_run_returning_applies_and_returns() {
1664        let store = InMemoryStore::new();
1665        let run = store.create_run(new_run_req("test")).await.unwrap();
1666
1667        // Transition Pending -> Running first
1668        store
1669            .update_run_status(run.id, RunStatus::Running)
1670            .await
1671            .unwrap();
1672
1673        let updated = store
1674            .update_run_returning(
1675                run.id,
1676                RunUpdate {
1677                    status: Some(RunStatus::Completed),
1678                    cost_usd: Some(Decimal::new(4200, 2)),
1679                    duration_ms: Some(1500),
1680                    ..RunUpdate::default()
1681                },
1682            )
1683            .await
1684            .unwrap();
1685
1686        assert_eq!(updated.id, run.id);
1687        assert_eq!(updated.status.state, RunStatus::Completed);
1688        assert_eq!(updated.cost_usd, Decimal::new(4200, 2));
1689        assert_eq!(updated.duration_ms, 1500);
1690        assert!(updated.completed_at.is_some());
1691    }
1692
1693    #[tokio::test]
1694    async fn update_run_returning_not_found() {
1695        let store = InMemoryStore::new();
1696        let result = store
1697            .update_run_returning(
1698                Uuid::nil(),
1699                RunUpdate {
1700                    status: Some(RunStatus::Running),
1701                    ..RunUpdate::default()
1702                },
1703            )
1704            .await;
1705
1706        assert!(matches!(result, Err(StoreError::RunNotFound(_))));
1707    }
1708
1709    #[tokio::test]
1710    async fn update_run_returning_invalid_transition() {
1711        let store = InMemoryStore::new();
1712        let run = store.create_run(new_run_req("test")).await.unwrap();
1713
1714        let result = store
1715            .update_run_returning(
1716                run.id,
1717                RunUpdate {
1718                    status: Some(RunStatus::Completed),
1719                    ..RunUpdate::default()
1720                },
1721            )
1722            .await;
1723
1724        assert!(matches!(result, Err(StoreError::InvalidTransition { .. })));
1725    }
1726}