Skip to main content

aimdb_core/
connector.rs

1//! Connector infrastructure for external protocol integration
2//!
3//! Provides the `.link()` builder API for ergonomic connector setup with
4//! automatic client lifecycle management. Connectors bridge AimDB records
5//! to external systems (MQTT, Kafka, HTTP, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **User Extensions**: Connector implementations are provided by users
10//! - **Shared Clients**: Single client instance shared across tasks (Arc or static)
11//! - **No Buffering**: Direct access to protocol clients, no intermediate queues
12//! - **Type Safety**: Compile-time guarantees via typed handlers
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use aimdb_core::{RecordConfig, BufferCfg};
18//!
19//! fn weather_alert_record() -> RecordConfig<WeatherAlert> {
20//!     RecordConfig::builder()
21//!         .buffer(BufferCfg::SingleLatest)
22//!         .link_to("mqtt://broker.example.com:1883")
23//!             .out::<WeatherAlert>(|reader, mqtt| {
24//!                 publish_alerts_to_mqtt(reader, mqtt)
25//!             })
26//!         .build()
27//! }
28//! ```
29
30use core::fmt::{self, Debug};
31use core::future::Future;
32use core::pin::Pin;
33
34extern crate alloc;
35
36use alloc::{
37    boxed::Box,
38    string::{String, ToString},
39    sync::Arc,
40    vec::Vec,
41};
42
43#[cfg(feature = "std")]
44use alloc::format;
45
46use crate::{builder::AimDb, transport::Connector, DbResult};
47
48/// Error that can occur during serialization
49///
50/// Uses an enum instead of String for better performance in `no_std` environments
51/// and to enable defmt logging support in Embassy.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SerializeError {
54    /// Output buffer is too small for the serialized data
55    BufferTooSmall,
56
57    /// Type mismatch in serializer (wrong type passed)
58    TypeMismatch,
59
60    /// Invalid data that cannot be serialized
61    InvalidData,
62}
63
64#[cfg(feature = "defmt")]
65impl defmt::Format for SerializeError {
66    fn format(&self, f: defmt::Formatter) {
67        match self {
68            Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
69            Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
70            Self::InvalidData => defmt::write!(f, "InvalidData"),
71        }
72    }
73}
74
75#[cfg(feature = "std")]
76impl std::fmt::Display for SerializeError {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        match self {
79            Self::BufferTooSmall => write!(f, "Output buffer too small"),
80            Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
81            Self::InvalidData => write!(f, "Invalid data for serialization"),
82        }
83    }
84}
85
86#[cfg(feature = "std")]
87impl std::error::Error for SerializeError {}
88
89/// Type alias for serializer callbacks (reduces type complexity)
90///
91/// Requires the `alloc` feature for `Arc` and `Vec` (available in both std and no_std+alloc).
92/// Serializers convert record values to bytes for publishing to external systems.
93///
94/// # Current Implementation
95///
96/// Returns `Vec<u8>` which requires heap allocation. This works in:
97/// - ✅ `std` environments (full standard library)
98/// - ✅ `no_std + alloc` environments (embedded with allocator, e.g., ESP32, STM32 with heap)
99/// - ❌ `no_std` without `alloc` (bare-metal MCUs without allocator)
100///
101/// # Future Considerations
102///
103/// For zero-allocation embedded environments, future versions may support buffer-based
104/// serialization using `&mut [u8]` output or static lifetime slices.
105pub type SerializerFn =
106    Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
107
108// ============================================================================
109// TopicProvider - Dynamic topic/destination routing
110// ============================================================================
111
112/// Trait for dynamic topic providers (outbound only)
113///
114/// Implement this trait to dynamically determine MQTT topics (or KNX group addresses)
115/// based on the data being published. This enables reusable routing logic that
116/// can be shared across multiple record types.
117///
118/// # Type Safety
119///
120/// The trait is generic over `T`, providing compile-time type safety
121/// at the implementation site. Type erasure occurs only at storage time.
122///
123/// # no_std Compatibility
124///
125/// Works in both `std` and `no_std + alloc` environments.
126///
127/// # Example
128///
129/// ```rust,ignore
130/// use aimdb_core::connector::TopicProvider;
131///
132/// struct SensorTopicProvider;
133///
134/// impl TopicProvider<Temperature> for SensorTopicProvider {
135///     fn topic(&self, value: &Temperature) -> Option<String> {
136///         Some(format!("sensors/temp/{}", value.sensor_id))
137///     }
138/// }
139/// ```
140pub trait TopicProvider<T>: Send + Sync {
141    /// Determine the topic/destination for a given value
142    ///
143    /// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
144    /// to the static topic from the `link_to()` URL.
145    fn topic(&self, value: &T) -> Option<String>;
146}
147
148/// Type-erased topic provider trait (internal)
149///
150/// Allows storing providers for different types in a unified collection.
151/// The concrete type is recovered via `Any::downcast_ref()` at runtime.
152pub trait TopicProviderAny: Send + Sync {
153    /// Get the topic for a type-erased value
154    ///
155    /// Returns `None` if the value type doesn't match or if the provider
156    /// returns `None` for this value.
157    fn topic_any(&self, value: &dyn core::any::Any) -> Option<String>;
158}
159
160/// Wrapper struct for type-erasing a `TopicProvider<T>`
161///
162/// This wraps a concrete `TopicProvider<T>` implementation and provides
163/// the `TopicProviderAny` interface for type-erased storage.
164///
165/// Uses `PhantomData<fn(T) -> T>` instead of `PhantomData<T>` to avoid
166/// inheriting Send/Sync bounds from T (the data type isn't stored).
167pub struct TopicProviderWrapper<T, P>
168where
169    T: 'static,
170    P: TopicProvider<T>,
171{
172    provider: P,
173    // Use fn(T) -> T to avoid Send/Sync variance issues with T
174    _phantom: core::marker::PhantomData<fn(T) -> T>,
175}
176
177impl<T, P> TopicProviderWrapper<T, P>
178where
179    T: 'static,
180    P: TopicProvider<T>,
181{
182    /// Create a new wrapper for a topic provider
183    pub fn new(provider: P) -> Self {
184        Self {
185            provider,
186            _phantom: core::marker::PhantomData,
187        }
188    }
189}
190
191impl<T, P> TopicProviderAny for TopicProviderWrapper<T, P>
192where
193    T: 'static,
194    P: TopicProvider<T> + Send + Sync,
195{
196    fn topic_any(&self, value: &dyn core::any::Any) -> Option<String> {
197        value
198            .downcast_ref::<T>()
199            .and_then(|v| self.provider.topic(v))
200    }
201}
202
203/// Type alias for stored topic provider (no_std compatible)
204///
205/// Uses `Arc<dyn TopicProviderAny>` for shared ownership across async tasks.
206pub type TopicProviderFn = Arc<dyn TopicProviderAny>;
207
208/// Parsed connector URL with protocol, host, port, and credentials
209///
210/// Supports multiple protocol schemes:
211/// - MQTT: `mqtt://host:port`, `mqtts://host:port`
212/// - Kafka: `kafka://broker1:port,broker2:port/topic`
213/// - HTTP: `http://host:port/path`, `https://host:port/path`
214/// - WebSocket: `ws://host:port/path`, `wss://host:port/path`
215#[derive(Clone, Debug, PartialEq)]
216pub struct ConnectorUrl {
217    /// Protocol scheme (mqtt, mqtts, kafka, http, https, ws, wss)
218    pub scheme: String,
219
220    /// Host or comma-separated list of hosts (for Kafka)
221    pub host: String,
222
223    /// Port number (optional, protocol-specific defaults)
224    pub port: Option<u16>,
225
226    /// Path component (for HTTP/WebSocket)
227    pub path: Option<String>,
228
229    /// Username for authentication (optional)
230    pub username: Option<String>,
231
232    /// Password for authentication (optional)
233    pub password: Option<String>,
234
235    /// Query parameters (optional, parsed from URL)
236    pub query_params: Vec<(String, String)>,
237}
238
239impl ConnectorUrl {
240    /// Parses a connector URL string
241    ///
242    /// # Supported Formats
243    ///
244    /// - `mqtt://host:port`
245    /// - `mqtt://user:pass@host:port`
246    /// - `mqtts://host:port` (TLS)
247    /// - `kafka://broker1:9092,broker2:9092/topic`
248    /// - `http://host:port/path`
249    /// - `https://host:port/path?key=value`
250    /// - `ws://host:port/mqtt` (WebSocket)
251    /// - `wss://host:port/mqtt` (WebSocket Secure)
252    ///
253    /// # Example
254    ///
255    /// ```rust
256    /// use aimdb_core::connector::ConnectorUrl;
257    ///
258    /// let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
259    /// assert_eq!(url.scheme, "mqtt");
260    /// assert_eq!(url.host, "broker.example.com");
261    /// assert_eq!(url.port, Some(1883));
262    /// assert_eq!(url.username, Some("user".to_string()));
263    /// ```
264    pub fn parse(url: &str) -> DbResult<Self> {
265        parse_connector_url(url)
266    }
267
268    /// Returns the default port for this protocol scheme
269    pub fn default_port(&self) -> Option<u16> {
270        match self.scheme.as_str() {
271            "mqtt" | "ws" => Some(1883),
272            "mqtts" | "wss" => Some(8883),
273            "kafka" => Some(9092),
274            "http" => Some(80),
275            "https" => Some(443),
276            _ => None,
277        }
278    }
279
280    /// Returns the effective port (explicit or default)
281    pub fn effective_port(&self) -> Option<u16> {
282        self.port.or_else(|| self.default_port())
283    }
284
285    /// Returns true if this is a secure connection (TLS)
286    pub fn is_secure(&self) -> bool {
287        matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
288    }
289
290    /// Returns the URL scheme (protocol)
291    pub fn scheme(&self) -> &str {
292        &self.scheme
293    }
294
295    /// Returns the path component, or "/" if not specified
296    pub fn path(&self) -> &str {
297        self.path.as_deref().unwrap_or("/")
298    }
299
300    /// Returns the resource identifier for protocols where the URL specifies a topic/key
301    ///
302    /// This is designed for the simplified connector model where each connector manages
303    /// a single broker/server connection, and URLs only specify the resource (topic, key, path).
304    ///
305    /// # Examples
306    ///
307    /// - `mqtt://commands/temperature` → `"commands/temperature"` (topic)
308    /// - `mqtt://sensors/temp` → `"sensors/temp"` (topic)
309    /// - `kafka://events` → `"events"` (topic)
310    ///
311    /// The format is `scheme://resource` where resource = host + path combined.
312    pub fn resource_id(&self) -> String {
313        let path = self.path().trim_start_matches('/');
314
315        // Combine host and path to form the complete resource identifier
316        // For mqtt://commands/temperature: host="commands", path="/temperature"
317        // Result: "commands/temperature"
318        if !self.host.is_empty() && !path.is_empty() {
319            alloc::format!("{}/{}", self.host, path)
320        } else if !self.host.is_empty() {
321            self.host.clone()
322        } else if !path.is_empty() {
323            path.to_string()
324        } else {
325            String::new()
326        }
327    }
328}
329
330impl fmt::Display for ConnectorUrl {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        write!(f, "{}://", self.scheme)?;
333
334        if let Some(ref username) = self.username {
335            write!(f, "{}", username)?;
336            if self.password.is_some() {
337                write!(f, ":****")?; // Don't expose password in Display
338            }
339            write!(f, "@")?;
340        }
341
342        write!(f, "{}", self.host)?;
343
344        if let Some(port) = self.port {
345            write!(f, ":{}", port)?;
346        }
347
348        if let Some(ref path) = self.path {
349            if !path.starts_with('/') {
350                write!(f, "/")?;
351            }
352            write!(f, "{}", path)?;
353        }
354
355        Ok(())
356    }
357}
358
359/// Connector client types (type-erased for storage)
360///
361/// This enum allows storing different connector client types in a unified way.
362/// Actual protocol implementations will downcast to their concrete types.
363///
364/// # Design Note
365///
366/// This is intentionally minimal - actual client types are defined by
367/// user extensions. The core only provides the infrastructure.
368///
369/// Works in both `std` and `no_std` (with `alloc`) environments.
370#[derive(Clone)]
371pub enum ConnectorClient {
372    /// MQTT client (protocol-specific, user-provided)
373    Mqtt(Arc<dyn core::any::Any + Send + Sync>),
374
375    /// Kafka producer (protocol-specific, user-provided)
376    Kafka(Arc<dyn core::any::Any + Send + Sync>),
377
378    /// HTTP client (protocol-specific, user-provided)
379    Http(Arc<dyn core::any::Any + Send + Sync>),
380
381    /// Generic connector for custom protocols
382    Generic {
383        protocol: String,
384        client: Arc<dyn core::any::Any + Send + Sync>,
385    },
386}
387
388impl Debug for ConnectorClient {
389    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390        match self {
391            ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
392            ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
393            ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
394            ConnectorClient::Generic { protocol, .. } => {
395                write!(f, "ConnectorClient::Generic({})", protocol)
396            }
397        }
398    }
399}
400
401impl ConnectorClient {
402    /// Downcasts to a concrete client type
403    ///
404    /// # Example
405    ///
406    /// ```rust,ignore
407    /// use rumqttc::AsyncClient;
408    ///
409    /// if let Some(mqtt_client) = connector.downcast_ref::<Arc<AsyncClient>>() {
410    ///     // Use the MQTT client
411    /// }
412    /// ```
413    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
414        match self {
415            ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
416            ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
417            ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
418            ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
419        }
420    }
421}
422
423/// Configuration for a connector link
424///
425/// Stores the parsed URL and configuration until the record is built.
426/// The actual client creation and handler spawning happens during the build phase.
427#[derive(Clone)]
428pub struct ConnectorLink {
429    /// Parsed connector URL
430    pub url: ConnectorUrl,
431
432    /// Additional configuration options (protocol-specific)
433    pub config: Vec<(String, String)>,
434
435    /// Serialization callback that converts record values to bytes for publishing
436    ///
437    /// This is a type-erased function that takes `&dyn Any` and returns `Result<Vec<u8>, String>`.
438    /// The connector implementation will downcast to the concrete type and call the serializer.
439    ///
440    /// If `None`, the connector must provide a default serialization mechanism or fail.
441    ///
442    /// Available in both `std` and `no_std` (with `alloc` feature) environments.
443    pub serializer: Option<SerializerFn>,
444
445    /// Consumer factory callback (alloc feature)
446    ///
447    /// Creates ConsumerTrait from Arc<AimDb<R>> to enable type-safe subscription.
448    /// The factory captures the record type T at link_to() configuration time,
449    /// allowing the connector to subscribe without knowing T at compile time.
450    ///
451    /// Mirrors the producer_factory pattern used for inbound connectors.
452    ///
453    /// Available in both `std` and `no_std + alloc` environments.
454    #[cfg(feature = "alloc")]
455    pub consumer_factory: Option<ConsumerFactoryFn>,
456
457    /// Optional dynamic topic provider
458    ///
459    /// When set, the provider is called with each value to determine the
460    /// topic/destination dynamically. If the provider returns `None`,
461    /// the static topic from the URL is used as fallback.
462    ///
463    /// Available in both `std` and `no_std + alloc` environments.
464    pub topic_provider: Option<TopicProviderFn>,
465}
466
467impl Debug for ConnectorLink {
468    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469        f.debug_struct("ConnectorLink")
470            .field("url", &self.url)
471            .field("config", &self.config)
472            .field(
473                "serializer",
474                &self.serializer.as_ref().map(|_| "<function>"),
475            )
476            .field(
477                "consumer_factory",
478                #[cfg(feature = "alloc")]
479                &self.consumer_factory.as_ref().map(|_| "<function>"),
480                #[cfg(not(feature = "alloc"))]
481                &None::<()>,
482            )
483            .field(
484                "topic_provider",
485                &self.topic_provider.as_ref().map(|_| "<function>"),
486            )
487            .finish()
488    }
489}
490
491impl ConnectorLink {
492    /// Creates a new connector link from a URL
493    pub fn new(url: ConnectorUrl) -> Self {
494        Self {
495            url,
496            config: Vec::new(),
497            serializer: None,
498            #[cfg(feature = "alloc")]
499            consumer_factory: None,
500            topic_provider: None,
501        }
502    }
503
504    /// Adds a configuration option
505    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
506        self.config.push((key.into(), value.into()));
507        self
508    }
509
510    /// Creates a consumer using the stored factory (alloc feature)
511    ///
512    /// Takes an Arc<dyn Any> (which should contain Arc<AimDb<R>>) and invokes
513    /// the consumer factory to create a ConsumerTrait instance.
514    ///
515    /// Returns None if no factory is configured.
516    ///
517    /// Available in both `std` and `no_std + alloc` environments.
518    #[cfg(feature = "alloc")]
519    pub fn create_consumer(
520        &self,
521        db_any: Arc<dyn core::any::Any + Send + Sync>,
522    ) -> Option<Box<dyn ConsumerTrait>> {
523        self.consumer_factory.as_ref().map(|f| f(db_any))
524    }
525}
526
527/// Type alias for type-erased deserializer callbacks
528///
529/// Converts raw bytes to a boxed Any that can be downcast to the concrete type.
530/// This allows storing deserializers for different types in a unified collection.
531pub type DeserializerFn =
532    Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
533
534/// Type alias for producer factory callback (alloc feature)
535///
536/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
537/// This allows capturing the record type T at link_from() time while storing
538/// the factory in a type-erased InboundConnectorLink.
539///
540/// Available in both `std` and `no_std + alloc` environments.
541#[cfg(feature = "alloc")]
542pub type ProducerFactoryFn =
543    Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait> + Send + Sync>;
544
545/// Topic resolver function for inbound connections (late-binding)
546///
547/// Called once at connector startup to resolve the subscription topic.
548/// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
549/// to the static topic from the `link_from()` URL.
550///
551/// # Use Cases
552///
553/// - Topics determined from smart contracts at runtime
554/// - Service discovery integration
555/// - Environment-specific topic configuration
556/// - Topics read from configuration files or databases
557///
558/// # no_std Compatibility
559///
560/// Works in both `std` and `no_std + alloc` environments.
561pub type TopicResolverFn = Arc<dyn Fn() -> Option<String> + Send + Sync>;
562
563/// Type-erased producer trait for MQTT router
564///
565/// Allows the router to call produce() on different record types without knowing
566/// the concrete type at compile time. The value is passed as Box<dyn Any> and
567/// downcast to the correct type inside the implementation.
568///
569/// # Implementation Note
570///
571/// This trait uses manual futures instead of `#[async_trait]` to enable `no_std`
572/// compatibility. The `async_trait` macro generates code that depends on `std`,
573/// while manual `Pin<Box<dyn Future>>` works in both `std` and `no_std + alloc`.
574pub trait ProducerTrait: Send + Sync {
575    /// Produce a value into the record's buffer
576    ///
577    /// The value must be passed as Box<dyn Any> and will be downcast to the correct type.
578    /// Returns an error if the downcast fails or if production fails.
579    fn produce_any<'a>(
580        &'a self,
581        value: Box<dyn core::any::Any + Send>,
582    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
583}
584
585/// Type alias for consumer factory callback (alloc feature)
586///
587/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ConsumerTrait.
588/// This allows capturing the record type T at link_to() time while storing
589/// the factory in a type-erased ConnectorLink.
590///
591/// Mirrors the ProducerFactoryFn pattern for symmetry between inbound and outbound.
592///
593/// Available in both `std` and `no_std + alloc` environments.
594#[cfg(feature = "alloc")]
595pub type ConsumerFactoryFn =
596    Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ConsumerTrait> + Send + Sync>;
597
598/// Type-erased consumer trait for outbound routing
599///
600/// Mirrors ProducerTrait but for consumption. Allows connectors to subscribe
601/// to typed values without knowing the concrete type T at compile time.
602///
603/// # Implementation Note
604///
605/// Like ProducerTrait, this uses manual futures instead of `#[async_trait]`
606/// to enable `no_std` compatibility.
607pub trait ConsumerTrait: Send + Sync {
608    /// Subscribe to typed values from this record
609    ///
610    /// Returns a type-erased reader that can be polled for Box<dyn Any> values.
611    /// The connector will downcast to the expected type after deserialization.
612    fn subscribe_any<'a>(&'a self) -> SubscribeAnyFuture<'a>;
613}
614
615/// Type alias for the future returned by `ConsumerTrait::subscribe_any`
616type SubscribeAnyFuture<'a> =
617    Pin<Box<dyn Future<Output = DbResult<Box<dyn AnyReader>>> + Send + 'a>>;
618
619/// Type alias for the future returned by `AnyReader::recv_any`
620type RecvAnyFuture<'a> =
621    Pin<Box<dyn Future<Output = DbResult<Box<dyn core::any::Any + Send>>> + Send + 'a>>;
622
623/// Helper trait for type-erased reading
624///
625/// Allows reading values from a buffer without knowing the concrete type at compile time.
626/// The value is returned as Box<dyn Any> and must be downcast by the caller.
627pub trait AnyReader: Send {
628    /// Receive a type-erased value from the buffer
629    ///
630    /// Returns Box<dyn Any> which must be downcast to the concrete type.
631    /// Returns an error if the buffer is closed or an I/O error occurs.
632    fn recv_any<'a>(&'a mut self) -> RecvAnyFuture<'a>;
633}
634
635/// Configuration for an inbound connector link (External → AimDB)
636///
637/// Stores the parsed URL, configuration, deserializer, and a producer creation callback.
638/// The callback captures the type T at creation time, allowing type-safe producer creation
639/// later without needing PhantomData or type parameters.
640pub struct InboundConnectorLink {
641    /// Parsed connector URL
642    pub url: ConnectorUrl,
643
644    /// Additional configuration options (protocol-specific)
645    pub config: Vec<(String, String)>,
646
647    /// Deserialization callback that converts bytes to typed values
648    ///
649    /// This is a type-erased function that takes `&[u8]` and returns
650    /// `Result<Box<dyn Any + Send>, String>`. The spawned task will
651    /// downcast to the concrete type before producing.
652    ///
653    /// Available in both `std` and `no_std` (with `alloc` feature) environments.
654    pub deserializer: DeserializerFn,
655
656    /// Producer creation callback (alloc feature)
657    ///
658    /// Takes Arc<AimDb<R>> and returns Box<dyn ProducerTrait>.
659    /// Captures the record type T at link_from() call time.
660    ///
661    /// Available in both `std` and `no_std + alloc` environments.
662    #[cfg(feature = "alloc")]
663    pub producer_factory: Option<ProducerFactoryFn>,
664
665    /// Optional dynamic topic resolver (late-binding)
666    ///
667    /// Called once at connector startup to determine the subscription topic.
668    /// If the resolver returns `None`, the static topic from the URL is used.
669    ///
670    /// Available in both `std` and `no_std + alloc` environments.
671    pub topic_resolver: Option<TopicResolverFn>,
672}
673
674impl Clone for InboundConnectorLink {
675    fn clone(&self) -> Self {
676        Self {
677            url: self.url.clone(),
678            config: self.config.clone(),
679            deserializer: self.deserializer.clone(),
680            #[cfg(feature = "alloc")]
681            producer_factory: self.producer_factory.clone(),
682            topic_resolver: self.topic_resolver.clone(),
683        }
684    }
685}
686
687impl Debug for InboundConnectorLink {
688    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689        f.debug_struct("InboundConnectorLink")
690            .field("url", &self.url)
691            .field("config", &self.config)
692            .field("deserializer", &"<function>")
693            .field(
694                "topic_resolver",
695                &self.topic_resolver.as_ref().map(|_| "<function>"),
696            )
697            .finish()
698    }
699}
700
701impl InboundConnectorLink {
702    /// Creates a new inbound connector link from a URL and deserializer
703    pub fn new(url: ConnectorUrl, deserializer: DeserializerFn) -> Self {
704        Self {
705            url,
706            config: Vec::new(),
707            deserializer,
708            #[cfg(feature = "alloc")]
709            producer_factory: None,
710            topic_resolver: None,
711        }
712    }
713
714    /// Sets the producer factory callback (alloc feature)
715    ///
716    /// Available in both `std` and `no_std + alloc` environments.
717    #[cfg(feature = "alloc")]
718    pub fn with_producer_factory<F>(mut self, factory: F) -> Self
719    where
720        F: Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait>
721            + Send
722            + Sync
723            + 'static,
724    {
725        self.producer_factory = Some(Arc::new(factory));
726        self
727    }
728
729    /// Creates a producer using the stored factory (alloc feature)
730    ///
731    /// Available in both `std` and `no_std + alloc` environments.
732    #[cfg(feature = "alloc")]
733    pub fn create_producer(
734        &self,
735        db_any: Arc<dyn core::any::Any + Send + Sync>,
736    ) -> Option<Box<dyn ProducerTrait>> {
737        self.producer_factory.as_ref().map(|f| f(db_any))
738    }
739
740    /// Resolves the subscription topic for this link
741    ///
742    /// If a topic resolver is configured, calls it to determine the topic.
743    /// Otherwise, returns the static topic from the URL.
744    ///
745    /// This is called once at connector startup.
746    pub fn resolve_topic(&self) -> String {
747        self.topic_resolver
748            .as_ref()
749            .and_then(|resolver| resolver())
750            .unwrap_or_else(|| self.url.resource_id())
751    }
752}
753
754/// Configuration for an outbound connector link (AimDB → External)
755pub struct OutboundConnectorLink {
756    pub url: ConnectorUrl,
757    pub config: Vec<(String, String)>,
758}
759
760/// Parses a connector URL string into structured components
761///
762/// This is a simple parser that handles the most common URL formats.
763/// For production use, consider using the `url` crate with feature flags.
764fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
765    use crate::DbError;
766
767    // Split scheme from rest
768    let (scheme, rest) = url.split_once("://").ok_or({
769        #[cfg(feature = "std")]
770        {
771            DbError::InvalidOperation {
772                operation: "parse_connector_url".into(),
773                reason: format!("Missing scheme in URL: {}", url),
774            }
775        }
776        #[cfg(not(feature = "std"))]
777        {
778            DbError::InvalidOperation {
779                _operation: (),
780                _reason: (),
781            }
782        }
783    })?;
784
785    // Extract credentials if present (user:pass@host)
786    let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
787        let creds = &rest[..at_idx];
788        let host = &rest[at_idx + 1..];
789        (Some(creds), host)
790    } else {
791        (None, rest)
792    };
793
794    let (username, password) = if let Some(creds) = credentials {
795        if let Some((user, pass)) = creds.split_once(':') {
796            (Some(user.to_string()), Some(pass.to_string()))
797        } else {
798            (Some(creds.to_string()), None)
799        }
800    } else {
801        (None, None)
802    };
803
804    // Split path and query from host:port
805    let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
806        let hp = &host_part[..slash_idx];
807        let path_query = &host_part[slash_idx..];
808
809        // Split query parameters
810        let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
811            (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
812        } else {
813            (path_query, None)
814        };
815
816        // Parse query parameters
817        let params = if let Some(query) = query_part {
818            query
819                .split('&')
820                .filter_map(|pair| {
821                    let (k, v) = pair.split_once('=')?;
822                    Some((k.to_string(), v.to_string()))
823                })
824                .collect()
825        } else {
826            Vec::new()
827        };
828
829        (hp, Some(path_part.to_string()), params)
830    } else {
831        (host_part, None, Vec::new())
832    };
833
834    // Split host and port
835    let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
836        let h = &host_port[..colon_idx];
837        let p = &host_port[colon_idx + 1..];
838        let port_num = p.parse::<u16>().ok();
839        (h.to_string(), port_num)
840    } else {
841        (host_port.to_string(), None)
842    };
843
844    Ok(ConnectorUrl {
845        scheme: scheme.to_string(),
846        host,
847        port,
848        path,
849        username,
850        password,
851        query_params,
852    })
853}
854
855/// Trait for building connectors after the database is constructed
856///
857/// Connectors that need to collect routes from the database (for inbound routing)
858/// implement this trait. The builder pattern allows connectors to be constructed
859/// in two phases:
860///
861/// 1. Configuration phase: User provides broker URLs and settings
862/// 2. Build phase: Connector collects routes from the database and initializes
863///
864/// # Example
865///
866/// ```rust,ignore
867/// pub struct MqttConnectorBuilder {
868///     broker_url: String,
869/// }
870///
871/// impl<R> ConnectorBuilder<R> for MqttConnectorBuilder
872/// where
873///     R: aimdb_executor::Spawn + 'static,
874/// {
875///     fn build<'a>(
876///         &'a self,
877///         db: &'a AimDb<R>,
878///     ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>> {
879///         Box::pin(async move {
880///             let routes = db.collect_inbound_routes(self.scheme());
881///             let router = RouterBuilder::from_routes(routes).build();
882///             let connector = MqttConnector::new(&self.broker_url, router).await?;
883///             Ok(Arc::new(connector) as Arc<dyn Connector>)
884///         })
885///     }
886///     
887///     fn scheme(&self) -> &str {
888///         "mqtt"
889///     }
890/// }
891/// ```
892pub trait ConnectorBuilder<R>: Send + Sync
893where
894    R: aimdb_executor::Spawn + 'static,
895{
896    /// Build the connector using the database
897    ///
898    /// This method is called during `AimDbBuilder::build()` after the database
899    /// has been constructed. The builder can use the database to:
900    /// - Collect inbound routes via `db.collect_inbound_routes()`
901    /// - Access database configuration
902    /// - Register subscriptions
903    ///
904    /// # Arguments
905    /// * `db` - The constructed database instance
906    ///
907    /// # Returns
908    /// An `Arc<dyn Connector>` that will be registered with the database
909    #[allow(clippy::type_complexity)]
910    fn build<'a>(
911        &'a self,
912        db: &'a AimDb<R>,
913    ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
914
915    /// The URL scheme this connector handles
916    ///
917    /// Returns the scheme (e.g., "mqtt", "kafka", "http") that this connector
918    /// will be registered under. Used for routing `.link_from()` and `.link_to()`
919    /// declarations to the appropriate connector.
920    fn scheme(&self) -> &str;
921}
922
923#[cfg(test)]
924mod tests {
925    use super::*;
926    use alloc::format;
927
928    #[test]
929    fn test_parse_simple_mqtt() {
930        let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
931        assert_eq!(url.scheme, "mqtt");
932        assert_eq!(url.host, "broker.example.com");
933        assert_eq!(url.port, Some(1883));
934        assert_eq!(url.username, None);
935        assert_eq!(url.password, None);
936    }
937
938    #[test]
939    fn test_parse_mqtt_with_credentials() {
940        let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
941        assert_eq!(url.scheme, "mqtt");
942        assert_eq!(url.host, "broker.example.com");
943        assert_eq!(url.port, Some(1883));
944        assert_eq!(url.username, Some("user".to_string()));
945        assert_eq!(url.password, Some("pass".to_string()));
946    }
947
948    #[test]
949    fn test_parse_https_with_path() {
950        let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
951        assert_eq!(url.scheme, "https");
952        assert_eq!(url.host, "api.example.com");
953        assert_eq!(url.port, Some(8443));
954        assert_eq!(url.path, Some("/events".to_string()));
955    }
956
957    #[test]
958    fn test_parse_with_query_params() {
959        let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
960        assert_eq!(url.scheme, "http");
961        assert_eq!(url.host, "api.example.com");
962        assert_eq!(url.path, Some("/data".to_string()));
963        assert_eq!(url.query_params.len(), 2);
964        assert_eq!(
965            url.query_params[0],
966            ("key".to_string(), "value".to_string())
967        );
968        assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
969    }
970
971    #[test]
972    fn test_default_ports() {
973        let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
974        assert_eq!(mqtt.default_port(), Some(1883));
975        assert_eq!(mqtt.effective_port(), Some(1883));
976
977        let https = ConnectorUrl::parse("https://api.example.com").unwrap();
978        assert_eq!(https.default_port(), Some(443));
979        assert_eq!(https.effective_port(), Some(443));
980    }
981
982    #[test]
983    fn test_is_secure() {
984        assert!(ConnectorUrl::parse("mqtts://broker.local")
985            .unwrap()
986            .is_secure());
987        assert!(ConnectorUrl::parse("https://api.example.com")
988            .unwrap()
989            .is_secure());
990        assert!(ConnectorUrl::parse("wss://ws.example.com")
991            .unwrap()
992            .is_secure());
993
994        assert!(!ConnectorUrl::parse("mqtt://broker.local")
995            .unwrap()
996            .is_secure());
997        assert!(!ConnectorUrl::parse("http://api.example.com")
998            .unwrap()
999            .is_secure());
1000        assert!(!ConnectorUrl::parse("ws://ws.example.com")
1001            .unwrap()
1002            .is_secure());
1003    }
1004
1005    #[test]
1006    fn test_display_hides_password() {
1007        let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
1008        let display = format!("{}", url);
1009        assert!(display.contains("user:****"));
1010        assert!(!display.contains("secret"));
1011    }
1012
1013    #[test]
1014    fn test_parse_kafka_style() {
1015        let url =
1016            ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
1017        assert_eq!(url.scheme, "kafka");
1018        // Note: Our simple parser doesn't handle the second port in comma-separated hosts perfectly
1019        // It parses "broker1.local:9092,broker2.local" as the host and "9092" as the port
1020        // This is acceptable for now - production connectors can handle this in their client factories
1021        assert!(url.host.contains("broker1.local"));
1022        assert!(url.host.contains("broker2.local"));
1023        assert_eq!(url.path, Some("/my-topic".to_string()));
1024    }
1025
1026    #[test]
1027    fn test_parse_missing_scheme() {
1028        let result = ConnectorUrl::parse("broker.example.com:1883");
1029        assert!(result.is_err());
1030    }
1031
1032    // ========================================================================
1033    // TopicProvider Tests
1034    // ========================================================================
1035
1036    #[allow(dead_code)]
1037    #[derive(Debug, Clone)]
1038    struct TestTemperature {
1039        sensor_id: String,
1040        celsius: f32,
1041    }
1042
1043    struct TestTopicProvider;
1044
1045    impl super::TopicProvider<TestTemperature> for TestTopicProvider {
1046        fn topic(&self, value: &TestTemperature) -> Option<String> {
1047            Some(format!("sensors/temp/{}", value.sensor_id))
1048        }
1049    }
1050
1051    #[test]
1052    fn test_topic_provider_type_erasure() {
1053        use super::{TopicProviderAny, TopicProviderWrapper};
1054
1055        let provider: Arc<dyn TopicProviderAny> =
1056            Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1057        let temp = TestTemperature {
1058            sensor_id: "kitchen-001".into(),
1059            celsius: 22.5,
1060        };
1061
1062        assert_eq!(
1063            provider.topic_any(&temp),
1064            Some("sensors/temp/kitchen-001".into())
1065        );
1066    }
1067
1068    #[test]
1069    fn test_topic_provider_type_mismatch() {
1070        use super::{TopicProviderAny, TopicProviderWrapper};
1071
1072        let provider: Arc<dyn TopicProviderAny> =
1073            Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1074        let wrong_type = "not a temperature";
1075
1076        // Type mismatch returns None (falls back to default topic)
1077        assert_eq!(provider.topic_any(&wrong_type), None);
1078    }
1079
1080    #[test]
1081    fn test_topic_provider_returns_none() {
1082        struct OptionalTopicProvider;
1083
1084        impl super::TopicProvider<TestTemperature> for OptionalTopicProvider {
1085            fn topic(&self, temp: &TestTemperature) -> Option<String> {
1086                if temp.sensor_id.is_empty() {
1087                    None // Fall back to default topic
1088                } else {
1089                    Some(format!("sensors/{}", temp.sensor_id))
1090                }
1091            }
1092        }
1093
1094        use super::{TopicProviderAny, TopicProviderWrapper};
1095
1096        let provider: Arc<dyn TopicProviderAny> =
1097            Arc::new(TopicProviderWrapper::new(OptionalTopicProvider));
1098
1099        // Non-empty sensor_id returns dynamic topic
1100        let temp_with_id = TestTemperature {
1101            sensor_id: "abc".into(),
1102            celsius: 20.0,
1103        };
1104        assert_eq!(
1105            provider.topic_any(&temp_with_id),
1106            Some("sensors/abc".into())
1107        );
1108
1109        // Empty sensor_id returns None (fallback)
1110        let temp_without_id = TestTemperature {
1111            sensor_id: String::new(),
1112            celsius: 20.0,
1113        };
1114        assert_eq!(provider.topic_any(&temp_without_id), None);
1115    }
1116
1117    // ========================================================================
1118    // TopicResolverFn Tests
1119    // ========================================================================
1120
1121    #[test]
1122    fn test_topic_resolver_returns_some() {
1123        let resolver: super::TopicResolverFn = Arc::new(|| Some("resolved/topic".into()));
1124
1125        assert_eq!(resolver(), Some("resolved/topic".into()));
1126    }
1127
1128    #[test]
1129    fn test_topic_resolver_returns_none() {
1130        let resolver: super::TopicResolverFn = Arc::new(|| None);
1131
1132        // Returns None, should fall back to default topic
1133        assert_eq!(resolver(), None);
1134    }
1135
1136    #[cfg(feature = "std")]
1137    #[test]
1138    fn test_topic_resolver_with_captured_state() {
1139        use std::sync::Mutex;
1140
1141        let config = Arc::new(Mutex::new(Some("dynamic/topic".to_string())));
1142        let config_clone = config.clone();
1143
1144        let resolver: super::TopicResolverFn =
1145            Arc::new(move || config_clone.lock().unwrap().clone());
1146
1147        assert_eq!(resolver(), Some("dynamic/topic".into()));
1148
1149        // Clear config
1150        *config.lock().unwrap() = None;
1151        assert_eq!(resolver(), None);
1152    }
1153
1154    #[test]
1155    fn test_inbound_connector_link_resolve_topic_default() {
1156        use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1157
1158        let url = ConnectorUrl::parse("mqtt://sensors/temperature").unwrap();
1159        let deserializer: DeserializerFn =
1160            Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1161        let link = InboundConnectorLink::new(url, deserializer);
1162
1163        // No resolver configured, should return static topic from URL
1164        assert_eq!(link.resolve_topic(), "sensors/temperature");
1165    }
1166
1167    #[test]
1168    fn test_inbound_connector_link_resolve_topic_dynamic() {
1169        use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1170
1171        let url = ConnectorUrl::parse("mqtt://sensors/default").unwrap();
1172        let deserializer: DeserializerFn =
1173            Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1174        let mut link = InboundConnectorLink::new(url, deserializer);
1175
1176        // Configure dynamic resolver
1177        link.topic_resolver = Some(Arc::new(|| Some("sensors/dynamic/kitchen".into())));
1178
1179        // Should return resolved topic, not URL topic
1180        assert_eq!(link.resolve_topic(), "sensors/dynamic/kitchen");
1181    }
1182
1183    #[test]
1184    fn test_inbound_connector_link_resolve_topic_fallback() {
1185        use super::{ConnectorUrl, DeserializerFn, InboundConnectorLink};
1186
1187        let url = ConnectorUrl::parse("mqtt://sensors/fallback").unwrap();
1188        let deserializer: DeserializerFn =
1189            Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1190        let mut link = InboundConnectorLink::new(url, deserializer);
1191
1192        // Configure resolver that returns None
1193        link.topic_resolver = Some(Arc::new(|| None));
1194
1195        // Should fall back to static topic from URL
1196        assert_eq!(link.resolve_topic(), "sensors/fallback");
1197    }
1198}