aimdb_core/
router.rs

1//! Generic message router for efficient connector dispatch
2//!
3//! Provides O(M) routing complexity instead of O(N×M) filtered streams.
4//! Routes incoming messages directly to type-specific producers based on topic/key matching.
5//!
6//! This router is protocol-agnostic and can be used by any connector:
7//! - MQTT: Routes topics to producers
8//! - Kafka: Routes topics/partitions to producers
9//! - HTTP: Routes paths to producers
10//! - DDS: Routes topics to producers
11//! - Shared Memory: Routes segment names to producers
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[cfg(not(feature = "std"))]
17use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
18
19#[cfg(feature = "std")]
20use std::sync::Arc;
21
22use crate::connector::{DeserializerFn, ProducerTrait};
23
24/// A single routing entry
25///
26/// Maps one (resource_id, type) pair to a producer and deserializer.
27/// Multiple routes can exist for the same resource_id (different types).
28///
29/// # Resource ID Examples
30///
31/// - MQTT: "sensors/temperature" (topic)
32/// - Kafka: "events:0" (topic:partition)
33/// - HTTP: "/api/v1/sensors" (path)
34/// - DDS: "TelemetryData" (topic name)
35/// - Shmem: "temperature_buffer" (segment name)
36pub struct Route {
37    /// Resource identifier to match (reference-counted for proper memory management)
38    ///
39    /// Examples: MQTT topic, Kafka topic, HTTP path, DDS topic, shmem segment
40    ///
41    /// Uses Arc<str> instead of &'static str to avoid memory leaks from Box::leak().
42    /// This adds ~8 bytes overhead per route (Arc control block) but enables proper cleanup.
43    pub resource_id: Arc<str>,
44
45    /// Type-erased producer for this route
46    pub producer: Box<dyn ProducerTrait>,
47
48    /// Deserializer for converting bytes → typed value
49    pub deserializer: DeserializerFn,
50}
51
52/// Generic message router for connector dispatch
53///
54/// Routes incoming messages to appropriate producers based on resource_id.
55/// Uses linear search which is efficient for <100 routes.
56///
57/// # Performance
58///
59/// - O(M) complexity where M = number of routes
60/// - May check multiple routes if same resource_id maps to multiple types
61/// - Typical routing time: <1μs for <50 routes
62///
63/// # Protocol Support
64///
65/// This router is protocol-agnostic. Each connector uses it with their own resource_id format:
66/// - **MQTT**: `topic` (e.g., "sensors/temperature")
67/// - **Kafka**: `topic` or `topic:partition` (e.g., "events" or "events:0")
68/// - **HTTP**: `path` (e.g., "/api/v1/sensors")
69/// - **DDS**: `topic_name` (e.g., "TelemetryData")
70/// - **Shmem**: `segment_name` (e.g., "temperature_buffer")
71pub struct Router {
72    /// List of all registered routes
73    routes: Vec<Route>,
74}
75
76impl Router {
77    /// Create a new router with the given routes
78    pub fn new(routes: Vec<Route>) -> Self {
79        Self { routes }
80    }
81
82    /// Route a message to appropriate producer(s)
83    ///
84    /// # Arguments
85    /// * `resource_id` - Resource identifier (topic, path, segment name, etc.)
86    /// * `payload` - Raw message payload bytes
87    ///
88    /// # Returns
89    /// * `Ok(())` - At least one route successfully processed the message
90    /// * `Err(_)` - All routes failed (or no routes found)
91    ///
92    /// # Behavior
93    /// - Checks all routes that match the resource_id (may be multiple)
94    /// - Logs warnings on deserialization failures but continues
95    /// - Logs debug message if no routes found for resource_id
96    pub async fn route(&self, resource_id: &str, payload: &[u8]) -> Result<(), String> {
97        let mut routed = false;
98
99        // Linear search through all routes
100        // Note: Multiple routes may match the same resource_id (different types)
101        for route in &self.routes {
102            if route.resource_id.as_ref() == resource_id {
103                // Deserialize the payload
104                match (route.deserializer)(payload) {
105                    Ok(value_any) => {
106                        // Produce into the buffer
107                        match route.producer.produce_any(value_any).await {
108                            Ok(()) => {
109                                routed = true;
110
111                                #[cfg(feature = "tracing")]
112                                tracing::debug!("Routed message on '{}' to producer", resource_id);
113                            }
114                            Err(_e) => {
115                                #[cfg(feature = "tracing")]
116                                tracing::error!(
117                                    "Failed to produce message on '{}': {}",
118                                    resource_id,
119                                    _e
120                                );
121
122                                #[cfg(feature = "defmt")]
123                                defmt::error!(
124                                    "Failed to produce message on '{}': {}",
125                                    resource_id,
126                                    _e.as_str()
127                                );
128                            }
129                        }
130                    }
131                    Err(_e) => {
132                        #[cfg(feature = "tracing")]
133                        tracing::warn!(
134                            "Failed to deserialize message on '{}': {}",
135                            resource_id,
136                            _e
137                        );
138
139                        #[cfg(feature = "defmt")]
140                        defmt::warn!(
141                            "Failed to deserialize message on '{}': {}",
142                            resource_id,
143                            _e.as_str()
144                        );
145                    }
146                }
147            }
148        }
149
150        if !routed {
151            #[cfg(feature = "tracing")]
152            tracing::debug!("No route found for resource: '{}'", resource_id);
153
154            #[cfg(feature = "defmt")]
155            defmt::debug!("No route found for resource: '{}'", resource_id);
156        }
157
158        Ok(())
159    }
160
161    /// Get list of all resource IDs registered in this router
162    ///
163    /// Useful for subscribing at the protocol level (e.g., MQTT SUBSCRIBE).
164    /// Returns unique resource IDs (deduplicated even if multiple routes per resource).
165    pub fn resource_ids(&self) -> Vec<Arc<str>> {
166        let mut ids: Vec<Arc<str>> = self.routes.iter().map(|r| r.resource_id.clone()).collect();
167
168        // Deduplicate by converting to strings for comparison
169        ids.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
170        ids.dedup_by(|a, b| a.as_ref() == b.as_ref());
171
172        ids
173    }
174
175    /// Get the number of routes in this router
176    pub fn route_count(&self) -> usize {
177        self.routes.len()
178    }
179}
180
181/// Builder for constructing routers
182///
183/// Provides a fluent API for adding routes before creating the router.
184///
185/// # Example
186///
187/// ```rust,ignore
188/// use aimdb_core::router::RouterBuilder;
189///
190/// let router = RouterBuilder::new()
191///     .add_route(
192///         "sensors/temperature",
193///         producer_temp.clone(),
194///         Arc::new(|bytes| {
195///             serde_json::from_slice::<Temperature>(bytes)
196///                 .map(|t| Box::new(t) as Box<dyn Any + Send>)
197///                 .map_err(|e| e.to_string())
198///         })
199///     )
200///     .add_route(
201///         "sensors/humidity",
202///         producer_humidity.clone(),
203///         Arc::new(|bytes| {
204///             serde_json::from_slice::<Humidity>(bytes)
205///                 .map(|h| Box::new(h) as Box<dyn Any + Send>)
206///                 .map_err(|e| e.to_string())
207///         })
208///     )
209///     .build();
210/// ```
211pub struct RouterBuilder {
212    routes: Vec<Route>,
213}
214
215impl RouterBuilder {
216    /// Create a new router builder
217    pub fn new() -> Self {
218        Self { routes: Vec::new() }
219    }
220
221    /// Create a router builder from a collection of routes
222    ///
223    /// This is a convenience method for automatic router construction from
224    /// `AimDb::collect_inbound_routes()`. The resource_ids are converted to
225    /// Arc<str> for proper memory management.
226    ///
227    /// # Arguments
228    /// * `routes` - Vector of (resource_id, producer, deserializer) tuples
229    ///
230    /// # Example
231    /// ```rust,ignore
232    /// let routes = db.collect_inbound_routes("mqtt");
233    /// let router = RouterBuilder::from_routes(routes).build();
234    /// connector.set_router(router).await?;
235    /// ```
236    pub fn from_routes(routes: Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>) -> Self {
237        let mut builder = Self::new();
238        for (resource_id, producer, deserializer) in routes {
239            // Convert String to Arc<str> - no leaking needed!
240            let resource_id_arc: Arc<str> = Arc::from(resource_id.as_str());
241            builder = builder.add_route(resource_id_arc, producer, deserializer);
242        }
243        builder
244    }
245
246    /// Add a route to the router
247    ///
248    /// # Arguments
249    /// * `resource_id` - Resource identifier to match (as Arc<str>)
250    /// * `producer` - Producer that implements ProducerTrait
251    /// * `deserializer` - Function to deserialize bytes to the target type
252    ///
253    /// # Resource ID Memory Management
254    /// The resource_id is stored as Arc<str> for proper reference counting and cleanup.
255    /// You can create an Arc<str> from:
256    /// - String literal: `Arc::from("sensors/temperature")`
257    /// - Owned String: `Arc::from(string.as_str())`
258    pub fn add_route(
259        mut self,
260        resource_id: Arc<str>,
261        producer: Box<dyn ProducerTrait>,
262        deserializer: DeserializerFn,
263    ) -> Self {
264        self.routes.push(Route {
265            resource_id,
266            producer,
267            deserializer,
268        });
269        self
270    }
271
272    /// Build the router
273    ///
274    /// Consumes the builder and returns a configured Router.
275    pub fn build(self) -> Router {
276        Router::new(self.routes)
277    }
278
279    /// Get the number of routes that will be created
280    pub fn route_count(&self) -> usize {
281        self.routes.len()
282    }
283}
284
285impl Default for RouterBuilder {
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291#[cfg(all(test, feature = "std"))]
292mod tests {
293    use super::*;
294    use crate::connector::ProducerTrait;
295    use core::future::Future;
296    use core::pin::Pin;
297    use std::any::Any;
298    use std::sync::atomic::{AtomicUsize, Ordering};
299    use std::sync::Arc;
300
301    // Mock producer for testing
302    struct MockProducer {
303        call_count: Arc<AtomicUsize>,
304    }
305
306    impl ProducerTrait for MockProducer {
307        fn produce_any<'a>(
308            &'a self,
309            _value: Box<dyn Any + Send>,
310        ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>> {
311            let call_count = self.call_count.clone();
312            Box::pin(async move {
313                call_count.fetch_add(1, Ordering::SeqCst);
314                Ok(())
315            })
316        }
317    }
318
319    #[tokio::test]
320    async fn test_single_route() {
321        let call_count = Arc::new(AtomicUsize::new(0));
322
323        let routes = vec![Route {
324            resource_id: Arc::from("test/resource"),
325            producer: Box::new(MockProducer {
326                call_count: call_count.clone(),
327            }),
328            deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
329        }];
330
331        let router = Router::new(routes);
332
333        router.route("test/resource", b"dummy").await.unwrap();
334
335        assert_eq!(call_count.load(Ordering::SeqCst), 1);
336    }
337
338    #[tokio::test]
339    async fn test_multiple_routes_same_resource() {
340        let call_count1 = Arc::new(AtomicUsize::new(0));
341        let call_count2 = Arc::new(AtomicUsize::new(0));
342
343        let routes = vec![
344            Route {
345                resource_id: Arc::from("shared/resource"),
346                producer: Box::new(MockProducer {
347                    call_count: call_count1.clone(),
348                }),
349                deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
350            },
351            Route {
352                resource_id: Arc::from("shared/resource"),
353                producer: Box::new(MockProducer {
354                    call_count: call_count2.clone(),
355                }),
356                deserializer: Arc::new(|_bytes| Ok(Box::new("test".to_string()))),
357            },
358        ];
359
360        let router = Router::new(routes);
361
362        router.route("shared/resource", b"dummy").await.unwrap();
363
364        // Both producers should be called
365        assert_eq!(call_count1.load(Ordering::SeqCst), 1);
366        assert_eq!(call_count2.load(Ordering::SeqCst), 1);
367    }
368
369    #[tokio::test]
370    async fn test_unknown_resource() {
371        let routes = vec![Route {
372            resource_id: Arc::from("test/resource"),
373            producer: Box::new(MockProducer {
374                call_count: Arc::new(AtomicUsize::new(0)),
375            }),
376            deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
377        }];
378
379        let router = Router::new(routes);
380
381        // Should not panic on unknown resource
382        router.route("unknown/resource", b"dummy").await.unwrap();
383    }
384
385    #[tokio::test]
386    async fn test_resource_ids_deduplication() {
387        let routes = vec![
388            Route {
389                resource_id: Arc::from("resource1"),
390                producer: Box::new(MockProducer {
391                    call_count: Arc::new(AtomicUsize::new(0)),
392                }),
393                deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
394            },
395            Route {
396                resource_id: Arc::from("resource1"), // Duplicate
397                producer: Box::new(MockProducer {
398                    call_count: Arc::new(AtomicUsize::new(0)),
399                }),
400                deserializer: Arc::new(|_bytes| Ok(Box::new("test".to_string()))),
401            },
402            Route {
403                resource_id: Arc::from("resource2"),
404                producer: Box::new(MockProducer {
405                    call_count: Arc::new(AtomicUsize::new(0)),
406                }),
407                deserializer: Arc::new(|_bytes| Ok(Box::new(99i32))),
408            },
409        ];
410
411        let router = Router::new(routes);
412        let ids = router.resource_ids();
413
414        assert_eq!(ids.len(), 2);
415        assert!(ids.iter().any(|id| id.as_ref() == "resource1"));
416        assert!(ids.iter().any(|id| id.as_ref() == "resource2"));
417    }
418}