use core::fmt::{self, Debug};
use core::future::Future;
use core::pin::Pin;
extern crate alloc;
use alloc::{
boxed::Box,
string::{String, ToString},
sync::Arc,
vec::Vec,
};
#[cfg(feature = "std")]
use alloc::format;
use crate::{builder::AimDb, transport::Connector, DbResult};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SerializeError {
BufferTooSmall,
TypeMismatch,
InvalidData,
}
#[cfg(feature = "defmt")]
impl defmt::Format for SerializeError {
fn format(&self, f: defmt::Formatter) {
match self {
Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
Self::InvalidData => defmt::write!(f, "InvalidData"),
}
}
}
#[cfg(feature = "std")]
impl std::fmt::Display for SerializeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BufferTooSmall => write!(f, "Output buffer too small"),
Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
Self::InvalidData => write!(f, "Invalid data for serialization"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for SerializeError {}
pub type SerializerFn =
Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
pub trait TopicProvider<T>: Send + Sync {
fn topic(&self, value: &T) -> Option<String>;
}
pub trait TopicProviderAny: Send + Sync {
fn topic_any(&self, value: &dyn core::any::Any) -> Option<String>;
}
pub struct TopicProviderWrapper<T, P>
where
T: 'static,
P: TopicProvider<T>,
{
provider: P,
_phantom: core::marker::PhantomData<fn(T) -> T>,
}
impl<T, P> TopicProviderWrapper<T, P>
where
T: 'static,
P: TopicProvider<T>,
{
pub fn new(provider: P) -> Self {
Self {
provider,
_phantom: core::marker::PhantomData,
}
}
}
impl<T, P> TopicProviderAny for TopicProviderWrapper<T, P>
where
T: 'static,
P: TopicProvider<T> + Send + Sync,
{
fn topic_any(&self, value: &dyn core::any::Any) -> Option<String> {
value
.downcast_ref::<T>()
.and_then(|v| self.provider.topic(v))
}
}
pub type TopicProviderFn = Arc<dyn TopicProviderAny>;
#[derive(Clone, Debug, PartialEq)]
pub struct ConnectorUrl {
pub scheme: String,
pub host: String,
pub port: Option<u16>,
pub path: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub query_params: Vec<(String, String)>,
}
impl ConnectorUrl {
pub fn parse(url: &str) -> DbResult<Self> {
parse_connector_url(url)
}
pub fn default_port(&self) -> Option<u16> {
match self.scheme.as_str() {
"mqtt" | "ws" => Some(1883),
"mqtts" | "wss" => Some(8883),
"kafka" => Some(9092),
"http" => Some(80),
"https" => Some(443),
_ => None,
}
}
pub fn effective_port(&self) -> Option<u16> {
self.port.or_else(|| self.default_port())
}
pub fn is_secure(&self) -> bool {
matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
}
pub fn scheme(&self) -> &str {
&self.scheme
}
pub fn path(&self) -> &str {
self.path.as_deref().unwrap_or("/")
}
pub fn resource_id(&self) -> String {
let path = self.path().trim_start_matches('/');
if !self.host.is_empty() && !path.is_empty() {
alloc::format!("{}/{}", self.host, path)
} else if !self.host.is_empty() {
self.host.clone()
} else if !path.is_empty() {
path.to_string()
} else {
String::new()
}
}
}
impl fmt::Display for ConnectorUrl {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}://", self.scheme)?;
if let Some(ref username) = self.username {
write!(f, "{}", username)?;
if self.password.is_some() {
write!(f, ":****")?; }
write!(f, "@")?;
}
write!(f, "{}", self.host)?;
if let Some(port) = self.port {
write!(f, ":{}", port)?;
}
if let Some(ref path) = self.path {
if !path.starts_with('/') {
write!(f, "/")?;
}
write!(f, "{}", path)?;
}
Ok(())
}
}
#[derive(Clone)]
pub enum ConnectorClient {
Mqtt(Arc<dyn core::any::Any + Send + Sync>),
Kafka(Arc<dyn core::any::Any + Send + Sync>),
Http(Arc<dyn core::any::Any + Send + Sync>),
Generic {
protocol: String,
client: Arc<dyn core::any::Any + Send + Sync>,
},
}
impl Debug for ConnectorClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
ConnectorClient::Generic { protocol, .. } => {
write!(f, "ConnectorClient::Generic({})", protocol)
}
}
}
}
impl ConnectorClient {
pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
match self {
ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
}
}
}
#[derive(Clone)]
pub struct ConnectorLink {
pub url: ConnectorUrl,
pub config: Vec<(String, String)>,
pub serializer: Option<SerializerFn>,
#[cfg(feature = "alloc")]
pub consumer_factory: Option<ConsumerFactoryFn>,
pub topic_provider: Option<TopicProviderFn>,
}
impl Debug for ConnectorLink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectorLink")
.field("url", &self.url)
.field("config", &self.config)
.field(
"serializer",
&self.serializer.as_ref().map(|_| "<function>"),
)
.field(
"consumer_factory",
#[cfg(feature = "alloc")]
&self.consumer_factory.as_ref().map(|_| "<function>"),
#[cfg(not(feature = "alloc"))]
&None::<()>,
)
.field(
"topic_provider",
&self.topic_provider.as_ref().map(|_| "<function>"),
)
.finish()
}
}
impl ConnectorLink {
pub fn new(url: ConnectorUrl) -> Self {
Self {
url,
config: Vec::new(),
serializer: None,
#[cfg(feature = "alloc")]
consumer_factory: None,
topic_provider: None,
}
}
pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.config.push((key.into(), value.into()));
self
}
#[cfg(feature = "alloc")]
pub fn create_consumer(
&self,
db_any: Arc<dyn core::any::Any + Send + Sync>,
) -> Option<Box<dyn ConsumerTrait>> {
self.consumer_factory.as_ref().map(|f| f(db_any))
}
}
pub type DeserializerFn =
Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
#[cfg(feature = "alloc")]
pub type ProducerFactoryFn =
Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait> + Send + Sync>;
pub type TopicResolverFn = Arc<dyn Fn() -> Option<String> + Send + Sync>;
pub trait ProducerTrait: Send + Sync {
fn produce_any<'a>(
&'a self,
value: Box<dyn core::any::Any + Send>,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
}
#[cfg(feature = "alloc")]
pub type ConsumerFactoryFn =
Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ConsumerTrait> + Send + Sync>;
pub trait ConsumerTrait: Send + Sync {
fn subscribe_any<'a>(&'a self) -> SubscribeAnyFuture<'a>;
}
type SubscribeAnyFuture<'a> =
Pin<Box<dyn Future<Output = DbResult<Box<dyn AnyReader>>> + Send + 'a>>;
type RecvAnyFuture<'a> =
Pin<Box<dyn Future<Output = DbResult<Box<dyn core::any::Any + Send>>> + Send + 'a>>;
pub trait AnyReader: Send {
fn recv_any<'a>(&'a mut self) -> RecvAnyFuture<'a>;
}
pub struct InboundConnectorLink {
pub url: ConnectorUrl,
pub config: Vec<(String, String)>,
pub deserializer: DeserializerFn,
#[cfg(feature = "alloc")]
pub producer_factory: Option<ProducerFactoryFn>,
pub topic_resolver: Option<TopicResolverFn>,
}
impl Clone for InboundConnectorLink {
fn clone(&self) -> Self {
Self {
url: self.url.clone(),
config: self.config.clone(),
deserializer: self.deserializer.clone(),
#[cfg(feature = "alloc")]
producer_factory: self.producer_factory.clone(),
topic_resolver: self.topic_resolver.clone(),
}
}
}
impl Debug for InboundConnectorLink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InboundConnectorLink")
.field("url", &self.url)
.field("config", &self.config)
.field("deserializer", &"<function>")
.field(
"topic_resolver",
&self.topic_resolver.as_ref().map(|_| "<function>"),
)
.finish()
}
}
impl InboundConnectorLink {
pub fn new(url: ConnectorUrl, deserializer: DeserializerFn) -> Self {
Self {
url,
config: Vec::new(),
deserializer,
#[cfg(feature = "alloc")]
producer_factory: None,
topic_resolver: None,
}
}
#[cfg(feature = "alloc")]
pub fn with_producer_factory<F>(mut self, factory: F) -> Self
where
F: Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait>
+ Send
+ Sync
+ 'static,
{
self.producer_factory = Some(Arc::new(factory));
self
}
#[cfg(feature = "alloc")]
pub fn create_producer(
&self,
db_any: Arc<dyn core::any::Any + Send + Sync>,
) -> Option<Box<dyn ProducerTrait>> {
self.producer_factory.as_ref().map(|f| f(db_any))
}
pub fn resolve_topic(&self) -> String {
self.topic_resolver
.as_ref()
.and_then(|resolver| resolver())
.unwrap_or_else(|| self.url.resource_id())
}
}
pub struct OutboundConnectorLink {
pub url: ConnectorUrl,
pub config: Vec<(String, String)>,
}
fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
use crate::DbError;
let (scheme, rest) = url.split_once("://").ok_or({
#[cfg(feature = "std")]
{
DbError::InvalidOperation {
operation: "parse_connector_url".into(),
reason: format!("Missing scheme in URL: {}", url),
}
}
#[cfg(not(feature = "std"))]
{
DbError::InvalidOperation {
_operation: (),
_reason: (),
}
}
})?;
let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
let creds = &rest[..at_idx];
let host = &rest[at_idx + 1..];
(Some(creds), host)
} else {
(None, rest)
};
let (username, password) = if let Some(creds) = credentials {
if let Some((user, pass)) = creds.split_once(':') {
(Some(user.to_string()), Some(pass.to_string()))
} else {
(Some(creds.to_string()), None)
}
} else {
(None, None)
};
let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
let hp = &host_part[..slash_idx];
let path_query = &host_part[slash_idx..];
let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
(&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
} else {
(path_query, None)
};
let params = if let Some(query) = query_part {
query
.split('&')
.filter_map(|pair| {
let (k, v) = pair.split_once('=')?;
Some((k.to_string(), v.to_string()))
})
.collect()
} else {
Vec::new()
};
(hp, Some(path_part.to_string()), params)
} else {
(host_part, None, Vec::new())
};
let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
let h = &host_port[..colon_idx];
let p = &host_port[colon_idx + 1..];
let port_num = p.parse::<u16>().ok();
(h.to_string(), port_num)
} else {
(host_port.to_string(), None)
};
Ok(ConnectorUrl {
scheme: scheme.to_string(),
host,
port,
path,
username,
password,
query_params,
})
}
pub trait ConnectorBuilder<R>: Send + Sync
where
R: aimdb_executor::Spawn + 'static,
{
#[allow(clippy::type_complexity)]
fn build<'a>(
&'a self,
db: &'a AimDb<R>,
) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
fn scheme(&self) -> &str;
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::format;
#[test]
fn test_parse_simple_mqtt() {
let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
assert_eq!(url.scheme, "mqtt");
assert_eq!(url.host, "broker.example.com");
assert_eq!(url.port, Some(1883));
assert_eq!(url.username, None);
assert_eq!(url.password, None);
}
#[test]
fn test_parse_mqtt_with_credentials() {
let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
assert_eq!(url.scheme, "mqtt");
assert_eq!(url.host, "broker.example.com");
assert_eq!(url.port, Some(1883));
assert_eq!(url.username, Some("user".to_string()));
assert_eq!(url.password, Some("pass".to_string()));
}
#[test]
fn test_parse_https_with_path() {
let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
assert_eq!(url.scheme, "https");
assert_eq!(url.host, "api.example.com");
assert_eq!(url.port, Some(8443));
assert_eq!(url.path, Some("/events".to_string()));
}
#[test]
fn test_parse_with_query_params() {
let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
assert_eq!(url.scheme, "http");
assert_eq!(url.host, "api.example.com");
assert_eq!(url.path, Some("/data".to_string()));
assert_eq!(url.query_params.len(), 2);
assert_eq!(
url.query_params[0],
("key".to_string(), "value".to_string())
);
assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
}
#[test]
fn test_default_ports() {
let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
assert_eq!(mqtt.default_port(), Some(1883));
assert_eq!(mqtt.effective_port(), Some(1883));
let https = ConnectorUrl::parse("https://api.example.com").unwrap();
assert_eq!(https.default_port(), Some(443));
assert_eq!(https.effective_port(), Some(443));
}
#[test]
fn test_is_secure() {
assert!(ConnectorUrl::parse("mqtts://broker.local")
.unwrap()
.is_secure());
assert!(ConnectorUrl::parse("https://api.example.com")
.unwrap()
.is_secure());
assert!(ConnectorUrl::parse("wss://ws.example.com")
.unwrap()
.is_secure());
assert!(!ConnectorUrl::parse("mqtt://broker.local")
.unwrap()
.is_secure());
assert!(!ConnectorUrl::parse("http://api.example.com")
.unwrap()
.is_secure());
assert!(!ConnectorUrl::parse("ws://ws.example.com")
.unwrap()
.is_secure());
}
#[test]
fn test_display_hides_password() {
let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
let display = format!("{}", url);
assert!(display.contains("user:****"));
assert!(!display.contains("secret"));
}
#[test]
fn test_parse_kafka_style() {
let url =
ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
assert_eq!(url.scheme, "kafka");
assert!(url.host.contains("broker1.local"));
assert!(url.host.contains("broker2.local"));
assert_eq!(url.path, Some("/my-topic".to_string()));
}
#[test]
fn test_parse_missing_scheme() {
let result = ConnectorUrl::parse("broker.example.com:1883");
assert!(result.is_err());
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
struct TestTemperature {
sensor_id: String,
celsius: f32,
}
struct TestTopicProvider;
impl super::TopicProvider<TestTemperature> for TestTopicProvider {
fn topic(&self, value: &TestTemperature) -> Option<String> {
Some(format!("sensors/temp/{}", value.sensor_id))
}
}
#[test]
fn test_topic_provider_type_erasure() {
use super::{TopicProviderAny, TopicProviderWrapper};
let provider: Arc<dyn TopicProviderAny> =
Arc::new(TopicProviderWrapper::new(TestTopicProvider));
let temp = TestTemperature {
sensor_id: "kitchen-001".into(),
celsius: 22.5,
};
assert_eq!(
provider.topic_any(&temp),
Some("sensors/temp/kitchen-001".into())
);
}
#[test]
fn test_topic_provider_type_mismatch() {
use super::{TopicProviderAny, TopicProviderWrapper};
let provider: Arc<dyn TopicProviderAny> =
Arc::new(TopicProviderWrapper::new(TestTopicProvider));
let wrong_type = "not a temperature";
assert_eq!(provider.topic_any(&wrong_type), None);
}
#[test]
fn test_topic_provider_returns_none() {
struct OptionalTopicProvider;
impl super::TopicProvider<TestTemperature> for OptionalTopicProvider {
fn topic(&self, temp: &TestTemperature) -> Option<String> {
if temp.sensor_id.is_empty() {
None } else {
Some(format!("sensors/{}", temp.sensor_id))
}
}
}
use super::{TopicProviderAny, TopicProviderWrapper};
let provider: Arc<dyn TopicProviderAny> =
Arc::new(TopicProviderWrapper::new(OptionalTopicProvider));
let temp_with_id = TestTemperature {
sensor_id: "abc".into(),
celsius: 20.0,
};
assert_eq!(
provider.topic_any(&temp_with_id),
Some("sensors/abc".into())
);
let temp_without_id = TestTemperature {
sensor_id: String::new(),
celsius: 20.0,
};
assert_eq!(provider.topic_any(&temp_without_id), None);
}
#[test]
fn test_topic_resolver_returns_some() {
let resolver: super::TopicResolverFn = Arc::new(|| Some("resolved/topic".into()));
assert_eq!(resolver(), Some("resolved/topic".into()));
}
#[test]
fn test_topic_resolver_returns_none() {
let resolver: super::TopicResolverFn = Arc::new(|| None);
assert_eq!(resolver(), None);
}
#[cfg(feature = "std")]
#[test]
fn test_topic_resolver_with_captured_state() {
use std::sync::Mutex;
let config = Arc::new(Mutex::new(Some("dynamic/topic".to_string())));
let config_clone = config.clone();
let resolver: super::TopicResolverFn =
Arc::new(move || config_clone.lock().unwrap().clone());
assert_eq!(resolver(), Some("dynamic/topic".into()));
*config.lock().unwrap() = None;
assert_eq!(resolver(), None);
}
#[test]
fn test_inbound_connector_link_resolve_topic_default() {
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
let url = ConnectorUrl::parse("mqtt://sensors/temperature").unwrap();
let deserializer: DeserializerFn =
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
let link = InboundConnectorLink::new(url, deserializer);
assert_eq!(link.resolve_topic(), "sensors/temperature");
}
#[test]
fn test_inbound_connector_link_resolve_topic_dynamic() {
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
let url = ConnectorUrl::parse("mqtt://sensors/default").unwrap();
let deserializer: DeserializerFn =
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
let mut link = InboundConnectorLink::new(url, deserializer);
link.topic_resolver = Some(Arc::new(|| Some("sensors/dynamic/kitchen".into())));
assert_eq!(link.resolve_topic(), "sensors/dynamic/kitchen");
}
#[test]
fn test_inbound_connector_link_resolve_topic_fallback() {
use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
let url = ConnectorUrl::parse("mqtt://sensors/fallback").unwrap();
let deserializer: DeserializerFn =
Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
let mut link = InboundConnectorLink::new(url, deserializer);
link.topic_resolver = Some(Arc::new(|| None));
assert_eq!(link.resolve_topic(), "sensors/fallback");
}
}