Skip to main content

aimdb_core/
router.rs

1//! Generic message router for efficient connector dispatch
2//!
3//! Provides O(M) routing complexity instead of O(N×M) filtered streams.
4//! Routes incoming messages directly to type-specific producers based on topic/key matching.
5//!
6//! This router is protocol-agnostic and can be used by any connector:
7//! - MQTT: Routes topics to producers
8//! - Kafka: Routes topics/partitions to producers
9//! - HTTP: Routes paths to producers
10//! - DDS: Routes topics to producers
11//! - Shared Memory: Routes segment names to producers
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[cfg(not(feature = "std"))]
17use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
18
19#[cfg(feature = "std")]
20use std::sync::Arc;
21
22use crate::connector::{DeserializerKind, ProducerTrait};
23
24/// A single routing entry
25///
26/// Maps one (resource_id, type) pair to a producer and deserializer.
27/// Multiple routes can exist for the same resource_id (different types).
28///
29/// # Resource ID Examples
30///
31/// - MQTT: "sensors/temperature" (topic)
32/// - Kafka: "events:0" (topic:partition)
33/// - HTTP: "/api/v1/sensors" (path)
34/// - DDS: "TelemetryData" (topic name)
35/// - Shmem: "temperature_buffer" (segment name)
36pub struct Route {
37    /// Resource identifier to match (reference-counted for proper memory management)
38    ///
39    /// Examples: MQTT topic, Kafka topic, HTTP path, DDS topic, shmem segment
40    ///
41    /// Uses Arc<str> instead of &'static str to avoid memory leaks from Box::leak().
42    /// This adds ~8 bytes overhead per route (Arc control block) but enables proper cleanup.
43    pub resource_id: Arc<str>,
44
45    /// Type-erased producer for this route
46    pub producer: Box<dyn ProducerTrait>,
47
48    /// Deserializer for converting bytes → typed value (raw or context-aware)
49    pub deserializer: DeserializerKind,
50}
51
52/// Generic message router for connector dispatch
53///
54/// Routes incoming messages to appropriate producers based on resource_id.
55/// Uses linear search which is efficient for <100 routes.
56///
57/// # Performance
58///
59/// - O(M) complexity where M = number of routes
60/// - May check multiple routes if same resource_id maps to multiple types
61/// - Typical routing time: <1μs for <50 routes
62///
63/// # Protocol Support
64///
65/// This router is protocol-agnostic. Each connector uses it with their own resource_id format:
66/// - **MQTT**: `topic` (e.g., "sensors/temperature")
67/// - **Kafka**: `topic` or `topic:partition` (e.g., "events" or "events:0")
68/// - **HTTP**: `path` (e.g., "/api/v1/sensors")
69/// - **DDS**: `topic_name` (e.g., "TelemetryData")
70/// - **Shmem**: `segment_name` (e.g., "temperature_buffer")
71pub struct Router {
72    /// List of all registered routes
73    routes: Vec<Route>,
74}
75
76impl Router {
77    /// Create a new router with the given routes
78    pub fn new(routes: Vec<Route>) -> Self {
79        Self { routes }
80    }
81
82    /// Route a message to appropriate producer(s)
83    ///
84    /// # Arguments
85    /// * `resource_id` - Resource identifier (topic, path, segment name, etc.)
86    /// * `payload` - Raw message payload bytes
87    /// * `ctx` - Optional type-erased runtime context for context-aware deserializers
88    ///
89    /// # Returns
90    /// * `Ok(())` - Always returns Ok, even if no routes matched or processing failed.
91    ///   Failures are logged (via tracing/defmt) but do not propagate as errors.
92    ///
93    /// # Behavior
94    /// - Checks all routes that match the resource_id (may be multiple)
95    /// - For `DeserializerKind::Raw`, calls the deserializer with payload only
96    /// - For `DeserializerKind::Context`, calls with context + payload (skips if no context)
97    /// - Logs warnings on deserialization failures but continues
98    /// - Logs debug message if no routes found for resource_id
99    pub async fn route(
100        &self,
101        resource_id: &str,
102        payload: &[u8],
103        ctx: Option<&Arc<dyn core::any::Any + Send + Sync>>,
104    ) -> Result<(), String> {
105        let mut routed = false;
106        let mut matched = false;
107
108        // Linear search through all routes
109        // Note: Multiple routes may match the same resource_id (different types)
110        for route in &self.routes {
111            if route.resource_id.as_ref() == resource_id {
112                matched = true;
113                // Deserialize the payload based on deserializer kind
114                let result = match &route.deserializer {
115                    DeserializerKind::Raw(deser) => (deser)(payload),
116                    DeserializerKind::Context(deser) => match ctx {
117                        Some(ctx) => (deser)(ctx.clone(), payload),
118                        None => {
119                            #[cfg(feature = "tracing")]
120                            tracing::warn!(
121                                "Context deserializer on '{}' but no context provided, skipping",
122                                resource_id
123                            );
124
125                            #[cfg(feature = "defmt")]
126                            defmt::warn!(
127                                "Context deserializer on '{}' but no context provided",
128                                resource_id
129                            );
130
131                            continue;
132                        }
133                    },
134                };
135
136                match result {
137                    Ok(value_any) => {
138                        // Produce into the buffer
139                        match route.producer.produce_any(value_any).await {
140                            Ok(()) => {
141                                routed = true;
142
143                                #[cfg(feature = "tracing")]
144                                tracing::debug!("Routed message on '{}' to producer", resource_id);
145                            }
146                            Err(_e) => {
147                                #[cfg(feature = "tracing")]
148                                tracing::error!(
149                                    "Failed to produce message on '{}': {}",
150                                    resource_id,
151                                    _e
152                                );
153
154                                #[cfg(feature = "defmt")]
155                                defmt::error!(
156                                    "Failed to produce message on '{}': {}",
157                                    resource_id,
158                                    _e.as_str()
159                                );
160                            }
161                        }
162                    }
163                    Err(_e) => {
164                        #[cfg(feature = "tracing")]
165                        tracing::warn!(
166                            "Failed to deserialize message on '{}': {}",
167                            resource_id,
168                            _e
169                        );
170
171                        #[cfg(feature = "defmt")]
172                        defmt::warn!(
173                            "Failed to deserialize message on '{}': {}",
174                            resource_id,
175                            _e.as_str()
176                        );
177                    }
178                }
179            }
180        }
181
182        if !routed {
183            if matched {
184                #[cfg(feature = "tracing")]
185                tracing::debug!("Route matched for '{}' but message was not produced (missing context or errors)", resource_id);
186
187                #[cfg(feature = "defmt")]
188                defmt::debug!("Route matched for '{}' but not produced", resource_id);
189            } else {
190                #[cfg(feature = "tracing")]
191                tracing::debug!("No route found for resource: '{}'", resource_id);
192
193                #[cfg(feature = "defmt")]
194                defmt::debug!("No route found for resource: '{}'", resource_id);
195            }
196        }
197
198        Ok(())
199    }
200
201    /// Get list of all resource IDs registered in this router
202    ///
203    /// Useful for subscribing at the protocol level (e.g., MQTT SUBSCRIBE).
204    /// Returns unique resource IDs (deduplicated even if multiple routes per resource).
205    pub fn resource_ids(&self) -> Vec<Arc<str>> {
206        let mut ids: Vec<Arc<str>> = self.routes.iter().map(|r| r.resource_id.clone()).collect();
207
208        // Deduplicate by converting to strings for comparison
209        ids.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
210        ids.dedup_by(|a, b| a.as_ref() == b.as_ref());
211
212        ids
213    }
214
215    /// Get the number of routes in this router
216    pub fn route_count(&self) -> usize {
217        self.routes.len()
218    }
219}
220
221/// Builder for constructing routers
222///
223/// Provides a fluent API for adding routes before creating the router.
224///
225/// # Example
226///
227/// ```rust,ignore
228/// use aimdb_core::router::RouterBuilder;
229///
230/// let router = RouterBuilder::new()
231///     .add_route(
232///         "sensors/temperature",
233///         producer_temp.clone(),
234///         Arc::new(|bytes| {
235///             serde_json::from_slice::<Temperature>(bytes)
236///                 .map(|t| Box::new(t) as Box<dyn Any + Send>)
237///                 .map_err(|e| e.to_string())
238///         })
239///     )
240///     .add_route(
241///         "sensors/humidity",
242///         producer_humidity.clone(),
243///         Arc::new(|bytes| {
244///             serde_json::from_slice::<Humidity>(bytes)
245///                 .map(|h| Box::new(h) as Box<dyn Any + Send>)
246///                 .map_err(|e| e.to_string())
247///         })
248///     )
249///     .build();
250/// ```
251pub struct RouterBuilder {
252    routes: Vec<Route>,
253}
254
255impl RouterBuilder {
256    /// Create a new router builder
257    pub fn new() -> Self {
258        Self { routes: Vec::new() }
259    }
260
261    /// Create a router builder from a collection of routes
262    ///
263    /// This is a convenience method for automatic router construction from
264    /// `AimDb::collect_inbound_routes()`. The resource_ids are converted to
265    /// Arc<str> for proper memory management.
266    ///
267    /// # Arguments
268    /// * `routes` - Vector of (resource_id, producer, deserializer) tuples
269    ///
270    /// # Example
271    /// ```rust,ignore
272    /// let routes = db.collect_inbound_routes("mqtt");
273    /// let router = RouterBuilder::from_routes(routes).build();
274    /// connector.set_router(router).await?;
275    /// ```
276    pub fn from_routes(routes: Vec<(String, Box<dyn ProducerTrait>, DeserializerKind)>) -> Self {
277        let mut builder = Self::new();
278        for (resource_id, producer, deserializer) in routes {
279            // Convert String to Arc<str> - no leaking needed!
280            let resource_id_arc: Arc<str> = Arc::from(resource_id.as_str());
281            builder = builder.add_route(resource_id_arc, producer, deserializer);
282        }
283        builder
284    }
285
286    /// Add a route to the router
287    ///
288    /// # Arguments
289    /// * `resource_id` - Resource identifier to match (as Arc<str>)
290    /// * `producer` - Producer that implements ProducerTrait
291    /// * `deserializer` - Deserializer variant (raw or context-aware)
292    ///
293    /// # Resource ID Memory Management
294    /// The resource_id is stored as Arc<str> for proper reference counting and cleanup.
295    /// You can create an Arc<str> from:
296    /// - String literal: `Arc::from("sensors/temperature")`
297    /// - Owned String: `Arc::from(string.as_str())`
298    pub fn add_route(
299        mut self,
300        resource_id: Arc<str>,
301        producer: Box<dyn ProducerTrait>,
302        deserializer: DeserializerKind,
303    ) -> Self {
304        self.routes.push(Route {
305            resource_id,
306            producer,
307            deserializer,
308        });
309        self
310    }
311
312    /// Build the router
313    ///
314    /// Consumes the builder and returns a configured Router.
315    pub fn build(self) -> Router {
316        Router::new(self.routes)
317    }
318
319    /// Get the number of routes that will be created
320    pub fn route_count(&self) -> usize {
321        self.routes.len()
322    }
323}
324
325impl Default for RouterBuilder {
326    fn default() -> Self {
327        Self::new()
328    }
329}
330
331#[cfg(all(test, feature = "std"))]
332mod tests {
333    use super::*;
334    use crate::connector::ProducerTrait;
335    use core::future::Future;
336    use core::pin::Pin;
337    use std::any::Any;
338    use std::sync::atomic::{AtomicUsize, Ordering};
339    use std::sync::Arc;
340
341    // Mock producer for testing
342    struct MockProducer {
343        call_count: Arc<AtomicUsize>,
344    }
345
346    impl ProducerTrait for MockProducer {
347        fn produce_any<'a>(
348            &'a self,
349            _value: Box<dyn Any + Send>,
350        ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>> {
351            let call_count = self.call_count.clone();
352            Box::pin(async move {
353                call_count.fetch_add(1, Ordering::SeqCst);
354                Ok(())
355            })
356        }
357    }
358
359    #[tokio::test]
360    async fn test_single_route() {
361        let call_count = Arc::new(AtomicUsize::new(0));
362
363        let routes = vec![Route {
364            resource_id: Arc::from("test/resource"),
365            producer: Box::new(MockProducer {
366                call_count: call_count.clone(),
367            }),
368            deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
369        }];
370
371        let router = Router::new(routes);
372
373        router.route("test/resource", b"dummy", None).await.unwrap();
374
375        assert_eq!(call_count.load(Ordering::SeqCst), 1);
376    }
377
378    #[tokio::test]
379    async fn test_multiple_routes_same_resource() {
380        let call_count1 = Arc::new(AtomicUsize::new(0));
381        let call_count2 = Arc::new(AtomicUsize::new(0));
382
383        let routes = vec![
384            Route {
385                resource_id: Arc::from("shared/resource"),
386                producer: Box::new(MockProducer {
387                    call_count: call_count1.clone(),
388                }),
389                deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
390            },
391            Route {
392                resource_id: Arc::from("shared/resource"),
393                producer: Box::new(MockProducer {
394                    call_count: call_count2.clone(),
395                }),
396                deserializer: DeserializerKind::Raw(Arc::new(|_bytes| {
397                    Ok(Box::new("test".to_string()))
398                })),
399            },
400        ];
401
402        let router = Router::new(routes);
403
404        router
405            .route("shared/resource", b"dummy", None)
406            .await
407            .unwrap();
408
409        // Both producers should be called
410        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
411        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
412    }
413
414    #[tokio::test]
415    async fn test_unknown_resource() {
416        let routes = vec![Route {
417            resource_id: Arc::from("test/resource"),
418            producer: Box::new(MockProducer {
419                call_count: Arc::new(AtomicUsize::new(0)),
420            }),
421            deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
422        }];
423
424        let router = Router::new(routes);
425
426        // Should not panic on unknown resource
427        router
428            .route("unknown/resource", b"dummy", None)
429            .await
430            .unwrap();
431    }
432
433    #[tokio::test]
434    async fn test_resource_ids_deduplication() {
435        let routes = vec![
436            Route {
437                resource_id: Arc::from("resource1"),
438                producer: Box::new(MockProducer {
439                    call_count: Arc::new(AtomicUsize::new(0)),
440                }),
441                deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
442            },
443            Route {
444                resource_id: Arc::from("resource1"), // Duplicate
445                producer: Box::new(MockProducer {
446                    call_count: Arc::new(AtomicUsize::new(0)),
447                }),
448                deserializer: DeserializerKind::Raw(Arc::new(|_bytes| {
449                    Ok(Box::new("test".to_string()))
450                })),
451            },
452            Route {
453                resource_id: Arc::from("resource2"),
454                producer: Box::new(MockProducer {
455                    call_count: Arc::new(AtomicUsize::new(0)),
456                }),
457                deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(99i32)))),
458            },
459        ];
460
461        let router = Router::new(routes);
462        let ids = router.resource_ids();
463
464        assert_eq!(ids.len(), 2);
465        assert!(ids.iter().any(|id| id.as_ref() == "resource1"));
466        assert!(ids.iter().any(|id| id.as_ref() == "resource2"));
467    }
468
469    #[tokio::test]
470    async fn test_context_deserializer_with_context() {
471        let call_count = Arc::new(AtomicUsize::new(0));
472        let call_count_clone = call_count.clone();
473
474        let routes = vec![Route {
475            resource_id: Arc::from("ctx/resource"),
476            producer: Box::new(MockProducer {
477                call_count: call_count.clone(),
478            }),
479            deserializer: DeserializerKind::Context(Arc::new(move |_ctx, _bytes| {
480                Ok(Box::new(42i32) as Box<dyn Any + Send>)
481            })),
482        }];
483
484        let router = Router::new(routes);
485
486        // Provide a dummy context (just an i32 wrapped in Arc)
487        let ctx: Arc<dyn Any + Send + Sync> = Arc::new(0i32);
488        router
489            .route("ctx/resource", b"dummy", Some(&ctx))
490            .await
491            .unwrap();
492
493        assert_eq!(call_count_clone.load(Ordering::SeqCst), 1);
494    }
495
496    #[tokio::test]
497    async fn test_context_deserializer_without_context_skips() {
498        let call_count = Arc::new(AtomicUsize::new(0));
499
500        let routes = vec![Route {
501            resource_id: Arc::from("ctx/resource"),
502            producer: Box::new(MockProducer {
503                call_count: call_count.clone(),
504            }),
505            deserializer: DeserializerKind::Context(Arc::new(|_ctx, _bytes| {
506                Ok(Box::new(42i32) as Box<dyn Any + Send>)
507            })),
508        }];
509
510        let router = Router::new(routes);
511
512        // No context provided — context deserializer should be skipped
513        router.route("ctx/resource", b"dummy", None).await.unwrap();
514
515        assert_eq!(call_count.load(Ordering::SeqCst), 0);
516    }
517}