1use super::realm_profile::{RealmProfileStore, StoredRealmProfile};
4use super::{MobEventStore, MobRunStore, MobSpecStore, MobStoreError};
5use crate::definition::MobDefinition;
6use crate::event::{MobEvent, NewMobEvent};
7use crate::ids::{FlowId, FrameId, LoopId, LoopInstanceId, MobId, RunId, StepId};
8use crate::profile::Profile;
9use crate::run::{
10 FailureLedgerEntry, FrameSnapshot, LoopIterationLedgerEntry, LoopSnapshot, MobRun,
11 MobRunStatus, StepLedgerEntry,
12};
13#[cfg(target_arch = "wasm32")]
14use crate::tokio;
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use indexmap::IndexMap;
18use meerkat_machine_kernels::KernelState;
19use std::collections::BTreeMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23#[derive(Debug, Default)]
25pub struct InMemoryMobEventStore {
26 events: Arc<RwLock<Vec<MobEvent>>>,
27}
28
29impl InMemoryMobEventStore {
30 pub fn new() -> Self {
31 Self::default()
32 }
33}
34
35#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
36#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
37impl MobEventStore for InMemoryMobEventStore {
38 async fn append(&self, event: NewMobEvent) -> Result<MobEvent, MobStoreError> {
39 let mut events = self.events.write().await;
40 let cursor = events.last().map_or(1, |existing| existing.cursor + 1);
41 let stored = MobEvent {
42 cursor,
43 timestamp: event.timestamp.unwrap_or_else(Utc::now),
44 mob_id: event.mob_id,
45 kind: event.kind,
46 };
47 events.push(stored.clone());
48 Ok(stored)
49 }
50
51 async fn append_batch(&self, batch: Vec<NewMobEvent>) -> Result<Vec<MobEvent>, MobStoreError> {
52 let mut events = self.events.write().await;
53 let mut results = Vec::with_capacity(batch.len());
54 for event in batch {
55 let cursor = events.last().map_or(1, |existing| existing.cursor + 1);
56 let stored = MobEvent {
57 cursor,
58 timestamp: event.timestamp.unwrap_or_else(Utc::now),
59 mob_id: event.mob_id,
60 kind: event.kind,
61 };
62 events.push(stored.clone());
63 results.push(stored);
64 }
65 Ok(results)
66 }
67
68 async fn poll(&self, after_cursor: u64, limit: usize) -> Result<Vec<MobEvent>, MobStoreError> {
69 let events = self.events.read().await;
70 Ok(events
71 .iter()
72 .filter(|event| event.cursor > after_cursor)
73 .take(limit)
74 .cloned()
75 .collect())
76 }
77
78 async fn replay_all(&self) -> Result<Vec<MobEvent>, MobStoreError> {
79 Ok(self.events.read().await.clone())
80 }
81
82 async fn clear(&self) -> Result<(), MobStoreError> {
83 self.events.write().await.clear();
84 Ok(())
85 }
86
87 async fn prune(&self, older_than: DateTime<Utc>) -> Result<u64, MobStoreError> {
88 let mut events = self.events.write().await;
89 let before = events.len();
90 events.retain(|event| event.timestamp >= older_than);
91 Ok((before - events.len()) as u64)
92 }
93}
94
95#[derive(Debug, Default)]
97pub struct InMemoryMobRunStore {
98 runs: Arc<RwLock<IndexMap<RunId, MobRun>>>,
99}
100
101impl InMemoryMobRunStore {
102 pub fn new() -> Self {
103 Self::default()
104 }
105}
106
107#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
108#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
109impl MobRunStore for InMemoryMobRunStore {
110 async fn create_run(&self, run: MobRun) -> Result<(), MobStoreError> {
111 let mut runs = self.runs.write().await;
112 if runs.contains_key(&run.run_id) {
113 return Err(MobStoreError::Internal(format!(
114 "run already exists: {}",
115 run.run_id
116 )));
117 }
118 runs.insert(run.run_id.clone(), run);
119 Ok(())
120 }
121
122 async fn get_run(&self, run_id: &RunId) -> Result<Option<MobRun>, MobStoreError> {
123 Ok(self.runs.read().await.get(run_id).cloned())
124 }
125
126 async fn list_runs(
127 &self,
128 mob_id: &MobId,
129 flow_id: Option<&FlowId>,
130 ) -> Result<Vec<MobRun>, MobStoreError> {
131 Ok(self
132 .runs
133 .read()
134 .await
135 .values()
136 .filter(|run| {
137 run.mob_id == *mob_id
138 && flow_id.is_none_or(|expected_flow_id| run.flow_id == *expected_flow_id)
139 })
140 .cloned()
141 .collect())
142 }
143
144 async fn cas_run_status(
145 &self,
146 run_id: &RunId,
147 expected: MobRunStatus,
148 next: MobRunStatus,
149 ) -> Result<bool, MobStoreError> {
150 let mut runs = self.runs.write().await;
151 let Some(run) = runs.get_mut(run_id) else {
152 return Ok(false);
153 };
154 if run.status != expected || run.status.is_terminal() {
155 return Ok(false);
156 }
157 let terminal = next.is_terminal();
158 run.status = next;
159 if terminal && run.completed_at.is_none() {
160 run.completed_at = Some(Utc::now());
161 }
162 Ok(true)
163 }
164
165 async fn cas_flow_state(
166 &self,
167 run_id: &RunId,
168 expected: &KernelState,
169 next: &KernelState,
170 ) -> Result<bool, MobStoreError> {
171 let mut runs = self.runs.write().await;
172 let Some(run) = runs.get_mut(run_id) else {
173 return Ok(false);
174 };
175 if &run.flow_state != expected {
176 return Ok(false);
177 }
178 run.flow_state = next.clone();
179 Ok(true)
180 }
181
182 async fn cas_run_snapshot(
183 &self,
184 run_id: &RunId,
185 expected_status: MobRunStatus,
186 expected_flow_state: &KernelState,
187 next_status: MobRunStatus,
188 next_flow_state: &KernelState,
189 ) -> Result<bool, MobStoreError> {
190 let mut runs = self.runs.write().await;
191 let Some(run) = runs.get_mut(run_id) else {
192 return Ok(false);
193 };
194 if run.status != expected_status
195 || run.status.is_terminal()
196 || &run.flow_state != expected_flow_state
197 {
198 return Ok(false);
199 }
200 let terminal = next_status.is_terminal();
201 run.status = next_status;
202 run.flow_state = next_flow_state.clone();
203 if terminal && run.completed_at.is_none() {
204 run.completed_at = Some(Utc::now());
205 }
206 Ok(true)
207 }
208
209 async fn append_step_entry(
210 &self,
211 run_id: &RunId,
212 entry: StepLedgerEntry,
213 ) -> Result<(), MobStoreError> {
214 let mut runs = self.runs.write().await;
215 let run = runs
216 .get_mut(run_id)
217 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
218 run.step_ledger.push(entry);
219 Ok(())
220 }
221
222 async fn append_step_entry_if_absent(
223 &self,
224 run_id: &RunId,
225 entry: StepLedgerEntry,
226 ) -> Result<bool, MobStoreError> {
227 let mut runs = self.runs.write().await;
228 let run = runs
229 .get_mut(run_id)
230 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
231 let is_duplicate = run.step_ledger.iter().any(|existing| {
232 existing.step_id == entry.step_id
233 && existing.meerkat_id == entry.meerkat_id
234 && existing.status == entry.status
235 });
236 if is_duplicate {
237 return Ok(false);
238 }
239 run.step_ledger.push(entry);
240 Ok(true)
241 }
242
243 async fn put_step_output(
244 &self,
245 run_id: &RunId,
246 step_id: &StepId,
247 output: serde_json::Value,
248 ) -> Result<(), MobStoreError> {
249 let mut runs = self.runs.write().await;
250 let run = runs
251 .get_mut(run_id)
252 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
253 if let Some(entry) = run
254 .step_ledger
255 .iter_mut()
256 .rev()
257 .find(|entry| &entry.step_id == step_id)
258 {
259 entry.output = Some(output);
260 return Ok(());
261 }
262 Err(MobStoreError::Internal(format!(
263 "cannot set output for unknown step '{step_id}' in run '{run_id}'"
264 )))
265 }
266
267 async fn append_failure_entry(
268 &self,
269 run_id: &RunId,
270 entry: FailureLedgerEntry,
271 ) -> Result<(), MobStoreError> {
272 let mut runs = self.runs.write().await;
273 let run = runs
274 .get_mut(run_id)
275 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
276 run.failure_ledger.push(entry);
277 Ok(())
278 }
279
280 async fn upsert_loop_snapshot(
281 &self,
282 run_id: &RunId,
283 loop_instance_id: &LoopInstanceId,
284 snapshot: LoopSnapshot,
285 ledger_entry: Option<LoopIterationLedgerEntry>,
286 ) -> Result<(), MobStoreError> {
287 let mut runs = self.runs.write().await;
288 let run = runs
289 .get_mut(run_id)
290 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
291 run.loops.insert(loop_instance_id.clone(), snapshot);
292 if let Some(entry) = ledger_entry
293 && !run.loop_iteration_ledger.iter().any(|existing| {
294 existing.loop_instance_id == entry.loop_instance_id
295 && existing.iteration == entry.iteration
296 && existing.frame_id == entry.frame_id
297 })
298 {
299 run.loop_iteration_ledger.push(entry);
300 }
301 Ok(())
302 }
303
304 async fn cas_frame_state(
305 &self,
306 run_id: &RunId,
307 frame_id: &FrameId,
308 expected: Option<&FrameSnapshot>,
309 next: FrameSnapshot,
310 ) -> Result<bool, MobStoreError> {
311 let mut runs = self.runs.write().await;
312 let run = runs
313 .get_mut(run_id)
314 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
315 let current = run.frames.get(frame_id);
316 match (expected, current) {
317 (None, None) => {
318 run.frames.insert(frame_id.clone(), next);
320 Ok(true)
321 }
322 (Some(exp), Some(cur)) if exp == cur => {
323 run.frames.insert(frame_id.clone(), next);
324 Ok(true)
325 }
326 _ => Ok(false),
327 }
328 }
329
330 async fn cas_grant_node_slot(
331 &self,
332 run_id: &RunId,
333 expected_run_state: &KernelState,
334 next_run_state: KernelState,
335 frame_id: &FrameId,
336 expected_frame: &FrameSnapshot,
337 next_frame: FrameSnapshot,
338 ) -> Result<bool, MobStoreError> {
339 let mut runs = self.runs.write().await;
340 let run = runs
341 .get_mut(run_id)
342 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
343 if &run.flow_state != expected_run_state {
345 return Ok(false);
346 }
347 if run.frames.get(frame_id) != Some(expected_frame) {
348 return Ok(false);
349 }
350 run.flow_state = next_run_state;
352 run.frames.insert(frame_id.clone(), next_frame);
353 Ok(true)
354 }
355
356 async fn cas_complete_step_and_record_output(
357 &self,
358 run_id: &RunId,
359 frame_id: &FrameId,
360 expected_frame: &FrameSnapshot,
361 next_frame: FrameSnapshot,
362 step_output_key: String,
363 step_output: serde_json::Value,
364 loop_context: Option<(&LoopId, u64)>,
365 ) -> Result<bool, MobStoreError> {
366 let mut runs = self.runs.write().await;
367 let run = runs
368 .get_mut(run_id)
369 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
370 if run.frames.get(frame_id) != Some(expected_frame) {
371 return Ok(false);
372 }
373 run.frames.insert(frame_id.clone(), next_frame);
374 match loop_context {
375 None => {
376 run.root_step_outputs.insert(
377 crate::ids::StepId::from(step_output_key.as_str()),
378 step_output,
379 );
380 }
381 Some((loop_id, iteration)) => {
382 let iteration_index = usize::try_from(iteration).map_err(|_| {
383 MobStoreError::Internal(format!(
384 "loop iteration index {iteration} exceeds usize::MAX on this target"
385 ))
386 })?;
387 let vec = run
388 .loop_iteration_outputs
389 .entry(loop_id.clone())
390 .or_default();
391 while vec.len() <= iteration_index {
392 vec.push(IndexMap::new());
393 }
394 vec[iteration_index].insert(
395 crate::ids::StepId::from(step_output_key.as_str()),
396 step_output,
397 );
398 }
399 }
400 Ok(true)
401 }
402
403 #[allow(clippy::too_many_arguments)]
404 async fn cas_start_loop(
405 &self,
406 run_id: &RunId,
407 loop_instance_id: &LoopInstanceId,
408 expected_run_state: &KernelState,
409 next_run_state: KernelState,
410 frame_id: &FrameId,
411 expected_frame: &FrameSnapshot,
412 next_frame: FrameSnapshot,
413 initial_loop: LoopSnapshot,
414 ) -> Result<bool, MobStoreError> {
415 let mut runs = self.runs.write().await;
416 let run = runs
417 .get_mut(run_id)
418 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
419 if &run.flow_state != expected_run_state {
420 return Ok(false);
421 }
422 if run.frames.get(frame_id) != Some(expected_frame) {
423 return Ok(false);
424 }
425 if run.loops.contains_key(loop_instance_id) {
427 return Ok(false);
428 }
429 run.flow_state = next_run_state;
430 run.frames.insert(frame_id.clone(), next_frame);
431 run.loops.insert(loop_instance_id.clone(), initial_loop);
432 Ok(true)
433 }
434
435 async fn cas_loop_request_body_frame(
436 &self,
437 run_id: &RunId,
438 loop_instance_id: &LoopInstanceId,
439 expected_loop: &LoopSnapshot,
440 next_loop: LoopSnapshot,
441 expected_run_state: &KernelState,
442 next_run_state: KernelState,
443 ) -> Result<bool, MobStoreError> {
444 let mut runs = self.runs.write().await;
445 let run = runs
446 .get_mut(run_id)
447 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
448 if &run.flow_state != expected_run_state {
449 return Ok(false);
450 }
451 if run.loops.get(loop_instance_id) != Some(expected_loop) {
452 return Ok(false);
453 }
454 run.flow_state = next_run_state;
455 run.loops.insert(loop_instance_id.clone(), next_loop);
456 Ok(true)
457 }
458
459 #[allow(clippy::too_many_arguments)]
460 async fn cas_grant_body_frame_start(
461 &self,
462 run_id: &RunId,
463 loop_instance_id: &LoopInstanceId,
464 expected_loop: &LoopSnapshot,
465 next_loop: LoopSnapshot,
466 frame_id: &FrameId,
467 initial_frame: FrameSnapshot,
468 ledger_entry: LoopIterationLedgerEntry,
469 expected_run_state: &KernelState,
470 next_run_state: KernelState,
471 ) -> Result<bool, MobStoreError> {
472 let mut runs = self.runs.write().await;
473 let run = runs
474 .get_mut(run_id)
475 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
476 if &run.flow_state != expected_run_state {
477 return Ok(false);
478 }
479 if run.loops.get(loop_instance_id) != Some(expected_loop) {
480 return Ok(false);
481 }
482 if run.frames.contains_key(frame_id) {
484 return Ok(false);
485 }
486 run.flow_state = next_run_state;
487 run.loops.insert(loop_instance_id.clone(), next_loop);
488 run.frames.insert(frame_id.clone(), initial_frame);
489 if !run.loop_iteration_ledger.iter().any(|existing| {
490 existing.loop_instance_id == ledger_entry.loop_instance_id
491 && existing.iteration == ledger_entry.iteration
492 && existing.frame_id == ledger_entry.frame_id
493 }) {
494 run.loop_iteration_ledger.push(ledger_entry);
495 }
496 Ok(true)
497 }
498
499 #[allow(clippy::too_many_arguments)]
500 async fn cas_complete_body_frame(
501 &self,
502 run_id: &RunId,
503 loop_instance_id: &LoopInstanceId,
504 expected_loop: &LoopSnapshot,
505 next_loop: LoopSnapshot,
506 frame_id: &FrameId,
507 expected_frame: &FrameSnapshot,
508 next_frame: FrameSnapshot,
509 expected_run_state: &KernelState,
510 next_run_state: KernelState,
511 ) -> Result<bool, MobStoreError> {
512 let mut runs = self.runs.write().await;
513 let run = runs
514 .get_mut(run_id)
515 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
516 if &run.flow_state != expected_run_state {
517 return Ok(false);
518 }
519 if run.loops.get(loop_instance_id) != Some(expected_loop) {
520 return Ok(false);
521 }
522 if run.frames.get(frame_id) != Some(expected_frame) {
523 return Ok(false);
524 }
525 run.flow_state = next_run_state;
526 run.loops.insert(loop_instance_id.clone(), next_loop);
527 run.frames.insert(frame_id.clone(), next_frame);
528 Ok(true)
529 }
530
531 #[allow(clippy::too_many_arguments)]
532 async fn cas_complete_loop(
533 &self,
534 run_id: &RunId,
535 loop_instance_id: &LoopInstanceId,
536 expected_loop: &LoopSnapshot,
537 next_loop: LoopSnapshot,
538 frame_id: &FrameId,
539 expected_frame: &FrameSnapshot,
540 next_frame: FrameSnapshot,
541 expected_run_state: &KernelState,
542 next_run_state: KernelState,
543 ) -> Result<bool, MobStoreError> {
544 let mut runs = self.runs.write().await;
545 let run = runs
546 .get_mut(run_id)
547 .ok_or_else(|| MobStoreError::NotFound(format!("run not found: {run_id}")))?;
548 if &run.flow_state != expected_run_state {
549 return Ok(false);
550 }
551 if run.loops.get(loop_instance_id) != Some(expected_loop) {
552 return Ok(false);
553 }
554 if run.frames.get(frame_id) != Some(expected_frame) {
555 return Ok(false);
556 }
557 run.flow_state = next_run_state;
558 run.loops.insert(loop_instance_id.clone(), next_loop);
559 run.frames.insert(frame_id.clone(), next_frame);
560 Ok(true)
561 }
562}
563
564#[derive(Debug, Default)]
566pub struct InMemoryMobSpecStore {
567 specs: Arc<RwLock<BTreeMap<MobId, (MobDefinition, u64)>>>,
568}
569
570impl InMemoryMobSpecStore {
571 pub fn new() -> Self {
572 Self::default()
573 }
574}
575
576#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
577#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
578impl MobSpecStore for InMemoryMobSpecStore {
579 async fn put_spec(
580 &self,
581 mob_id: &MobId,
582 definition: &MobDefinition,
583 revision: Option<u64>,
584 ) -> Result<u64, MobStoreError> {
585 let mut specs = self.specs.write().await;
586 let current_revision = specs.get(mob_id).map_or(0, |(_, rev)| *rev);
587 if let Some(expected) = revision
588 && expected != current_revision
589 {
590 return Err(MobStoreError::SpecRevisionConflict {
591 mob_id: mob_id.clone(),
592 expected: revision,
593 actual: current_revision,
594 });
595 }
596
597 let next_revision = current_revision + 1;
598 specs.insert(mob_id.clone(), (definition.clone(), next_revision));
599 Ok(next_revision)
600 }
601
602 async fn get_spec(
603 &self,
604 mob_id: &MobId,
605 ) -> Result<Option<(MobDefinition, u64)>, MobStoreError> {
606 Ok(self.specs.read().await.get(mob_id).cloned())
607 }
608
609 async fn list_specs(&self) -> Result<Vec<MobId>, MobStoreError> {
610 Ok(self.specs.read().await.keys().cloned().collect())
611 }
612
613 async fn delete_spec(
614 &self,
615 mob_id: &MobId,
616 revision: Option<u64>,
617 ) -> Result<bool, MobStoreError> {
618 let mut specs = self.specs.write().await;
619 let Some((_, current_revision)) = specs.get(mob_id) else {
620 return Ok(false);
621 };
622
623 if let Some(expected) = revision
624 && expected != *current_revision
625 {
626 return Ok(false);
627 }
628
629 specs.remove(mob_id);
630 Ok(true)
631 }
632}
633
634#[derive(Debug, Default)]
636pub struct InMemoryRealmProfileStore {
637 profiles: Arc<RwLock<BTreeMap<String, StoredRealmProfile>>>,
638}
639
640impl InMemoryRealmProfileStore {
641 pub fn new() -> Self {
642 Self::default()
643 }
644}
645
646#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
647#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
648impl RealmProfileStore for InMemoryRealmProfileStore {
649 async fn create(
650 &self,
651 name: &str,
652 profile: &Profile,
653 ) -> Result<StoredRealmProfile, MobStoreError> {
654 let mut profiles = self.profiles.write().await;
655 if profiles.contains_key(name) {
656 return Err(MobStoreError::CasConflict(format!(
657 "realm profile already exists: {name}"
658 )));
659 }
660 let now = Utc::now();
661 let stored = StoredRealmProfile {
662 name: name.to_string(),
663 profile: profile.clone(),
664 revision: 1,
665 created_at: now,
666 updated_at: now,
667 };
668 profiles.insert(name.to_string(), stored.clone());
669 Ok(stored)
670 }
671
672 async fn get(&self, name: &str) -> Result<Option<StoredRealmProfile>, MobStoreError> {
673 Ok(self.profiles.read().await.get(name).cloned())
674 }
675
676 async fn list(&self) -> Result<Vec<StoredRealmProfile>, MobStoreError> {
677 Ok(self.profiles.read().await.values().cloned().collect())
678 }
679
680 async fn update(
681 &self,
682 name: &str,
683 profile: &Profile,
684 expected_revision: u64,
685 ) -> Result<StoredRealmProfile, MobStoreError> {
686 let mut profiles = self.profiles.write().await;
687 let existing = profiles
688 .get(name)
689 .ok_or_else(|| MobStoreError::NotFound(format!("realm profile not found: {name}")))?;
690 if existing.revision != expected_revision {
691 return Err(MobStoreError::CasConflict(format!(
692 "realm profile '{name}' revision conflict: expected {expected_revision}, actual {}",
693 existing.revision
694 )));
695 }
696 let updated = StoredRealmProfile {
697 name: name.to_string(),
698 profile: profile.clone(),
699 revision: expected_revision + 1,
700 created_at: existing.created_at,
701 updated_at: Utc::now(),
702 };
703 profiles.insert(name.to_string(), updated.clone());
704 Ok(updated)
705 }
706
707 async fn delete(
708 &self,
709 name: &str,
710 expected_revision: u64,
711 ) -> Result<StoredRealmProfile, MobStoreError> {
712 let mut profiles = self.profiles.write().await;
713 let existing = profiles
714 .get(name)
715 .ok_or_else(|| MobStoreError::NotFound(format!("realm profile not found: {name}")))?;
716 if existing.revision != expected_revision {
717 return Err(MobStoreError::CasConflict(format!(
718 "realm profile '{name}' revision conflict: expected {expected_revision}, actual {}",
719 existing.revision
720 )));
721 }
722 let removed = existing.clone();
723 profiles.remove(name);
724 Ok(removed)
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use super::*;
731 use crate::definition::{BackendConfig, MobDefinition, WiringRules};
732 use crate::event::MobEventKind;
733 use crate::ids::{MeerkatId, ProfileName};
734 use crate::profile::{Profile, ProfileBinding, ToolConfig};
735 use crate::run::StepRunStatus;
736 use futures::future::join_all;
737 use std::collections::BTreeMap;
738
739 fn sample_definition() -> MobDefinition {
740 let mut profiles = BTreeMap::new();
741 profiles.insert(
742 ProfileName::from("worker"),
743 ProfileBinding::Inline(Profile {
744 model: "model".to_string(),
745 skills: Vec::new(),
746 tools: ToolConfig::default(),
747 peer_description: "worker".to_string(),
748 external_addressable: false,
749 backend: None,
750 runtime_mode: crate::MobRuntimeMode::AutonomousHost,
751 max_inline_peer_notifications: None,
752 output_schema: None,
753 provider_params: None,
754 }),
755 );
756 MobDefinition {
757 id: MobId::from("mob"),
758 orchestrator: None,
759 profiles,
760 mcp_servers: BTreeMap::new(),
761 wiring: WiringRules::default(),
762 skills: BTreeMap::new(),
763 backend: BackendConfig::default(),
764 flows: BTreeMap::new(),
765 topology: None,
766 supervisor: None,
767 limits: None,
768 spawn_policy: None,
769 event_router: None,
770 owner_session_id: None,
771 session_cleanup_policy: crate::definition::SessionCleanupPolicy::Manual,
772 is_implicit: false,
773 }
774 }
775
776 fn sample_run(status: MobRunStatus) -> MobRun {
777 MobRun {
778 run_id: RunId::new(),
779 mob_id: MobId::from("mob"),
780 flow_id: FlowId::from("flow-a"),
781 status,
782 flow_state: MobRun::flow_state_for_steps([StepId::from("step-1")]).unwrap(),
783 activation_params: serde_json::json!({"a":1}),
784 created_at: Utc::now(),
785 completed_at: None,
786 step_ledger: Vec::new(),
787 failure_ledger: Vec::new(),
788 frames: std::collections::BTreeMap::new(),
789 loops: std::collections::BTreeMap::new(),
790 loop_iteration_ledger: Vec::new(),
791 schema_version: 4,
792 root_step_outputs: IndexMap::new(),
793 loop_iteration_outputs: BTreeMap::new(),
794 }
795 }
796
797 #[tokio::test]
798 async fn test_event_store_prune() {
799 let store = InMemoryMobEventStore::new();
800 let now = Utc::now();
801
802 store
803 .append(NewMobEvent {
804 mob_id: MobId::from("mob"),
805 timestamp: Some(now - chrono::Duration::minutes(10)),
806 kind: MobEventKind::MobCompleted,
807 })
808 .await
809 .unwrap();
810 store
811 .append(NewMobEvent {
812 mob_id: MobId::from("mob"),
813 timestamp: Some(now),
814 kind: MobEventKind::MobCompleted,
815 })
816 .await
817 .unwrap();
818
819 let removed = store
820 .prune(now - chrono::Duration::minutes(1))
821 .await
822 .unwrap();
823 assert_eq!(removed, 1);
824 assert_eq!(store.replay_all().await.unwrap().len(), 1);
825 }
826
827 #[tokio::test]
828 async fn test_run_store_status_cas_single_winner() {
829 let store = Arc::new(InMemoryMobRunStore::new());
830 let run = sample_run(MobRunStatus::Running);
831 let run_id = run.run_id.clone();
832 store.create_run(run).await.unwrap();
833
834 let tasks = (0..10).map(|_| {
835 let store = Arc::clone(&store);
836 let run_id = run_id.clone();
837 tokio::spawn(async move {
838 store
839 .cas_run_status(&run_id, MobRunStatus::Running, MobRunStatus::Completed)
840 .await
841 .unwrap()
842 })
843 });
844 let outcomes = join_all(tasks).await;
845 let wins = outcomes
846 .into_iter()
847 .filter_map(std::result::Result::ok)
848 .filter(|cas_result| *cas_result)
849 .count();
850 assert_eq!(wins, 1);
851
852 let stored = store.get_run(&run_id).await.unwrap().unwrap();
853 assert_eq!(stored.status, MobRunStatus::Completed);
854 assert!(stored.completed_at.is_some());
855 }
856
857 #[tokio::test]
858 async fn test_run_store_step_dedup_and_ledgers() {
859 let store = InMemoryMobRunStore::new();
860 let run = sample_run(MobRunStatus::Running);
861 let run_id = run.run_id.clone();
862 store.create_run(run).await.unwrap();
863
864 let step_entry = StepLedgerEntry {
865 step_id: StepId::from("s1"),
866 meerkat_id: MeerkatId::from("worker-1"),
867 status: StepRunStatus::Dispatched,
868 output: None,
869 timestamp: Utc::now(),
870 };
871
872 assert!(
873 store
874 .append_step_entry_if_absent(&run_id, step_entry.clone())
875 .await
876 .unwrap()
877 );
878 assert!(
879 !store
880 .append_step_entry_if_absent(&run_id, step_entry)
881 .await
882 .unwrap()
883 );
884
885 store
886 .put_step_output(&run_id, &StepId::from("s1"), serde_json::json!({"ok":true}))
887 .await
888 .unwrap();
889 store
890 .append_failure_entry(
891 &run_id,
892 FailureLedgerEntry {
893 step_id: StepId::from("s1"),
894 reason: "failed".to_string(),
895 timestamp: Utc::now(),
896 },
897 )
898 .await
899 .unwrap();
900
901 let stored = store.get_run(&run_id).await.unwrap().unwrap();
902 assert_eq!(stored.step_ledger.len(), 1);
903 assert_eq!(stored.failure_ledger.len(), 1);
904 assert_eq!(
905 stored.step_ledger[0].output,
906 Some(serde_json::json!({"ok":true}))
907 );
908 }
909
910 #[tokio::test]
911 async fn test_spec_store_revision_conflict_behavior() {
912 let store = InMemoryMobSpecStore::new();
913 let definition = sample_definition();
914 let mob_id = MobId::from("mob");
915
916 let rev1 = store.put_spec(&mob_id, &definition, None).await.unwrap();
917 assert_eq!(rev1, 1);
918
919 let rev2 = store
920 .put_spec(&mob_id, &definition, Some(rev1))
921 .await
922 .unwrap();
923 assert_eq!(rev2, 2);
924
925 let conflict = store
926 .put_spec(&mob_id, &definition, Some(1))
927 .await
928 .unwrap_err();
929 assert!(matches!(
930 conflict,
931 MobStoreError::SpecRevisionConflict {
932 expected: Some(1),
933 actual: 2,
934 ..
935 }
936 ));
937
938 assert!(!store.delete_spec(&mob_id, Some(1)).await.unwrap());
939 assert!(store.delete_spec(&mob_id, Some(2)).await.unwrap());
940 }
941
942 use crate::store::realm_profile::contract_tests;
947
948 #[tokio::test]
949 async fn realm_profile_create_and_get() {
950 let store = InMemoryRealmProfileStore::new();
951 contract_tests::test_create_and_get(&store).await;
952 }
953
954 #[tokio::test]
955 async fn realm_profile_get_nonexistent() {
956 let store = InMemoryRealmProfileStore::new();
957 contract_tests::test_get_nonexistent(&store).await;
958 }
959
960 #[tokio::test]
961 async fn realm_profile_create_duplicate_fails() {
962 let store = InMemoryRealmProfileStore::new();
963 contract_tests::test_create_duplicate_fails(&store).await;
964 }
965
966 #[tokio::test]
967 async fn realm_profile_update_correct_revision() {
968 let store = InMemoryRealmProfileStore::new();
969 contract_tests::test_update_with_correct_revision(&store).await;
970 }
971
972 #[tokio::test]
973 async fn realm_profile_update_wrong_revision() {
974 let store = InMemoryRealmProfileStore::new();
975 contract_tests::test_update_with_wrong_revision(&store).await;
976 }
977
978 #[tokio::test]
979 async fn realm_profile_update_nonexistent() {
980 let store = InMemoryRealmProfileStore::new();
981 contract_tests::test_update_nonexistent(&store).await;
982 }
983
984 #[tokio::test]
985 async fn realm_profile_delete_correct_revision() {
986 let store = InMemoryRealmProfileStore::new();
987 contract_tests::test_delete_with_correct_revision(&store).await;
988 }
989
990 #[tokio::test]
991 async fn realm_profile_delete_wrong_revision() {
992 let store = InMemoryRealmProfileStore::new();
993 contract_tests::test_delete_with_wrong_revision(&store).await;
994 }
995
996 #[tokio::test]
997 async fn realm_profile_delete_nonexistent() {
998 let store = InMemoryRealmProfileStore::new();
999 contract_tests::test_delete_nonexistent(&store).await;
1000 }
1001
1002 #[tokio::test]
1003 async fn realm_profile_list() {
1004 let store = InMemoryRealmProfileStore::new();
1005 contract_tests::test_list(&store).await;
1006 }
1007
1008 #[tokio::test]
1009 async fn realm_profile_list_empty() {
1010 let store = InMemoryRealmProfileStore::new();
1011 contract_tests::test_list_empty(&store).await;
1012 }
1013}