aimdb_core/router.rs
1//! Generic message router for efficient connector dispatch
2//!
3//! Provides O(M) routing complexity instead of O(N×M) filtered streams.
4//! Routes incoming messages directly to type-specific producers based on topic/key matching.
5//!
6//! This router is protocol-agnostic and can be used by any connector:
7//! - MQTT: Routes topics to producers
8//! - Kafka: Routes topics/partitions to producers
9//! - HTTP: Routes paths to producers
10//! - DDS: Routes topics to producers
11//! - Shared Memory: Routes segment names to producers
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[cfg(not(feature = "std"))]
17use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
18
19#[cfg(feature = "std")]
20use std::sync::Arc;
21
22use crate::connector::{DeserializerFn, ProducerTrait};
23
24/// A single routing entry
25///
26/// Maps one (resource_id, type) pair to a producer and deserializer.
27/// Multiple routes can exist for the same resource_id (different types).
28///
29/// # Resource ID Examples
30///
31/// - MQTT: "sensors/temperature" (topic)
32/// - Kafka: "events:0" (topic:partition)
33/// - HTTP: "/api/v1/sensors" (path)
34/// - DDS: "TelemetryData" (topic name)
35/// - Shmem: "temperature_buffer" (segment name)
36pub struct Route {
37 /// Resource identifier to match (reference-counted for proper memory management)
38 ///
39 /// Examples: MQTT topic, Kafka topic, HTTP path, DDS topic, shmem segment
40 ///
41 /// Uses Arc<str> instead of &'static str to avoid memory leaks from Box::leak().
42 /// This adds ~8 bytes overhead per route (Arc control block) but enables proper cleanup.
43 pub resource_id: Arc<str>,
44
45 /// Type-erased producer for this route
46 pub producer: Box<dyn ProducerTrait>,
47
48 /// Deserializer for converting bytes → typed value
49 pub deserializer: DeserializerFn,
50}
51
52/// Generic message router for connector dispatch
53///
54/// Routes incoming messages to appropriate producers based on resource_id.
55/// Uses linear search which is efficient for <100 routes.
56///
57/// # Performance
58///
59/// - O(M) complexity where M = number of routes
60/// - May check multiple routes if same resource_id maps to multiple types
61/// - Typical routing time: <1μs for <50 routes
62///
63/// # Protocol Support
64///
65/// This router is protocol-agnostic. Each connector uses it with their own resource_id format:
66/// - **MQTT**: `topic` (e.g., "sensors/temperature")
67/// - **Kafka**: `topic` or `topic:partition` (e.g., "events" or "events:0")
68/// - **HTTP**: `path` (e.g., "/api/v1/sensors")
69/// - **DDS**: `topic_name` (e.g., "TelemetryData")
70/// - **Shmem**: `segment_name` (e.g., "temperature_buffer")
71pub struct Router {
72 /// List of all registered routes
73 routes: Vec<Route>,
74}
75
76impl Router {
77 /// Create a new router with the given routes
78 pub fn new(routes: Vec<Route>) -> Self {
79 Self { routes }
80 }
81
82 /// Route a message to appropriate producer(s)
83 ///
84 /// # Arguments
85 /// * `resource_id` - Resource identifier (topic, path, segment name, etc.)
86 /// * `payload` - Raw message payload bytes
87 ///
88 /// # Returns
89 /// * `Ok(())` - At least one route successfully processed the message
90 /// * `Err(_)` - All routes failed (or no routes found)
91 ///
92 /// # Behavior
93 /// - Checks all routes that match the resource_id (may be multiple)
94 /// - Logs warnings on deserialization failures but continues
95 /// - Logs debug message if no routes found for resource_id
96 pub async fn route(&self, resource_id: &str, payload: &[u8]) -> Result<(), String> {
97 let mut routed = false;
98
99 // Linear search through all routes
100 // Note: Multiple routes may match the same resource_id (different types)
101 for route in &self.routes {
102 if route.resource_id.as_ref() == resource_id {
103 // Deserialize the payload
104 match (route.deserializer)(payload) {
105 Ok(value_any) => {
106 // Produce into the buffer
107 match route.producer.produce_any(value_any).await {
108 Ok(()) => {
109 routed = true;
110
111 #[cfg(feature = "tracing")]
112 tracing::debug!("Routed message on '{}' to producer", resource_id);
113 }
114 Err(_e) => {
115 #[cfg(feature = "tracing")]
116 tracing::error!(
117 "Failed to produce message on '{}': {}",
118 resource_id,
119 _e
120 );
121
122 #[cfg(feature = "defmt")]
123 defmt::error!(
124 "Failed to produce message on '{}': {}",
125 resource_id,
126 _e.as_str()
127 );
128 }
129 }
130 }
131 Err(_e) => {
132 #[cfg(feature = "tracing")]
133 tracing::warn!(
134 "Failed to deserialize message on '{}': {}",
135 resource_id,
136 _e
137 );
138
139 #[cfg(feature = "defmt")]
140 defmt::warn!(
141 "Failed to deserialize message on '{}': {}",
142 resource_id,
143 _e.as_str()
144 );
145 }
146 }
147 }
148 }
149
150 if !routed {
151 #[cfg(feature = "tracing")]
152 tracing::debug!("No route found for resource: '{}'", resource_id);
153
154 #[cfg(feature = "defmt")]
155 defmt::debug!("No route found for resource: '{}'", resource_id);
156 }
157
158 Ok(())
159 }
160
161 /// Get list of all resource IDs registered in this router
162 ///
163 /// Useful for subscribing at the protocol level (e.g., MQTT SUBSCRIBE).
164 /// Returns unique resource IDs (deduplicated even if multiple routes per resource).
165 pub fn resource_ids(&self) -> Vec<Arc<str>> {
166 let mut ids: Vec<Arc<str>> = self.routes.iter().map(|r| r.resource_id.clone()).collect();
167
168 // Deduplicate by converting to strings for comparison
169 ids.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
170 ids.dedup_by(|a, b| a.as_ref() == b.as_ref());
171
172 ids
173 }
174
175 /// Get the number of routes in this router
176 pub fn route_count(&self) -> usize {
177 self.routes.len()
178 }
179}
180
181/// Builder for constructing routers
182///
183/// Provides a fluent API for adding routes before creating the router.
184///
185/// # Example
186///
187/// ```rust,ignore
188/// use aimdb_core::router::RouterBuilder;
189///
190/// let router = RouterBuilder::new()
191/// .add_route(
192/// "sensors/temperature",
193/// producer_temp.clone(),
194/// Arc::new(|bytes| {
195/// serde_json::from_slice::<Temperature>(bytes)
196/// .map(|t| Box::new(t) as Box<dyn Any + Send>)
197/// .map_err(|e| e.to_string())
198/// })
199/// )
200/// .add_route(
201/// "sensors/humidity",
202/// producer_humidity.clone(),
203/// Arc::new(|bytes| {
204/// serde_json::from_slice::<Humidity>(bytes)
205/// .map(|h| Box::new(h) as Box<dyn Any + Send>)
206/// .map_err(|e| e.to_string())
207/// })
208/// )
209/// .build();
210/// ```
211pub struct RouterBuilder {
212 routes: Vec<Route>,
213}
214
215impl RouterBuilder {
216 /// Create a new router builder
217 pub fn new() -> Self {
218 Self { routes: Vec::new() }
219 }
220
221 /// Create a router builder from a collection of routes
222 ///
223 /// This is a convenience method for automatic router construction from
224 /// `AimDb::collect_inbound_routes()`. The resource_ids are converted to
225 /// Arc<str> for proper memory management.
226 ///
227 /// # Arguments
228 /// * `routes` - Vector of (resource_id, producer, deserializer) tuples
229 ///
230 /// # Example
231 /// ```rust,ignore
232 /// let routes = db.collect_inbound_routes("mqtt");
233 /// let router = RouterBuilder::from_routes(routes).build();
234 /// connector.set_router(router).await?;
235 /// ```
236 pub fn from_routes(routes: Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>) -> Self {
237 let mut builder = Self::new();
238 for (resource_id, producer, deserializer) in routes {
239 // Convert String to Arc<str> - no leaking needed!
240 let resource_id_arc: Arc<str> = Arc::from(resource_id.as_str());
241 builder = builder.add_route(resource_id_arc, producer, deserializer);
242 }
243 builder
244 }
245
246 /// Add a route to the router
247 ///
248 /// # Arguments
249 /// * `resource_id` - Resource identifier to match (as Arc<str>)
250 /// * `producer` - Producer that implements ProducerTrait
251 /// * `deserializer` - Function to deserialize bytes to the target type
252 ///
253 /// # Resource ID Memory Management
254 /// The resource_id is stored as Arc<str> for proper reference counting and cleanup.
255 /// You can create an Arc<str> from:
256 /// - String literal: `Arc::from("sensors/temperature")`
257 /// - Owned String: `Arc::from(string.as_str())`
258 pub fn add_route(
259 mut self,
260 resource_id: Arc<str>,
261 producer: Box<dyn ProducerTrait>,
262 deserializer: DeserializerFn,
263 ) -> Self {
264 self.routes.push(Route {
265 resource_id,
266 producer,
267 deserializer,
268 });
269 self
270 }
271
272 /// Build the router
273 ///
274 /// Consumes the builder and returns a configured Router.
275 pub fn build(self) -> Router {
276 Router::new(self.routes)
277 }
278
279 /// Get the number of routes that will be created
280 pub fn route_count(&self) -> usize {
281 self.routes.len()
282 }
283}
284
285impl Default for RouterBuilder {
286 fn default() -> Self {
287 Self::new()
288 }
289}
290
291#[cfg(all(test, feature = "std"))]
292mod tests {
293 use super::*;
294 use crate::connector::ProducerTrait;
295 use core::future::Future;
296 use core::pin::Pin;
297 use std::any::Any;
298 use std::sync::atomic::{AtomicUsize, Ordering};
299 use std::sync::Arc;
300
301 // Mock producer for testing
302 struct MockProducer {
303 call_count: Arc<AtomicUsize>,
304 }
305
306 impl ProducerTrait for MockProducer {
307 fn produce_any<'a>(
308 &'a self,
309 _value: Box<dyn Any + Send>,
310 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>> {
311 let call_count = self.call_count.clone();
312 Box::pin(async move {
313 call_count.fetch_add(1, Ordering::SeqCst);
314 Ok(())
315 })
316 }
317 }
318
319 #[tokio::test]
320 async fn test_single_route() {
321 let call_count = Arc::new(AtomicUsize::new(0));
322
323 let routes = vec![Route {
324 resource_id: Arc::from("test/resource"),
325 producer: Box::new(MockProducer {
326 call_count: call_count.clone(),
327 }),
328 deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
329 }];
330
331 let router = Router::new(routes);
332
333 router.route("test/resource", b"dummy").await.unwrap();
334
335 assert_eq!(call_count.load(Ordering::SeqCst), 1);
336 }
337
338 #[tokio::test]
339 async fn test_multiple_routes_same_resource() {
340 let call_count1 = Arc::new(AtomicUsize::new(0));
341 let call_count2 = Arc::new(AtomicUsize::new(0));
342
343 let routes = vec![
344 Route {
345 resource_id: Arc::from("shared/resource"),
346 producer: Box::new(MockProducer {
347 call_count: call_count1.clone(),
348 }),
349 deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
350 },
351 Route {
352 resource_id: Arc::from("shared/resource"),
353 producer: Box::new(MockProducer {
354 call_count: call_count2.clone(),
355 }),
356 deserializer: Arc::new(|_bytes| Ok(Box::new("test".to_string()))),
357 },
358 ];
359
360 let router = Router::new(routes);
361
362 router.route("shared/resource", b"dummy").await.unwrap();
363
364 // Both producers should be called
365 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
366 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
367 }
368
369 #[tokio::test]
370 async fn test_unknown_resource() {
371 let routes = vec![Route {
372 resource_id: Arc::from("test/resource"),
373 producer: Box::new(MockProducer {
374 call_count: Arc::new(AtomicUsize::new(0)),
375 }),
376 deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
377 }];
378
379 let router = Router::new(routes);
380
381 // Should not panic on unknown resource
382 router.route("unknown/resource", b"dummy").await.unwrap();
383 }
384
385 #[tokio::test]
386 async fn test_resource_ids_deduplication() {
387 let routes = vec![
388 Route {
389 resource_id: Arc::from("resource1"),
390 producer: Box::new(MockProducer {
391 call_count: Arc::new(AtomicUsize::new(0)),
392 }),
393 deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
394 },
395 Route {
396 resource_id: Arc::from("resource1"), // Duplicate
397 producer: Box::new(MockProducer {
398 call_count: Arc::new(AtomicUsize::new(0)),
399 }),
400 deserializer: Arc::new(|_bytes| Ok(Box::new("test".to_string()))),
401 },
402 Route {
403 resource_id: Arc::from("resource2"),
404 producer: Box::new(MockProducer {
405 call_count: Arc::new(AtomicUsize::new(0)),
406 }),
407 deserializer: Arc::new(|_bytes| Ok(Box::new(99i32))),
408 },
409 ];
410
411 let router = Router::new(routes);
412 let ids = router.resource_ids();
413
414 assert_eq!(ids.len(), 2);
415 assert!(ids.iter().any(|id| id.as_ref() == "resource1"));
416 assert!(ids.iter().any(|id| id.as_ref() == "resource2"));
417 }
418}