aimdb_core/
connector.rs

1//! Connector infrastructure for external protocol integration
2//!
3//! Provides the `.link()` builder API for ergonomic connector setup with
4//! automatic client lifecycle management. Connectors bridge AimDB records
5//! to external systems (MQTT, Kafka, HTTP, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **User Extensions**: Connector implementations are provided by users
10//! - **Shared Clients**: Single client instance shared across tasks (Arc or static)
11//! - **No Buffering**: Direct access to protocol clients, no intermediate queues
12//! - **Type Safety**: Compile-time guarantees via typed handlers
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use aimdb_core::{RecordConfig, BufferCfg};
18//!
19//! fn weather_alert_record() -> RecordConfig<WeatherAlert> {
20//!     RecordConfig::builder()
21//!         .buffer(BufferCfg::SingleLatest)
22//!         .link("mqtt://broker.example.com:1883")
23//!             .out::<WeatherAlert>(|reader, mqtt| {
24//!                 publish_alerts_to_mqtt(reader, mqtt)
25//!             })
26//!         .build()
27//! }
28//! ```
29
30use core::fmt::{self, Debug};
31
32extern crate alloc;
33
34use alloc::{
35    string::{String, ToString},
36    sync::Arc,
37    vec::Vec,
38};
39
40#[cfg(feature = "std")]
41use alloc::format;
42
43use crate::DbResult;
44
45/// Error that can occur during serialization
46///
47/// Uses an enum instead of String for better performance in `no_std` environments
48/// and to enable defmt logging support in Embassy.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum SerializeError {
51    /// Output buffer is too small for the serialized data
52    BufferTooSmall,
53
54    /// Type mismatch in serializer (wrong type passed)
55    TypeMismatch,
56
57    /// Invalid data that cannot be serialized
58    InvalidData,
59}
60
61#[cfg(feature = "defmt")]
62impl defmt::Format for SerializeError {
63    fn format(&self, f: defmt::Formatter) {
64        match self {
65            Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
66            Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
67            Self::InvalidData => defmt::write!(f, "InvalidData"),
68        }
69    }
70}
71
72#[cfg(feature = "std")]
73impl std::fmt::Display for SerializeError {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            Self::BufferTooSmall => write!(f, "Output buffer too small"),
77            Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
78            Self::InvalidData => write!(f, "Invalid data for serialization"),
79        }
80    }
81}
82
83#[cfg(feature = "std")]
84impl std::error::Error for SerializeError {}
85
86/// Type alias for serializer callbacks (reduces type complexity)
87///
88/// Requires the `alloc` feature for `Arc` and `Vec` (available in both std and no_std+alloc).
89/// Serializers convert record values to bytes for publishing to external systems.
90///
91/// # Current Implementation
92///
93/// Returns `Vec<u8>` which requires heap allocation. This works in:
94/// - ✅ `std` environments (full standard library)
95/// - ✅ `no_std + alloc` environments (embedded with allocator, e.g., ESP32, STM32 with heap)
96/// - ❌ `no_std` without `alloc` (bare-metal MCUs without allocator)
97///
98/// # Future Considerations
99///
100/// For zero-allocation embedded environments, future versions may support buffer-based
101/// serialization using `&mut [u8]` output or static lifetime slices.
102pub type SerializerFn =
103    Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
104
105/// Parsed connector URL with protocol, host, port, and credentials
106///
107/// Supports multiple protocol schemes:
108/// - MQTT: `mqtt://host:port`, `mqtts://host:port`
109/// - Kafka: `kafka://broker1:port,broker2:port/topic`
110/// - HTTP: `http://host:port/path`, `https://host:port/path`
111/// - WebSocket: `ws://host:port/path`, `wss://host:port/path`
112#[derive(Clone, Debug, PartialEq)]
113pub struct ConnectorUrl {
114    /// Protocol scheme (mqtt, mqtts, kafka, http, https, ws, wss)
115    pub scheme: String,
116
117    /// Host or comma-separated list of hosts (for Kafka)
118    pub host: String,
119
120    /// Port number (optional, protocol-specific defaults)
121    pub port: Option<u16>,
122
123    /// Path component (for HTTP/WebSocket)
124    pub path: Option<String>,
125
126    /// Username for authentication (optional)
127    pub username: Option<String>,
128
129    /// Password for authentication (optional)
130    pub password: Option<String>,
131
132    /// Query parameters (optional, parsed from URL)
133    pub query_params: Vec<(String, String)>,
134}
135
136impl ConnectorUrl {
137    /// Parses a connector URL string
138    ///
139    /// # Supported Formats
140    ///
141    /// - `mqtt://host:port`
142    /// - `mqtt://user:pass@host:port`
143    /// - `mqtts://host:port` (TLS)
144    /// - `kafka://broker1:9092,broker2:9092/topic`
145    /// - `http://host:port/path`
146    /// - `https://host:port/path?key=value`
147    /// - `ws://host:port/mqtt` (WebSocket)
148    /// - `wss://host:port/mqtt` (WebSocket Secure)
149    ///
150    /// # Example
151    ///
152    /// ```rust
153    /// use aimdb_core::connector::ConnectorUrl;
154    ///
155    /// let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
156    /// assert_eq!(url.scheme, "mqtt");
157    /// assert_eq!(url.host, "broker.example.com");
158    /// assert_eq!(url.port, Some(1883));
159    /// assert_eq!(url.username, Some("user".to_string()));
160    /// ```
161    pub fn parse(url: &str) -> DbResult<Self> {
162        parse_connector_url(url)
163    }
164
165    /// Returns the default port for this protocol scheme
166    pub fn default_port(&self) -> Option<u16> {
167        match self.scheme.as_str() {
168            "mqtt" | "ws" => Some(1883),
169            "mqtts" | "wss" => Some(8883),
170            "kafka" => Some(9092),
171            "http" => Some(80),
172            "https" => Some(443),
173            _ => None,
174        }
175    }
176
177    /// Returns the effective port (explicit or default)
178    pub fn effective_port(&self) -> Option<u16> {
179        self.port.or_else(|| self.default_port())
180    }
181
182    /// Returns true if this is a secure connection (TLS)
183    pub fn is_secure(&self) -> bool {
184        matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
185    }
186
187    /// Returns the URL scheme (protocol)
188    pub fn scheme(&self) -> &str {
189        &self.scheme
190    }
191
192    /// Returns the path component, or "/" if not specified
193    pub fn path(&self) -> &str {
194        self.path.as_deref().unwrap_or("/")
195    }
196}
197
198impl fmt::Display for ConnectorUrl {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        write!(f, "{}://", self.scheme)?;
201
202        if let Some(ref username) = self.username {
203            write!(f, "{}", username)?;
204            if self.password.is_some() {
205                write!(f, ":****")?; // Don't expose password in Display
206            }
207            write!(f, "@")?;
208        }
209
210        write!(f, "{}", self.host)?;
211
212        if let Some(port) = self.port {
213            write!(f, ":{}", port)?;
214        }
215
216        if let Some(ref path) = self.path {
217            if !path.starts_with('/') {
218                write!(f, "/")?;
219            }
220            write!(f, "{}", path)?;
221        }
222
223        Ok(())
224    }
225}
226
227/// Connector client types (type-erased for storage)
228///
229/// This enum allows storing different connector client types in a unified way.
230/// Actual protocol implementations will downcast to their concrete types.
231///
232/// # Design Note
233///
234/// This is intentionally minimal - actual client types are defined by
235/// user extensions. The core only provides the infrastructure.
236///
237/// Works in both `std` and `no_std` (with `alloc`) environments.
238#[derive(Clone)]
239pub enum ConnectorClient {
240    /// MQTT client (protocol-specific, user-provided)
241    Mqtt(Arc<dyn core::any::Any + Send + Sync>),
242
243    /// Kafka producer (protocol-specific, user-provided)
244    Kafka(Arc<dyn core::any::Any + Send + Sync>),
245
246    /// HTTP client (protocol-specific, user-provided)
247    Http(Arc<dyn core::any::Any + Send + Sync>),
248
249    /// Generic connector for custom protocols
250    Generic {
251        protocol: String,
252        client: Arc<dyn core::any::Any + Send + Sync>,
253    },
254}
255
256impl Debug for ConnectorClient {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        match self {
259            ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
260            ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
261            ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
262            ConnectorClient::Generic { protocol, .. } => {
263                write!(f, "ConnectorClient::Generic({})", protocol)
264            }
265        }
266    }
267}
268
269impl ConnectorClient {
270    /// Downcasts to a concrete client type
271    ///
272    /// # Example
273    ///
274    /// ```rust,ignore
275    /// use rumqttc::AsyncClient;
276    ///
277    /// if let Some(mqtt_client) = connector.downcast_ref::<Arc<AsyncClient>>() {
278    ///     // Use the MQTT client
279    /// }
280    /// ```
281    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
282        match self {
283            ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
284            ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
285            ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
286            ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
287        }
288    }
289}
290
291/// Configuration for a connector link
292///
293/// Stores the parsed URL and configuration until the record is built.
294/// The actual client creation and handler spawning happens during the build phase.
295#[derive(Clone)]
296pub struct ConnectorLink {
297    /// Parsed connector URL
298    pub url: ConnectorUrl,
299
300    /// Additional configuration options (protocol-specific)
301    pub config: Vec<(String, String)>,
302
303    /// Serialization callback that converts record values to bytes for publishing
304    ///
305    /// This is a type-erased function that takes `&dyn Any` and returns `Result<Vec<u8>, String>`.
306    /// The connector implementation will downcast to the concrete type and call the serializer.
307    ///
308    /// If `None`, the connector must provide a default serialization mechanism or fail.
309    ///
310    /// Available in both `std` and `no_std` (with `alloc` feature) environments.
311    pub serializer: Option<SerializerFn>,
312}
313
314impl Debug for ConnectorLink {
315    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316        f.debug_struct("ConnectorLink")
317            .field("url", &self.url)
318            .field("config", &self.config)
319            .field(
320                "serializer",
321                &self.serializer.as_ref().map(|_| "<function>"),
322            )
323            .finish()
324    }
325}
326
327impl ConnectorLink {
328    /// Creates a new connector link from a URL
329    pub fn new(url: ConnectorUrl) -> Self {
330        Self {
331            url,
332            config: Vec::new(),
333            serializer: None,
334        }
335    }
336
337    /// Adds a configuration option
338    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
339        self.config.push((key.into(), value.into()));
340        self
341    }
342}
343
344/// Parses a connector URL string into structured components
345///
346/// This is a simple parser that handles the most common URL formats.
347/// For production use, consider using the `url` crate with feature flags.
348fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
349    use crate::DbError;
350
351    // Split scheme from rest
352    let (scheme, rest) = url.split_once("://").ok_or({
353        #[cfg(feature = "std")]
354        {
355            DbError::InvalidOperation {
356                operation: "parse_connector_url".into(),
357                reason: format!("Missing scheme in URL: {}", url),
358            }
359        }
360        #[cfg(not(feature = "std"))]
361        {
362            DbError::InvalidOperation {
363                _operation: (),
364                _reason: (),
365            }
366        }
367    })?;
368
369    // Extract credentials if present (user:pass@host)
370    let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
371        let creds = &rest[..at_idx];
372        let host = &rest[at_idx + 1..];
373        (Some(creds), host)
374    } else {
375        (None, rest)
376    };
377
378    let (username, password) = if let Some(creds) = credentials {
379        if let Some((user, pass)) = creds.split_once(':') {
380            (Some(user.to_string()), Some(pass.to_string()))
381        } else {
382            (Some(creds.to_string()), None)
383        }
384    } else {
385        (None, None)
386    };
387
388    // Split path and query from host:port
389    let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
390        let hp = &host_part[..slash_idx];
391        let path_query = &host_part[slash_idx..];
392
393        // Split query parameters
394        let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
395            (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
396        } else {
397            (path_query, None)
398        };
399
400        // Parse query parameters
401        let params = if let Some(query) = query_part {
402            query
403                .split('&')
404                .filter_map(|pair| {
405                    let (k, v) = pair.split_once('=')?;
406                    Some((k.to_string(), v.to_string()))
407                })
408                .collect()
409        } else {
410            Vec::new()
411        };
412
413        (hp, Some(path_part.to_string()), params)
414    } else {
415        (host_part, None, Vec::new())
416    };
417
418    // Split host and port
419    let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
420        let h = &host_port[..colon_idx];
421        let p = &host_port[colon_idx + 1..];
422        let port_num = p.parse::<u16>().ok();
423        (h.to_string(), port_num)
424    } else {
425        (host_port.to_string(), None)
426    };
427
428    Ok(ConnectorUrl {
429        scheme: scheme.to_string(),
430        host,
431        port,
432        path,
433        username,
434        password,
435        query_params,
436    })
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use alloc::format;
443
444    #[test]
445    fn test_parse_simple_mqtt() {
446        let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
447        assert_eq!(url.scheme, "mqtt");
448        assert_eq!(url.host, "broker.example.com");
449        assert_eq!(url.port, Some(1883));
450        assert_eq!(url.username, None);
451        assert_eq!(url.password, None);
452    }
453
454    #[test]
455    fn test_parse_mqtt_with_credentials() {
456        let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
457        assert_eq!(url.scheme, "mqtt");
458        assert_eq!(url.host, "broker.example.com");
459        assert_eq!(url.port, Some(1883));
460        assert_eq!(url.username, Some("user".to_string()));
461        assert_eq!(url.password, Some("pass".to_string()));
462    }
463
464    #[test]
465    fn test_parse_https_with_path() {
466        let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
467        assert_eq!(url.scheme, "https");
468        assert_eq!(url.host, "api.example.com");
469        assert_eq!(url.port, Some(8443));
470        assert_eq!(url.path, Some("/events".to_string()));
471    }
472
473    #[test]
474    fn test_parse_with_query_params() {
475        let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
476        assert_eq!(url.scheme, "http");
477        assert_eq!(url.host, "api.example.com");
478        assert_eq!(url.path, Some("/data".to_string()));
479        assert_eq!(url.query_params.len(), 2);
480        assert_eq!(
481            url.query_params[0],
482            ("key".to_string(), "value".to_string())
483        );
484        assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
485    }
486
487    #[test]
488    fn test_default_ports() {
489        let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
490        assert_eq!(mqtt.default_port(), Some(1883));
491        assert_eq!(mqtt.effective_port(), Some(1883));
492
493        let https = ConnectorUrl::parse("https://api.example.com").unwrap();
494        assert_eq!(https.default_port(), Some(443));
495        assert_eq!(https.effective_port(), Some(443));
496    }
497
498    #[test]
499    fn test_is_secure() {
500        assert!(ConnectorUrl::parse("mqtts://broker.local")
501            .unwrap()
502            .is_secure());
503        assert!(ConnectorUrl::parse("https://api.example.com")
504            .unwrap()
505            .is_secure());
506        assert!(ConnectorUrl::parse("wss://ws.example.com")
507            .unwrap()
508            .is_secure());
509
510        assert!(!ConnectorUrl::parse("mqtt://broker.local")
511            .unwrap()
512            .is_secure());
513        assert!(!ConnectorUrl::parse("http://api.example.com")
514            .unwrap()
515            .is_secure());
516        assert!(!ConnectorUrl::parse("ws://ws.example.com")
517            .unwrap()
518            .is_secure());
519    }
520
521    #[test]
522    fn test_display_hides_password() {
523        let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
524        let display = format!("{}", url);
525        assert!(display.contains("user:****"));
526        assert!(!display.contains("secret"));
527    }
528
529    #[test]
530    fn test_parse_kafka_style() {
531        let url =
532            ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
533        assert_eq!(url.scheme, "kafka");
534        // Note: Our simple parser doesn't handle the second port in comma-separated hosts perfectly
535        // It parses "broker1.local:9092,broker2.local" as the host and "9092" as the port
536        // This is acceptable for now - production connectors can handle this in their client factories
537        assert!(url.host.contains("broker1.local"));
538        assert!(url.host.contains("broker2.local"));
539        assert_eq!(url.path, Some("/my-topic".to_string()));
540    }
541
542    #[test]
543    fn test_parse_missing_scheme() {
544        let result = ConnectorUrl::parse("broker.example.com:1883");
545        assert!(result.is_err());
546    }
547}