Skip to main content

meerkat_mob/store/
in_memory.rs

1//! In-memory store implementations.
2
3use super::realm_profile::{RealmProfileStore, StoredRealmProfile};
4use super::{MobEventStore, MobRunStore, MobSpecStore, MobStoreError};
5use crate::definition::MobDefinition;
6use crate::event::{MobEvent, NewMobEvent};
7use crate::ids::{FlowId, FrameId, LoopId, LoopInstanceId, MobId, RunId, StepId};
8use crate::profile::Profile;
9use crate::run::{
10    FailureLedgerEntry, FrameSnapshot, LoopIterationLedgerEntry, LoopSnapshot, MobRun,
11    MobRunStatus, StepLedgerEntry,
12};
13#[cfg(target_arch = "wasm32")]
14use crate::tokio;
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use indexmap::IndexMap;
18use meerkat_machine_kernels::KernelState;
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23/// In-memory event store for tests and ephemeral mobs.
24#[derive(Debug, Default)]
25pub struct InMemoryMobEventStore {
26    events: Arc<RwLock<Vec<MobEvent>>>,
27}
28
29impl InMemoryMobEventStore {
30    pub fn new() -> Self {
31        Self::default()
32    }
33}
34
35#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
36#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
37impl MobEventStore for InMemoryMobEventStore {
38    async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError> {
39        let mut events = self.events.write().await;
40        let cursor = events.last().map_or(1, |existing| existing.cursor + 1);
41        let stored = MobEvent {
42            cursor,
43            timestamp: event.timestamp.unwrap_or_else(Utc::now),
44            mob_id: event.mob_id,
45            kind: event.kind,
46        };
47        events.push(stored.clone());
48        Ok(stored)
49    }
50
51    async fn append_batch(&self, batch: Vec<NewMobEvent>) -> Result<Vec<MobEvent>, MobStoreError> {
52        let mut events = self.events.write().await;
53        let mut results = Vec::with_capacity(batch.len());
54        for event in batch {
55            let cursor = events.last().map_or(1, |existing| existing.cursor + 1);
56            let stored = MobEvent {
57                cursor,
58                timestamp: event.timestamp.unwrap_or_else(Utc::now),
59                mob_id: event.mob_id,
60                kind: event.kind,
61            };
62            events.push(stored.clone());
63            results.push(stored);
64        }
65        Ok(results)
66    }
67
68    async fn poll(&self, after_cursor: u64, limit: usize) -> Result<Vec<MobEvent>, MobStoreError> {
69        let events = self.events.read().await;
70        Ok(events
71            .iter()
72            .filter(|event| event.cursor > after_cursor)
73            .take(limit)
74            .cloned()
75            .collect())
76    }
77
78    async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError> {
79        Ok(self.events.read().await.clone())
80    }
81
82    async fn clear(&self) -> Result<(), MobStoreError> {
83        self.events.write().await.clear();
84        Ok(())
85    }
86
87    async fn prune(&self, older_than: DateTime<Utc>) -> Result<u64, MobStoreError> {
88        let mut events = self.events.write().await;
89        let before = events.len();
90        events.retain(|event| event.timestamp >= older_than);
91        Ok((before - events.len()) as u64)
92    }
93}
94
95/// In-memory run store.
96#[derive(Debug, Default)]
97pub struct InMemoryMobRunStore {
98    runs: Arc<RwLock<IndexMap<RunId, MobRun>>>,
99}
100
101impl InMemoryMobRunStore {
102    pub fn new() -> Self {
103        Self::default()
104    }
105}
106
107#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
108#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
109impl MobRunStore for InMemoryMobRunStore {
110    async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError> {
111        let mut runs = self.runs.write().await;
112        if runs.contains_key(&run.run_id) {
113            return Err(MobStoreError::Internal(format!(
114                "run already exists: {}",
115                run.run_id
116            )));
117        }
118        runs.insert(run.run_id.clone(), run);
119        Ok(())
120    }
121
122    async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError> {
123        Ok(self.runs.read().await.get(run_id).cloned())
124    }
125
126    async fn list_runs(
127        &self,
128        mob_id: &MobId,
129        flow_id: Option<&FlowId>,
130    ) -> Result<Vec<MobRun>, MobStoreError> {
131        Ok(self
132            .runs
133            .read()
134            .await
135            .values()
136            .filter(|run| {
137                run.mob_id == *mob_id
138                    && flow_id.is_none_or(|expected_flow_id| run.flow_id == *expected_flow_id)
139            })
140            .cloned()
141            .collect())
142    }
143
144    async fn cas_run_status(
145        &self,
146        run_id: &RunId,
147        expected: MobRunStatus,
148        next: MobRunStatus,
149    ) -> Result<bool, MobStoreError> {
150        let mut runs = self.runs.write().await;
151        let Some(run) = runs.get_mut(run_id) else {
152            return Ok(false);
153        };
154        if run.status != expected || run.status.is_terminal() {
155            return Ok(false);
156        }
157        let terminal = next.is_terminal();
158        run.status = next;
159        if terminal && run.completed_at.is_none() {
160            run.completed_at = Some(Utc::now());
161        }
162        Ok(true)
163    }
164
165    async fn cas_flow_state(
166        &self,
167        run_id: &RunId,
168        expected: &KernelState,
169        next: &KernelState,
170    ) -> Result<bool, MobStoreError> {
171        let mut runs = self.runs.write().await;
172        let Some(run) = runs.get_mut(run_id) else {
173            return Ok(false);
174        };
175        if &run.flow_state != expected {
176            return Ok(false);
177        }
178        run.flow_state = next.clone();
179        Ok(true)
180    }
181
182    async fn cas_run_snapshot(
183        &self,
184        run_id: &RunId,
185        expected_status: MobRunStatus,
186        expected_flow_state: &KernelState,
187        next_status: MobRunStatus,
188        next_flow_state: &KernelState,
189    ) -> Result<bool, MobStoreError> {
190        let mut runs = self.runs.write().await;
191        let Some(run) = runs.get_mut(run_id) else {
192            return Ok(false);
193        };
194        if run.status != expected_status
195            || run.status.is_terminal()
196            || &run.flow_state != expected_flow_state
197        {
198            return Ok(false);
199        }
200        let terminal = next_status.is_terminal();
201        run.status = next_status;
202        run.flow_state = next_flow_state.clone();
203        if terminal && run.completed_at.is_none() {
204            run.completed_at = Some(Utc::now());
205        }
206        Ok(true)
207    }
208
209    async fn append_step_entry(
210        &self,
211        run_id: &RunId,
212        entry: StepLedgerEntry,
213    ) -> Result<(), MobStoreError> {
214        let mut runs = self.runs.write().await;
215        let run = runs
216            .get_mut(run_id)
217            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
218        run.step_ledger.push(entry);
219        Ok(())
220    }
221
222    async fn append_step_entry_if_absent(
223        &self,
224        run_id: &RunId,
225        entry: StepLedgerEntry,
226    ) -> Result<bool, MobStoreError> {
227        let mut runs = self.runs.write().await;
228        let run = runs
229            .get_mut(run_id)
230            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
231        let is_duplicate = run.step_ledger.iter().any(|existing| {
232            existing.step_id == entry.step_id
233                && existing.meerkat_id == entry.meerkat_id
234                && existing.status == entry.status
235        });
236        if is_duplicate {
237            return Ok(false);
238        }
239        run.step_ledger.push(entry);
240        Ok(true)
241    }
242
243    async fn put_step_output(
244        &self,
245        run_id: &RunId,
246        step_id: &StepId,
247        output: serde_json::Value,
248    ) -> Result<(), MobStoreError> {
249        let mut runs = self.runs.write().await;
250        let run = runs
251            .get_mut(run_id)
252            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
253        if let Some(entry) = run
254            .step_ledger
255            .iter_mut()
256            .rev()
257            .find(|entry| &entry.step_id == step_id)
258        {
259            entry.output = Some(output);
260            return Ok(());
261        }
262        Err(MobStoreError::Internal(format!(
263            "cannot set output for unknown step '{step_id}' in run '{run_id}'"
264        )))
265    }
266
267    async fn append_failure_entry(
268        &self,
269        run_id: &RunId,
270        entry: FailureLedgerEntry,
271    ) -> Result<(), MobStoreError> {
272        let mut runs = self.runs.write().await;
273        let run = runs
274            .get_mut(run_id)
275            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
276        run.failure_ledger.push(entry);
277        Ok(())
278    }
279
280    async fn upsert_loop_snapshot(
281        &self,
282        run_id: &RunId,
283        loop_instance_id: &LoopInstanceId,
284        snapshot: LoopSnapshot,
285        ledger_entry: Option<LoopIterationLedgerEntry>,
286    ) -> Result<(), MobStoreError> {
287        let mut runs = self.runs.write().await;
288        let run = runs
289            .get_mut(run_id)
290            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
291        run.loops.insert(loop_instance_id.clone(), snapshot);
292        if let Some(entry) = ledger_entry
293            && !run.loop_iteration_ledger.iter().any(|existing| {
294                existing.loop_instance_id == entry.loop_instance_id
295                    && existing.iteration == entry.iteration
296                    && existing.frame_id == entry.frame_id
297            })
298        {
299            run.loop_iteration_ledger.push(entry);
300        }
301        Ok(())
302    }
303
304    async fn cas_frame_state(
305        &self,
306        run_id: &RunId,
307        frame_id: &FrameId,
308        expected: Option<&FrameSnapshot>,
309        next: FrameSnapshot,
310    ) -> Result<bool, MobStoreError> {
311        let mut runs = self.runs.write().await;
312        let run = runs
313            .get_mut(run_id)
314            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
315        let current = run.frames.get(frame_id);
316        match (expected, current) {
317            (None, None) => {
318                // Insert new frame
319                run.frames.insert(frame_id.clone(), next);
320                Ok(true)
321            }
322            (Some(exp), Some(cur)) if exp == cur => {
323                run.frames.insert(frame_id.clone(), next);
324                Ok(true)
325            }
326            _ => Ok(false),
327        }
328    }
329
330    async fn cas_grant_node_slot(
331        &self,
332        run_id: &RunId,
333        expected_run_state: &KernelState,
334        next_run_state: KernelState,
335        frame_id: &FrameId,
336        expected_frame: &FrameSnapshot,
337        next_frame: FrameSnapshot,
338    ) -> Result<bool, MobStoreError> {
339        let mut runs = self.runs.write().await;
340        let run = runs
341            .get_mut(run_id)
342            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
343        // Check all expectations atomically
344        if &run.flow_state != expected_run_state {
345            return Ok(false);
346        }
347        if run.frames.get(frame_id) != Some(expected_frame) {
348            return Ok(false);
349        }
350        // Apply all updates
351        run.flow_state = next_run_state;
352        run.frames.insert(frame_id.clone(), next_frame);
353        Ok(true)
354    }
355
356    async fn cas_complete_step_and_record_output(
357        &self,
358        run_id: &RunId,
359        frame_id: &FrameId,
360        expected_frame: &FrameSnapshot,
361        next_frame: FrameSnapshot,
362        step_output_key: String,
363        step_output: serde_json::Value,
364        loop_context: Option<(&LoopId, u64)>,
365    ) -> Result<bool, MobStoreError> {
366        let mut runs = self.runs.write().await;
367        let run = runs
368            .get_mut(run_id)
369            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
370        if run.frames.get(frame_id) != Some(expected_frame) {
371            return Ok(false);
372        }
373        run.frames.insert(frame_id.clone(), next_frame);
374        match loop_context {
375            None => {
376                run.root_step_outputs.insert(
377                    crate::ids::StepId::from(step_output_key.as_str()),
378                    step_output,
379                );
380            }
381            Some((loop_id, iteration)) => {
382                let iteration_index = usize::try_from(iteration).map_err(|_| {
383                    MobStoreError::Internal(format!(
384                        "loop iteration index {iteration} exceeds usize::MAX on this target"
385                    ))
386                })?;
387                let vec = run
388                    .loop_iteration_outputs
389                    .entry(loop_id.clone())
390                    .or_default();
391                while vec.len() <= iteration_index {
392                    vec.push(IndexMap::new());
393                }
394                vec[iteration_index].insert(
395                    crate::ids::StepId::from(step_output_key.as_str()),
396                    step_output,
397                );
398            }
399        }
400        Ok(true)
401    }
402
403    #[allow(clippy::too_many_arguments)]
404    async fn cas_start_loop(
405        &self,
406        run_id: &RunId,
407        loop_instance_id: &LoopInstanceId,
408        expected_run_state: &KernelState,
409        next_run_state: KernelState,
410        frame_id: &FrameId,
411        expected_frame: &FrameSnapshot,
412        next_frame: FrameSnapshot,
413        initial_loop: LoopSnapshot,
414    ) -> Result<bool, MobStoreError> {
415        let mut runs = self.runs.write().await;
416        let run = runs
417            .get_mut(run_id)
418            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
419        if &run.flow_state != expected_run_state {
420            return Ok(false);
421        }
422        if run.frames.get(frame_id) != Some(expected_frame) {
423            return Ok(false);
424        }
425        // Loop must not already exist
426        if run.loops.contains_key(loop_instance_id) {
427            return Ok(false);
428        }
429        run.flow_state = next_run_state;
430        run.frames.insert(frame_id.clone(), next_frame);
431        run.loops.insert(loop_instance_id.clone(), initial_loop);
432        Ok(true)
433    }
434
435    async fn cas_loop_request_body_frame(
436        &self,
437        run_id: &RunId,
438        loop_instance_id: &LoopInstanceId,
439        expected_loop: &LoopSnapshot,
440        next_loop: LoopSnapshot,
441        expected_run_state: &KernelState,
442        next_run_state: KernelState,
443    ) -> Result<bool, MobStoreError> {
444        let mut runs = self.runs.write().await;
445        let run = runs
446            .get_mut(run_id)
447            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
448        if &run.flow_state != expected_run_state {
449            return Ok(false);
450        }
451        if run.loops.get(loop_instance_id) != Some(expected_loop) {
452            return Ok(false);
453        }
454        run.flow_state = next_run_state;
455        run.loops.insert(loop_instance_id.clone(), next_loop);
456        Ok(true)
457    }
458
459    #[allow(clippy::too_many_arguments)]
460    async fn cas_grant_body_frame_start(
461        &self,
462        run_id: &RunId,
463        loop_instance_id: &LoopInstanceId,
464        expected_loop: &LoopSnapshot,
465        next_loop: LoopSnapshot,
466        frame_id: &FrameId,
467        initial_frame: FrameSnapshot,
468        ledger_entry: LoopIterationLedgerEntry,
469        expected_run_state: &KernelState,
470        next_run_state: KernelState,
471    ) -> Result<bool, MobStoreError> {
472        let mut runs = self.runs.write().await;
473        let run = runs
474            .get_mut(run_id)
475            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
476        if &run.flow_state != expected_run_state {
477            return Ok(false);
478        }
479        if run.loops.get(loop_instance_id) != Some(expected_loop) {
480            return Ok(false);
481        }
482        // Frame must not already exist
483        if run.frames.contains_key(frame_id) {
484            return Ok(false);
485        }
486        run.flow_state = next_run_state;
487        run.loops.insert(loop_instance_id.clone(), next_loop);
488        run.frames.insert(frame_id.clone(), initial_frame);
489        if !run.loop_iteration_ledger.iter().any(|existing| {
490            existing.loop_instance_id == ledger_entry.loop_instance_id
491                && existing.iteration == ledger_entry.iteration
492                && existing.frame_id == ledger_entry.frame_id
493        }) {
494            run.loop_iteration_ledger.push(ledger_entry);
495        }
496        Ok(true)
497    }
498
499    #[allow(clippy::too_many_arguments)]
500    async fn cas_complete_body_frame(
501        &self,
502        run_id: &RunId,
503        loop_instance_id: &LoopInstanceId,
504        expected_loop: &LoopSnapshot,
505        next_loop: LoopSnapshot,
506        frame_id: &FrameId,
507        expected_frame: &FrameSnapshot,
508        next_frame: FrameSnapshot,
509        expected_run_state: &KernelState,
510        next_run_state: KernelState,
511    ) -> Result<bool, MobStoreError> {
512        let mut runs = self.runs.write().await;
513        let run = runs
514            .get_mut(run_id)
515            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
516        if &run.flow_state != expected_run_state {
517            return Ok(false);
518        }
519        if run.loops.get(loop_instance_id) != Some(expected_loop) {
520            return Ok(false);
521        }
522        if run.frames.get(frame_id) != Some(expected_frame) {
523            return Ok(false);
524        }
525        run.flow_state = next_run_state;
526        run.loops.insert(loop_instance_id.clone(), next_loop);
527        run.frames.insert(frame_id.clone(), next_frame);
528        Ok(true)
529    }
530
531    #[allow(clippy::too_many_arguments)]
532    async fn cas_complete_loop(
533        &self,
534        run_id: &RunId,
535        loop_instance_id: &LoopInstanceId,
536        expected_loop: &LoopSnapshot,
537        next_loop: LoopSnapshot,
538        frame_id: &FrameId,
539        expected_frame: &FrameSnapshot,
540        next_frame: FrameSnapshot,
541        expected_run_state: &KernelState,
542        next_run_state: KernelState,
543    ) -> Result<bool, MobStoreError> {
544        let mut runs = self.runs.write().await;
545        let run = runs
546            .get_mut(run_id)
547            .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
548        if &run.flow_state != expected_run_state {
549            return Ok(false);
550        }
551        if run.loops.get(loop_instance_id) != Some(expected_loop) {
552            return Ok(false);
553        }
554        if run.frames.get(frame_id) != Some(expected_frame) {
555            return Ok(false);
556        }
557        run.flow_state = next_run_state;
558        run.loops.insert(loop_instance_id.clone(), next_loop);
559        run.frames.insert(frame_id.clone(), next_frame);
560        Ok(true)
561    }
562}
563
564/// In-memory spec store with revision CAS semantics.
565#[derive(Debug, Default)]
566pub struct InMemoryMobSpecStore {
567    specs: Arc<RwLock<BTreeMap<MobId, (MobDefinition, u64)>>>,
568}
569
570impl InMemoryMobSpecStore {
571    pub fn new() -> Self {
572        Self::default()
573    }
574}
575
576#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
577#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
578impl MobSpecStore for InMemoryMobSpecStore {
579    async fn put_spec(
580        &self,
581        mob_id: &MobId,
582        definition: &MobDefinition,
583        revision: Option<u64>,
584    ) -> Result<u64, MobStoreError> {
585        let mut specs = self.specs.write().await;
586        let current_revision = specs.get(mob_id).map_or(0, |(_, rev)| *rev);
587        if let Some(expected) = revision
588            && expected != current_revision
589        {
590            return Err(MobStoreError::SpecRevisionConflict {
591                mob_id: mob_id.clone(),
592                expected: revision,
593                actual: current_revision,
594            });
595        }
596
597        let next_revision = current_revision + 1;
598        specs.insert(mob_id.clone(), (definition.clone(), next_revision));
599        Ok(next_revision)
600    }
601
602    async fn get_spec(
603        &self,
604        mob_id: &MobId,
605    ) -> Result<Option<(MobDefinition, u64)>, MobStoreError> {
606        Ok(self.specs.read().await.get(mob_id).cloned())
607    }
608
609    async fn list_specs(&self) -> Result<Vec<MobId>, MobStoreError> {
610        Ok(self.specs.read().await.keys().cloned().collect())
611    }
612
613    async fn delete_spec(
614        &self,
615        mob_id: &MobId,
616        revision: Option<u64>,
617    ) -> Result<bool, MobStoreError> {
618        let mut specs = self.specs.write().await;
619        let Some((_, current_revision)) = specs.get(mob_id) else {
620            return Ok(false);
621        };
622
623        if let Some(expected) = revision
624            && expected != *current_revision
625        {
626            return Ok(false);
627        }
628
629        specs.remove(mob_id);
630        Ok(true)
631    }
632}
633
634/// In-memory realm profile store with CAS semantics.
635#[derive(Debug, Default)]
636pub struct InMemoryRealmProfileStore {
637    profiles: Arc<RwLock<BTreeMap<String, StoredRealmProfile>>>,
638}
639
640impl InMemoryRealmProfileStore {
641    pub fn new() -> Self {
642        Self::default()
643    }
644}
645
646#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
647#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
648impl RealmProfileStore for InMemoryRealmProfileStore {
649    async fn create(
650        &self,
651        name: &str,
652        profile: &Profile,
653    ) -> Result<StoredRealmProfile, MobStoreError> {
654        let mut profiles = self.profiles.write().await;
655        if profiles.contains_key(name) {
656            return Err(MobStoreError::CasConflict(format!(
657                "realm profile already exists: {name}"
658            )));
659        }
660        let now = Utc::now();
661        let stored = StoredRealmProfile {
662            name: name.to_string(),
663            profile: profile.clone(),
664            revision: 1,
665            created_at: now,
666            updated_at: now,
667        };
668        profiles.insert(name.to_string(), stored.clone());
669        Ok(stored)
670    }
671
672    async fn get(&self, name: &str) -> Result<Option<StoredRealmProfile>, MobStoreError> {
673        Ok(self.profiles.read().await.get(name).cloned())
674    }
675
676    async fn list(&self) -> Result<Vec<StoredRealmProfile>, MobStoreError> {
677        Ok(self.profiles.read().await.values().cloned().collect())
678    }
679
680    async fn update(
681        &self,
682        name: &str,
683        profile: &Profile,
684        expected_revision: u64,
685    ) -> Result<StoredRealmProfile, MobStoreError> {
686        let mut profiles = self.profiles.write().await;
687        let existing = profiles
688            .get(name)
689            .ok_or_else(|| MobStoreError::NotFound(format!("realm profile not found: {name}")))?;
690        if existing.revision != expected_revision {
691            return Err(MobStoreError::CasConflict(format!(
692                "realm profile '{name}' revision conflict: expected {expected_revision}, actual {}",
693                existing.revision
694            )));
695        }
696        let updated = StoredRealmProfile {
697            name: name.to_string(),
698            profile: profile.clone(),
699            revision: expected_revision + 1,
700            created_at: existing.created_at,
701            updated_at: Utc::now(),
702        };
703        profiles.insert(name.to_string(), updated.clone());
704        Ok(updated)
705    }
706
707    async fn delete(
708        &self,
709        name: &str,
710        expected_revision: u64,
711    ) -> Result<StoredRealmProfile, MobStoreError> {
712        let mut profiles = self.profiles.write().await;
713        let existing = profiles
714            .get(name)
715            .ok_or_else(|| MobStoreError::NotFound(format!("realm profile not found: {name}")))?;
716        if existing.revision != expected_revision {
717            return Err(MobStoreError::CasConflict(format!(
718                "realm profile '{name}' revision conflict: expected {expected_revision}, actual {}",
719                existing.revision
720            )));
721        }
722        let removed = existing.clone();
723        profiles.remove(name);
724        Ok(removed)
725    }
726}
727
728#[cfg(test)]
729mod tests {
730    use super::*;
731    use crate::definition::{BackendConfig, MobDefinition, WiringRules};
732    use crate::event::MobEventKind;
733    use crate::ids::{MeerkatId, ProfileName};
734    use crate::profile::{Profile, ProfileBinding, ToolConfig};
735    use crate::run::StepRunStatus;
736    use futures::future::join_all;
737    use std::collections::BTreeMap;
738
739    fn sample_definition() -> MobDefinition {
740        let mut profiles = BTreeMap::new();
741        profiles.insert(
742            ProfileName::from("worker"),
743            ProfileBinding::Inline(Profile {
744                model: "model".to_string(),
745                skills: Vec::new(),
746                tools: ToolConfig::default(),
747                peer_description: "worker".to_string(),
748                external_addressable: false,
749                backend: None,
750                runtime_mode: crate::MobRuntimeMode::AutonomousHost,
751                max_inline_peer_notifications: None,
752                output_schema: None,
753                provider_params: None,
754            }),
755        );
756        MobDefinition {
757            id: MobId::from("mob"),
758            orchestrator: None,
759            profiles,
760            mcp_servers: BTreeMap::new(),
761            wiring: WiringRules::default(),
762            skills: BTreeMap::new(),
763            backend: BackendConfig::default(),
764            flows: BTreeMap::new(),
765            topology: None,
766            supervisor: None,
767            limits: None,
768            spawn_policy: None,
769            event_router: None,
770            owner_session_id: None,
771            session_cleanup_policy: crate::definition::SessionCleanupPolicy::Manual,
772            is_implicit: false,
773        }
774    }
775
776    fn sample_run(status: MobRunStatus) -> MobRun {
777        MobRun {
778            run_id: RunId::new(),
779            mob_id: MobId::from("mob"),
780            flow_id: FlowId::from("flow-a"),
781            status,
782            flow_state: MobRun::flow_state_for_steps([StepId::from("step-1")]).unwrap(),
783            activation_params: serde_json::json!({"a":1}),
784            created_at: Utc::now(),
785            completed_at: None,
786            step_ledger: Vec::new(),
787            failure_ledger: Vec::new(),
788            frames: std::collections::BTreeMap::new(),
789            loops: std::collections::BTreeMap::new(),
790            loop_iteration_ledger: Vec::new(),
791            schema_version: 4,
792            root_step_outputs: IndexMap::new(),
793            loop_iteration_outputs: BTreeMap::new(),
794        }
795    }
796
797    #[tokio::test]
798    async fn test_event_store_prune() {
799        let store = InMemoryMobEventStore::new();
800        let now = Utc::now();
801
802        store
803            .append(NewMobEvent {
804                mob_id: MobId::from("mob"),
805                timestamp: Some(now - chrono::Duration::minutes(10)),
806                kind: MobEventKind::MobCompleted,
807            })
808            .await
809            .unwrap();
810        store
811            .append(NewMobEvent {
812                mob_id: MobId::from("mob"),
813                timestamp: Some(now),
814                kind: MobEventKind::MobCompleted,
815            })
816            .await
817            .unwrap();
818
819        let removed = store
820            .prune(now - chrono::Duration::minutes(1))
821            .await
822            .unwrap();
823        assert_eq!(removed, 1);
824        assert_eq!(store.replay_all().await.unwrap().len(), 1);
825    }
826
827    #[tokio::test]
828    async fn test_run_store_status_cas_single_winner() {
829        let store = Arc::new(InMemoryMobRunStore::new());
830        let run = sample_run(MobRunStatus::Running);
831        let run_id = run.run_id.clone();
832        store.create_run(run).await.unwrap();
833
834        let tasks = (0..10).map(|_| {
835            let store = Arc::clone(&store);
836            let run_id = run_id.clone();
837            tokio::spawn(async move {
838                store
839                    .cas_run_status(&run_id, MobRunStatus::Running, MobRunStatus::Completed)
840                    .await
841                    .unwrap()
842            })
843        });
844        let outcomes = join_all(tasks).await;
845        let wins = outcomes
846            .into_iter()
847            .filter_map(std::result::Result::ok)
848            .filter(|cas_result| *cas_result)
849            .count();
850        assert_eq!(wins, 1);
851
852        let stored = store.get_run(&run_id).await.unwrap().unwrap();
853        assert_eq!(stored.status, MobRunStatus::Completed);
854        assert!(stored.completed_at.is_some());
855    }
856
857    #[tokio::test]
858    async fn test_run_store_step_dedup_and_ledgers() {
859        let store = InMemoryMobRunStore::new();
860        let run = sample_run(MobRunStatus::Running);
861        let run_id = run.run_id.clone();
862        store.create_run(run).await.unwrap();
863
864        let step_entry = StepLedgerEntry {
865            step_id: StepId::from("s1"),
866            meerkat_id: MeerkatId::from("worker-1"),
867            status: StepRunStatus::Dispatched,
868            output: None,
869            timestamp: Utc::now(),
870        };
871
872        assert!(
873            store
874                .append_step_entry_if_absent(&run_id, step_entry.clone())
875                .await
876                .unwrap()
877        );
878        assert!(
879            !store
880                .append_step_entry_if_absent(&run_id, step_entry)
881                .await
882                .unwrap()
883        );
884
885        store
886            .put_step_output(&run_id, &StepId::from("s1"), serde_json::json!({"ok":true}))
887            .await
888            .unwrap();
889        store
890            .append_failure_entry(
891                &run_id,
892                FailureLedgerEntry {
893                    step_id: StepId::from("s1"),
894                    reason: "failed".to_string(),
895                    timestamp: Utc::now(),
896                },
897            )
898            .await
899            .unwrap();
900
901        let stored = store.get_run(&run_id).await.unwrap().unwrap();
902        assert_eq!(stored.step_ledger.len(), 1);
903        assert_eq!(stored.failure_ledger.len(), 1);
904        assert_eq!(
905            stored.step_ledger[0].output,
906            Some(serde_json::json!({"ok":true}))
907        );
908    }
909
910    #[tokio::test]
911    async fn test_spec_store_revision_conflict_behavior() {
912        let store = InMemoryMobSpecStore::new();
913        let definition = sample_definition();
914        let mob_id = MobId::from("mob");
915
916        let rev1 = store.put_spec(&mob_id, &definition, None).await.unwrap();
917        assert_eq!(rev1, 1);
918
919        let rev2 = store
920            .put_spec(&mob_id, &definition, Some(rev1))
921            .await
922            .unwrap();
923        assert_eq!(rev2, 2);
924
925        let conflict = store
926            .put_spec(&mob_id, &definition, Some(1))
927            .await
928            .unwrap_err();
929        assert!(matches!(
930            conflict,
931            MobStoreError::SpecRevisionConflict {
932                expected: Some(1),
933                actual: 2,
934                ..
935            }
936        ));
937
938        assert!(!store.delete_spec(&mob_id, Some(1)).await.unwrap());
939        assert!(store.delete_spec(&mob_id, Some(2)).await.unwrap());
940    }
941
942    // -----------------------------------------------------------------------
943    // RealmProfileStore contract tests
944    // -----------------------------------------------------------------------
945
946    use crate::store::realm_profile::contract_tests;
947
948    #[tokio::test]
949    async fn realm_profile_create_and_get() {
950        let store = InMemoryRealmProfileStore::new();
951        contract_tests::test_create_and_get(&store).await;
952    }
953
954    #[tokio::test]
955    async fn realm_profile_get_nonexistent() {
956        let store = InMemoryRealmProfileStore::new();
957        contract_tests::test_get_nonexistent(&store).await;
958    }
959
960    #[tokio::test]
961    async fn realm_profile_create_duplicate_fails() {
962        let store = InMemoryRealmProfileStore::new();
963        contract_tests::test_create_duplicate_fails(&store).await;
964    }
965
966    #[tokio::test]
967    async fn realm_profile_update_correct_revision() {
968        let store = InMemoryRealmProfileStore::new();
969        contract_tests::test_update_with_correct_revision(&store).await;
970    }
971
972    #[tokio::test]
973    async fn realm_profile_update_wrong_revision() {
974        let store = InMemoryRealmProfileStore::new();
975        contract_tests::test_update_with_wrong_revision(&store).await;
976    }
977
978    #[tokio::test]
979    async fn realm_profile_update_nonexistent() {
980        let store = InMemoryRealmProfileStore::new();
981        contract_tests::test_update_nonexistent(&store).await;
982    }
983
984    #[tokio::test]
985    async fn realm_profile_delete_correct_revision() {
986        let store = InMemoryRealmProfileStore::new();
987        contract_tests::test_delete_with_correct_revision(&store).await;
988    }
989
990    #[tokio::test]
991    async fn realm_profile_delete_wrong_revision() {
992        let store = InMemoryRealmProfileStore::new();
993        contract_tests::test_delete_with_wrong_revision(&store).await;
994    }
995
996    #[tokio::test]
997    async fn realm_profile_delete_nonexistent() {
998        let store = InMemoryRealmProfileStore::new();
999        contract_tests::test_delete_nonexistent(&store).await;
1000    }
1001
1002    #[tokio::test]
1003    async fn realm_profile_list() {
1004        let store = InMemoryRealmProfileStore::new();
1005        contract_tests::test_list(&store).await;
1006    }
1007
1008    #[tokio::test]
1009    async fn realm_profile_list_empty() {
1010        let store = InMemoryRealmProfileStore::new();
1011        contract_tests::test_list_empty(&store).await;
1012    }
1013}