#[cfg(not(feature = "std"))]
extern crate alloc;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
#[cfg(feature = "std")]
use std::sync::Arc;
use crate::connector::{DeserializerFn, ProducerTrait};
pub struct Route {
pub resource_id: Arc<str>,
pub producer: Box<dyn ProducerTrait>,
pub deserializer: DeserializerFn,
}
pub struct Router {
routes: Vec<Route>,
}
impl Router {
pub fn new(routes: Vec<Route>) -> Self {
Self { routes }
}
pub async fn route(&self, resource_id: &str, payload: &[u8]) -> Result<(), String> {
let mut routed = false;
for route in &self.routes {
if route.resource_id.as_ref() == resource_id {
match (route.deserializer)(payload) {
Ok(value_any) => {
match route.producer.produce_any(value_any).await {
Ok(()) => {
routed = true;
#[cfg(feature = "tracing")]
tracing::debug!("Routed message on '{}' to producer", resource_id);
}
Err(_e) => {
#[cfg(feature = "tracing")]
tracing::error!(
"Failed to produce message on '{}': {}",
resource_id,
_e
);
#[cfg(feature = "defmt")]
defmt::error!(
"Failed to produce message on '{}': {}",
resource_id,
_e.as_str()
);
}
}
}
Err(_e) => {
#[cfg(feature = "tracing")]
tracing::warn!(
"Failed to deserialize message on '{}': {}",
resource_id,
_e
);
#[cfg(feature = "defmt")]
defmt::warn!(
"Failed to deserialize message on '{}': {}",
resource_id,
_e.as_str()
);
}
}
}
}
if !routed {
#[cfg(feature = "tracing")]
tracing::debug!("No route found for resource: '{}'", resource_id);
#[cfg(feature = "defmt")]
defmt::debug!("No route found for resource: '{}'", resource_id);
}
Ok(())
}
pub fn resource_ids(&self) -> Vec<Arc<str>> {
let mut ids: Vec<Arc<str>> = self.routes.iter().map(|r| r.resource_id.clone()).collect();
ids.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
ids.dedup_by(|a, b| a.as_ref() == b.as_ref());
ids
}
pub fn route_count(&self) -> usize {
self.routes.len()
}
}
pub struct RouterBuilder {
routes: Vec<Route>,
}
impl RouterBuilder {
pub fn new() -> Self {
Self { routes: Vec::new() }
}
pub fn from_routes(routes: Vec<(String, Box<dyn ProducerTrait>, DeserializerFn)>) -> Self {
let mut builder = Self::new();
for (resource_id, producer, deserializer) in routes {
let resource_id_arc: Arc<str> = Arc::from(resource_id.as_str());
builder = builder.add_route(resource_id_arc, producer, deserializer);
}
builder
}
pub fn add_route(
mut self,
resource_id: Arc<str>,
producer: Box<dyn ProducerTrait>,
deserializer: DeserializerFn,
) -> Self {
self.routes.push(Route {
resource_id,
producer,
deserializer,
});
self
}
pub fn build(self) -> Router {
Router::new(self.routes)
}
pub fn route_count(&self) -> usize {
self.routes.len()
}
}
impl Default for RouterBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(all(test, feature = "std"))]
mod tests {
use super::*;
use crate::connector::ProducerTrait;
use core::future::Future;
use core::pin::Pin;
use std::any::Any;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct MockProducer {
call_count: Arc<AtomicUsize>,
}
impl ProducerTrait for MockProducer {
fn produce_any<'a>(
&'a self,
_value: Box<dyn Any + Send>,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>> {
let call_count = self.call_count.clone();
Box::pin(async move {
call_count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
}
#[tokio::test]
async fn test_single_route() {
let call_count = Arc::new(AtomicUsize::new(0));
let routes = vec![Route {
resource_id: Arc::from("test/resource"),
producer: Box::new(MockProducer {
call_count: call_count.clone(),
}),
deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
}];
let router = Router::new(routes);
router.route("test/resource", b"dummy").await.unwrap();
assert_eq!(call_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multiple_routes_same_resource() {
let call_count1 = Arc::new(AtomicUsize::new(0));
let call_count2 = Arc::new(AtomicUsize::new(0));
let routes = vec![
Route {
resource_id: Arc::from("shared/resource"),
producer: Box::new(MockProducer {
call_count: call_count1.clone(),
}),
deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
},
Route {
resource_id: Arc::from("shared/resource"),
producer: Box::new(MockProducer {
call_count: call_count2.clone(),
}),
deserializer: Arc::new(|_bytes| Ok(Box::new("test".to_string()))),
},
];
let router = Router::new(routes);
router.route("shared/resource", b"dummy").await.unwrap();
assert_eq!(call_count1.load(Ordering::SeqCst), 1);
assert_eq!(call_count2.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_unknown_resource() {
let routes = vec![Route {
resource_id: Arc::from("test/resource"),
producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
}];
let router = Router::new(routes);
router.route("unknown/resource", b"dummy").await.unwrap();
}
#[tokio::test]
async fn test_resource_ids_deduplication() {
let routes = vec![
Route {
resource_id: Arc::from("resource1"),
producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: Arc::new(|_bytes| Ok(Box::new(42i32))),
},
Route {
resource_id: Arc::from("resource1"), producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: Arc::new(|_bytes| Ok(Box::new("test".to_string()))),
},
Route {
resource_id: Arc::from("resource2"),
producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: Arc::new(|_bytes| Ok(Box::new(99i32))),
},
];
let router = Router::new(routes);
let ids = router.resource_ids();
assert_eq!(ids.len(), 2);
assert!(ids.iter().any(|id| id.as_ref() == "resource1"));
assert!(ids.iter().any(|id| id.as_ref() == "resource2"));
}
}