Skip to main content

camel_core/lifecycle/adapters/
in_memory.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use tokio::sync::{Mutex, RwLock};
6
7use camel_api::CamelError;
8
9use crate::lifecycle::domain::{RouteRuntimeAggregate, RouteRuntimeState, RuntimeEvent};
10use crate::lifecycle::ports::{
11    CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
12    RouteStatusProjection, RuntimeEventJournalPort, RuntimeUnitOfWorkPort,
13};
14
15#[derive(Default, Clone)]
16pub struct InMemoryRouteRepository {
17    routes: Arc<RwLock<HashMap<String, RouteRuntimeAggregate>>>,
18}
19
20#[async_trait]
21impl RouteRepositoryPort for InMemoryRouteRepository {
22    async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
23        let routes = self.routes.read().await;
24        Ok(routes.get(route_id).cloned())
25    }
26
27    async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
28        let mut routes = self.routes.write().await;
29        routes.insert(aggregate.route_id().to_string(), aggregate);
30        Ok(())
31    }
32
33    async fn save_if_version(
34        &self,
35        aggregate: RouteRuntimeAggregate,
36        expected_version: u64,
37    ) -> Result<(), CamelError> {
38        let mut routes = self.routes.write().await;
39        let route_id = aggregate.route_id().to_string();
40        let current = routes.get(&route_id).ok_or_else(|| {
41            CamelError::RouteError(format!(
42                "optimistic lock conflict for route '{route_id}': route not found"
43            ))
44        })?;
45
46        if current.version() != expected_version {
47            return Err(CamelError::RouteError(format!(
48                "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
49                current.version()
50            )));
51        }
52
53        routes.insert(route_id, aggregate);
54        Ok(())
55    }
56
57    async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
58        let mut routes = self.routes.write().await;
59        routes.remove(route_id);
60        Ok(())
61    }
62}
63
64#[derive(Default, Clone)]
65pub struct InMemoryProjectionStore {
66    statuses: Arc<RwLock<HashMap<String, RouteStatusProjection>>>,
67}
68
69#[async_trait]
70impl ProjectionStorePort for InMemoryProjectionStore {
71    async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
72        let mut statuses = self.statuses.write().await;
73        statuses.insert(status.route_id.clone(), status);
74        Ok(())
75    }
76
77    async fn get_status(
78        &self,
79        route_id: &str,
80    ) -> Result<Option<RouteStatusProjection>, CamelError> {
81        let statuses = self.statuses.read().await;
82        Ok(statuses.get(route_id).cloned())
83    }
84
85    async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
86        let statuses = self.statuses.read().await;
87        Ok(statuses.values().cloned().collect())
88    }
89
90    async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
91        let mut statuses = self.statuses.write().await;
92        statuses.remove(route_id);
93        Ok(())
94    }
95}
96
97#[derive(Default, Clone)]
98pub struct InMemoryEventPublisher {
99    events: Arc<RwLock<Vec<RuntimeEvent>>>,
100}
101
102impl InMemoryEventPublisher {
103    pub async fn snapshot(&self) -> Vec<RuntimeEvent> {
104        self.events.read().await.clone()
105    }
106}
107
108#[async_trait]
109impl EventPublisherPort for InMemoryEventPublisher {
110    async fn publish(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
111        let mut stored = self.events.write().await;
112        stored.extend(events.iter().cloned());
113        Ok(())
114    }
115}
116
117#[derive(Default, Clone)]
118pub struct InMemoryCommandDedup {
119    seen: Arc<RwLock<HashSet<String>>>,
120}
121
122#[async_trait]
123impl CommandDedupPort for InMemoryCommandDedup {
124    async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
125        let mut seen = self.seen.write().await;
126        Ok(seen.insert(command_id.to_string()))
127    }
128
129    async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
130        let mut seen = self.seen.write().await;
131        seen.remove(command_id);
132        Ok(())
133    }
134}
135
136#[derive(Clone)]
137pub struct InMemoryRuntimeStore {
138    inner: Arc<Mutex<RuntimeStoreState>>,
139    journal: Option<Arc<dyn RuntimeEventJournalPort>>,
140}
141
142#[derive(Default)]
143struct RuntimeStoreState {
144    routes: HashMap<String, RouteRuntimeAggregate>,
145    statuses: HashMap<String, RouteStatusProjection>,
146    events: Vec<RuntimeEvent>,
147    seen: HashSet<String>,
148}
149
150impl InMemoryRuntimeStore {
151    pub fn with_journal(mut self, journal: Arc<dyn RuntimeEventJournalPort>) -> Self {
152        self.journal = Some(journal);
153        self
154    }
155
156    pub async fn snapshot_events(&self) -> Vec<RuntimeEvent> {
157        self.inner.lock().await.events.clone()
158    }
159}
160
161fn upsert_replayed_route(
162    state: &mut RuntimeStoreState,
163    route_id: &str,
164    next_state: RouteRuntimeState,
165    status: &str,
166    increment_version: bool,
167) {
168    let current_version = state
169        .routes
170        .get(route_id)
171        .map(|agg| agg.version())
172        .unwrap_or(0);
173    let next_version = if increment_version {
174        current_version.saturating_add(1)
175    } else {
176        current_version
177    };
178    state.routes.insert(
179        route_id.to_string(),
180        RouteRuntimeAggregate::from_snapshot(route_id, next_state, next_version),
181    );
182    state.statuses.insert(
183        route_id.to_string(),
184        RouteStatusProjection {
185            route_id: route_id.to_string(),
186            status: status.to_string(),
187        },
188    );
189}
190
191fn apply_replayed_event(state: &mut RuntimeStoreState, event: &RuntimeEvent) {
192    match event {
193        RuntimeEvent::RouteRegistered { route_id } => {
194            state.routes.insert(
195                route_id.clone(),
196                RouteRuntimeAggregate::new(route_id.clone()),
197            );
198            state.statuses.insert(
199                route_id.clone(),
200                RouteStatusProjection {
201                    route_id: route_id.clone(),
202                    status: "Registered".to_string(),
203                },
204            );
205        }
206        RuntimeEvent::RouteStartRequested { route_id } => {
207            upsert_replayed_route(
208                state,
209                route_id,
210                RouteRuntimeState::Starting,
211                "Starting",
212                true,
213            );
214        }
215        RuntimeEvent::RouteStarted { route_id } => {
216            let increment_version = !matches!(
217                state.routes.get(route_id).map(|agg| agg.state()),
218                Some(RouteRuntimeState::Starting)
219            );
220            upsert_replayed_route(
221                state,
222                route_id,
223                RouteRuntimeState::Started,
224                "Started",
225                increment_version,
226            );
227        }
228        RuntimeEvent::RouteFailed { route_id, error } => {
229            upsert_replayed_route(
230                state,
231                route_id,
232                RouteRuntimeState::Failed(error.clone()),
233                "Failed",
234                true,
235            );
236        }
237        RuntimeEvent::RouteStopped { route_id } => {
238            upsert_replayed_route(state, route_id, RouteRuntimeState::Stopped, "Stopped", true);
239        }
240        RuntimeEvent::RouteSuspended { route_id } => {
241            upsert_replayed_route(
242                state,
243                route_id,
244                RouteRuntimeState::Suspended,
245                "Suspended",
246                true,
247            );
248        }
249        RuntimeEvent::RouteResumed { route_id } => {
250            upsert_replayed_route(state, route_id, RouteRuntimeState::Started, "Started", true);
251        }
252        RuntimeEvent::RouteReloaded { route_id } => {
253            upsert_replayed_route(state, route_id, RouteRuntimeState::Started, "Started", true);
254        }
255        RuntimeEvent::RouteRemoved { route_id } => {
256            state.routes.remove(route_id);
257            state.statuses.remove(route_id);
258        }
259    }
260}
261
262impl Default for InMemoryRuntimeStore {
263    fn default() -> Self {
264        Self {
265            inner: Arc::new(Mutex::new(RuntimeStoreState::default())),
266            journal: None,
267        }
268    }
269}
270
271#[async_trait]
272impl RouteRepositoryPort for InMemoryRuntimeStore {
273    async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
274        let guard = self.inner.lock().await;
275        Ok(guard.routes.get(route_id).cloned())
276    }
277
278    async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
279        let mut guard = self.inner.lock().await;
280        guard
281            .routes
282            .insert(aggregate.route_id().to_string(), aggregate);
283        Ok(())
284    }
285
286    async fn save_if_version(
287        &self,
288        aggregate: RouteRuntimeAggregate,
289        expected_version: u64,
290    ) -> Result<(), CamelError> {
291        let mut guard = self.inner.lock().await;
292        let route_id = aggregate.route_id().to_string();
293        let current = guard.routes.get(&route_id).ok_or_else(|| {
294            CamelError::RouteError(format!(
295                "optimistic lock conflict for route '{route_id}': route not found"
296            ))
297        })?;
298
299        if current.version() != expected_version {
300            return Err(CamelError::RouteError(format!(
301                "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
302                current.version()
303            )));
304        }
305
306        guard.routes.insert(route_id, aggregate);
307        Ok(())
308    }
309
310    async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
311        let mut guard = self.inner.lock().await;
312        guard.routes.remove(route_id);
313        Ok(())
314    }
315}
316
317#[async_trait]
318impl ProjectionStorePort for InMemoryRuntimeStore {
319    async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
320        let mut guard = self.inner.lock().await;
321        guard.statuses.insert(status.route_id.clone(), status);
322        Ok(())
323    }
324
325    async fn get_status(
326        &self,
327        route_id: &str,
328    ) -> Result<Option<RouteStatusProjection>, CamelError> {
329        let guard = self.inner.lock().await;
330        Ok(guard.statuses.get(route_id).cloned())
331    }
332
333    async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
334        let guard = self.inner.lock().await;
335        Ok(guard.statuses.values().cloned().collect())
336    }
337
338    async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
339        let mut guard = self.inner.lock().await;
340        guard.statuses.remove(route_id);
341        Ok(())
342    }
343}
344
345#[async_trait]
346impl EventPublisherPort for InMemoryRuntimeStore {
347    async fn publish(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
348        let mut guard = self.inner.lock().await;
349        if let Some(journal) = &self.journal {
350            journal.append_batch(events).await?;
351        }
352        guard.events.extend(events.iter().cloned());
353        Ok(())
354    }
355}
356
357#[async_trait]
358impl CommandDedupPort for InMemoryRuntimeStore {
359    async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
360        let mut guard = self.inner.lock().await;
361        if !guard.seen.insert(command_id.to_string()) {
362            return Ok(false);
363        }
364
365        if let Some(journal) = &self.journal
366            && let Err(err) = journal.append_command_id(command_id).await
367        {
368            guard.seen.remove(command_id);
369            return Err(err);
370        }
371
372        Ok(true)
373    }
374
375    async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
376        let mut guard = self.inner.lock().await;
377        let removed = guard.seen.remove(command_id);
378        if removed && let Some(journal) = &self.journal {
379            journal.remove_command_id(command_id).await?;
380        }
381        Ok(())
382    }
383}
384
385#[async_trait]
386impl RuntimeUnitOfWorkPort for InMemoryRuntimeStore {
387    async fn persist_upsert(
388        &self,
389        aggregate: RouteRuntimeAggregate,
390        expected_version: Option<u64>,
391        projection: RouteStatusProjection,
392        events: &[RuntimeEvent],
393    ) -> Result<(), CamelError> {
394        let mut guard = self.inner.lock().await;
395        if let Some(expected) = expected_version {
396            let route_id = aggregate.route_id().to_string();
397            let current = guard.routes.get(&route_id).ok_or_else(|| {
398                CamelError::RouteError(format!(
399                    "optimistic lock conflict for route '{route_id}': route not found"
400                ))
401            })?;
402            if current.version() != expected {
403                return Err(CamelError::RouteError(format!(
404                    "optimistic lock conflict for route '{route_id}': expected version {expected}, actual {}",
405                    current.version()
406                )));
407            }
408        }
409
410        if let Some(journal) = &self.journal {
411            journal.append_batch(events).await?;
412        }
413
414        guard
415            .routes
416            .insert(aggregate.route_id().to_string(), aggregate);
417        guard
418            .statuses
419            .insert(projection.route_id.clone(), projection);
420        guard.events.extend(events.iter().cloned());
421        Ok(())
422    }
423
424    async fn persist_delete(
425        &self,
426        route_id: &str,
427        events: &[RuntimeEvent],
428    ) -> Result<(), CamelError> {
429        let mut guard = self.inner.lock().await;
430        if let Some(journal) = &self.journal {
431            journal.append_batch(events).await?;
432        }
433        guard.routes.remove(route_id);
434        guard.statuses.remove(route_id);
435        guard.events.extend(events.iter().cloned());
436        Ok(())
437    }
438
439    async fn recover_from_journal(&self) -> Result<(), CamelError> {
440        let Some(journal) = &self.journal else {
441            return Ok(());
442        };
443
444        let replayed_events = journal.load_all().await?;
445        let replayed_command_ids = journal.load_command_ids().await?;
446
447        let mut guard = self.inner.lock().await;
448        guard.routes.clear();
449        guard.statuses.clear();
450        guard.events.clear();
451        guard.seen.clear();
452
453        for event in &replayed_events {
454            apply_replayed_event(&mut guard, event);
455        }
456        guard.events = replayed_events;
457        for command_id in replayed_command_ids {
458            guard.seen.insert(command_id);
459        }
460        Ok(())
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use std::sync::Arc;
468
469    #[derive(Clone)]
470    struct ReplayJournal {
471        events: Vec<RuntimeEvent>,
472    }
473
474    #[async_trait]
475    impl RuntimeEventJournalPort for ReplayJournal {
476        async fn append_batch(&self, _events: &[RuntimeEvent]) -> Result<(), CamelError> {
477            Ok(())
478        }
479
480        async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError> {
481            Ok(self.events.clone())
482        }
483    }
484
485    #[tokio::test]
486    async fn repo_roundtrip_works() {
487        let repo = InMemoryRouteRepository::default();
488        repo.save(RouteRuntimeAggregate::new("r1")).await.unwrap();
489        assert!(repo.load("r1").await.unwrap().is_some());
490
491        let updated = RouteRuntimeAggregate::from_snapshot(
492            "r1",
493            crate::lifecycle::domain::RouteRuntimeState::Started,
494            1,
495        );
496        repo.save_if_version(updated.clone(), 0).await.unwrap();
497        let loaded = repo.load("r1").await.unwrap().unwrap();
498        assert_eq!(loaded.version(), 1);
499
500        let conflict = repo.save_if_version(updated, 0).await.unwrap_err();
501        assert!(
502            conflict.to_string().contains("optimistic lock conflict"),
503            "unexpected conflict error: {conflict}"
504        );
505
506        repo.delete("r1").await.unwrap();
507        assert!(repo.load("r1").await.unwrap().is_none());
508    }
509
510    #[tokio::test]
511    async fn projection_roundtrip_works() {
512        let store = InMemoryProjectionStore::default();
513        store
514            .upsert_status(RouteStatusProjection {
515                route_id: "r1".into(),
516                status: "Started".into(),
517            })
518            .await
519            .unwrap();
520
521        let status = store.get_status("r1").await.unwrap();
522        assert!(status.is_some());
523        assert_eq!(status.unwrap().status, "Started");
524        store.remove_status("r1").await.unwrap();
525        assert!(store.get_status("r1").await.unwrap().is_none());
526    }
527
528    #[tokio::test]
529    async fn event_publisher_stores_events() {
530        let publisher = InMemoryEventPublisher::default();
531        publisher
532            .publish(&[RuntimeEvent::RouteStarted {
533                route_id: "r1".into(),
534            }])
535            .await
536            .unwrap();
537
538        let events = publisher.snapshot().await;
539        assert_eq!(events.len(), 1);
540    }
541
542    #[tokio::test]
543    async fn command_dedup_detects_duplicates() {
544        let dedup = InMemoryCommandDedup::default();
545        assert!(dedup.first_seen("c1").await.unwrap());
546        assert!(!dedup.first_seen("c1").await.unwrap());
547        dedup.forget_seen("c1").await.unwrap();
548        assert!(dedup.first_seen("c1").await.unwrap());
549        assert!(dedup.first_seen("c2").await.unwrap());
550    }
551
552    #[tokio::test]
553    async fn runtime_store_uow_persists_all_three_writes() {
554        let store = InMemoryRuntimeStore::default();
555        let aggregate = RouteRuntimeAggregate::new("uow-r1");
556        let projection = RouteStatusProjection {
557            route_id: "uow-r1".to_string(),
558            status: "Registered".to_string(),
559        };
560        let events = vec![RuntimeEvent::RouteRegistered {
561            route_id: "uow-r1".to_string(),
562        }];
563
564        store
565            .persist_upsert(aggregate, None, projection.clone(), &events)
566            .await
567            .unwrap();
568
569        assert!(store.load("uow-r1").await.unwrap().is_some());
570        assert_eq!(
571            store.get_status("uow-r1").await.unwrap().unwrap(),
572            projection
573        );
574        assert_eq!(store.snapshot_events().await, events);
575    }
576
577    #[tokio::test]
578    async fn runtime_store_uow_enforces_expected_version() {
579        let store = InMemoryRuntimeStore::default();
580        let initial = RouteRuntimeAggregate::new("uow-r2");
581        let initial_projection = RouteStatusProjection {
582            route_id: "uow-r2".to_string(),
583            status: "Registered".to_string(),
584        };
585        store
586            .persist_upsert(
587                initial,
588                None,
589                initial_projection,
590                &[RuntimeEvent::RouteRegistered {
591                    route_id: "uow-r2".to_string(),
592                }],
593            )
594            .await
595            .unwrap();
596
597        let started = RouteRuntimeAggregate::from_snapshot(
598            "uow-r2",
599            crate::lifecycle::domain::RouteRuntimeState::Started,
600            1,
601        );
602        let err = store
603            .persist_upsert(
604                started,
605                Some(99),
606                RouteStatusProjection {
607                    route_id: "uow-r2".to_string(),
608                    status: "Started".to_string(),
609                },
610                &[RuntimeEvent::RouteStarted {
611                    route_id: "uow-r2".to_string(),
612                }],
613            )
614            .await
615            .unwrap_err()
616            .to_string();
617        assert!(
618            err.contains("optimistic lock conflict"),
619            "unexpected error: {err}"
620        );
621    }
622
623    #[tokio::test]
624    async fn replay_start_requested_only_advances_version_once() {
625        let store = InMemoryRuntimeStore::default().with_journal(Arc::new(ReplayJournal {
626            events: vec![
627                RuntimeEvent::RouteRegistered {
628                    route_id: "replay-r1".to_string(),
629                },
630                RuntimeEvent::RouteStartRequested {
631                    route_id: "replay-r1".to_string(),
632                },
633            ],
634        }));
635
636        store.recover_from_journal().await.unwrap();
637        let aggregate = store.load("replay-r1").await.unwrap().unwrap();
638
639        assert_eq!(aggregate.state(), &RouteRuntimeState::Starting);
640        assert_eq!(aggregate.version(), 1);
641    }
642
643    #[tokio::test]
644    async fn replay_start_requested_then_started_keeps_single_command_version() {
645        let store = InMemoryRuntimeStore::default().with_journal(Arc::new(ReplayJournal {
646            events: vec![
647                RuntimeEvent::RouteRegistered {
648                    route_id: "replay-r2".to_string(),
649                },
650                RuntimeEvent::RouteStartRequested {
651                    route_id: "replay-r2".to_string(),
652                },
653                RuntimeEvent::RouteStarted {
654                    route_id: "replay-r2".to_string(),
655                },
656            ],
657        }));
658
659        store.recover_from_journal().await.unwrap();
660        let aggregate = store.load("replay-r2").await.unwrap().unwrap();
661
662        assert_eq!(aggregate.state(), &RouteRuntimeState::Started);
663        assert_eq!(aggregate.version(), 1);
664    }
665}