aimdb_core/
connector.rs

1//! Connector infrastructure for external protocol integration
2//!
3//! Provides the `.link()` builder API for ergonomic connector setup with
4//! automatic client lifecycle management. Connectors bridge AimDB records
5//! to external systems (MQTT, Kafka, HTTP, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **User Extensions**: Connector implementations are provided by users
10//! - **Shared Clients**: Single client instance shared across tasks (Arc or static)
11//! - **No Buffering**: Direct access to protocol clients, no intermediate queues
12//! - **Type Safety**: Compile-time guarantees via typed handlers
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use aimdb_core::{RecordConfig, BufferCfg};
18//!
19//! fn weather_alert_record() -> RecordConfig<WeatherAlert> {
20//!     RecordConfig::builder()
21//!         .buffer(BufferCfg::SingleLatest)
22//!         .link_to("mqtt://broker.example.com:1883")
23//!             .out::<WeatherAlert>(|reader, mqtt| {
24//!                 publish_alerts_to_mqtt(reader, mqtt)
25//!             })
26//!         .build()
27//! }
28//! ```
29
30use core::fmt::{self, Debug};
31use core::future::Future;
32use core::pin::Pin;
33
34extern crate alloc;
35
36use alloc::{
37    boxed::Box,
38    string::{String, ToString},
39    sync::Arc,
40    vec::Vec,
41};
42
43#[cfg(feature = "std")]
44use alloc::format;
45
46use crate::{builder::AimDb, transport::Connector, DbResult};
47
48/// Error that can occur during serialization
49///
50/// Uses an enum instead of String for better performance in `no_std` environments
51/// and to enable defmt logging support in Embassy.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SerializeError {
54    /// Output buffer is too small for the serialized data
55    BufferTooSmall,
56
57    /// Type mismatch in serializer (wrong type passed)
58    TypeMismatch,
59
60    /// Invalid data that cannot be serialized
61    InvalidData,
62}
63
64#[cfg(feature = "defmt")]
65impl defmt::Format for SerializeError {
66    fn format(&self, f: defmt::Formatter) {
67        match self {
68            Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
69            Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
70            Self::InvalidData => defmt::write!(f, "InvalidData"),
71        }
72    }
73}
74
75#[cfg(feature = "std")]
76impl std::fmt::Display for SerializeError {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        match self {
79            Self::BufferTooSmall => write!(f, "Output buffer too small"),
80            Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
81            Self::InvalidData => write!(f, "Invalid data for serialization"),
82        }
83    }
84}
85
86#[cfg(feature = "std")]
87impl std::error::Error for SerializeError {}
88
89/// Type alias for serializer callbacks (reduces type complexity)
90///
91/// Requires the `alloc` feature for `Arc` and `Vec` (available in both std and no_std+alloc).
92/// Serializers convert record values to bytes for publishing to external systems.
93///
94/// # Current Implementation
95///
96/// Returns `Vec<u8>` which requires heap allocation. This works in:
97/// - ✅ `std` environments (full standard library)
98/// - ✅ `no_std + alloc` environments (embedded with allocator, e.g., ESP32, STM32 with heap)
99/// - ❌ `no_std` without `alloc` (bare-metal MCUs without allocator)
100///
101/// # Future Considerations
102///
103/// For zero-allocation embedded environments, future versions may support buffer-based
104/// serialization using `&mut [u8]` output or static lifetime slices.
105pub type SerializerFn =
106    Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
107
108/// Parsed connector URL with protocol, host, port, and credentials
109///
110/// Supports multiple protocol schemes:
111/// - MQTT: `mqtt://host:port`, `mqtts://host:port`
112/// - Kafka: `kafka://broker1:port,broker2:port/topic`
113/// - HTTP: `http://host:port/path`, `https://host:port/path`
114/// - WebSocket: `ws://host:port/path`, `wss://host:port/path`
115#[derive(Clone, Debug, PartialEq)]
116pub struct ConnectorUrl {
117    /// Protocol scheme (mqtt, mqtts, kafka, http, https, ws, wss)
118    pub scheme: String,
119
120    /// Host or comma-separated list of hosts (for Kafka)
121    pub host: String,
122
123    /// Port number (optional, protocol-specific defaults)
124    pub port: Option<u16>,
125
126    /// Path component (for HTTP/WebSocket)
127    pub path: Option<String>,
128
129    /// Username for authentication (optional)
130    pub username: Option<String>,
131
132    /// Password for authentication (optional)
133    pub password: Option<String>,
134
135    /// Query parameters (optional, parsed from URL)
136    pub query_params: Vec<(String, String)>,
137}
138
139impl ConnectorUrl {
140    /// Parses a connector URL string
141    ///
142    /// # Supported Formats
143    ///
144    /// - `mqtt://host:port`
145    /// - `mqtt://user:pass@host:port`
146    /// - `mqtts://host:port` (TLS)
147    /// - `kafka://broker1:9092,broker2:9092/topic`
148    /// - `http://host:port/path`
149    /// - `https://host:port/path?key=value`
150    /// - `ws://host:port/mqtt` (WebSocket)
151    /// - `wss://host:port/mqtt` (WebSocket Secure)
152    ///
153    /// # Example
154    ///
155    /// ```rust
156    /// use aimdb_core::connector::ConnectorUrl;
157    ///
158    /// let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
159    /// assert_eq!(url.scheme, "mqtt");
160    /// assert_eq!(url.host, "broker.example.com");
161    /// assert_eq!(url.port, Some(1883));
162    /// assert_eq!(url.username, Some("user".to_string()));
163    /// ```
164    pub fn parse(url: &str) -> DbResult<Self> {
165        parse_connector_url(url)
166    }
167
168    /// Returns the default port for this protocol scheme
169    pub fn default_port(&self) -> Option<u16> {
170        match self.scheme.as_str() {
171            "mqtt" | "ws" => Some(1883),
172            "mqtts" | "wss" => Some(8883),
173            "kafka" => Some(9092),
174            "http" => Some(80),
175            "https" => Some(443),
176            _ => None,
177        }
178    }
179
180    /// Returns the effective port (explicit or default)
181    pub fn effective_port(&self) -> Option<u16> {
182        self.port.or_else(|| self.default_port())
183    }
184
185    /// Returns true if this is a secure connection (TLS)
186    pub fn is_secure(&self) -> bool {
187        matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
188    }
189
190    /// Returns the URL scheme (protocol)
191    pub fn scheme(&self) -> &str {
192        &self.scheme
193    }
194
195    /// Returns the path component, or "/" if not specified
196    pub fn path(&self) -> &str {
197        self.path.as_deref().unwrap_or("/")
198    }
199
200    /// Returns the resource identifier for protocols where the URL specifies a topic/key
201    ///
202    /// This is designed for the simplified connector model where each connector manages
203    /// a single broker/server connection, and URLs only specify the resource (topic, key, path).
204    ///
205    /// # Examples
206    ///
207    /// - `mqtt://commands/temperature` → `"commands/temperature"` (topic)
208    /// - `mqtt://sensors/temp` → `"sensors/temp"` (topic)
209    /// - `kafka://events` → `"events"` (topic)
210    ///
211    /// The format is `scheme://resource` where resource = host + path combined.
212    pub fn resource_id(&self) -> String {
213        let path = self.path().trim_start_matches('/');
214
215        // Combine host and path to form the complete resource identifier
216        // For mqtt://commands/temperature: host="commands", path="/temperature"
217        // Result: "commands/temperature"
218        if !self.host.is_empty() && !path.is_empty() {
219            alloc::format!("{}/{}", self.host, path)
220        } else if !self.host.is_empty() {
221            self.host.clone()
222        } else if !path.is_empty() {
223            path.to_string()
224        } else {
225            String::new()
226        }
227    }
228}
229
230impl fmt::Display for ConnectorUrl {
231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232        write!(f, "{}://", self.scheme)?;
233
234        if let Some(ref username) = self.username {
235            write!(f, "{}", username)?;
236            if self.password.is_some() {
237                write!(f, ":****")?; // Don't expose password in Display
238            }
239            write!(f, "@")?;
240        }
241
242        write!(f, "{}", self.host)?;
243
244        if let Some(port) = self.port {
245            write!(f, ":{}", port)?;
246        }
247
248        if let Some(ref path) = self.path {
249            if !path.starts_with('/') {
250                write!(f, "/")?;
251            }
252            write!(f, "{}", path)?;
253        }
254
255        Ok(())
256    }
257}
258
259/// Connector client types (type-erased for storage)
260///
261/// This enum allows storing different connector client types in a unified way.
262/// Actual protocol implementations will downcast to their concrete types.
263///
264/// # Design Note
265///
266/// This is intentionally minimal - actual client types are defined by
267/// user extensions. The core only provides the infrastructure.
268///
269/// Works in both `std` and `no_std` (with `alloc`) environments.
270#[derive(Clone)]
271pub enum ConnectorClient {
272    /// MQTT client (protocol-specific, user-provided)
273    Mqtt(Arc<dyn core::any::Any + Send + Sync>),
274
275    /// Kafka producer (protocol-specific, user-provided)
276    Kafka(Arc<dyn core::any::Any + Send + Sync>),
277
278    /// HTTP client (protocol-specific, user-provided)
279    Http(Arc<dyn core::any::Any + Send + Sync>),
280
281    /// Generic connector for custom protocols
282    Generic {
283        protocol: String,
284        client: Arc<dyn core::any::Any + Send + Sync>,
285    },
286}
287
288impl Debug for ConnectorClient {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        match self {
291            ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
292            ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
293            ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
294            ConnectorClient::Generic { protocol, .. } => {
295                write!(f, "ConnectorClient::Generic({})", protocol)
296            }
297        }
298    }
299}
300
301impl ConnectorClient {
302    /// Downcasts to a concrete client type
303    ///
304    /// # Example
305    ///
306    /// ```rust,ignore
307    /// use rumqttc::AsyncClient;
308    ///
309    /// if let Some(mqtt_client) = connector.downcast_ref::<Arc<AsyncClient>>() {
310    ///     // Use the MQTT client
311    /// }
312    /// ```
313    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
314        match self {
315            ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
316            ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
317            ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
318            ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
319        }
320    }
321}
322
323/// Configuration for a connector link
324///
325/// Stores the parsed URL and configuration until the record is built.
326/// The actual client creation and handler spawning happens during the build phase.
327#[derive(Clone)]
328pub struct ConnectorLink {
329    /// Parsed connector URL
330    pub url: ConnectorUrl,
331
332    /// Additional configuration options (protocol-specific)
333    pub config: Vec<(String, String)>,
334
335    /// Serialization callback that converts record values to bytes for publishing
336    ///
337    /// This is a type-erased function that takes `&dyn Any` and returns `Result<Vec<u8>, String>`.
338    /// The connector implementation will downcast to the concrete type and call the serializer.
339    ///
340    /// If `None`, the connector must provide a default serialization mechanism or fail.
341    ///
342    /// Available in both `std` and `no_std` (with `alloc` feature) environments.
343    pub serializer: Option<SerializerFn>,
344
345    /// Consumer factory callback (alloc feature)
346    ///
347    /// Creates ConsumerTrait from Arc<AimDb<R>> to enable type-safe subscription.
348    /// The factory captures the record type T at link_to() configuration time,
349    /// allowing the connector to subscribe without knowing T at compile time.
350    ///
351    /// Mirrors the producer_factory pattern used for inbound connectors.
352    ///
353    /// Available in both `std` and `no_std + alloc` environments.
354    #[cfg(feature = "alloc")]
355    pub consumer_factory: Option<ConsumerFactoryFn>,
356}
357
358impl Debug for ConnectorLink {
359    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360        f.debug_struct("ConnectorLink")
361            .field("url", &self.url)
362            .field("config", &self.config)
363            .field(
364                "serializer",
365                &self.serializer.as_ref().map(|_| "<function>"),
366            )
367            .field(
368                "consumer_factory",
369                #[cfg(feature = "alloc")]
370                &self.consumer_factory.as_ref().map(|_| "<function>"),
371                #[cfg(not(feature = "alloc"))]
372                &None::<()>,
373            )
374            .finish()
375    }
376}
377
378impl ConnectorLink {
379    /// Creates a new connector link from a URL
380    pub fn new(url: ConnectorUrl) -> Self {
381        Self {
382            url,
383            config: Vec::new(),
384            serializer: None,
385            #[cfg(feature = "alloc")]
386            consumer_factory: None,
387        }
388    }
389
390    /// Adds a configuration option
391    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
392        self.config.push((key.into(), value.into()));
393        self
394    }
395
396    /// Creates a consumer using the stored factory (alloc feature)
397    ///
398    /// Takes an Arc<dyn Any> (which should contain Arc<AimDb<R>>) and invokes
399    /// the consumer factory to create a ConsumerTrait instance.
400    ///
401    /// Returns None if no factory is configured.
402    ///
403    /// Available in both `std` and `no_std + alloc` environments.
404    #[cfg(feature = "alloc")]
405    pub fn create_consumer(
406        &self,
407        db_any: Arc<dyn core::any::Any + Send + Sync>,
408    ) -> Option<Box<dyn ConsumerTrait>> {
409        self.consumer_factory.as_ref().map(|f| f(db_any))
410    }
411}
412
413/// Type alias for type-erased deserializer callbacks
414///
415/// Converts raw bytes to a boxed Any that can be downcast to the concrete type.
416/// This allows storing deserializers for different types in a unified collection.
417pub type DeserializerFn =
418    Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
419
420/// Type alias for producer factory callback (alloc feature)
421///
422/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
423/// This allows capturing the record type T at link_from() time while storing
424/// the factory in a type-erased InboundConnectorLink.
425///
426/// Available in both `std` and `no_std + alloc` environments.
427#[cfg(feature = "alloc")]
428pub type ProducerFactoryFn =
429    Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait> + Send + Sync>;
430
431/// Type-erased producer trait for MQTT router
432///
433/// Allows the router to call produce() on different record types without knowing
434/// the concrete type at compile time. The value is passed as Box<dyn Any> and
435/// downcast to the correct type inside the implementation.
436///
437/// # Implementation Note
438///
439/// This trait uses manual futures instead of `#[async_trait]` to enable `no_std`
440/// compatibility. The `async_trait` macro generates code that depends on `std`,
441/// while manual `Pin<Box<dyn Future>>` works in both `std` and `no_std + alloc`.
442pub trait ProducerTrait: Send + Sync {
443    /// Produce a value into the record's buffer
444    ///
445    /// The value must be passed as Box<dyn Any> and will be downcast to the correct type.
446    /// Returns an error if the downcast fails or if production fails.
447    fn produce_any<'a>(
448        &'a self,
449        value: Box<dyn core::any::Any + Send>,
450    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
451}
452
453/// Type alias for consumer factory callback (alloc feature)
454///
455/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ConsumerTrait.
456/// This allows capturing the record type T at link_to() time while storing
457/// the factory in a type-erased ConnectorLink.
458///
459/// Mirrors the ProducerFactoryFn pattern for symmetry between inbound and outbound.
460///
461/// Available in both `std` and `no_std + alloc` environments.
462#[cfg(feature = "alloc")]
463pub type ConsumerFactoryFn =
464    Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ConsumerTrait> + Send + Sync>;
465
466/// Type-erased consumer trait for outbound routing
467///
468/// Mirrors ProducerTrait but for consumption. Allows connectors to subscribe
469/// to typed values without knowing the concrete type T at compile time.
470///
471/// # Implementation Note
472///
473/// Like ProducerTrait, this uses manual futures instead of `#[async_trait]`
474/// to enable `no_std` compatibility.
475pub trait ConsumerTrait: Send + Sync {
476    /// Subscribe to typed values from this record
477    ///
478    /// Returns a type-erased reader that can be polled for Box<dyn Any> values.
479    /// The connector will downcast to the expected type after deserialization.
480    fn subscribe_any<'a>(&'a self) -> SubscribeAnyFuture<'a>;
481}
482
483/// Type alias for the future returned by `ConsumerTrait::subscribe_any`
484type SubscribeAnyFuture<'a> =
485    Pin<Box<dyn Future<Output = DbResult<Box<dyn AnyReader>>> + Send + 'a>>;
486
487/// Type alias for the future returned by `AnyReader::recv_any`
488type RecvAnyFuture<'a> =
489    Pin<Box<dyn Future<Output = DbResult<Box<dyn core::any::Any + Send>>> + Send + 'a>>;
490
491/// Helper trait for type-erased reading
492///
493/// Allows reading values from a buffer without knowing the concrete type at compile time.
494/// The value is returned as Box<dyn Any> and must be downcast by the caller.
495pub trait AnyReader: Send {
496    /// Receive a type-erased value from the buffer
497    ///
498    /// Returns Box<dyn Any> which must be downcast to the concrete type.
499    /// Returns an error if the buffer is closed or an I/O error occurs.
500    fn recv_any<'a>(&'a mut self) -> RecvAnyFuture<'a>;
501}
502
503/// Configuration for an inbound connector link (External → AimDB)
504///
505/// Stores the parsed URL, configuration, deserializer, and a producer creation callback.
506/// The callback captures the type T at creation time, allowing type-safe producer creation
507/// later without needing PhantomData or type parameters.
508pub struct InboundConnectorLink {
509    /// Parsed connector URL
510    pub url: ConnectorUrl,
511
512    /// Additional configuration options (protocol-specific)
513    pub config: Vec<(String, String)>,
514
515    /// Deserialization callback that converts bytes to typed values
516    ///
517    /// This is a type-erased function that takes `&[u8]` and returns
518    /// `Result<Box<dyn Any + Send>, String>`. The spawned task will
519    /// downcast to the concrete type before producing.
520    ///
521    /// Available in both `std` and `no_std` (with `alloc` feature) environments.
522    pub deserializer: DeserializerFn,
523
524    /// Producer creation callback (alloc feature)
525    ///
526    /// Takes Arc<AimDb<R>> and returns Box<dyn ProducerTrait>.
527    /// Captures the record type T at link_from() call time.
528    ///
529    /// Available in both `std` and `no_std + alloc` environments.
530    #[cfg(feature = "alloc")]
531    pub producer_factory: Option<ProducerFactoryFn>,
532}
533
534impl Clone for InboundConnectorLink {
535    fn clone(&self) -> Self {
536        Self {
537            url: self.url.clone(),
538            config: self.config.clone(),
539            deserializer: self.deserializer.clone(),
540            #[cfg(feature = "alloc")]
541            producer_factory: self.producer_factory.clone(),
542        }
543    }
544}
545
546impl Debug for InboundConnectorLink {
547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548        f.debug_struct("InboundConnectorLink")
549            .field("url", &self.url)
550            .field("config", &self.config)
551            .field("deserializer", &"<function>")
552            .finish()
553    }
554}
555
556impl InboundConnectorLink {
557    /// Creates a new inbound connector link from a URL and deserializer
558    pub fn new(url: ConnectorUrl, deserializer: DeserializerFn) -> Self {
559        Self {
560            url,
561            config: Vec::new(),
562            deserializer,
563            #[cfg(feature = "alloc")]
564            producer_factory: None,
565        }
566    }
567
568    /// Sets the producer factory callback (alloc feature)
569    ///
570    /// Available in both `std` and `no_std + alloc` environments.
571    #[cfg(feature = "alloc")]
572    pub fn with_producer_factory<F>(mut self, factory: F) -> Self
573    where
574        F: Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait>
575            + Send
576            + Sync
577            + 'static,
578    {
579        self.producer_factory = Some(Arc::new(factory));
580        self
581    }
582
583    /// Creates a producer using the stored factory (alloc feature)
584    ///
585    /// Available in both `std` and `no_std + alloc` environments.
586    #[cfg(feature = "alloc")]
587    pub fn create_producer(
588        &self,
589        db_any: Arc<dyn core::any::Any + Send + Sync>,
590    ) -> Option<Box<dyn ProducerTrait>> {
591        self.producer_factory.as_ref().map(|f| f(db_any))
592    }
593}
594
595/// Configuration for an outbound connector link (AimDB → External)
596pub struct OutboundConnectorLink {
597    pub url: ConnectorUrl,
598    pub config: Vec<(String, String)>,
599}
600
601/// Parses a connector URL string into structured components
602///
603/// This is a simple parser that handles the most common URL formats.
604/// For production use, consider using the `url` crate with feature flags.
605fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
606    use crate::DbError;
607
608    // Split scheme from rest
609    let (scheme, rest) = url.split_once("://").ok_or({
610        #[cfg(feature = "std")]
611        {
612            DbError::InvalidOperation {
613                operation: "parse_connector_url".into(),
614                reason: format!("Missing scheme in URL: {}", url),
615            }
616        }
617        #[cfg(not(feature = "std"))]
618        {
619            DbError::InvalidOperation {
620                _operation: (),
621                _reason: (),
622            }
623        }
624    })?;
625
626    // Extract credentials if present (user:pass@host)
627    let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
628        let creds = &rest[..at_idx];
629        let host = &rest[at_idx + 1..];
630        (Some(creds), host)
631    } else {
632        (None, rest)
633    };
634
635    let (username, password) = if let Some(creds) = credentials {
636        if let Some((user, pass)) = creds.split_once(':') {
637            (Some(user.to_string()), Some(pass.to_string()))
638        } else {
639            (Some(creds.to_string()), None)
640        }
641    } else {
642        (None, None)
643    };
644
645    // Split path and query from host:port
646    let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
647        let hp = &host_part[..slash_idx];
648        let path_query = &host_part[slash_idx..];
649
650        // Split query parameters
651        let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
652            (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
653        } else {
654            (path_query, None)
655        };
656
657        // Parse query parameters
658        let params = if let Some(query) = query_part {
659            query
660                .split('&')
661                .filter_map(|pair| {
662                    let (k, v) = pair.split_once('=')?;
663                    Some((k.to_string(), v.to_string()))
664                })
665                .collect()
666        } else {
667            Vec::new()
668        };
669
670        (hp, Some(path_part.to_string()), params)
671    } else {
672        (host_part, None, Vec::new())
673    };
674
675    // Split host and port
676    let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
677        let h = &host_port[..colon_idx];
678        let p = &host_port[colon_idx + 1..];
679        let port_num = p.parse::<u16>().ok();
680        (h.to_string(), port_num)
681    } else {
682        (host_port.to_string(), None)
683    };
684
685    Ok(ConnectorUrl {
686        scheme: scheme.to_string(),
687        host,
688        port,
689        path,
690        username,
691        password,
692        query_params,
693    })
694}
695
696/// Trait for building connectors after the database is constructed
697///
698/// Connectors that need to collect routes from the database (for inbound routing)
699/// implement this trait. The builder pattern allows connectors to be constructed
700/// in two phases:
701///
702/// 1. Configuration phase: User provides broker URLs and settings
703/// 2. Build phase: Connector collects routes from the database and initializes
704///
705/// # Example
706///
707/// ```rust,ignore
708/// pub struct MqttConnectorBuilder {
709///     broker_url: String,
710/// }
711///
712/// impl<R> ConnectorBuilder<R> for MqttConnectorBuilder
713/// where
714///     R: aimdb_executor::Spawn + 'static,
715/// {
716///     fn build<'a>(
717///         &'a self,
718///         db: &'a AimDb<R>,
719///     ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>> {
720///         Box::pin(async move {
721///             let routes = db.collect_inbound_routes(self.scheme());
722///             let router = RouterBuilder::from_routes(routes).build();
723///             let connector = MqttConnector::new(&self.broker_url, router).await?;
724///             Ok(Arc::new(connector) as Arc<dyn Connector>)
725///         })
726///     }
727///     
728///     fn scheme(&self) -> &str {
729///         "mqtt"
730///     }
731/// }
732/// ```
733pub trait ConnectorBuilder<R>: Send + Sync
734where
735    R: aimdb_executor::Spawn + 'static,
736{
737    /// Build the connector using the database
738    ///
739    /// This method is called during `AimDbBuilder::build()` after the database
740    /// has been constructed. The builder can use the database to:
741    /// - Collect inbound routes via `db.collect_inbound_routes()`
742    /// - Access database configuration
743    /// - Register subscriptions
744    ///
745    /// # Arguments
746    /// * `db` - The constructed database instance
747    ///
748    /// # Returns
749    /// An `Arc<dyn Connector>` that will be registered with the database
750    #[allow(clippy::type_complexity)]
751    fn build<'a>(
752        &'a self,
753        db: &'a AimDb<R>,
754    ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
755
756    /// The URL scheme this connector handles
757    ///
758    /// Returns the scheme (e.g., "mqtt", "kafka", "http") that this connector
759    /// will be registered under. Used for routing `.link_from()` and `.link_to()`
760    /// declarations to the appropriate connector.
761    fn scheme(&self) -> &str;
762}
763
764#[cfg(test)]
765mod tests {
766    use super::*;
767    use alloc::format;
768
769    #[test]
770    fn test_parse_simple_mqtt() {
771        let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
772        assert_eq!(url.scheme, "mqtt");
773        assert_eq!(url.host, "broker.example.com");
774        assert_eq!(url.port, Some(1883));
775        assert_eq!(url.username, None);
776        assert_eq!(url.password, None);
777    }
778
779    #[test]
780    fn test_parse_mqtt_with_credentials() {
781        let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
782        assert_eq!(url.scheme, "mqtt");
783        assert_eq!(url.host, "broker.example.com");
784        assert_eq!(url.port, Some(1883));
785        assert_eq!(url.username, Some("user".to_string()));
786        assert_eq!(url.password, Some("pass".to_string()));
787    }
788
789    #[test]
790    fn test_parse_https_with_path() {
791        let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
792        assert_eq!(url.scheme, "https");
793        assert_eq!(url.host, "api.example.com");
794        assert_eq!(url.port, Some(8443));
795        assert_eq!(url.path, Some("/events".to_string()));
796    }
797
798    #[test]
799    fn test_parse_with_query_params() {
800        let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
801        assert_eq!(url.scheme, "http");
802        assert_eq!(url.host, "api.example.com");
803        assert_eq!(url.path, Some("/data".to_string()));
804        assert_eq!(url.query_params.len(), 2);
805        assert_eq!(
806            url.query_params[0],
807            ("key".to_string(), "value".to_string())
808        );
809        assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
810    }
811
812    #[test]
813    fn test_default_ports() {
814        let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
815        assert_eq!(mqtt.default_port(), Some(1883));
816        assert_eq!(mqtt.effective_port(), Some(1883));
817
818        let https = ConnectorUrl::parse("https://api.example.com").unwrap();
819        assert_eq!(https.default_port(), Some(443));
820        assert_eq!(https.effective_port(), Some(443));
821    }
822
823    #[test]
824    fn test_is_secure() {
825        assert!(ConnectorUrl::parse("mqtts://broker.local")
826            .unwrap()
827            .is_secure());
828        assert!(ConnectorUrl::parse("https://api.example.com")
829            .unwrap()
830            .is_secure());
831        assert!(ConnectorUrl::parse("wss://ws.example.com")
832            .unwrap()
833            .is_secure());
834
835        assert!(!ConnectorUrl::parse("mqtt://broker.local")
836            .unwrap()
837            .is_secure());
838        assert!(!ConnectorUrl::parse("http://api.example.com")
839            .unwrap()
840            .is_secure());
841        assert!(!ConnectorUrl::parse("ws://ws.example.com")
842            .unwrap()
843            .is_secure());
844    }
845
846    #[test]
847    fn test_display_hides_password() {
848        let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
849        let display = format!("{}", url);
850        assert!(display.contains("user:****"));
851        assert!(!display.contains("secret"));
852    }
853
854    #[test]
855    fn test_parse_kafka_style() {
856        let url =
857            ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
858        assert_eq!(url.scheme, "kafka");
859        // Note: Our simple parser doesn't handle the second port in comma-separated hosts perfectly
860        // It parses "broker1.local:9092,broker2.local" as the host and "9092" as the port
861        // This is acceptable for now - production connectors can handle this in their client factories
862        assert!(url.host.contains("broker1.local"));
863        assert!(url.host.contains("broker2.local"));
864        assert_eq!(url.path, Some("/my-topic".to_string()));
865    }
866
867    #[test]
868    fn test_parse_missing_scheme() {
869        let result = ConnectorUrl::parse("broker.example.com:1883");
870        assert!(result.is_err());
871    }
872}