Skip to main content

camel_core/lifecycle/application/
runtime_bus.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7    CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8    RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12    CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18    CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19    RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23    repo: Arc<dyn RouteRepositoryPort>,
24    projections: Arc<dyn ProjectionStorePort>,
25    events: Arc<dyn EventPublisherPort>,
26    dedup: Arc<dyn CommandDedupPort>,
27    uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28    execution: Option<Arc<dyn RuntimeExecutionPort>>,
29    journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33    pub fn new(
34        repo: Arc<dyn RouteRepositoryPort>,
35        projections: Arc<dyn ProjectionStorePort>,
36        events: Arc<dyn EventPublisherPort>,
37        dedup: Arc<dyn CommandDedupPort>,
38    ) -> Self {
39        Self {
40            repo,
41            projections,
42            events,
43            dedup,
44            uow: None,
45            execution: None,
46            journal_recovered_once: OnceCell::new(),
47        }
48    }
49
50    pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51        self.uow = Some(uow);
52        self
53    }
54
55    pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56        self.execution = Some(execution);
57        self
58    }
59
60    pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61        &self.repo
62    }
63
64    fn deps(&self) -> CommandDeps {
65        CommandDeps {
66            repo: Arc::clone(&self.repo),
67            projections: Arc::clone(&self.projections),
68            events: Arc::clone(&self.events),
69            uow: self.uow.clone(),
70            execution: self.execution.clone(),
71        }
72    }
73
74    fn query_deps(&self) -> QueryDeps {
75        QueryDeps {
76            projections: Arc::clone(&self.projections),
77        }
78    }
79
80    async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
81        let Some(uow) = &self.uow else {
82            return Ok(());
83        };
84
85        self.journal_recovered_once
86            .get_or_try_init(|| async {
87                uow.recover_from_journal().await?;
88                Ok::<(), CamelError>(())
89            })
90            .await?;
91        Ok(())
92    }
93}
94
95#[async_trait]
96impl RuntimeCommandBus for RuntimeBus {
97    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
98        self.ensure_journal_recovered().await?;
99        let command_id = cmd.command_id().to_string();
100        if !self.dedup.first_seen(&command_id).await? {
101            return Ok(RuntimeCommandResult::Duplicate { command_id });
102        }
103        let deps = self.deps();
104        match execute_command(&deps, cmd).await {
105            Ok(result) => Ok(result),
106            Err(err) => {
107                let _ = self.dedup.forget_seen(&command_id).await;
108                Err(err)
109            }
110        }
111    }
112}
113
114#[async_trait]
115impl RuntimeQueryBus for RuntimeBus {
116    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117        self.ensure_journal_recovered().await?;
118        let deps = self.query_deps();
119        execute_query(&deps, query).await
120    }
121}
122
123#[async_trait]
124impl RouteRegistrationPort for RuntimeBus {
125    async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
126        self.ensure_journal_recovered().await?;
127        let deps = self.deps();
128        handle_register_internal(&deps, def).await.map(|_| ())
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use std::collections::{HashMap, HashSet};
136    use std::sync::Mutex;
137
138    use crate::lifecycle::application::route_definition::RouteDefinition;
139    use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
140    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
141    use crate::lifecycle::ports::RouteStatusProjection;
142
143    #[derive(Clone, Default)]
144    struct InMemoryTestRepo {
145        routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
146    }
147
148    #[async_trait]
149    impl RouteRepositoryPort for InMemoryTestRepo {
150        async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
151            Ok(self
152                .routes
153                .lock()
154                .expect("lock test routes")
155                .get(route_id)
156                .cloned())
157        }
158
159        async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
160            self.routes
161                .lock()
162                .expect("lock test routes")
163                .insert(aggregate.route_id().to_string(), aggregate);
164            Ok(())
165        }
166
167        async fn save_if_version(
168            &self,
169            aggregate: RouteRuntimeAggregate,
170            expected_version: u64,
171        ) -> Result<(), CamelError> {
172            let route_id = aggregate.route_id().to_string();
173            let mut routes = self.routes.lock().expect("lock test routes");
174            let current = routes.get(&route_id).ok_or_else(|| {
175                CamelError::RouteError(format!(
176                    "optimistic lock conflict for route '{route_id}': route not found"
177                ))
178            })?;
179
180            if current.version() != expected_version {
181                return Err(CamelError::RouteError(format!(
182                    "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
183                    current.version()
184                )));
185            }
186
187            routes.insert(route_id, aggregate);
188            Ok(())
189        }
190
191        async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
192            self.routes
193                .lock()
194                .expect("lock test routes")
195                .remove(route_id);
196            Ok(())
197        }
198    }
199
200    #[derive(Clone, Default)]
201    struct InMemoryTestProjectionStore {
202        statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
203    }
204
205    #[async_trait]
206    impl ProjectionStorePort for InMemoryTestProjectionStore {
207        async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
208            self.statuses
209                .lock()
210                .expect("lock test statuses")
211                .insert(status.route_id.clone(), status);
212            Ok(())
213        }
214
215        async fn get_status(
216            &self,
217            route_id: &str,
218        ) -> Result<Option<RouteStatusProjection>, CamelError> {
219            Ok(self
220                .statuses
221                .lock()
222                .expect("lock test statuses")
223                .get(route_id)
224                .cloned())
225        }
226
227        async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
228            Ok(self
229                .statuses
230                .lock()
231                .expect("lock test statuses")
232                .values()
233                .cloned()
234                .collect())
235        }
236
237        async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
238            self.statuses
239                .lock()
240                .expect("lock test statuses")
241                .remove(route_id);
242            Ok(())
243        }
244    }
245
246    #[derive(Clone, Default)]
247    struct InMemoryTestEventPublisher;
248
249    #[async_trait]
250    impl EventPublisherPort for InMemoryTestEventPublisher {
251        async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), CamelError> {
252            Ok(())
253        }
254    }
255
256    #[derive(Clone, Default)]
257    struct InMemoryTestDedup {
258        seen: Arc<Mutex<HashSet<String>>>,
259    }
260
261    #[async_trait]
262    impl CommandDedupPort for InMemoryTestDedup {
263        async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
264            let mut seen = self.seen.lock().expect("lock dedup set");
265            Ok(seen.insert(command_id.to_string()))
266        }
267
268        async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
269            self.seen.lock().expect("lock dedup set").remove(command_id);
270            Ok(())
271        }
272    }
273
274    fn build_test_runtime_bus() -> RuntimeBus {
275        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
276        let projections: Arc<dyn ProjectionStorePort> =
277            Arc::new(InMemoryTestProjectionStore::default());
278        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
279        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
280        RuntimeBus::new(repo, projections, events, dedup)
281    }
282
283    #[tokio::test]
284    async fn runtime_bus_implements_internal_command_bus() {
285        let bus = build_test_runtime_bus();
286        let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
287        let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
288        assert!(
289            result.is_ok(),
290            "internal bus registration failed: {:?}",
291            result
292        );
293
294        let status = bus
295            .ask(RuntimeQuery::GetRouteStatus {
296                route_id: "internal-route".to_string(),
297            })
298            .await
299            .unwrap();
300        match status {
301            RuntimeQueryResult::RouteStatus { status, .. } => {
302                assert_eq!(status, "Registered");
303            }
304            _ => panic!("unexpected query result"),
305        }
306    }
307}