aimdb_core/connector.rs
1//! Connector infrastructure for external protocol integration
2//!
3//! Provides the `.link()` builder API for ergonomic connector setup with
4//! automatic client lifecycle management. Connectors bridge AimDB records
5//! to external systems (MQTT, Kafka, HTTP, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **User Extensions**: Connector implementations are provided by users
10//! - **Shared Clients**: Single client instance shared across tasks (Arc or static)
11//! - **No Buffering**: Direct access to protocol clients, no intermediate queues
12//! - **Type Safety**: Compile-time guarantees via typed handlers
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use aimdb_core::{RecordConfig, BufferCfg};
18//!
19//! fn weather_alert_record() -> RecordConfig<WeatherAlert> {
20//! RecordConfig::builder()
21//! .buffer(BufferCfg::SingleLatest)
22//! .link_to("mqtt://broker.example.com:1883")
23//! .out::<WeatherAlert>(|reader, mqtt| {
24//! publish_alerts_to_mqtt(reader, mqtt)
25//! })
26//! .build()
27//! }
28//! ```
29
30use core::fmt::{self, Debug};
31use core::future::Future;
32use core::pin::Pin;
33
34extern crate alloc;
35
36use alloc::{
37 boxed::Box,
38 string::{String, ToString},
39 sync::Arc,
40 vec::Vec,
41};
42
43#[cfg(feature = "std")]
44use alloc::format;
45
46use crate::{builder::AimDb, transport::Connector, DbResult};
47
48/// Error that can occur during serialization
49///
50/// Uses an enum instead of String for better performance in `no_std` environments
51/// and to enable defmt logging support in Embassy.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SerializeError {
54 /// Output buffer is too small for the serialized data
55 BufferTooSmall,
56
57 /// Type mismatch in serializer (wrong type passed)
58 TypeMismatch,
59
60 /// Invalid data that cannot be serialized
61 InvalidData,
62}
63
64#[cfg(feature = "defmt")]
65impl defmt::Format for SerializeError {
66 fn format(&self, f: defmt::Formatter) {
67 match self {
68 Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
69 Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
70 Self::InvalidData => defmt::write!(f, "InvalidData"),
71 }
72 }
73}
74
75#[cfg(feature = "std")]
76impl std::fmt::Display for SerializeError {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 match self {
79 Self::BufferTooSmall => write!(f, "Output buffer too small"),
80 Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
81 Self::InvalidData => write!(f, "Invalid data for serialization"),
82 }
83 }
84}
85
86#[cfg(feature = "std")]
87impl std::error::Error for SerializeError {}
88
89/// Type alias for serializer callbacks (reduces type complexity)
90///
91/// Requires the `alloc` feature for `Arc` and `Vec` (available in both std and no_std+alloc).
92/// Serializers convert record values to bytes for publishing to external systems.
93///
94/// # Current Implementation
95///
96/// Returns `Vec<u8>` which requires heap allocation. This works in:
97/// - ✅ `std` environments (full standard library)
98/// - ✅ `no_std + alloc` environments (embedded with allocator, e.g., ESP32, STM32 with heap)
99/// - ❌ `no_std` without `alloc` (bare-metal MCUs without allocator)
100///
101/// # Future Considerations
102///
103/// For zero-allocation embedded environments, future versions may support buffer-based
104/// serialization using `&mut [u8]` output or static lifetime slices.
105pub type SerializerFn =
106 Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
107
108/// Parsed connector URL with protocol, host, port, and credentials
109///
110/// Supports multiple protocol schemes:
111/// - MQTT: `mqtt://host:port`, `mqtts://host:port`
112/// - Kafka: `kafka://broker1:port,broker2:port/topic`
113/// - HTTP: `http://host:port/path`, `https://host:port/path`
114/// - WebSocket: `ws://host:port/path`, `wss://host:port/path`
115#[derive(Clone, Debug, PartialEq)]
116pub struct ConnectorUrl {
117 /// Protocol scheme (mqtt, mqtts, kafka, http, https, ws, wss)
118 pub scheme: String,
119
120 /// Host or comma-separated list of hosts (for Kafka)
121 pub host: String,
122
123 /// Port number (optional, protocol-specific defaults)
124 pub port: Option<u16>,
125
126 /// Path component (for HTTP/WebSocket)
127 pub path: Option<String>,
128
129 /// Username for authentication (optional)
130 pub username: Option<String>,
131
132 /// Password for authentication (optional)
133 pub password: Option<String>,
134
135 /// Query parameters (optional, parsed from URL)
136 pub query_params: Vec<(String, String)>,
137}
138
139impl ConnectorUrl {
140 /// Parses a connector URL string
141 ///
142 /// # Supported Formats
143 ///
144 /// - `mqtt://host:port`
145 /// - `mqtt://user:pass@host:port`
146 /// - `mqtts://host:port` (TLS)
147 /// - `kafka://broker1:9092,broker2:9092/topic`
148 /// - `http://host:port/path`
149 /// - `https://host:port/path?key=value`
150 /// - `ws://host:port/mqtt` (WebSocket)
151 /// - `wss://host:port/mqtt` (WebSocket Secure)
152 ///
153 /// # Example
154 ///
155 /// ```rust
156 /// use aimdb_core::connector::ConnectorUrl;
157 ///
158 /// let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
159 /// assert_eq!(url.scheme, "mqtt");
160 /// assert_eq!(url.host, "broker.example.com");
161 /// assert_eq!(url.port, Some(1883));
162 /// assert_eq!(url.username, Some("user".to_string()));
163 /// ```
164 pub fn parse(url: &str) -> DbResult<Self> {
165 parse_connector_url(url)
166 }
167
168 /// Returns the default port for this protocol scheme
169 pub fn default_port(&self) -> Option<u16> {
170 match self.scheme.as_str() {
171 "mqtt" | "ws" => Some(1883),
172 "mqtts" | "wss" => Some(8883),
173 "kafka" => Some(9092),
174 "http" => Some(80),
175 "https" => Some(443),
176 _ => None,
177 }
178 }
179
180 /// Returns the effective port (explicit or default)
181 pub fn effective_port(&self) -> Option<u16> {
182 self.port.or_else(|| self.default_port())
183 }
184
185 /// Returns true if this is a secure connection (TLS)
186 pub fn is_secure(&self) -> bool {
187 matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
188 }
189
190 /// Returns the URL scheme (protocol)
191 pub fn scheme(&self) -> &str {
192 &self.scheme
193 }
194
195 /// Returns the path component, or "/" if not specified
196 pub fn path(&self) -> &str {
197 self.path.as_deref().unwrap_or("/")
198 }
199
200 /// Returns the resource identifier for protocols where the URL specifies a topic/key
201 ///
202 /// This is designed for the simplified connector model where each connector manages
203 /// a single broker/server connection, and URLs only specify the resource (topic, key, path).
204 ///
205 /// # Examples
206 ///
207 /// - `mqtt://commands/temperature` → `"commands/temperature"` (topic)
208 /// - `mqtt://sensors/temp` → `"sensors/temp"` (topic)
209 /// - `kafka://events` → `"events"` (topic)
210 ///
211 /// The format is `scheme://resource` where resource = host + path combined.
212 pub fn resource_id(&self) -> String {
213 let path = self.path().trim_start_matches('/');
214
215 // Combine host and path to form the complete resource identifier
216 // For mqtt://commands/temperature: host="commands", path="/temperature"
217 // Result: "commands/temperature"
218 if !self.host.is_empty() && !path.is_empty() {
219 alloc::format!("{}/{}", self.host, path)
220 } else if !self.host.is_empty() {
221 self.host.clone()
222 } else if !path.is_empty() {
223 path.to_string()
224 } else {
225 String::new()
226 }
227 }
228}
229
230impl fmt::Display for ConnectorUrl {
231 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232 write!(f, "{}://", self.scheme)?;
233
234 if let Some(ref username) = self.username {
235 write!(f, "{}", username)?;
236 if self.password.is_some() {
237 write!(f, ":****")?; // Don't expose password in Display
238 }
239 write!(f, "@")?;
240 }
241
242 write!(f, "{}", self.host)?;
243
244 if let Some(port) = self.port {
245 write!(f, ":{}", port)?;
246 }
247
248 if let Some(ref path) = self.path {
249 if !path.starts_with('/') {
250 write!(f, "/")?;
251 }
252 write!(f, "{}", path)?;
253 }
254
255 Ok(())
256 }
257}
258
259/// Connector client types (type-erased for storage)
260///
261/// This enum allows storing different connector client types in a unified way.
262/// Actual protocol implementations will downcast to their concrete types.
263///
264/// # Design Note
265///
266/// This is intentionally minimal - actual client types are defined by
267/// user extensions. The core only provides the infrastructure.
268///
269/// Works in both `std` and `no_std` (with `alloc`) environments.
270#[derive(Clone)]
271pub enum ConnectorClient {
272 /// MQTT client (protocol-specific, user-provided)
273 Mqtt(Arc<dyn core::any::Any + Send + Sync>),
274
275 /// Kafka producer (protocol-specific, user-provided)
276 Kafka(Arc<dyn core::any::Any + Send + Sync>),
277
278 /// HTTP client (protocol-specific, user-provided)
279 Http(Arc<dyn core::any::Any + Send + Sync>),
280
281 /// Generic connector for custom protocols
282 Generic {
283 protocol: String,
284 client: Arc<dyn core::any::Any + Send + Sync>,
285 },
286}
287
288impl Debug for ConnectorClient {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 match self {
291 ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
292 ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
293 ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
294 ConnectorClient::Generic { protocol, .. } => {
295 write!(f, "ConnectorClient::Generic({})", protocol)
296 }
297 }
298 }
299}
300
301impl ConnectorClient {
302 /// Downcasts to a concrete client type
303 ///
304 /// # Example
305 ///
306 /// ```rust,ignore
307 /// use rumqttc::AsyncClient;
308 ///
309 /// if let Some(mqtt_client) = connector.downcast_ref::<Arc<AsyncClient>>() {
310 /// // Use the MQTT client
311 /// }
312 /// ```
313 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
314 match self {
315 ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
316 ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
317 ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
318 ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
319 }
320 }
321}
322
323/// Configuration for a connector link
324///
325/// Stores the parsed URL and configuration until the record is built.
326/// The actual client creation and handler spawning happens during the build phase.
327#[derive(Clone)]
328pub struct ConnectorLink {
329 /// Parsed connector URL
330 pub url: ConnectorUrl,
331
332 /// Additional configuration options (protocol-specific)
333 pub config: Vec<(String, String)>,
334
335 /// Serialization callback that converts record values to bytes for publishing
336 ///
337 /// This is a type-erased function that takes `&dyn Any` and returns `Result<Vec<u8>, String>`.
338 /// The connector implementation will downcast to the concrete type and call the serializer.
339 ///
340 /// If `None`, the connector must provide a default serialization mechanism or fail.
341 ///
342 /// Available in both `std` and `no_std` (with `alloc` feature) environments.
343 pub serializer: Option<SerializerFn>,
344
345 /// Consumer factory callback (alloc feature)
346 ///
347 /// Creates ConsumerTrait from Arc<AimDb<R>> to enable type-safe subscription.
348 /// The factory captures the record type T at link_to() configuration time,
349 /// allowing the connector to subscribe without knowing T at compile time.
350 ///
351 /// Mirrors the producer_factory pattern used for inbound connectors.
352 ///
353 /// Available in both `std` and `no_std + alloc` environments.
354 #[cfg(feature = "alloc")]
355 pub consumer_factory: Option<ConsumerFactoryFn>,
356}
357
358impl Debug for ConnectorLink {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 f.debug_struct("ConnectorLink")
361 .field("url", &self.url)
362 .field("config", &self.config)
363 .field(
364 "serializer",
365 &self.serializer.as_ref().map(|_| "<function>"),
366 )
367 .field(
368 "consumer_factory",
369 #[cfg(feature = "alloc")]
370 &self.consumer_factory.as_ref().map(|_| "<function>"),
371 #[cfg(not(feature = "alloc"))]
372 &None::<()>,
373 )
374 .finish()
375 }
376}
377
378impl ConnectorLink {
379 /// Creates a new connector link from a URL
380 pub fn new(url: ConnectorUrl) -> Self {
381 Self {
382 url,
383 config: Vec::new(),
384 serializer: None,
385 #[cfg(feature = "alloc")]
386 consumer_factory: None,
387 }
388 }
389
390 /// Adds a configuration option
391 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
392 self.config.push((key.into(), value.into()));
393 self
394 }
395
396 /// Creates a consumer using the stored factory (alloc feature)
397 ///
398 /// Takes an Arc<dyn Any> (which should contain Arc<AimDb<R>>) and invokes
399 /// the consumer factory to create a ConsumerTrait instance.
400 ///
401 /// Returns None if no factory is configured.
402 ///
403 /// Available in both `std` and `no_std + alloc` environments.
404 #[cfg(feature = "alloc")]
405 pub fn create_consumer(
406 &self,
407 db_any: Arc<dyn core::any::Any + Send + Sync>,
408 ) -> Option<Box<dyn ConsumerTrait>> {
409 self.consumer_factory.as_ref().map(|f| f(db_any))
410 }
411}
412
413/// Type alias for type-erased deserializer callbacks
414///
415/// Converts raw bytes to a boxed Any that can be downcast to the concrete type.
416/// This allows storing deserializers for different types in a unified collection.
417pub type DeserializerFn =
418 Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
419
420/// Type alias for producer factory callback (alloc feature)
421///
422/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
423/// This allows capturing the record type T at link_from() time while storing
424/// the factory in a type-erased InboundConnectorLink.
425///
426/// Available in both `std` and `no_std + alloc` environments.
427#[cfg(feature = "alloc")]
428pub type ProducerFactoryFn =
429 Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait> + Send + Sync>;
430
431/// Type-erased producer trait for MQTT router
432///
433/// Allows the router to call produce() on different record types without knowing
434/// the concrete type at compile time. The value is passed as Box<dyn Any> and
435/// downcast to the correct type inside the implementation.
436///
437/// # Implementation Note
438///
439/// This trait uses manual futures instead of `#[async_trait]` to enable `no_std`
440/// compatibility. The `async_trait` macro generates code that depends on `std`,
441/// while manual `Pin<Box<dyn Future>>` works in both `std` and `no_std + alloc`.
442pub trait ProducerTrait: Send + Sync {
443 /// Produce a value into the record's buffer
444 ///
445 /// The value must be passed as Box<dyn Any> and will be downcast to the correct type.
446 /// Returns an error if the downcast fails or if production fails.
447 fn produce_any<'a>(
448 &'a self,
449 value: Box<dyn core::any::Any + Send>,
450 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
451}
452
453/// Type alias for consumer factory callback (alloc feature)
454///
455/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ConsumerTrait.
456/// This allows capturing the record type T at link_to() time while storing
457/// the factory in a type-erased ConnectorLink.
458///
459/// Mirrors the ProducerFactoryFn pattern for symmetry between inbound and outbound.
460///
461/// Available in both `std` and `no_std + alloc` environments.
462#[cfg(feature = "alloc")]
463pub type ConsumerFactoryFn =
464 Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ConsumerTrait> + Send + Sync>;
465
466/// Type-erased consumer trait for outbound routing
467///
468/// Mirrors ProducerTrait but for consumption. Allows connectors to subscribe
469/// to typed values without knowing the concrete type T at compile time.
470///
471/// # Implementation Note
472///
473/// Like ProducerTrait, this uses manual futures instead of `#[async_trait]`
474/// to enable `no_std` compatibility.
475pub trait ConsumerTrait: Send + Sync {
476 /// Subscribe to typed values from this record
477 ///
478 /// Returns a type-erased reader that can be polled for Box<dyn Any> values.
479 /// The connector will downcast to the expected type after deserialization.
480 fn subscribe_any<'a>(&'a self) -> SubscribeAnyFuture<'a>;
481}
482
483/// Type alias for the future returned by `ConsumerTrait::subscribe_any`
484type SubscribeAnyFuture<'a> =
485 Pin<Box<dyn Future<Output = DbResult<Box<dyn AnyReader>>> + Send + 'a>>;
486
487/// Type alias for the future returned by `AnyReader::recv_any`
488type RecvAnyFuture<'a> =
489 Pin<Box<dyn Future<Output = DbResult<Box<dyn core::any::Any + Send>>> + Send + 'a>>;
490
491/// Helper trait for type-erased reading
492///
493/// Allows reading values from a buffer without knowing the concrete type at compile time.
494/// The value is returned as Box<dyn Any> and must be downcast by the caller.
495pub trait AnyReader: Send {
496 /// Receive a type-erased value from the buffer
497 ///
498 /// Returns Box<dyn Any> which must be downcast to the concrete type.
499 /// Returns an error if the buffer is closed or an I/O error occurs.
500 fn recv_any<'a>(&'a mut self) -> RecvAnyFuture<'a>;
501}
502
503/// Configuration for an inbound connector link (External → AimDB)
504///
505/// Stores the parsed URL, configuration, deserializer, and a producer creation callback.
506/// The callback captures the type T at creation time, allowing type-safe producer creation
507/// later without needing PhantomData or type parameters.
508pub struct InboundConnectorLink {
509 /// Parsed connector URL
510 pub url: ConnectorUrl,
511
512 /// Additional configuration options (protocol-specific)
513 pub config: Vec<(String, String)>,
514
515 /// Deserialization callback that converts bytes to typed values
516 ///
517 /// This is a type-erased function that takes `&[u8]` and returns
518 /// `Result<Box<dyn Any + Send>, String>`. The spawned task will
519 /// downcast to the concrete type before producing.
520 ///
521 /// Available in both `std` and `no_std` (with `alloc` feature) environments.
522 pub deserializer: DeserializerFn,
523
524 /// Producer creation callback (alloc feature)
525 ///
526 /// Takes Arc<AimDb<R>> and returns Box<dyn ProducerTrait>.
527 /// Captures the record type T at link_from() call time.
528 ///
529 /// Available in both `std` and `no_std + alloc` environments.
530 #[cfg(feature = "alloc")]
531 pub producer_factory: Option<ProducerFactoryFn>,
532}
533
534impl Clone for InboundConnectorLink {
535 fn clone(&self) -> Self {
536 Self {
537 url: self.url.clone(),
538 config: self.config.clone(),
539 deserializer: self.deserializer.clone(),
540 #[cfg(feature = "alloc")]
541 producer_factory: self.producer_factory.clone(),
542 }
543 }
544}
545
546impl Debug for InboundConnectorLink {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 f.debug_struct("InboundConnectorLink")
549 .field("url", &self.url)
550 .field("config", &self.config)
551 .field("deserializer", &"<function>")
552 .finish()
553 }
554}
555
556impl InboundConnectorLink {
557 /// Creates a new inbound connector link from a URL and deserializer
558 pub fn new(url: ConnectorUrl, deserializer: DeserializerFn) -> Self {
559 Self {
560 url,
561 config: Vec::new(),
562 deserializer,
563 #[cfg(feature = "alloc")]
564 producer_factory: None,
565 }
566 }
567
568 /// Sets the producer factory callback (alloc feature)
569 ///
570 /// Available in both `std` and `no_std + alloc` environments.
571 #[cfg(feature = "alloc")]
572 pub fn with_producer_factory<F>(mut self, factory: F) -> Self
573 where
574 F: Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait>
575 + Send
576 + Sync
577 + 'static,
578 {
579 self.producer_factory = Some(Arc::new(factory));
580 self
581 }
582
583 /// Creates a producer using the stored factory (alloc feature)
584 ///
585 /// Available in both `std` and `no_std + alloc` environments.
586 #[cfg(feature = "alloc")]
587 pub fn create_producer(
588 &self,
589 db_any: Arc<dyn core::any::Any + Send + Sync>,
590 ) -> Option<Box<dyn ProducerTrait>> {
591 self.producer_factory.as_ref().map(|f| f(db_any))
592 }
593}
594
595/// Configuration for an outbound connector link (AimDB → External)
596pub struct OutboundConnectorLink {
597 pub url: ConnectorUrl,
598 pub config: Vec<(String, String)>,
599}
600
601/// Parses a connector URL string into structured components
602///
603/// This is a simple parser that handles the most common URL formats.
604/// For production use, consider using the `url` crate with feature flags.
605fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
606 use crate::DbError;
607
608 // Split scheme from rest
609 let (scheme, rest) = url.split_once("://").ok_or({
610 #[cfg(feature = "std")]
611 {
612 DbError::InvalidOperation {
613 operation: "parse_connector_url".into(),
614 reason: format!("Missing scheme in URL: {}", url),
615 }
616 }
617 #[cfg(not(feature = "std"))]
618 {
619 DbError::InvalidOperation {
620 _operation: (),
621 _reason: (),
622 }
623 }
624 })?;
625
626 // Extract credentials if present (user:pass@host)
627 let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
628 let creds = &rest[..at_idx];
629 let host = &rest[at_idx + 1..];
630 (Some(creds), host)
631 } else {
632 (None, rest)
633 };
634
635 let (username, password) = if let Some(creds) = credentials {
636 if let Some((user, pass)) = creds.split_once(':') {
637 (Some(user.to_string()), Some(pass.to_string()))
638 } else {
639 (Some(creds.to_string()), None)
640 }
641 } else {
642 (None, None)
643 };
644
645 // Split path and query from host:port
646 let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
647 let hp = &host_part[..slash_idx];
648 let path_query = &host_part[slash_idx..];
649
650 // Split query parameters
651 let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
652 (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
653 } else {
654 (path_query, None)
655 };
656
657 // Parse query parameters
658 let params = if let Some(query) = query_part {
659 query
660 .split('&')
661 .filter_map(|pair| {
662 let (k, v) = pair.split_once('=')?;
663 Some((k.to_string(), v.to_string()))
664 })
665 .collect()
666 } else {
667 Vec::new()
668 };
669
670 (hp, Some(path_part.to_string()), params)
671 } else {
672 (host_part, None, Vec::new())
673 };
674
675 // Split host and port
676 let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
677 let h = &host_port[..colon_idx];
678 let p = &host_port[colon_idx + 1..];
679 let port_num = p.parse::<u16>().ok();
680 (h.to_string(), port_num)
681 } else {
682 (host_port.to_string(), None)
683 };
684
685 Ok(ConnectorUrl {
686 scheme: scheme.to_string(),
687 host,
688 port,
689 path,
690 username,
691 password,
692 query_params,
693 })
694}
695
696/// Trait for building connectors after the database is constructed
697///
698/// Connectors that need to collect routes from the database (for inbound routing)
699/// implement this trait. The builder pattern allows connectors to be constructed
700/// in two phases:
701///
702/// 1. Configuration phase: User provides broker URLs and settings
703/// 2. Build phase: Connector collects routes from the database and initializes
704///
705/// # Example
706///
707/// ```rust,ignore
708/// pub struct MqttConnectorBuilder {
709/// broker_url: String,
710/// }
711///
712/// impl<R> ConnectorBuilder<R> for MqttConnectorBuilder
713/// where
714/// R: aimdb_executor::Spawn + 'static,
715/// {
716/// fn build<'a>(
717/// &'a self,
718/// db: &'a AimDb<R>,
719/// ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>> {
720/// Box::pin(async move {
721/// let routes = db.collect_inbound_routes(self.scheme());
722/// let router = RouterBuilder::from_routes(routes).build();
723/// let connector = MqttConnector::new(&self.broker_url, router).await?;
724/// Ok(Arc::new(connector) as Arc<dyn Connector>)
725/// })
726/// }
727///
728/// fn scheme(&self) -> &str {
729/// "mqtt"
730/// }
731/// }
732/// ```
733pub trait ConnectorBuilder<R>: Send + Sync
734where
735 R: aimdb_executor::Spawn + 'static,
736{
737 /// Build the connector using the database
738 ///
739 /// This method is called during `AimDbBuilder::build()` after the database
740 /// has been constructed. The builder can use the database to:
741 /// - Collect inbound routes via `db.collect_inbound_routes()`
742 /// - Access database configuration
743 /// - Register subscriptions
744 ///
745 /// # Arguments
746 /// * `db` - The constructed database instance
747 ///
748 /// # Returns
749 /// An `Arc<dyn Connector>` that will be registered with the database
750 #[allow(clippy::type_complexity)]
751 fn build<'a>(
752 &'a self,
753 db: &'a AimDb<R>,
754 ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
755
756 /// The URL scheme this connector handles
757 ///
758 /// Returns the scheme (e.g., "mqtt", "kafka", "http") that this connector
759 /// will be registered under. Used for routing `.link_from()` and `.link_to()`
760 /// declarations to the appropriate connector.
761 fn scheme(&self) -> &str;
762}
763
764#[cfg(test)]
765mod tests {
766 use super::*;
767 use alloc::format;
768
769 #[test]
770 fn test_parse_simple_mqtt() {
771 let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
772 assert_eq!(url.scheme, "mqtt");
773 assert_eq!(url.host, "broker.example.com");
774 assert_eq!(url.port, Some(1883));
775 assert_eq!(url.username, None);
776 assert_eq!(url.password, None);
777 }
778
779 #[test]
780 fn test_parse_mqtt_with_credentials() {
781 let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
782 assert_eq!(url.scheme, "mqtt");
783 assert_eq!(url.host, "broker.example.com");
784 assert_eq!(url.port, Some(1883));
785 assert_eq!(url.username, Some("user".to_string()));
786 assert_eq!(url.password, Some("pass".to_string()));
787 }
788
789 #[test]
790 fn test_parse_https_with_path() {
791 let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
792 assert_eq!(url.scheme, "https");
793 assert_eq!(url.host, "api.example.com");
794 assert_eq!(url.port, Some(8443));
795 assert_eq!(url.path, Some("/events".to_string()));
796 }
797
798 #[test]
799 fn test_parse_with_query_params() {
800 let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
801 assert_eq!(url.scheme, "http");
802 assert_eq!(url.host, "api.example.com");
803 assert_eq!(url.path, Some("/data".to_string()));
804 assert_eq!(url.query_params.len(), 2);
805 assert_eq!(
806 url.query_params[0],
807 ("key".to_string(), "value".to_string())
808 );
809 assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
810 }
811
812 #[test]
813 fn test_default_ports() {
814 let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
815 assert_eq!(mqtt.default_port(), Some(1883));
816 assert_eq!(mqtt.effective_port(), Some(1883));
817
818 let https = ConnectorUrl::parse("https://api.example.com").unwrap();
819 assert_eq!(https.default_port(), Some(443));
820 assert_eq!(https.effective_port(), Some(443));
821 }
822
823 #[test]
824 fn test_is_secure() {
825 assert!(ConnectorUrl::parse("mqtts://broker.local")
826 .unwrap()
827 .is_secure());
828 assert!(ConnectorUrl::parse("https://api.example.com")
829 .unwrap()
830 .is_secure());
831 assert!(ConnectorUrl::parse("wss://ws.example.com")
832 .unwrap()
833 .is_secure());
834
835 assert!(!ConnectorUrl::parse("mqtt://broker.local")
836 .unwrap()
837 .is_secure());
838 assert!(!ConnectorUrl::parse("http://api.example.com")
839 .unwrap()
840 .is_secure());
841 assert!(!ConnectorUrl::parse("ws://ws.example.com")
842 .unwrap()
843 .is_secure());
844 }
845
846 #[test]
847 fn test_display_hides_password() {
848 let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
849 let display = format!("{}", url);
850 assert!(display.contains("user:****"));
851 assert!(!display.contains("secret"));
852 }
853
854 #[test]
855 fn test_parse_kafka_style() {
856 let url =
857 ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
858 assert_eq!(url.scheme, "kafka");
859 // Note: Our simple parser doesn't handle the second port in comma-separated hosts perfectly
860 // It parses "broker1.local:9092,broker2.local" as the host and "9092" as the port
861 // This is acceptable for now - production connectors can handle this in their client factories
862 assert!(url.host.contains("broker1.local"));
863 assert!(url.host.contains("broker2.local"));
864 assert_eq!(url.path, Some("/my-topic".to_string()));
865 }
866
867 #[test]
868 fn test_parse_missing_scheme() {
869 let result = ConnectorUrl::parse("broker.example.com:1883");
870 assert!(result.is_err());
871 }
872}