aimdb_core/connector.rs
1//! Connector infrastructure for external protocol integration
2//!
3//! Provides the `.link()` builder API for ergonomic connector setup with
4//! automatic client lifecycle management. Connectors bridge AimDB records
5//! to external systems (MQTT, Kafka, HTTP, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **User Extensions**: Connector implementations are provided by users
10//! - **Shared Clients**: Single client instance shared across tasks (Arc or static)
11//! - **No Buffering**: Direct access to protocol clients, no intermediate queues
12//! - **Type Safety**: Compile-time guarantees via typed handlers
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use aimdb_core::{RecordConfig, BufferCfg};
18//!
19//! fn weather_alert_record() -> RecordConfig<WeatherAlert> {
20//! RecordConfig::builder()
21//! .buffer(BufferCfg::SingleLatest)
22//! .link_to("mqtt://broker.example.com:1883")
23//! .out::<WeatherAlert>(|reader, mqtt| {
24//! publish_alerts_to_mqtt(reader, mqtt)
25//! })
26//! .build()
27//! }
28//! ```
29
30use core::fmt::{self, Debug};
31use core::future::Future;
32use core::pin::Pin;
33
34extern crate alloc;
35
36use alloc::{
37 boxed::Box,
38 string::{String, ToString},
39 sync::Arc,
40 vec::Vec,
41};
42
43#[cfg(feature = "std")]
44use alloc::format;
45
46use crate::{builder::AimDb, transport::Connector, DbResult};
47
48/// Error that can occur during serialization
49///
50/// Uses an enum instead of String for better performance in `no_std` environments
51/// and to enable defmt logging support in Embassy.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SerializeError {
54 /// Output buffer is too small for the serialized data
55 BufferTooSmall,
56
57 /// Type mismatch in serializer (wrong type passed)
58 TypeMismatch,
59
60 /// Invalid data that cannot be serialized
61 InvalidData,
62}
63
64#[cfg(feature = "defmt")]
65impl defmt::Format for SerializeError {
66 fn format(&self, f: defmt::Formatter) {
67 match self {
68 Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
69 Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
70 Self::InvalidData => defmt::write!(f, "InvalidData"),
71 }
72 }
73}
74
75#[cfg(feature = "std")]
76impl std::fmt::Display for SerializeError {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 match self {
79 Self::BufferTooSmall => write!(f, "Output buffer too small"),
80 Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
81 Self::InvalidData => write!(f, "Invalid data for serialization"),
82 }
83 }
84}
85
86#[cfg(feature = "std")]
87impl std::error::Error for SerializeError {}
88
89/// Type alias for serializer callbacks (reduces type complexity)
90///
91/// Requires the `alloc` feature for `Arc` and `Vec` (available in both std and no_std+alloc).
92/// Serializers convert record values to bytes for publishing to external systems.
93///
94/// # Current Implementation
95///
96/// Returns `Vec<u8>` which requires heap allocation. This works in:
97/// - ✅ `std` environments (full standard library)
98/// - ✅ `no_std + alloc` environments (embedded with allocator, e.g., ESP32, STM32 with heap)
99/// - ❌ `no_std` without `alloc` (bare-metal MCUs without allocator)
100///
101/// # Future Considerations
102///
103/// For zero-allocation embedded environments, future versions may support buffer-based
104/// serialization using `&mut [u8]` output or static lifetime slices.
105pub type SerializerFn =
106 Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
107
108// ============================================================================
109// TopicProvider - Dynamic topic/destination routing
110// ============================================================================
111
112/// Trait for dynamic topic providers (outbound only)
113///
114/// Implement this trait to dynamically determine MQTT topics (or KNX group addresses)
115/// based on the data being published. This enables reusable routing logic that
116/// can be shared across multiple record types.
117///
118/// # Type Safety
119///
120/// The trait is generic over `T`, providing compile-time type safety
121/// at the implementation site. Type erasure occurs only at storage time.
122///
123/// # no_std Compatibility
124///
125/// Works in both `std` and `no_std + alloc` environments.
126///
127/// # Example
128///
129/// ```rust,ignore
130/// use aimdb_core::connector::TopicProvider;
131///
132/// struct SensorTopicProvider;
133///
134/// impl TopicProvider<Temperature> for SensorTopicProvider {
135/// fn topic(&self, value: &Temperature) -> Option<String> {
136/// Some(format!("sensors/temp/{}", value.sensor_id))
137/// }
138/// }
139/// ```
140pub trait TopicProvider<T>: Send + Sync {
141 /// Determine the topic/destination for a given value
142 ///
143 /// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
144 /// to the static topic from the `link_to()` URL.
145 fn topic(&self, value: &T) -> Option<String>;
146}
147
148/// Type-erased topic provider trait (internal)
149///
150/// Allows storing providers for different types in a unified collection.
151/// The concrete type is recovered via `Any::downcast_ref()` at runtime.
152pub trait TopicProviderAny: Send + Sync {
153 /// Get the topic for a type-erased value
154 ///
155 /// Returns `None` if the value type doesn't match or if the provider
156 /// returns `None` for this value.
157 fn topic_any(&self, value: &dyn core::any::Any) -> Option<String>;
158}
159
160/// Wrapper struct for type-erasing a `TopicProvider<T>`
161///
162/// This wraps a concrete `TopicProvider<T>` implementation and provides
163/// the `TopicProviderAny` interface for type-erased storage.
164///
165/// Uses `PhantomData<fn(T) -> T>` instead of `PhantomData<T>` to avoid
166/// inheriting Send/Sync bounds from T (the data type isn't stored).
167pub struct TopicProviderWrapper<T, P>
168where
169 T: 'static,
170 P: TopicProvider<T>,
171{
172 provider: P,
173 // Use fn(T) -> T to avoid Send/Sync variance issues with T
174 _phantom: core::marker::PhantomData<fn(T) -> T>,
175}
176
177impl<T, P> TopicProviderWrapper<T, P>
178where
179 T: 'static,
180 P: TopicProvider<T>,
181{
182 /// Create a new wrapper for a topic provider
183 pub fn new(provider: P) -> Self {
184 Self {
185 provider,
186 _phantom: core::marker::PhantomData,
187 }
188 }
189}
190
191impl<T, P> TopicProviderAny for TopicProviderWrapper<T, P>
192where
193 T: 'static,
194 P: TopicProvider<T> + Send + Sync,
195{
196 fn topic_any(&self, value: &dyn core::any::Any) -> Option<String> {
197 value
198 .downcast_ref::<T>()
199 .and_then(|v| self.provider.topic(v))
200 }
201}
202
203/// Type alias for stored topic provider (no_std compatible)
204///
205/// Uses `Arc<dyn TopicProviderAny>` for shared ownership across async tasks.
206pub type TopicProviderFn = Arc<dyn TopicProviderAny>;
207
208/// Parsed connector URL with protocol, host, port, and credentials
209///
210/// Supports multiple protocol schemes:
211/// - MQTT: `mqtt://host:port`, `mqtts://host:port`
212/// - Kafka: `kafka://broker1:port,broker2:port/topic`
213/// - HTTP: `http://host:port/path`, `https://host:port/path`
214/// - WebSocket: `ws://host:port/path`, `wss://host:port/path`
215#[derive(Clone, Debug, PartialEq)]
216pub struct ConnectorUrl {
217 /// Protocol scheme (mqtt, mqtts, kafka, http, https, ws, wss)
218 pub scheme: String,
219
220 /// Host or comma-separated list of hosts (for Kafka)
221 pub host: String,
222
223 /// Port number (optional, protocol-specific defaults)
224 pub port: Option<u16>,
225
226 /// Path component (for HTTP/WebSocket)
227 pub path: Option<String>,
228
229 /// Username for authentication (optional)
230 pub username: Option<String>,
231
232 /// Password for authentication (optional)
233 pub password: Option<String>,
234
235 /// Query parameters (optional, parsed from URL)
236 pub query_params: Vec<(String, String)>,
237}
238
239impl ConnectorUrl {
240 /// Parses a connector URL string
241 ///
242 /// # Supported Formats
243 ///
244 /// - `mqtt://host:port`
245 /// - `mqtt://user:pass@host:port`
246 /// - `mqtts://host:port` (TLS)
247 /// - `kafka://broker1:9092,broker2:9092/topic`
248 /// - `http://host:port/path`
249 /// - `https://host:port/path?key=value`
250 /// - `ws://host:port/mqtt` (WebSocket)
251 /// - `wss://host:port/mqtt` (WebSocket Secure)
252 ///
253 /// # Example
254 ///
255 /// ```rust
256 /// use aimdb_core::connector::ConnectorUrl;
257 ///
258 /// let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
259 /// assert_eq!(url.scheme, "mqtt");
260 /// assert_eq!(url.host, "broker.example.com");
261 /// assert_eq!(url.port, Some(1883));
262 /// assert_eq!(url.username, Some("user".to_string()));
263 /// ```
264 pub fn parse(url: &str) -> DbResult<Self> {
265 parse_connector_url(url)
266 }
267
268 /// Returns the default port for this protocol scheme
269 pub fn default_port(&self) -> Option<u16> {
270 match self.scheme.as_str() {
271 "mqtt" | "ws" => Some(1883),
272 "mqtts" | "wss" => Some(8883),
273 "kafka" => Some(9092),
274 "http" => Some(80),
275 "https" => Some(443),
276 _ => None,
277 }
278 }
279
280 /// Returns the effective port (explicit or default)
281 pub fn effective_port(&self) -> Option<u16> {
282 self.port.or_else(|| self.default_port())
283 }
284
285 /// Returns true if this is a secure connection (TLS)
286 pub fn is_secure(&self) -> bool {
287 matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
288 }
289
290 /// Returns the URL scheme (protocol)
291 pub fn scheme(&self) -> &str {
292 &self.scheme
293 }
294
295 /// Returns the path component, or "/" if not specified
296 pub fn path(&self) -> &str {
297 self.path.as_deref().unwrap_or("/")
298 }
299
300 /// Returns the resource identifier for protocols where the URL specifies a topic/key
301 ///
302 /// This is designed for the simplified connector model where each connector manages
303 /// a single broker/server connection, and URLs only specify the resource (topic, key, path).
304 ///
305 /// # Examples
306 ///
307 /// - `mqtt://commands/temperature` → `"commands/temperature"` (topic)
308 /// - `mqtt://sensors/temp` → `"sensors/temp"` (topic)
309 /// - `kafka://events` → `"events"` (topic)
310 ///
311 /// The format is `scheme://resource` where resource = host + path combined.
312 pub fn resource_id(&self) -> String {
313 let path = self.path().trim_start_matches('/');
314
315 // Combine host and path to form the complete resource identifier
316 // For mqtt://commands/temperature: host="commands", path="/temperature"
317 // Result: "commands/temperature"
318 if !self.host.is_empty() && !path.is_empty() {
319 alloc::format!("{}/{}", self.host, path)
320 } else if !self.host.is_empty() {
321 self.host.clone()
322 } else if !path.is_empty() {
323 path.to_string()
324 } else {
325 String::new()
326 }
327 }
328}
329
330impl fmt::Display for ConnectorUrl {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 write!(f, "{}://", self.scheme)?;
333
334 if let Some(ref username) = self.username {
335 write!(f, "{}", username)?;
336 if self.password.is_some() {
337 write!(f, ":****")?; // Don't expose password in Display
338 }
339 write!(f, "@")?;
340 }
341
342 write!(f, "{}", self.host)?;
343
344 if let Some(port) = self.port {
345 write!(f, ":{}", port)?;
346 }
347
348 if let Some(ref path) = self.path {
349 if !path.starts_with('/') {
350 write!(f, "/")?;
351 }
352 write!(f, "{}", path)?;
353 }
354
355 Ok(())
356 }
357}
358
359/// Connector client types (type-erased for storage)
360///
361/// This enum allows storing different connector client types in a unified way.
362/// Actual protocol implementations will downcast to their concrete types.
363///
364/// # Design Note
365///
366/// This is intentionally minimal - actual client types are defined by
367/// user extensions. The core only provides the infrastructure.
368///
369/// Works in both `std` and `no_std` (with `alloc`) environments.
370#[derive(Clone)]
371pub enum ConnectorClient {
372 /// MQTT client (protocol-specific, user-provided)
373 Mqtt(Arc<dyn core::any::Any + Send + Sync>),
374
375 /// Kafka producer (protocol-specific, user-provided)
376 Kafka(Arc<dyn core::any::Any + Send + Sync>),
377
378 /// HTTP client (protocol-specific, user-provided)
379 Http(Arc<dyn core::any::Any + Send + Sync>),
380
381 /// Generic connector for custom protocols
382 Generic {
383 protocol: String,
384 client: Arc<dyn core::any::Any + Send + Sync>,
385 },
386}
387
388impl Debug for ConnectorClient {
389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390 match self {
391 ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
392 ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
393 ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
394 ConnectorClient::Generic { protocol, .. } => {
395 write!(f, "ConnectorClient::Generic({})", protocol)
396 }
397 }
398 }
399}
400
401impl ConnectorClient {
402 /// Downcasts to a concrete client type
403 ///
404 /// # Example
405 ///
406 /// ```rust,ignore
407 /// use rumqttc::AsyncClient;
408 ///
409 /// if let Some(mqtt_client) = connector.downcast_ref::<Arc<AsyncClient>>() {
410 /// // Use the MQTT client
411 /// }
412 /// ```
413 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
414 match self {
415 ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
416 ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
417 ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
418 ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
419 }
420 }
421}
422
423/// Configuration for a connector link
424///
425/// Stores the parsed URL and configuration until the record is built.
426/// The actual client creation and handler spawning happens during the build phase.
427#[derive(Clone)]
428pub struct ConnectorLink {
429 /// Parsed connector URL
430 pub url: ConnectorUrl,
431
432 /// Additional configuration options (protocol-specific)
433 pub config: Vec<(String, String)>,
434
435 /// Serialization callback that converts record values to bytes for publishing
436 ///
437 /// This is a type-erased function that takes `&dyn Any` and returns `Result<Vec<u8>, String>`.
438 /// The connector implementation will downcast to the concrete type and call the serializer.
439 ///
440 /// If `None`, the connector must provide a default serialization mechanism or fail.
441 ///
442 /// Available in both `std` and `no_std` (with `alloc` feature) environments.
443 pub serializer: Option<SerializerFn>,
444
445 /// Consumer factory callback (alloc feature)
446 ///
447 /// Creates ConsumerTrait from Arc<AimDb<R>> to enable type-safe subscription.
448 /// The factory captures the record type T at link_to() configuration time,
449 /// allowing the connector to subscribe without knowing T at compile time.
450 ///
451 /// Mirrors the producer_factory pattern used for inbound connectors.
452 ///
453 /// Available in both `std` and `no_std + alloc` environments.
454 #[cfg(feature = "alloc")]
455 pub consumer_factory: Option<ConsumerFactoryFn>,
456
457 /// Optional dynamic topic provider
458 ///
459 /// When set, the provider is called with each value to determine the
460 /// topic/destination dynamically. If the provider returns `None`,
461 /// the static topic from the URL is used as fallback.
462 ///
463 /// Available in both `std` and `no_std + alloc` environments.
464 pub topic_provider: Option<TopicProviderFn>,
465}
466
467impl Debug for ConnectorLink {
468 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469 f.debug_struct("ConnectorLink")
470 .field("url", &self.url)
471 .field("config", &self.config)
472 .field(
473 "serializer",
474 &self.serializer.as_ref().map(|_| "<function>"),
475 )
476 .field(
477 "consumer_factory",
478 #[cfg(feature = "alloc")]
479 &self.consumer_factory.as_ref().map(|_| "<function>"),
480 #[cfg(not(feature = "alloc"))]
481 &None::<()>,
482 )
483 .field(
484 "topic_provider",
485 &self.topic_provider.as_ref().map(|_| "<function>"),
486 )
487 .finish()
488 }
489}
490
491impl ConnectorLink {
492 /// Creates a new connector link from a URL
493 pub fn new(url: ConnectorUrl) -> Self {
494 Self {
495 url,
496 config: Vec::new(),
497 serializer: None,
498 #[cfg(feature = "alloc")]
499 consumer_factory: None,
500 topic_provider: None,
501 }
502 }
503
504 /// Adds a configuration option
505 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
506 self.config.push((key.into(), value.into()));
507 self
508 }
509
510 /// Creates a consumer using the stored factory (alloc feature)
511 ///
512 /// Takes an Arc<dyn Any> (which should contain Arc<AimDb<R>>) and invokes
513 /// the consumer factory to create a ConsumerTrait instance.
514 ///
515 /// Returns None if no factory is configured.
516 ///
517 /// Available in both `std` and `no_std + alloc` environments.
518 #[cfg(feature = "alloc")]
519 pub fn create_consumer(
520 &self,
521 db_any: Arc<dyn core::any::Any + Send + Sync>,
522 ) -> Option<Box<dyn ConsumerTrait>> {
523 self.consumer_factory.as_ref().map(|f| f(db_any))
524 }
525}
526
527/// Type alias for type-erased deserializer callbacks
528///
529/// Converts raw bytes to a boxed Any that can be downcast to the concrete type.
530/// This allows storing deserializers for different types in a unified collection.
531pub type DeserializerFn =
532 Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
533
534/// Type alias for producer factory callback (alloc feature)
535///
536/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
537/// This allows capturing the record type T at link_from() time while storing
538/// the factory in a type-erased InboundConnectorLink.
539///
540/// Available in both `std` and `no_std + alloc` environments.
541#[cfg(feature = "alloc")]
542pub type ProducerFactoryFn =
543 Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait> + Send + Sync>;
544
545/// Topic resolver function for inbound connections (late-binding)
546///
547/// Called once at connector startup to resolve the subscription topic.
548/// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
549/// to the static topic from the `link_from()` URL.
550///
551/// # Use Cases
552///
553/// - Topics determined from smart contracts at runtime
554/// - Service discovery integration
555/// - Environment-specific topic configuration
556/// - Topics read from configuration files or databases
557///
558/// # no_std Compatibility
559///
560/// Works in both `std` and `no_std + alloc` environments.
561pub type TopicResolverFn = Arc<dyn Fn() -> Option<String> + Send + Sync>;
562
563/// Type-erased producer trait for MQTT router
564///
565/// Allows the router to call produce() on different record types without knowing
566/// the concrete type at compile time. The value is passed as Box<dyn Any> and
567/// downcast to the correct type inside the implementation.
568///
569/// # Implementation Note
570///
571/// This trait uses manual futures instead of `#[async_trait]` to enable `no_std`
572/// compatibility. The `async_trait` macro generates code that depends on `std`,
573/// while manual `Pin<Box<dyn Future>>` works in both `std` and `no_std + alloc`.
574pub trait ProducerTrait: Send + Sync {
575 /// Produce a value into the record's buffer
576 ///
577 /// The value must be passed as Box<dyn Any> and will be downcast to the correct type.
578 /// Returns an error if the downcast fails or if production fails.
579 fn produce_any<'a>(
580 &'a self,
581 value: Box<dyn core::any::Any + Send>,
582 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
583}
584
585/// Type alias for consumer factory callback (alloc feature)
586///
587/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ConsumerTrait.
588/// This allows capturing the record type T at link_to() time while storing
589/// the factory in a type-erased ConnectorLink.
590///
591/// Mirrors the ProducerFactoryFn pattern for symmetry between inbound and outbound.
592///
593/// Available in both `std` and `no_std + alloc` environments.
594#[cfg(feature = "alloc")]
595pub type ConsumerFactoryFn =
596 Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ConsumerTrait> + Send + Sync>;
597
598/// Type-erased consumer trait for outbound routing
599///
600/// Mirrors ProducerTrait but for consumption. Allows connectors to subscribe
601/// to typed values without knowing the concrete type T at compile time.
602///
603/// # Implementation Note
604///
605/// Like ProducerTrait, this uses manual futures instead of `#[async_trait]`
606/// to enable `no_std` compatibility.
607pub trait ConsumerTrait: Send + Sync {
608 /// Subscribe to typed values from this record
609 ///
610 /// Returns a type-erased reader that can be polled for Box<dyn Any> values.
611 /// The connector will downcast to the expected type after deserialization.
612 fn subscribe_any<'a>(&'a self) -> SubscribeAnyFuture<'a>;
613}
614
615/// Type alias for the future returned by `ConsumerTrait::subscribe_any`
616type SubscribeAnyFuture<'a> =
617 Pin<Box<dyn Future<Output = DbResult<Box<dyn AnyReader>>> + Send + 'a>>;
618
619/// Type alias for the future returned by `AnyReader::recv_any`
620type RecvAnyFuture<'a> =
621 Pin<Box<dyn Future<Output = DbResult<Box<dyn core::any::Any + Send>>> + Send + 'a>>;
622
623/// Helper trait for type-erased reading
624///
625/// Allows reading values from a buffer without knowing the concrete type at compile time.
626/// The value is returned as Box<dyn Any> and must be downcast by the caller.
627pub trait AnyReader: Send {
628 /// Receive a type-erased value from the buffer
629 ///
630 /// Returns Box<dyn Any> which must be downcast to the concrete type.
631 /// Returns an error if the buffer is closed or an I/O error occurs.
632 fn recv_any<'a>(&'a mut self) -> RecvAnyFuture<'a>;
633}
634
635/// Configuration for an inbound connector link (External → AimDB)
636///
637/// Stores the parsed URL, configuration, deserializer, and a producer creation callback.
638/// The callback captures the type T at creation time, allowing type-safe producer creation
639/// later without needing PhantomData or type parameters.
640pub struct InboundConnectorLink {
641 /// Parsed connector URL
642 pub url: ConnectorUrl,
643
644 /// Additional configuration options (protocol-specific)
645 pub config: Vec<(String, String)>,
646
647 /// Deserialization callback that converts bytes to typed values
648 ///
649 /// This is a type-erased function that takes `&[u8]` and returns
650 /// `Result<Box<dyn Any + Send>, String>`. The spawned task will
651 /// downcast to the concrete type before producing.
652 ///
653 /// Available in both `std` and `no_std` (with `alloc` feature) environments.
654 pub deserializer: DeserializerFn,
655
656 /// Producer creation callback (alloc feature)
657 ///
658 /// Takes Arc<AimDb<R>> and returns Box<dyn ProducerTrait>.
659 /// Captures the record type T at link_from() call time.
660 ///
661 /// Available in both `std` and `no_std + alloc` environments.
662 #[cfg(feature = "alloc")]
663 pub producer_factory: Option<ProducerFactoryFn>,
664
665 /// Optional dynamic topic resolver (late-binding)
666 ///
667 /// Called once at connector startup to determine the subscription topic.
668 /// If the resolver returns `None`, the static topic from the URL is used.
669 ///
670 /// Available in both `std` and `no_std + alloc` environments.
671 pub topic_resolver: Option<TopicResolverFn>,
672}
673
674impl Clone for InboundConnectorLink {
675 fn clone(&self) -> Self {
676 Self {
677 url: self.url.clone(),
678 config: self.config.clone(),
679 deserializer: self.deserializer.clone(),
680 #[cfg(feature = "alloc")]
681 producer_factory: self.producer_factory.clone(),
682 topic_resolver: self.topic_resolver.clone(),
683 }
684 }
685}
686
687impl Debug for InboundConnectorLink {
688 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689 f.debug_struct("InboundConnectorLink")
690 .field("url", &self.url)
691 .field("config", &self.config)
692 .field("deserializer", &"<function>")
693 .field(
694 "topic_resolver",
695 &self.topic_resolver.as_ref().map(|_| "<function>"),
696 )
697 .finish()
698 }
699}
700
701impl InboundConnectorLink {
702 /// Creates a new inbound connector link from a URL and deserializer
703 pub fn new(url: ConnectorUrl, deserializer: DeserializerFn) -> Self {
704 Self {
705 url,
706 config: Vec::new(),
707 deserializer,
708 #[cfg(feature = "alloc")]
709 producer_factory: None,
710 topic_resolver: None,
711 }
712 }
713
714 /// Sets the producer factory callback (alloc feature)
715 ///
716 /// Available in both `std` and `no_std + alloc` environments.
717 #[cfg(feature = "alloc")]
718 pub fn with_producer_factory<F>(mut self, factory: F) -> Self
719 where
720 F: Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait>
721 + Send
722 + Sync
723 + 'static,
724 {
725 self.producer_factory = Some(Arc::new(factory));
726 self
727 }
728
729 /// Creates a producer using the stored factory (alloc feature)
730 ///
731 /// Available in both `std` and `no_std + alloc` environments.
732 #[cfg(feature = "alloc")]
733 pub fn create_producer(
734 &self,
735 db_any: Arc<dyn core::any::Any + Send + Sync>,
736 ) -> Option<Box<dyn ProducerTrait>> {
737 self.producer_factory.as_ref().map(|f| f(db_any))
738 }
739
740 /// Resolves the subscription topic for this link
741 ///
742 /// If a topic resolver is configured, calls it to determine the topic.
743 /// Otherwise, returns the static topic from the URL.
744 ///
745 /// This is called once at connector startup.
746 pub fn resolve_topic(&self) -> String {
747 self.topic_resolver
748 .as_ref()
749 .and_then(|resolver| resolver())
750 .unwrap_or_else(|| self.url.resource_id())
751 }
752}
753
754/// Configuration for an outbound connector link (AimDB → External)
755pub struct OutboundConnectorLink {
756 pub url: ConnectorUrl,
757 pub config: Vec<(String, String)>,
758}
759
760/// Parses a connector URL string into structured components
761///
762/// This is a simple parser that handles the most common URL formats.
763/// For production use, consider using the `url` crate with feature flags.
764fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
765 use crate::DbError;
766
767 // Split scheme from rest
768 let (scheme, rest) = url.split_once("://").ok_or({
769 #[cfg(feature = "std")]
770 {
771 DbError::InvalidOperation {
772 operation: "parse_connector_url".into(),
773 reason: format!("Missing scheme in URL: {}", url),
774 }
775 }
776 #[cfg(not(feature = "std"))]
777 {
778 DbError::InvalidOperation {
779 _operation: (),
780 _reason: (),
781 }
782 }
783 })?;
784
785 // Extract credentials if present (user:pass@host)
786 let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
787 let creds = &rest[..at_idx];
788 let host = &rest[at_idx + 1..];
789 (Some(creds), host)
790 } else {
791 (None, rest)
792 };
793
794 let (username, password) = if let Some(creds) = credentials {
795 if let Some((user, pass)) = creds.split_once(':') {
796 (Some(user.to_string()), Some(pass.to_string()))
797 } else {
798 (Some(creds.to_string()), None)
799 }
800 } else {
801 (None, None)
802 };
803
804 // Split path and query from host:port
805 let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
806 let hp = &host_part[..slash_idx];
807 let path_query = &host_part[slash_idx..];
808
809 // Split query parameters
810 let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
811 (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
812 } else {
813 (path_query, None)
814 };
815
816 // Parse query parameters
817 let params = if let Some(query) = query_part {
818 query
819 .split('&')
820 .filter_map(|pair| {
821 let (k, v) = pair.split_once('=')?;
822 Some((k.to_string(), v.to_string()))
823 })
824 .collect()
825 } else {
826 Vec::new()
827 };
828
829 (hp, Some(path_part.to_string()), params)
830 } else {
831 (host_part, None, Vec::new())
832 };
833
834 // Split host and port
835 let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
836 let h = &host_port[..colon_idx];
837 let p = &host_port[colon_idx + 1..];
838 let port_num = p.parse::<u16>().ok();
839 (h.to_string(), port_num)
840 } else {
841 (host_port.to_string(), None)
842 };
843
844 Ok(ConnectorUrl {
845 scheme: scheme.to_string(),
846 host,
847 port,
848 path,
849 username,
850 password,
851 query_params,
852 })
853}
854
855/// Trait for building connectors after the database is constructed
856///
857/// Connectors that need to collect routes from the database (for inbound routing)
858/// implement this trait. The builder pattern allows connectors to be constructed
859/// in two phases:
860///
861/// 1. Configuration phase: User provides broker URLs and settings
862/// 2. Build phase: Connector collects routes from the database and initializes
863///
864/// # Example
865///
866/// ```rust,ignore
867/// pub struct MqttConnectorBuilder {
868/// broker_url: String,
869/// }
870///
871/// impl<R> ConnectorBuilder<R> for MqttConnectorBuilder
872/// where
873/// R: aimdb_executor::Spawn + 'static,
874/// {
875/// fn build<'a>(
876/// &'a self,
877/// db: &'a AimDb<R>,
878/// ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>> {
879/// Box::pin(async move {
880/// let routes = db.collect_inbound_routes(self.scheme());
881/// let router = RouterBuilder::from_routes(routes).build();
882/// let connector = MqttConnector::new(&self.broker_url, router).await?;
883/// Ok(Arc::new(connector) as Arc<dyn Connector>)
884/// })
885/// }
886///
887/// fn scheme(&self) -> &str {
888/// "mqtt"
889/// }
890/// }
891/// ```
892pub trait ConnectorBuilder<R>: Send + Sync
893where
894 R: aimdb_executor::Spawn + 'static,
895{
896 /// Build the connector using the database
897 ///
898 /// This method is called during `AimDbBuilder::build()` after the database
899 /// has been constructed. The builder can use the database to:
900 /// - Collect inbound routes via `db.collect_inbound_routes()`
901 /// - Access database configuration
902 /// - Register subscriptions
903 ///
904 /// # Arguments
905 /// * `db` - The constructed database instance
906 ///
907 /// # Returns
908 /// An `Arc<dyn Connector>` that will be registered with the database
909 #[allow(clippy::type_complexity)]
910 fn build<'a>(
911 &'a self,
912 db: &'a AimDb<R>,
913 ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
914
915 /// The URL scheme this connector handles
916 ///
917 /// Returns the scheme (e.g., "mqtt", "kafka", "http") that this connector
918 /// will be registered under. Used for routing `.link_from()` and `.link_to()`
919 /// declarations to the appropriate connector.
920 fn scheme(&self) -> &str;
921}
922
923#[cfg(test)]
924mod tests {
925 use super::*;
926 use alloc::format;
927
928 #[test]
929 fn test_parse_simple_mqtt() {
930 let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
931 assert_eq!(url.scheme, "mqtt");
932 assert_eq!(url.host, "broker.example.com");
933 assert_eq!(url.port, Some(1883));
934 assert_eq!(url.username, None);
935 assert_eq!(url.password, None);
936 }
937
938 #[test]
939 fn test_parse_mqtt_with_credentials() {
940 let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
941 assert_eq!(url.scheme, "mqtt");
942 assert_eq!(url.host, "broker.example.com");
943 assert_eq!(url.port, Some(1883));
944 assert_eq!(url.username, Some("user".to_string()));
945 assert_eq!(url.password, Some("pass".to_string()));
946 }
947
948 #[test]
949 fn test_parse_https_with_path() {
950 let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
951 assert_eq!(url.scheme, "https");
952 assert_eq!(url.host, "api.example.com");
953 assert_eq!(url.port, Some(8443));
954 assert_eq!(url.path, Some("/events".to_string()));
955 }
956
957 #[test]
958 fn test_parse_with_query_params() {
959 let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
960 assert_eq!(url.scheme, "http");
961 assert_eq!(url.host, "api.example.com");
962 assert_eq!(url.path, Some("/data".to_string()));
963 assert_eq!(url.query_params.len(), 2);
964 assert_eq!(
965 url.query_params[0],
966 ("key".to_string(), "value".to_string())
967 );
968 assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
969 }
970
971 #[test]
972 fn test_default_ports() {
973 let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
974 assert_eq!(mqtt.default_port(), Some(1883));
975 assert_eq!(mqtt.effective_port(), Some(1883));
976
977 let https = ConnectorUrl::parse("https://api.example.com").unwrap();
978 assert_eq!(https.default_port(), Some(443));
979 assert_eq!(https.effective_port(), Some(443));
980 }
981
982 #[test]
983 fn test_is_secure() {
984 assert!(ConnectorUrl::parse("mqtts://broker.local")
985 .unwrap()
986 .is_secure());
987 assert!(ConnectorUrl::parse("https://api.example.com")
988 .unwrap()
989 .is_secure());
990 assert!(ConnectorUrl::parse("wss://ws.example.com")
991 .unwrap()
992 .is_secure());
993
994 assert!(!ConnectorUrl::parse("mqtt://broker.local")
995 .unwrap()
996 .is_secure());
997 assert!(!ConnectorUrl::parse("http://api.example.com")
998 .unwrap()
999 .is_secure());
1000 assert!(!ConnectorUrl::parse("ws://ws.example.com")
1001 .unwrap()
1002 .is_secure());
1003 }
1004
1005 #[test]
1006 fn test_display_hides_password() {
1007 let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
1008 let display = format!("{}", url);
1009 assert!(display.contains("user:****"));
1010 assert!(!display.contains("secret"));
1011 }
1012
1013 #[test]
1014 fn test_parse_kafka_style() {
1015 let url =
1016 ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
1017 assert_eq!(url.scheme, "kafka");
1018 // Note: Our simple parser doesn't handle the second port in comma-separated hosts perfectly
1019 // It parses "broker1.local:9092,broker2.local" as the host and "9092" as the port
1020 // This is acceptable for now - production connectors can handle this in their client factories
1021 assert!(url.host.contains("broker1.local"));
1022 assert!(url.host.contains("broker2.local"));
1023 assert_eq!(url.path, Some("/my-topic".to_string()));
1024 }
1025
1026 #[test]
1027 fn test_parse_missing_scheme() {
1028 let result = ConnectorUrl::parse("broker.example.com:1883");
1029 assert!(result.is_err());
1030 }
1031
1032 // ========================================================================
1033 // TopicProvider Tests
1034 // ========================================================================
1035
1036 #[allow(dead_code)]
1037 #[derive(Debug, Clone)]
1038 struct TestTemperature {
1039 sensor_id: String,
1040 celsius: f32,
1041 }
1042
1043 struct TestTopicProvider;
1044
1045 impl super::TopicProvider<TestTemperature> for TestTopicProvider {
1046 fn topic(&self, value: &TestTemperature) -> Option<String> {
1047 Some(format!("sensors/temp/{}", value.sensor_id))
1048 }
1049 }
1050
1051 #[test]
1052 fn test_topic_provider_type_erasure() {
1053 use super::{TopicProviderAny, TopicProviderWrapper};
1054
1055 let provider: Arc<dyn TopicProviderAny> =
1056 Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1057 let temp = TestTemperature {
1058 sensor_id: "kitchen-001".into(),
1059 celsius: 22.5,
1060 };
1061
1062 assert_eq!(
1063 provider.topic_any(&temp),
1064 Some("sensors/temp/kitchen-001".into())
1065 );
1066 }
1067
1068 #[test]
1069 fn test_topic_provider_type_mismatch() {
1070 use super::{TopicProviderAny, TopicProviderWrapper};
1071
1072 let provider: Arc<dyn TopicProviderAny> =
1073 Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1074 let wrong_type = "not a temperature";
1075
1076 // Type mismatch returns None (falls back to default topic)
1077 assert_eq!(provider.topic_any(&wrong_type), None);
1078 }
1079
1080 #[test]
1081 fn test_topic_provider_returns_none() {
1082 struct OptionalTopicProvider;
1083
1084 impl super::TopicProvider<TestTemperature> for OptionalTopicProvider {
1085 fn topic(&self, temp: &TestTemperature) -> Option<String> {
1086 if temp.sensor_id.is_empty() {
1087 None // Fall back to default topic
1088 } else {
1089 Some(format!("sensors/{}", temp.sensor_id))
1090 }
1091 }
1092 }
1093
1094 use super::{TopicProviderAny, TopicProviderWrapper};
1095
1096 let provider: Arc<dyn TopicProviderAny> =
1097 Arc::new(TopicProviderWrapper::new(OptionalTopicProvider));
1098
1099 // Non-empty sensor_id returns dynamic topic
1100 let temp_with_id = TestTemperature {
1101 sensor_id: "abc".into(),
1102 celsius: 20.0,
1103 };
1104 assert_eq!(
1105 provider.topic_any(&temp_with_id),
1106 Some("sensors/abc".into())
1107 );
1108
1109 // Empty sensor_id returns None (fallback)
1110 let temp_without_id = TestTemperature {
1111 sensor_id: String::new(),
1112 celsius: 20.0,
1113 };
1114 assert_eq!(provider.topic_any(&temp_without_id), None);
1115 }
1116
1117 // ========================================================================
1118 // TopicResolverFn Tests
1119 // ========================================================================
1120
1121 #[test]
1122 fn test_topic_resolver_returns_some() {
1123 let resolver: super::TopicResolverFn = Arc::new(|| Some("resolved/topic".into()));
1124
1125 assert_eq!(resolver(), Some("resolved/topic".into()));
1126 }
1127
1128 #[test]
1129 fn test_topic_resolver_returns_none() {
1130 let resolver: super::TopicResolverFn = Arc::new(|| None);
1131
1132 // Returns None, should fall back to default topic
1133 assert_eq!(resolver(), None);
1134 }
1135
1136 #[cfg(feature = "std")]
1137 #[test]
1138 fn test_topic_resolver_with_captured_state() {
1139 use std::sync::Mutex;
1140
1141 let config = Arc::new(Mutex::new(Some("dynamic/topic".to_string())));
1142 let config_clone = config.clone();
1143
1144 let resolver: super::TopicResolverFn =
1145 Arc::new(move || config_clone.lock().unwrap().clone());
1146
1147 assert_eq!(resolver(), Some("dynamic/topic".into()));
1148
1149 // Clear config
1150 *config.lock().unwrap() = None;
1151 assert_eq!(resolver(), None);
1152 }
1153
1154 #[test]
1155 fn test_inbound_connector_link_resolve_topic_default() {
1156 use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1157
1158 let url = ConnectorUrl::parse("mqtt://sensors/temperature").unwrap();
1159 let deserializer: DeserializerFn =
1160 Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1161 let link = InboundConnectorLink::new(url, deserializer);
1162
1163 // No resolver configured, should return static topic from URL
1164 assert_eq!(link.resolve_topic(), "sensors/temperature");
1165 }
1166
1167 #[test]
1168 fn test_inbound_connector_link_resolve_topic_dynamic() {
1169 use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1170
1171 let url = ConnectorUrl::parse("mqtt://sensors/default").unwrap();
1172 let deserializer: DeserializerFn =
1173 Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1174 let mut link = InboundConnectorLink::new(url, deserializer);
1175
1176 // Configure dynamic resolver
1177 link.topic_resolver = Some(Arc::new(|| Some("sensors/dynamic/kitchen".into())));
1178
1179 // Should return resolved topic, not URL topic
1180 assert_eq!(link.resolve_topic(), "sensors/dynamic/kitchen");
1181 }
1182
1183 #[test]
1184 fn test_inbound_connector_link_resolve_topic_fallback() {
1185 use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1186
1187 let url = ConnectorUrl::parse("mqtt://sensors/fallback").unwrap();
1188 let deserializer: DeserializerFn =
1189 Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1190 let mut link = InboundConnectorLink::new(url, deserializer);
1191
1192 // Configure resolver that returns None
1193 link.topic_resolver = Some(Arc::new(|| None));
1194
1195 // Should fall back to static topic from URL
1196 assert_eq!(link.resolve_topic(), "sensors/fallback");
1197 }
1198}