aimdb_core/router.rs
1//! Generic message router for efficient connector dispatch
2//!
3//! Provides O(M) routing complexity instead of O(N×M) filtered streams.
4//! Routes incoming messages directly to type-specific producers based on topic/key matching.
5//!
6//! This router is protocol-agnostic and can be used by any connector:
7//! - MQTT: Routes topics to producers
8//! - Kafka: Routes topics/partitions to producers
9//! - HTTP: Routes paths to producers
10//! - DDS: Routes topics to producers
11//! - Shared Memory: Routes segment names to producers
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[cfg(not(feature = "std"))]
17use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
18
19#[cfg(feature = "std")]
20use std::sync::Arc;
21
22use crate::connector::{DeserializerKind, ProducerTrait};
23
24/// A single routing entry
25///
26/// Maps one (resource_id, type) pair to a producer and deserializer.
27/// Multiple routes can exist for the same resource_id (different types).
28///
29/// # Resource ID Examples
30///
31/// - MQTT: "sensors/temperature" (topic)
32/// - Kafka: "events:0" (topic:partition)
33/// - HTTP: "/api/v1/sensors" (path)
34/// - DDS: "TelemetryData" (topic name)
35/// - Shmem: "temperature_buffer" (segment name)
36pub struct Route {
37 /// Resource identifier to match (reference-counted for proper memory management)
38 ///
39 /// Examples: MQTT topic, Kafka topic, HTTP path, DDS topic, shmem segment
40 ///
41 /// Uses Arc<str> instead of &'static str to avoid memory leaks from Box::leak().
42 /// This adds ~8 bytes overhead per route (Arc control block) but enables proper cleanup.
43 pub resource_id: Arc<str>,
44
45 /// Type-erased producer for this route
46 pub producer: Box<dyn ProducerTrait>,
47
48 /// Deserializer for converting bytes → typed value (raw or context-aware)
49 pub deserializer: DeserializerKind,
50}
51
52/// Generic message router for connector dispatch
53///
54/// Routes incoming messages to appropriate producers based on resource_id.
55/// Uses linear search which is efficient for <100 routes.
56///
57/// # Performance
58///
59/// - O(M) complexity where M = number of routes
60/// - May check multiple routes if same resource_id maps to multiple types
61/// - Typical routing time: <1μs for <50 routes
62///
63/// # Protocol Support
64///
65/// This router is protocol-agnostic. Each connector uses it with their own resource_id format:
66/// - **MQTT**: `topic` (e.g., "sensors/temperature")
67/// - **Kafka**: `topic` or `topic:partition` (e.g., "events" or "events:0")
68/// - **HTTP**: `path` (e.g., "/api/v1/sensors")
69/// - **DDS**: `topic_name` (e.g., "TelemetryData")
70/// - **Shmem**: `segment_name` (e.g., "temperature_buffer")
71pub struct Router {
72 /// List of all registered routes
73 routes: Vec<Route>,
74}
75
76impl Router {
77 /// Create a new router with the given routes
78 pub fn new(routes: Vec<Route>) -> Self {
79 Self { routes }
80 }
81
82 /// Route a message to appropriate producer(s)
83 ///
84 /// # Arguments
85 /// * `resource_id` - Resource identifier (topic, path, segment name, etc.)
86 /// * `payload` - Raw message payload bytes
87 /// * `ctx` - Optional type-erased runtime context for context-aware deserializers
88 ///
89 /// # Returns
90 /// * `Ok(())` - Always returns Ok, even if no routes matched or processing failed.
91 /// Failures are logged (via tracing/defmt) but do not propagate as errors.
92 ///
93 /// # Behavior
94 /// - Checks all routes that match the resource_id (may be multiple)
95 /// - For `DeserializerKind::Raw`, calls the deserializer with payload only
96 /// - For `DeserializerKind::Context`, calls with context + payload (skips if no context)
97 /// - Logs warnings on deserialization failures but continues
98 /// - Logs debug message if no routes found for resource_id
99 pub async fn route(
100 &self,
101 resource_id: &str,
102 payload: &[u8],
103 ctx: Option<&Arc<dyn core::any::Any + Send + Sync>>,
104 ) -> Result<(), String> {
105 let mut routed = false;
106 let mut matched = false;
107
108 // Linear search through all routes
109 // Note: Multiple routes may match the same resource_id (different types)
110 for route in &self.routes {
111 if route.resource_id.as_ref() == resource_id {
112 matched = true;
113 // Deserialize the payload based on deserializer kind
114 let result = match &route.deserializer {
115 DeserializerKind::Raw(deser) => (deser)(payload),
116 DeserializerKind::Context(deser) => match ctx {
117 Some(ctx) => (deser)(ctx.clone(), payload),
118 None => {
119 #[cfg(feature = "tracing")]
120 tracing::warn!(
121 "Context deserializer on '{}' but no context provided, skipping",
122 resource_id
123 );
124
125 #[cfg(feature = "defmt")]
126 defmt::warn!(
127 "Context deserializer on '{}' but no context provided",
128 resource_id
129 );
130
131 continue;
132 }
133 },
134 };
135
136 match result {
137 Ok(value_any) => {
138 // Produce into the buffer
139 match route.producer.produce_any(value_any).await {
140 Ok(()) => {
141 routed = true;
142
143 #[cfg(feature = "tracing")]
144 tracing::debug!("Routed message on '{}' to producer", resource_id);
145 }
146 Err(_e) => {
147 #[cfg(feature = "tracing")]
148 tracing::error!(
149 "Failed to produce message on '{}': {}",
150 resource_id,
151 _e
152 );
153
154 #[cfg(feature = "defmt")]
155 defmt::error!(
156 "Failed to produce message on '{}': {}",
157 resource_id,
158 _e.as_str()
159 );
160 }
161 }
162 }
163 Err(_e) => {
164 #[cfg(feature = "tracing")]
165 tracing::warn!(
166 "Failed to deserialize message on '{}': {}",
167 resource_id,
168 _e
169 );
170
171 #[cfg(feature = "defmt")]
172 defmt::warn!(
173 "Failed to deserialize message on '{}': {}",
174 resource_id,
175 _e.as_str()
176 );
177 }
178 }
179 }
180 }
181
182 if !routed {
183 if matched {
184 #[cfg(feature = "tracing")]
185 tracing::debug!("Route matched for '{}' but message was not produced (missing context or errors)", resource_id);
186
187 #[cfg(feature = "defmt")]
188 defmt::debug!("Route matched for '{}' but not produced", resource_id);
189 } else {
190 #[cfg(feature = "tracing")]
191 tracing::debug!("No route found for resource: '{}'", resource_id);
192
193 #[cfg(feature = "defmt")]
194 defmt::debug!("No route found for resource: '{}'", resource_id);
195 }
196 }
197
198 Ok(())
199 }
200
201 /// Get list of all resource IDs registered in this router
202 ///
203 /// Useful for subscribing at the protocol level (e.g., MQTT SUBSCRIBE).
204 /// Returns unique resource IDs (deduplicated even if multiple routes per resource).
205 pub fn resource_ids(&self) -> Vec<Arc<str>> {
206 let mut ids: Vec<Arc<str>> = self.routes.iter().map(|r| r.resource_id.clone()).collect();
207
208 // Deduplicate by converting to strings for comparison
209 ids.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
210 ids.dedup_by(|a, b| a.as_ref() == b.as_ref());
211
212 ids
213 }
214
215 /// Get the number of routes in this router
216 pub fn route_count(&self) -> usize {
217 self.routes.len()
218 }
219}
220
221/// Builder for constructing routers
222///
223/// Provides a fluent API for adding routes before creating the router.
224///
225/// # Example
226///
227/// ```rust,ignore
228/// use aimdb_core::router::RouterBuilder;
229///
230/// let router = RouterBuilder::new()
231/// .add_route(
232/// "sensors/temperature",
233/// producer_temp.clone(),
234/// Arc::new(|bytes| {
235/// serde_json::from_slice::<Temperature>(bytes)
236/// .map(|t| Box::new(t) as Box<dyn Any + Send>)
237/// .map_err(|e| e.to_string())
238/// })
239/// )
240/// .add_route(
241/// "sensors/humidity",
242/// producer_humidity.clone(),
243/// Arc::new(|bytes| {
244/// serde_json::from_slice::<Humidity>(bytes)
245/// .map(|h| Box::new(h) as Box<dyn Any + Send>)
246/// .map_err(|e| e.to_string())
247/// })
248/// )
249/// .build();
250/// ```
251pub struct RouterBuilder {
252 routes: Vec<Route>,
253}
254
255impl RouterBuilder {
256 /// Create a new router builder
257 pub fn new() -> Self {
258 Self { routes: Vec::new() }
259 }
260
261 /// Create a router builder from a collection of routes
262 ///
263 /// This is a convenience method for automatic router construction from
264 /// `AimDb::collect_inbound_routes()`. The resource_ids are converted to
265 /// Arc<str> for proper memory management.
266 ///
267 /// # Arguments
268 /// * `routes` - Vector of (resource_id, producer, deserializer) tuples
269 ///
270 /// # Example
271 /// ```rust,ignore
272 /// let routes = db.collect_inbound_routes("mqtt");
273 /// let router = RouterBuilder::from_routes(routes).build();
274 /// connector.set_router(router).await?;
275 /// ```
276 pub fn from_routes(routes: Vec<(String, Box<dyn ProducerTrait>, DeserializerKind)>) -> Self {
277 let mut builder = Self::new();
278 for (resource_id, producer, deserializer) in routes {
279 // Convert String to Arc<str> - no leaking needed!
280 let resource_id_arc: Arc<str> = Arc::from(resource_id.as_str());
281 builder = builder.add_route(resource_id_arc, producer, deserializer);
282 }
283 builder
284 }
285
286 /// Add a route to the router
287 ///
288 /// # Arguments
289 /// * `resource_id` - Resource identifier to match (as Arc<str>)
290 /// * `producer` - Producer that implements ProducerTrait
291 /// * `deserializer` - Deserializer variant (raw or context-aware)
292 ///
293 /// # Resource ID Memory Management
294 /// The resource_id is stored as Arc<str> for proper reference counting and cleanup.
295 /// You can create an Arc<str> from:
296 /// - String literal: `Arc::from("sensors/temperature")`
297 /// - Owned String: `Arc::from(string.as_str())`
298 pub fn add_route(
299 mut self,
300 resource_id: Arc<str>,
301 producer: Box<dyn ProducerTrait>,
302 deserializer: DeserializerKind,
303 ) -> Self {
304 self.routes.push(Route {
305 resource_id,
306 producer,
307 deserializer,
308 });
309 self
310 }
311
312 /// Build the router
313 ///
314 /// Consumes the builder and returns a configured Router.
315 pub fn build(self) -> Router {
316 Router::new(self.routes)
317 }
318
319 /// Get the number of routes that will be created
320 pub fn route_count(&self) -> usize {
321 self.routes.len()
322 }
323}
324
325impl Default for RouterBuilder {
326 fn default() -> Self {
327 Self::new()
328 }
329}
330
331#[cfg(all(test, feature = "std"))]
332mod tests {
333 use super::*;
334 use crate::connector::ProducerTrait;
335 use core::future::Future;
336 use core::pin::Pin;
337 use std::any::Any;
338 use std::sync::atomic::{AtomicUsize, Ordering};
339 use std::sync::Arc;
340
341 // Mock producer for testing
342 struct MockProducer {
343 call_count: Arc<AtomicUsize>,
344 }
345
346 impl ProducerTrait for MockProducer {
347 fn produce_any<'a>(
348 &'a self,
349 _value: Box<dyn Any + Send>,
350 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>> {
351 let call_count = self.call_count.clone();
352 Box::pin(async move {
353 call_count.fetch_add(1, Ordering::SeqCst);
354 Ok(())
355 })
356 }
357 }
358
359 #[tokio::test]
360 async fn test_single_route() {
361 let call_count = Arc::new(AtomicUsize::new(0));
362
363 let routes = vec![Route {
364 resource_id: Arc::from("test/resource"),
365 producer: Box::new(MockProducer {
366 call_count: call_count.clone(),
367 }),
368 deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
369 }];
370
371 let router = Router::new(routes);
372
373 router.route("test/resource", b"dummy", None).await.unwrap();
374
375 assert_eq!(call_count.load(Ordering::SeqCst), 1);
376 }
377
378 #[tokio::test]
379 async fn test_multiple_routes_same_resource() {
380 let call_count1 = Arc::new(AtomicUsize::new(0));
381 let call_count2 = Arc::new(AtomicUsize::new(0));
382
383 let routes = vec![
384 Route {
385 resource_id: Arc::from("shared/resource"),
386 producer: Box::new(MockProducer {
387 call_count: call_count1.clone(),
388 }),
389 deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
390 },
391 Route {
392 resource_id: Arc::from("shared/resource"),
393 producer: Box::new(MockProducer {
394 call_count: call_count2.clone(),
395 }),
396 deserializer: DeserializerKind::Raw(Arc::new(|_bytes| {
397 Ok(Box::new("test".to_string()))
398 })),
399 },
400 ];
401
402 let router = Router::new(routes);
403
404 router
405 .route("shared/resource", b"dummy", None)
406 .await
407 .unwrap();
408
409 // Both producers should be called
410 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
411 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
412 }
413
414 #[tokio::test]
415 async fn test_unknown_resource() {
416 let routes = vec![Route {
417 resource_id: Arc::from("test/resource"),
418 producer: Box::new(MockProducer {
419 call_count: Arc::new(AtomicUsize::new(0)),
420 }),
421 deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
422 }];
423
424 let router = Router::new(routes);
425
426 // Should not panic on unknown resource
427 router
428 .route("unknown/resource", b"dummy", None)
429 .await
430 .unwrap();
431 }
432
433 #[tokio::test]
434 async fn test_resource_ids_deduplication() {
435 let routes = vec![
436 Route {
437 resource_id: Arc::from("resource1"),
438 producer: Box::new(MockProducer {
439 call_count: Arc::new(AtomicUsize::new(0)),
440 }),
441 deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
442 },
443 Route {
444 resource_id: Arc::from("resource1"), // Duplicate
445 producer: Box::new(MockProducer {
446 call_count: Arc::new(AtomicUsize::new(0)),
447 }),
448 deserializer: DeserializerKind::Raw(Arc::new(|_bytes| {
449 Ok(Box::new("test".to_string()))
450 })),
451 },
452 Route {
453 resource_id: Arc::from("resource2"),
454 producer: Box::new(MockProducer {
455 call_count: Arc::new(AtomicUsize::new(0)),
456 }),
457 deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(99i32)))),
458 },
459 ];
460
461 let router = Router::new(routes);
462 let ids = router.resource_ids();
463
464 assert_eq!(ids.len(), 2);
465 assert!(ids.iter().any(|id| id.as_ref() == "resource1"));
466 assert!(ids.iter().any(|id| id.as_ref() == "resource2"));
467 }
468
469 #[tokio::test]
470 async fn test_context_deserializer_with_context() {
471 let call_count = Arc::new(AtomicUsize::new(0));
472 let call_count_clone = call_count.clone();
473
474 let routes = vec![Route {
475 resource_id: Arc::from("ctx/resource"),
476 producer: Box::new(MockProducer {
477 call_count: call_count.clone(),
478 }),
479 deserializer: DeserializerKind::Context(Arc::new(move |_ctx, _bytes| {
480 Ok(Box::new(42i32) as Box<dyn Any + Send>)
481 })),
482 }];
483
484 let router = Router::new(routes);
485
486 // Provide a dummy context (just an i32 wrapped in Arc)
487 let ctx: Arc<dyn Any + Send + Sync> = Arc::new(0i32);
488 router
489 .route("ctx/resource", b"dummy", Some(&ctx))
490 .await
491 .unwrap();
492
493 assert_eq!(call_count_clone.load(Ordering::SeqCst), 1);
494 }
495
496 #[tokio::test]
497 async fn test_context_deserializer_without_context_skips() {
498 let call_count = Arc::new(AtomicUsize::new(0));
499
500 let routes = vec![Route {
501 resource_id: Arc::from("ctx/resource"),
502 producer: Box::new(MockProducer {
503 call_count: call_count.clone(),
504 }),
505 deserializer: DeserializerKind::Context(Arc::new(|_ctx, _bytes| {
506 Ok(Box::new(42i32) as Box<dyn Any + Send>)
507 })),
508 }];
509
510 let router = Router::new(routes);
511
512 // No context provided — context deserializer should be skipped
513 router.route("ctx/resource", b"dummy", None).await.unwrap();
514
515 assert_eq!(call_count.load(Ordering::SeqCst), 0);
516 }
517}