#[cfg(not(feature = "std"))]
extern crate alloc;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
#[cfg(feature = "std")]
use std::sync::Arc;
use crate::connector::{DeserializerKind, ProducerTrait};
pub struct Route {
pub resource_id: Arc<str>,
pub producer: Box<dyn ProducerTrait>,
pub deserializer: DeserializerKind,
}
pub struct Router {
routes: Vec<Route>,
}
impl Router {
pub fn new(routes: Vec<Route>) -> Self {
Self { routes }
}
pub async fn route(
&self,
resource_id: &str,
payload: &[u8],
ctx: Option<&Arc<dyn core::any::Any + Send + Sync>>,
) -> Result<(), String> {
let mut routed = false;
let mut matched = false;
for route in &self.routes {
if route.resource_id.as_ref() == resource_id {
matched = true;
let result = match &route.deserializer {
DeserializerKind::Raw(deser) => (deser)(payload),
DeserializerKind::Context(deser) => match ctx {
Some(ctx) => (deser)(ctx.clone(), payload),
None => {
#[cfg(feature = "tracing")]
tracing::warn!(
"Context deserializer on '{}' but no context provided, skipping",
resource_id
);
#[cfg(feature = "defmt")]
defmt::warn!(
"Context deserializer on '{}' but no context provided",
resource_id
);
continue;
}
},
};
match result {
Ok(value_any) => {
match route.producer.produce_any(value_any).await {
Ok(()) => {
routed = true;
#[cfg(feature = "tracing")]
tracing::debug!("Routed message on '{}' to producer", resource_id);
}
Err(_e) => {
#[cfg(feature = "tracing")]
tracing::error!(
"Failed to produce message on '{}': {}",
resource_id,
_e
);
#[cfg(feature = "defmt")]
defmt::error!(
"Failed to produce message on '{}': {}",
resource_id,
_e.as_str()
);
}
}
}
Err(_e) => {
#[cfg(feature = "tracing")]
tracing::warn!(
"Failed to deserialize message on '{}': {}",
resource_id,
_e
);
#[cfg(feature = "defmt")]
defmt::warn!(
"Failed to deserialize message on '{}': {}",
resource_id,
_e.as_str()
);
}
}
}
}
if !routed {
if matched {
#[cfg(feature = "tracing")]
tracing::debug!("Route matched for '{}' but message was not produced (missing context or errors)", resource_id);
#[cfg(feature = "defmt")]
defmt::debug!("Route matched for '{}' but not produced", resource_id);
} else {
#[cfg(feature = "tracing")]
tracing::debug!("No route found for resource: '{}'", resource_id);
#[cfg(feature = "defmt")]
defmt::debug!("No route found for resource: '{}'", resource_id);
}
}
Ok(())
}
pub fn resource_ids(&self) -> Vec<Arc<str>> {
let mut ids: Vec<Arc<str>> = self.routes.iter().map(|r| r.resource_id.clone()).collect();
ids.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
ids.dedup_by(|a, b| a.as_ref() == b.as_ref());
ids
}
pub fn route_count(&self) -> usize {
self.routes.len()
}
}
pub struct RouterBuilder {
routes: Vec<Route>,
}
impl RouterBuilder {
pub fn new() -> Self {
Self { routes: Vec::new() }
}
pub fn from_routes(routes: Vec<(String, Box<dyn ProducerTrait>, DeserializerKind)>) -> Self {
let mut builder = Self::new();
for (resource_id, producer, deserializer) in routes {
let resource_id_arc: Arc<str> = Arc::from(resource_id.as_str());
builder = builder.add_route(resource_id_arc, producer, deserializer);
}
builder
}
pub fn add_route(
mut self,
resource_id: Arc<str>,
producer: Box<dyn ProducerTrait>,
deserializer: DeserializerKind,
) -> Self {
self.routes.push(Route {
resource_id,
producer,
deserializer,
});
self
}
pub fn build(self) -> Router {
Router::new(self.routes)
}
pub fn route_count(&self) -> usize {
self.routes.len()
}
}
impl Default for RouterBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(all(test, feature = "std"))]
mod tests {
use super::*;
use crate::connector::ProducerTrait;
use core::future::Future;
use core::pin::Pin;
use std::any::Any;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct MockProducer {
call_count: Arc<AtomicUsize>,
}
impl ProducerTrait for MockProducer {
fn produce_any<'a>(
&'a self,
_value: Box<dyn Any + Send>,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>> {
let call_count = self.call_count.clone();
Box::pin(async move {
call_count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
}
}
#[tokio::test]
async fn test_single_route() {
let call_count = Arc::new(AtomicUsize::new(0));
let routes = vec![Route {
resource_id: Arc::from("test/resource"),
producer: Box::new(MockProducer {
call_count: call_count.clone(),
}),
deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
}];
let router = Router::new(routes);
router.route("test/resource", b"dummy", None).await.unwrap();
assert_eq!(call_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multiple_routes_same_resource() {
let call_count1 = Arc::new(AtomicUsize::new(0));
let call_count2 = Arc::new(AtomicUsize::new(0));
let routes = vec![
Route {
resource_id: Arc::from("shared/resource"),
producer: Box::new(MockProducer {
call_count: call_count1.clone(),
}),
deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
},
Route {
resource_id: Arc::from("shared/resource"),
producer: Box::new(MockProducer {
call_count: call_count2.clone(),
}),
deserializer: DeserializerKind::Raw(Arc::new(|_bytes| {
Ok(Box::new("test".to_string()))
})),
},
];
let router = Router::new(routes);
router
.route("shared/resource", b"dummy", None)
.await
.unwrap();
assert_eq!(call_count1.load(Ordering::SeqCst), 1);
assert_eq!(call_count2.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_unknown_resource() {
let routes = vec![Route {
resource_id: Arc::from("test/resource"),
producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
}];
let router = Router::new(routes);
router
.route("unknown/resource", b"dummy", None)
.await
.unwrap();
}
#[tokio::test]
async fn test_resource_ids_deduplication() {
let routes = vec![
Route {
resource_id: Arc::from("resource1"),
producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(42i32)))),
},
Route {
resource_id: Arc::from("resource1"), producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: DeserializerKind::Raw(Arc::new(|_bytes| {
Ok(Box::new("test".to_string()))
})),
},
Route {
resource_id: Arc::from("resource2"),
producer: Box::new(MockProducer {
call_count: Arc::new(AtomicUsize::new(0)),
}),
deserializer: DeserializerKind::Raw(Arc::new(|_bytes| Ok(Box::new(99i32)))),
},
];
let router = Router::new(routes);
let ids = router.resource_ids();
assert_eq!(ids.len(), 2);
assert!(ids.iter().any(|id| id.as_ref() == "resource1"));
assert!(ids.iter().any(|id| id.as_ref() == "resource2"));
}
#[tokio::test]
async fn test_context_deserializer_with_context() {
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();
let routes = vec![Route {
resource_id: Arc::from("ctx/resource"),
producer: Box::new(MockProducer {
call_count: call_count.clone(),
}),
deserializer: DeserializerKind::Context(Arc::new(move |_ctx, _bytes| {
Ok(Box::new(42i32) as Box<dyn Any + Send>)
})),
}];
let router = Router::new(routes);
let ctx: Arc<dyn Any + Send + Sync> = Arc::new(0i32);
router
.route("ctx/resource", b"dummy", Some(&ctx))
.await
.unwrap();
assert_eq!(call_count_clone.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_context_deserializer_without_context_skips() {
let call_count = Arc::new(AtomicUsize::new(0));
let routes = vec![Route {
resource_id: Arc::from("ctx/resource"),
producer: Box::new(MockProducer {
call_count: call_count.clone(),
}),
deserializer: DeserializerKind::Context(Arc::new(|_ctx, _bytes| {
Ok(Box::new(42i32) as Box<dyn Any + Send>)
})),
}];
let router = Router::new(routes);
router.route("ctx/resource", b"dummy", None).await.unwrap();
assert_eq!(call_count.load(Ordering::SeqCst), 0);
}
}