1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use tokio::sync::{Mutex, RwLock};
6
7use camel_api::CamelError;
8
9use crate::lifecycle::domain::{RouteRuntimeAggregate, RouteRuntimeState, RuntimeEvent};
10use crate::lifecycle::ports::{
11 CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
12 RouteStatusProjection, RuntimeEventJournalPort, RuntimeUnitOfWorkPort,
13};
14
15#[derive(Default, Clone)]
16pub struct InMemoryRouteRepository {
17 routes: Arc<RwLock<HashMap<String, RouteRuntimeAggregate>>>,
18}
19
20#[async_trait]
21impl RouteRepositoryPort for InMemoryRouteRepository {
22 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
23 let routes = self.routes.read().await;
24 Ok(routes.get(route_id).cloned())
25 }
26
27 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
28 let mut routes = self.routes.write().await;
29 routes.insert(aggregate.route_id().to_string(), aggregate);
30 Ok(())
31 }
32
33 async fn save_if_version(
34 &self,
35 aggregate: RouteRuntimeAggregate,
36 expected_version: u64,
37 ) -> Result<(), CamelError> {
38 let mut routes = self.routes.write().await;
39 let route_id = aggregate.route_id().to_string();
40 let current = routes.get(&route_id).ok_or_else(|| {
41 CamelError::RouteError(format!(
42 "optimistic lock conflict for route '{route_id}': route not found"
43 ))
44 })?;
45
46 if current.version() != expected_version {
47 return Err(CamelError::RouteError(format!(
48 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
49 current.version()
50 )));
51 }
52
53 routes.insert(route_id, aggregate);
54 Ok(())
55 }
56
57 async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
58 let mut routes = self.routes.write().await;
59 routes.remove(route_id);
60 Ok(())
61 }
62}
63
64#[derive(Default, Clone)]
65pub struct InMemoryProjectionStore {
66 statuses: Arc<RwLock<HashMap<String, RouteStatusProjection>>>,
67}
68
69#[async_trait]
70impl ProjectionStorePort for InMemoryProjectionStore {
71 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
72 let mut statuses = self.statuses.write().await;
73 statuses.insert(status.route_id.clone(), status);
74 Ok(())
75 }
76
77 async fn get_status(
78 &self,
79 route_id: &str,
80 ) -> Result<Option<RouteStatusProjection>, CamelError> {
81 let statuses = self.statuses.read().await;
82 Ok(statuses.get(route_id).cloned())
83 }
84
85 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
86 let statuses = self.statuses.read().await;
87 Ok(statuses.values().cloned().collect())
88 }
89
90 async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
91 let mut statuses = self.statuses.write().await;
92 statuses.remove(route_id);
93 Ok(())
94 }
95}
96
97#[derive(Default, Clone)]
98pub struct InMemoryEventPublisher {
99 events: Arc<RwLock<Vec<RuntimeEvent>>>,
100}
101
102impl InMemoryEventPublisher {
103 pub async fn snapshot(&self) -> Vec<RuntimeEvent> {
104 self.events.read().await.clone()
105 }
106}
107
108#[async_trait]
109impl EventPublisherPort for InMemoryEventPublisher {
110 async fn publish(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
111 let mut stored = self.events.write().await;
112 stored.extend(events.iter().cloned());
113 Ok(())
114 }
115}
116
117#[derive(Default, Clone)]
118pub struct InMemoryCommandDedup {
119 seen: Arc<RwLock<HashSet<String>>>,
120}
121
122#[async_trait]
123impl CommandDedupPort for InMemoryCommandDedup {
124 async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
125 let mut seen = self.seen.write().await;
126 Ok(seen.insert(command_id.to_string()))
127 }
128
129 async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
130 let mut seen = self.seen.write().await;
131 seen.remove(command_id);
132 Ok(())
133 }
134}
135
136#[derive(Clone)]
137pub struct InMemoryRuntimeStore {
138 inner: Arc<Mutex<RuntimeStoreState>>,
139 journal: Option<Arc<dyn RuntimeEventJournalPort>>,
140}
141
142#[derive(Default)]
143struct RuntimeStoreState {
144 routes: HashMap<String, RouteRuntimeAggregate>,
145 statuses: HashMap<String, RouteStatusProjection>,
146 events: Vec<RuntimeEvent>,
147 seen: HashSet<String>,
148}
149
150impl InMemoryRuntimeStore {
151 pub fn with_journal(mut self, journal: Arc<dyn RuntimeEventJournalPort>) -> Self {
152 self.journal = Some(journal);
153 self
154 }
155
156 pub async fn snapshot_events(&self) -> Vec<RuntimeEvent> {
157 self.inner.lock().await.events.clone()
158 }
159}
160
161fn upsert_replayed_route(
162 state: &mut RuntimeStoreState,
163 route_id: &str,
164 next_state: RouteRuntimeState,
165 status: &str,
166 increment_version: bool,
167) {
168 let current_version = state
169 .routes
170 .get(route_id)
171 .map(|agg| agg.version())
172 .unwrap_or(0);
173 let next_version = if increment_version {
174 current_version.saturating_add(1)
175 } else {
176 current_version
177 };
178 state.routes.insert(
179 route_id.to_string(),
180 RouteRuntimeAggregate::from_snapshot(route_id, next_state, next_version),
181 );
182 state.statuses.insert(
183 route_id.to_string(),
184 RouteStatusProjection {
185 route_id: route_id.to_string(),
186 status: status.to_string(),
187 },
188 );
189}
190
191fn apply_replayed_event(state: &mut RuntimeStoreState, event: &RuntimeEvent) {
192 match event {
193 RuntimeEvent::RouteRegistered { route_id } => {
194 state.routes.insert(
195 route_id.clone(),
196 RouteRuntimeAggregate::new(route_id.clone()),
197 );
198 state.statuses.insert(
199 route_id.clone(),
200 RouteStatusProjection {
201 route_id: route_id.clone(),
202 status: "Registered".to_string(),
203 },
204 );
205 }
206 RuntimeEvent::RouteStartRequested { route_id } => {
207 upsert_replayed_route(
208 state,
209 route_id,
210 RouteRuntimeState::Starting,
211 "Starting",
212 true,
213 );
214 }
215 RuntimeEvent::RouteStarted { route_id } => {
216 let increment_version = !matches!(
217 state.routes.get(route_id).map(|agg| agg.state()),
218 Some(RouteRuntimeState::Starting)
219 );
220 upsert_replayed_route(
221 state,
222 route_id,
223 RouteRuntimeState::Started,
224 "Started",
225 increment_version,
226 );
227 }
228 RuntimeEvent::RouteFailed { route_id, error } => {
229 upsert_replayed_route(
230 state,
231 route_id,
232 RouteRuntimeState::Failed(error.clone()),
233 "Failed",
234 true,
235 );
236 }
237 RuntimeEvent::RouteStopped { route_id } => {
238 upsert_replayed_route(state, route_id, RouteRuntimeState::Stopped, "Stopped", true);
239 }
240 RuntimeEvent::RouteSuspended { route_id } => {
241 upsert_replayed_route(
242 state,
243 route_id,
244 RouteRuntimeState::Suspended,
245 "Suspended",
246 true,
247 );
248 }
249 RuntimeEvent::RouteResumed { route_id } => {
250 upsert_replayed_route(state, route_id, RouteRuntimeState::Started, "Started", true);
251 }
252 RuntimeEvent::RouteReloaded { route_id } => {
253 upsert_replayed_route(state, route_id, RouteRuntimeState::Started, "Started", true);
254 }
255 RuntimeEvent::RouteRemoved { route_id } => {
256 state.routes.remove(route_id);
257 state.statuses.remove(route_id);
258 }
259 }
260}
261
262impl Default for InMemoryRuntimeStore {
263 fn default() -> Self {
264 Self {
265 inner: Arc::new(Mutex::new(RuntimeStoreState::default())),
266 journal: None,
267 }
268 }
269}
270
271#[async_trait]
272impl RouteRepositoryPort for InMemoryRuntimeStore {
273 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
274 let guard = self.inner.lock().await;
275 Ok(guard.routes.get(route_id).cloned())
276 }
277
278 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
279 let mut guard = self.inner.lock().await;
280 guard
281 .routes
282 .insert(aggregate.route_id().to_string(), aggregate);
283 Ok(())
284 }
285
286 async fn save_if_version(
287 &self,
288 aggregate: RouteRuntimeAggregate,
289 expected_version: u64,
290 ) -> Result<(), CamelError> {
291 let mut guard = self.inner.lock().await;
292 let route_id = aggregate.route_id().to_string();
293 let current = guard.routes.get(&route_id).ok_or_else(|| {
294 CamelError::RouteError(format!(
295 "optimistic lock conflict for route '{route_id}': route not found"
296 ))
297 })?;
298
299 if current.version() != expected_version {
300 return Err(CamelError::RouteError(format!(
301 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
302 current.version()
303 )));
304 }
305
306 guard.routes.insert(route_id, aggregate);
307 Ok(())
308 }
309
310 async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
311 let mut guard = self.inner.lock().await;
312 guard.routes.remove(route_id);
313 Ok(())
314 }
315}
316
317#[async_trait]
318impl ProjectionStorePort for InMemoryRuntimeStore {
319 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
320 let mut guard = self.inner.lock().await;
321 guard.statuses.insert(status.route_id.clone(), status);
322 Ok(())
323 }
324
325 async fn get_status(
326 &self,
327 route_id: &str,
328 ) -> Result<Option<RouteStatusProjection>, CamelError> {
329 let guard = self.inner.lock().await;
330 Ok(guard.statuses.get(route_id).cloned())
331 }
332
333 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
334 let guard = self.inner.lock().await;
335 Ok(guard.statuses.values().cloned().collect())
336 }
337
338 async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
339 let mut guard = self.inner.lock().await;
340 guard.statuses.remove(route_id);
341 Ok(())
342 }
343}
344
345#[async_trait]
346impl EventPublisherPort for InMemoryRuntimeStore {
347 async fn publish(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
348 let mut guard = self.inner.lock().await;
349 if let Some(journal) = &self.journal {
350 journal.append_batch(events).await?;
351 }
352 guard.events.extend(events.iter().cloned());
353 Ok(())
354 }
355}
356
357#[async_trait]
358impl CommandDedupPort for InMemoryRuntimeStore {
359 async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
360 let mut guard = self.inner.lock().await;
361 if !guard.seen.insert(command_id.to_string()) {
362 return Ok(false);
363 }
364
365 if let Some(journal) = &self.journal
366 && let Err(err) = journal.append_command_id(command_id).await
367 {
368 guard.seen.remove(command_id);
369 return Err(err);
370 }
371
372 Ok(true)
373 }
374
375 async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
376 let mut guard = self.inner.lock().await;
377 let removed = guard.seen.remove(command_id);
378 if removed && let Some(journal) = &self.journal {
379 journal.remove_command_id(command_id).await?;
380 }
381 Ok(())
382 }
383}
384
385#[async_trait]
386impl RuntimeUnitOfWorkPort for InMemoryRuntimeStore {
387 async fn persist_upsert(
388 &self,
389 aggregate: RouteRuntimeAggregate,
390 expected_version: Option<u64>,
391 projection: RouteStatusProjection,
392 events: &[RuntimeEvent],
393 ) -> Result<(), CamelError> {
394 let mut guard = self.inner.lock().await;
395 if let Some(expected) = expected_version {
396 let route_id = aggregate.route_id().to_string();
397 let current = guard.routes.get(&route_id).ok_or_else(|| {
398 CamelError::RouteError(format!(
399 "optimistic lock conflict for route '{route_id}': route not found"
400 ))
401 })?;
402 if current.version() != expected {
403 return Err(CamelError::RouteError(format!(
404 "optimistic lock conflict for route '{route_id}': expected version {expected}, actual {}",
405 current.version()
406 )));
407 }
408 }
409
410 if let Some(journal) = &self.journal {
411 journal.append_batch(events).await?;
412 }
413
414 guard
415 .routes
416 .insert(aggregate.route_id().to_string(), aggregate);
417 guard
418 .statuses
419 .insert(projection.route_id.clone(), projection);
420 guard.events.extend(events.iter().cloned());
421 Ok(())
422 }
423
424 async fn persist_delete(
425 &self,
426 route_id: &str,
427 events: &[RuntimeEvent],
428 ) -> Result<(), CamelError> {
429 let mut guard = self.inner.lock().await;
430 if let Some(journal) = &self.journal {
431 journal.append_batch(events).await?;
432 }
433 guard.routes.remove(route_id);
434 guard.statuses.remove(route_id);
435 guard.events.extend(events.iter().cloned());
436 Ok(())
437 }
438
439 async fn recover_from_journal(&self) -> Result<(), CamelError> {
440 let Some(journal) = &self.journal else {
441 return Ok(());
442 };
443
444 let replayed_events = journal.load_all().await?;
445 let replayed_command_ids = journal.load_command_ids().await?;
446
447 let mut guard = self.inner.lock().await;
448 guard.routes.clear();
449 guard.statuses.clear();
450 guard.events.clear();
451 guard.seen.clear();
452
453 for event in &replayed_events {
454 apply_replayed_event(&mut guard, event);
455 }
456 guard.events = replayed_events;
457 for command_id in replayed_command_ids {
458 guard.seen.insert(command_id);
459 }
460 Ok(())
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use std::sync::Arc;
468
469 #[derive(Clone)]
470 struct ReplayJournal {
471 events: Vec<RuntimeEvent>,
472 }
473
474 #[async_trait]
475 impl RuntimeEventJournalPort for ReplayJournal {
476 async fn append_batch(&self, _events: &[RuntimeEvent]) -> Result<(), CamelError> {
477 Ok(())
478 }
479
480 async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError> {
481 Ok(self.events.clone())
482 }
483 }
484
485 #[tokio::test]
486 async fn repo_roundtrip_works() {
487 let repo = InMemoryRouteRepository::default();
488 repo.save(RouteRuntimeAggregate::new("r1")).await.unwrap();
489 assert!(repo.load("r1").await.unwrap().is_some());
490
491 let updated = RouteRuntimeAggregate::from_snapshot(
492 "r1",
493 crate::lifecycle::domain::RouteRuntimeState::Started,
494 1,
495 );
496 repo.save_if_version(updated.clone(), 0).await.unwrap();
497 let loaded = repo.load("r1").await.unwrap().unwrap();
498 assert_eq!(loaded.version(), 1);
499
500 let conflict = repo.save_if_version(updated, 0).await.unwrap_err();
501 assert!(
502 conflict.to_string().contains("optimistic lock conflict"),
503 "unexpected conflict error: {conflict}"
504 );
505
506 repo.delete("r1").await.unwrap();
507 assert!(repo.load("r1").await.unwrap().is_none());
508 }
509
510 #[tokio::test]
511 async fn projection_roundtrip_works() {
512 let store = InMemoryProjectionStore::default();
513 store
514 .upsert_status(RouteStatusProjection {
515 route_id: "r1".into(),
516 status: "Started".into(),
517 })
518 .await
519 .unwrap();
520
521 let status = store.get_status("r1").await.unwrap();
522 assert!(status.is_some());
523 assert_eq!(status.unwrap().status, "Started");
524 store.remove_status("r1").await.unwrap();
525 assert!(store.get_status("r1").await.unwrap().is_none());
526 }
527
528 #[tokio::test]
529 async fn event_publisher_stores_events() {
530 let publisher = InMemoryEventPublisher::default();
531 publisher
532 .publish(&[RuntimeEvent::RouteStarted {
533 route_id: "r1".into(),
534 }])
535 .await
536 .unwrap();
537
538 let events = publisher.snapshot().await;
539 assert_eq!(events.len(), 1);
540 }
541
542 #[tokio::test]
543 async fn command_dedup_detects_duplicates() {
544 let dedup = InMemoryCommandDedup::default();
545 assert!(dedup.first_seen("c1").await.unwrap());
546 assert!(!dedup.first_seen("c1").await.unwrap());
547 dedup.forget_seen("c1").await.unwrap();
548 assert!(dedup.first_seen("c1").await.unwrap());
549 assert!(dedup.first_seen("c2").await.unwrap());
550 }
551
552 #[tokio::test]
553 async fn runtime_store_uow_persists_all_three_writes() {
554 let store = InMemoryRuntimeStore::default();
555 let aggregate = RouteRuntimeAggregate::new("uow-r1");
556 let projection = RouteStatusProjection {
557 route_id: "uow-r1".to_string(),
558 status: "Registered".to_string(),
559 };
560 let events = vec![RuntimeEvent::RouteRegistered {
561 route_id: "uow-r1".to_string(),
562 }];
563
564 store
565 .persist_upsert(aggregate, None, projection.clone(), &events)
566 .await
567 .unwrap();
568
569 assert!(store.load("uow-r1").await.unwrap().is_some());
570 assert_eq!(
571 store.get_status("uow-r1").await.unwrap().unwrap(),
572 projection
573 );
574 assert_eq!(store.snapshot_events().await, events);
575 }
576
577 #[tokio::test]
578 async fn runtime_store_uow_enforces_expected_version() {
579 let store = InMemoryRuntimeStore::default();
580 let initial = RouteRuntimeAggregate::new("uow-r2");
581 let initial_projection = RouteStatusProjection {
582 route_id: "uow-r2".to_string(),
583 status: "Registered".to_string(),
584 };
585 store
586 .persist_upsert(
587 initial,
588 None,
589 initial_projection,
590 &[RuntimeEvent::RouteRegistered {
591 route_id: "uow-r2".to_string(),
592 }],
593 )
594 .await
595 .unwrap();
596
597 let started = RouteRuntimeAggregate::from_snapshot(
598 "uow-r2",
599 crate::lifecycle::domain::RouteRuntimeState::Started,
600 1,
601 );
602 let err = store
603 .persist_upsert(
604 started,
605 Some(99),
606 RouteStatusProjection {
607 route_id: "uow-r2".to_string(),
608 status: "Started".to_string(),
609 },
610 &[RuntimeEvent::RouteStarted {
611 route_id: "uow-r2".to_string(),
612 }],
613 )
614 .await
615 .unwrap_err()
616 .to_string();
617 assert!(
618 err.contains("optimistic lock conflict"),
619 "unexpected error: {err}"
620 );
621 }
622
623 #[tokio::test]
624 async fn replay_start_requested_only_advances_version_once() {
625 let store = InMemoryRuntimeStore::default().with_journal(Arc::new(ReplayJournal {
626 events: vec![
627 RuntimeEvent::RouteRegistered {
628 route_id: "replay-r1".to_string(),
629 },
630 RuntimeEvent::RouteStartRequested {
631 route_id: "replay-r1".to_string(),
632 },
633 ],
634 }));
635
636 store.recover_from_journal().await.unwrap();
637 let aggregate = store.load("replay-r1").await.unwrap().unwrap();
638
639 assert_eq!(aggregate.state(), &RouteRuntimeState::Starting);
640 assert_eq!(aggregate.version(), 1);
641 }
642
643 #[tokio::test]
644 async fn replay_start_requested_then_started_keeps_single_command_version() {
645 let store = InMemoryRuntimeStore::default().with_journal(Arc::new(ReplayJournal {
646 events: vec![
647 RuntimeEvent::RouteRegistered {
648 route_id: "replay-r2".to_string(),
649 },
650 RuntimeEvent::RouteStartRequested {
651 route_id: "replay-r2".to_string(),
652 },
653 RuntimeEvent::RouteStarted {
654 route_id: "replay-r2".to_string(),
655 },
656 ],
657 }));
658
659 store.recover_from_journal().await.unwrap();
660 let aggregate = store.load("replay-r2").await.unwrap().unwrap();
661
662 assert_eq!(aggregate.state(), &RouteRuntimeState::Started);
663 assert_eq!(aggregate.version(), 1);
664 }
665}