Skip to main content

aimdb_core/
connector.rs

1//! Connector infrastructure for external protocol integration
2//!
3//! Provides the `.link()` builder API for ergonomic connector setup with
4//! automatic client lifecycle management. Connectors bridge AimDB records
5//! to external systems (MQTT, Kafka, HTTP, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **User Extensions**: Connector implementations are provided by users
10//! - **Shared Clients**: Single client instance shared across tasks (Arc or static)
11//! - **No Buffering**: Direct access to protocol clients, no intermediate queues
12//! - **Type Safety**: Compile-time guarantees via typed handlers
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use aimdb_core::{RecordConfig, BufferCfg};
18//!
19//! fn weather_alert_record() -> RecordConfig<WeatherAlert> {
20//!     RecordConfig::builder()
21//!         .buffer(BufferCfg::SingleLatest)
22//!         .link_to("mqtt://broker.example.com:1883")
23//!             .out::<WeatherAlert>(|reader, mqtt| {
24//!                 publish_alerts_to_mqtt(reader, mqtt)
25//!             })
26//!         .build()
27//! }
28//! ```
29
30use core::fmt::{self, Debug};
31use core::future::Future;
32use core::pin::Pin;
33
34extern crate alloc;
35
36use alloc::{
37    boxed::Box,
38    string::{String, ToString},
39    sync::Arc,
40    vec::Vec,
41};
42
43#[cfg(feature = "std")]
44use alloc::format;
45
46use crate::{builder::AimDb, transport::Connector, DbResult};
47
48/// Error that can occur during serialization
49///
50/// Uses an enum instead of String for better performance in `no_std` environments
51/// and to enable defmt logging support in Embassy.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SerializeError {
54    /// Output buffer is too small for the serialized data
55    BufferTooSmall,
56
57    /// Type mismatch in serializer (wrong type passed)
58    TypeMismatch,
59
60    /// Invalid data that cannot be serialized
61    InvalidData,
62}
63
64#[cfg(feature = "defmt")]
65impl defmt::Format for SerializeError {
66    fn format(&self, f: defmt::Formatter) {
67        match self {
68            Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
69            Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
70            Self::InvalidData => defmt::write!(f, "InvalidData"),
71        }
72    }
73}
74
75#[cfg(feature = "std")]
76impl std::fmt::Display for SerializeError {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        match self {
79            Self::BufferTooSmall => write!(f, "Output buffer too small"),
80            Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
81            Self::InvalidData => write!(f, "Invalid data for serialization"),
82        }
83    }
84}
85
86#[cfg(feature = "std")]
87impl std::error::Error for SerializeError {}
88
89/// Type alias for serializer callbacks (reduces type complexity)
90///
91/// Requires the `alloc` feature for `Arc` and `Vec` (available in both std and no_std+alloc).
92/// Serializers convert record values to bytes for publishing to external systems.
93///
94/// # Current Implementation
95///
96/// Returns `Vec<u8>` which requires heap allocation. This works in:
97/// - ✅ `std` environments (full standard library)
98/// - ✅ `no_std + alloc` environments (embedded with allocator, e.g., ESP32, STM32 with heap)
99/// - ❌ `no_std` without `alloc` (bare-metal MCUs without allocator)
100///
101/// # Future Considerations
102///
103/// For zero-allocation embedded environments, future versions may support buffer-based
104/// serialization using `&mut [u8]` output or static lifetime slices.
105pub type SerializerFn =
106    Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
107
108/// Type alias for context-aware type-erased serializer callbacks
109///
110/// Like `SerializerFn`, but receives a type-erased runtime context
111/// for platform-independent timestamps and logging during serialization.
112///
113/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
114/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
115pub type ContextSerializerFn = Arc<
116    dyn Fn(
117            Arc<dyn core::any::Any + Send + Sync>,
118            &dyn core::any::Any,
119        ) -> Result<Vec<u8>, SerializeError>
120        + Send
121        + Sync,
122>;
123
124/// Which serializer variant is registered for an outbound link
125///
126/// Enforces mutual exclusivity between raw value-only serializers
127/// and context-aware serializers.
128#[derive(Clone)]
129pub enum SerializerKind {
130    /// Plain value-only serializer (from `.with_serializer_raw()`)
131    Raw(SerializerFn),
132    /// Context-aware serializer (from `.with_serializer()`)
133    Context(ContextSerializerFn),
134}
135
136// ============================================================================
137// TopicProvider - Dynamic topic/destination routing
138// ============================================================================
139
140/// Trait for dynamic topic providers (outbound only)
141///
142/// Implement this trait to dynamically determine MQTT topics (or KNX group addresses)
143/// based on the data being published. This enables reusable routing logic that
144/// can be shared across multiple record types.
145///
146/// # Type Safety
147///
148/// The trait is generic over `T`, providing compile-time type safety
149/// at the implementation site. Type erasure occurs only at storage time.
150///
151/// # no_std Compatibility
152///
153/// Works in both `std` and `no_std + alloc` environments.
154///
155/// # Example
156///
157/// ```rust,ignore
158/// use aimdb_core::connector::TopicProvider;
159///
160/// struct SensorTopicProvider;
161///
162/// impl TopicProvider<Temperature> for SensorTopicProvider {
163///     fn topic(&self, value: &Temperature) -> Option<String> {
164///         Some(format!("sensors/temp/{}", value.sensor_id))
165///     }
166/// }
167/// ```
168pub trait TopicProvider<T>: Send + Sync {
169    /// Determine the topic/destination for a given value
170    ///
171    /// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
172    /// to the static topic from the `link_to()` URL.
173    fn topic(&self, value: &T) -> Option<String>;
174}
175
176/// Type-erased topic provider trait (internal)
177///
178/// Allows storing providers for different types in a unified collection.
179/// The concrete type is recovered via `Any::downcast_ref()` at runtime.
180pub trait TopicProviderAny: Send + Sync {
181    /// Get the topic for a type-erased value
182    ///
183    /// Returns `None` if the value type doesn't match or if the provider
184    /// returns `None` for this value.
185    fn topic_any(&self, value: &dyn core::any::Any) -> Option<String>;
186}
187
188/// Wrapper struct for type-erasing a `TopicProvider<T>`
189///
190/// This wraps a concrete `TopicProvider<T>` implementation and provides
191/// the `TopicProviderAny` interface for type-erased storage.
192///
193/// Uses `PhantomData<fn(T) -> T>` instead of `PhantomData<T>` to avoid
194/// inheriting Send/Sync bounds from T (the data type isn't stored).
195pub struct TopicProviderWrapper<T, P>
196where
197    T: 'static,
198    P: TopicProvider<T>,
199{
200    provider: P,
201    // Use fn(T) -> T to avoid Send/Sync variance issues with T
202    _phantom: core::marker::PhantomData<fn(T) -> T>,
203}
204
205impl<T, P> TopicProviderWrapper<T, P>
206where
207    T: 'static,
208    P: TopicProvider<T>,
209{
210    /// Create a new wrapper for a topic provider
211    pub fn new(provider: P) -> Self {
212        Self {
213            provider,
214            _phantom: core::marker::PhantomData,
215        }
216    }
217}
218
219impl<T, P> TopicProviderAny for TopicProviderWrapper<T, P>
220where
221    T: 'static,
222    P: TopicProvider<T> + Send + Sync,
223{
224    fn topic_any(&self, value: &dyn core::any::Any) -> Option<String> {
225        value
226            .downcast_ref::<T>()
227            .and_then(|v| self.provider.topic(v))
228    }
229}
230
231/// Type alias for stored topic provider (no_std compatible)
232///
233/// Uses `Arc<dyn TopicProviderAny>` for shared ownership across async tasks.
234pub type TopicProviderFn = Arc<dyn TopicProviderAny>;
235
236/// Parsed connector URL with protocol, host, port, and credentials
237///
238/// Supports multiple protocol schemes:
239/// - MQTT: `mqtt://host:port`, `mqtts://host:port`
240/// - Kafka: `kafka://broker1:port,broker2:port/topic`
241/// - HTTP: `http://host:port/path`, `https://host:port/path`
242/// - WebSocket: `ws://host:port/path`, `wss://host:port/path`
243#[derive(Clone, Debug, PartialEq)]
244pub struct ConnectorUrl {
245    /// Protocol scheme (mqtt, mqtts, kafka, http, https, ws, wss)
246    pub scheme: String,
247
248    /// Host or comma-separated list of hosts (for Kafka)
249    pub host: String,
250
251    /// Port number (optional, protocol-specific defaults)
252    pub port: Option<u16>,
253
254    /// Path component (for HTTP/WebSocket)
255    pub path: Option<String>,
256
257    /// Username for authentication (optional)
258    pub username: Option<String>,
259
260    /// Password for authentication (optional)
261    pub password: Option<String>,
262
263    /// Query parameters (optional, parsed from URL)
264    pub query_params: Vec<(String, String)>,
265}
266
267impl ConnectorUrl {
268    /// Parses a connector URL string
269    ///
270    /// # Supported Formats
271    ///
272    /// - `mqtt://host:port`
273    /// - `mqtt://user:pass@host:port`
274    /// - `mqtts://host:port` (TLS)
275    /// - `kafka://broker1:9092,broker2:9092/topic`
276    /// - `http://host:port/path`
277    /// - `https://host:port/path?key=value`
278    /// - `ws://host:port/mqtt` (WebSocket)
279    /// - `wss://host:port/mqtt` (WebSocket Secure)
280    ///
281    /// # Example
282    ///
283    /// ```rust
284    /// use aimdb_core::connector::ConnectorUrl;
285    ///
286    /// let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
287    /// assert_eq!(url.scheme, "mqtt");
288    /// assert_eq!(url.host, "broker.example.com");
289    /// assert_eq!(url.port, Some(1883));
290    /// assert_eq!(url.username, Some("user".to_string()));
291    /// ```
292    pub fn parse(url: &str) -> DbResult<Self> {
293        parse_connector_url(url)
294    }
295
296    /// Returns the default port for this protocol scheme
297    pub fn default_port(&self) -> Option<u16> {
298        match self.scheme.as_str() {
299            "mqtt" | "ws" => Some(1883),
300            "mqtts" | "wss" => Some(8883),
301            "kafka" => Some(9092),
302            "http" => Some(80),
303            "https" => Some(443),
304            _ => None,
305        }
306    }
307
308    /// Returns the effective port (explicit or default)
309    pub fn effective_port(&self) -> Option<u16> {
310        self.port.or_else(|| self.default_port())
311    }
312
313    /// Returns true if this is a secure connection (TLS)
314    pub fn is_secure(&self) -> bool {
315        matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
316    }
317
318    /// Returns the URL scheme (protocol)
319    pub fn scheme(&self) -> &str {
320        &self.scheme
321    }
322
323    /// Returns the path component, or "/" if not specified
324    pub fn path(&self) -> &str {
325        self.path.as_deref().unwrap_or("/")
326    }
327
328    /// Returns the resource identifier for protocols where the URL specifies a topic/key
329    ///
330    /// This is designed for the simplified connector model where each connector manages
331    /// a single broker/server connection, and URLs only specify the resource (topic, key, path).
332    ///
333    /// # Examples
334    ///
335    /// - `mqtt://commands/temperature` → `"commands/temperature"` (topic)
336    /// - `mqtt://sensors/temp` → `"sensors/temp"` (topic)
337    /// - `kafka://events` → `"events"` (topic)
338    ///
339    /// The format is `scheme://resource` where resource = host + path combined.
340    pub fn resource_id(&self) -> String {
341        let path = self.path().trim_start_matches('/');
342
343        // Combine host and path to form the complete resource identifier
344        // For mqtt://commands/temperature: host="commands", path="/temperature"
345        // Result: "commands/temperature"
346        if !self.host.is_empty() && !path.is_empty() {
347            alloc::format!("{}/{}", self.host, path)
348        } else if !self.host.is_empty() {
349            self.host.clone()
350        } else if !path.is_empty() {
351            path.to_string()
352        } else {
353            String::new()
354        }
355    }
356}
357
358impl fmt::Display for ConnectorUrl {
359    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360        write!(f, "{}://", self.scheme)?;
361
362        if let Some(ref username) = self.username {
363            write!(f, "{}", username)?;
364            if self.password.is_some() {
365                write!(f, ":****")?; // Don't expose password in Display
366            }
367            write!(f, "@")?;
368        }
369
370        write!(f, "{}", self.host)?;
371
372        if let Some(port) = self.port {
373            write!(f, ":{}", port)?;
374        }
375
376        if let Some(ref path) = self.path {
377            if !path.starts_with('/') {
378                write!(f, "/")?;
379            }
380            write!(f, "{}", path)?;
381        }
382
383        Ok(())
384    }
385}
386
387/// Connector client types (type-erased for storage)
388///
389/// This enum allows storing different connector client types in a unified way.
390/// Actual protocol implementations will downcast to their concrete types.
391///
392/// # Design Note
393///
394/// This is intentionally minimal - actual client types are defined by
395/// user extensions. The core only provides the infrastructure.
396///
397/// Works in both `std` and `no_std` (with `alloc`) environments.
398#[derive(Clone)]
399pub enum ConnectorClient {
400    /// MQTT client (protocol-specific, user-provided)
401    Mqtt(Arc<dyn core::any::Any + Send + Sync>),
402
403    /// Kafka producer (protocol-specific, user-provided)
404    Kafka(Arc<dyn core::any::Any + Send + Sync>),
405
406    /// HTTP client (protocol-specific, user-provided)
407    Http(Arc<dyn core::any::Any + Send + Sync>),
408
409    /// Generic connector for custom protocols
410    Generic {
411        protocol: String,
412        client: Arc<dyn core::any::Any + Send + Sync>,
413    },
414}
415
416impl Debug for ConnectorClient {
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        match self {
419            ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
420            ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
421            ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
422            ConnectorClient::Generic { protocol, .. } => {
423                write!(f, "ConnectorClient::Generic({})", protocol)
424            }
425        }
426    }
427}
428
429impl ConnectorClient {
430    /// Downcasts to a concrete client type
431    ///
432    /// # Example
433    ///
434    /// ```rust,ignore
435    /// use rumqttc::AsyncClient;
436    ///
437    /// if let Some(mqtt_client) = connector.downcast_ref::<Arc<AsyncClient>>() {
438    ///     // Use the MQTT client
439    /// }
440    /// ```
441    pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
442        match self {
443            ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
444            ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
445            ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
446            ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
447        }
448    }
449}
450
451/// Configuration for a connector link
452///
453/// Stores the parsed URL and configuration until the record is built.
454/// The actual client creation and handler spawning happens during the build phase.
455#[derive(Clone)]
456pub struct ConnectorLink {
457    /// Parsed connector URL
458    pub url: ConnectorUrl,
459
460    /// Additional configuration options (protocol-specific)
461    pub config: Vec<(String, String)>,
462
463    /// Serialization callback that converts record values to bytes for publishing
464    ///
465    /// Either a plain value-only serializer (`Raw`) or a context-aware
466    /// serializer (`Context`) that receives `RuntimeContext` for timestamps
467    /// and logging.
468    ///
469    /// If `None`, the connector must provide a default serialization mechanism or fail.
470    ///
471    /// Available in both `std` and `no_std` (with `alloc` feature) environments.
472    pub serializer: Option<SerializerKind>,
473
474    /// Consumer factory callback (alloc feature)
475    ///
476    /// Creates ConsumerTrait from Arc<AimDb<R>> to enable type-safe subscription.
477    /// The factory captures the record type T at link_to() configuration time,
478    /// allowing the connector to subscribe without knowing T at compile time.
479    ///
480    /// Mirrors the producer_factory pattern used for inbound connectors.
481    ///
482    /// Available in both `std` and `no_std + alloc` environments.
483    #[cfg(feature = "alloc")]
484    pub consumer_factory: Option<ConsumerFactoryFn>,
485
486    /// Optional dynamic topic provider
487    ///
488    /// When set, the provider is called with each value to determine the
489    /// topic/destination dynamically. If the provider returns `None`,
490    /// the static topic from the URL is used as fallback.
491    ///
492    /// Available in both `std` and `no_std + alloc` environments.
493    pub topic_provider: Option<TopicProviderFn>,
494}
495
496impl Debug for ConnectorLink {
497    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498        f.debug_struct("ConnectorLink")
499            .field("url", &self.url)
500            .field("config", &self.config)
501            .field(
502                "serializer",
503                &self.serializer.as_ref().map(|s| match s {
504                    SerializerKind::Raw(_) => "<raw>",
505                    SerializerKind::Context(_) => "<context>",
506                }),
507            )
508            .field(
509                "consumer_factory",
510                #[cfg(feature = "alloc")]
511                &self.consumer_factory.as_ref().map(|_| "<function>"),
512                #[cfg(not(feature = "alloc"))]
513                &None::<()>,
514            )
515            .field(
516                "topic_provider",
517                &self.topic_provider.as_ref().map(|_| "<function>"),
518            )
519            .finish()
520    }
521}
522
523impl ConnectorLink {
524    /// Creates a new connector link from a URL
525    pub fn new(url: ConnectorUrl) -> Self {
526        Self {
527            url,
528            config: Vec::new(),
529            serializer: None,
530            #[cfg(feature = "alloc")]
531            consumer_factory: None,
532            topic_provider: None,
533        }
534    }
535
536    /// Adds a configuration option
537    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
538        self.config.push((key.into(), value.into()));
539        self
540    }
541
542    /// Creates a consumer using the stored factory (alloc feature)
543    ///
544    /// Takes an Arc<dyn Any> (which should contain Arc<AimDb<R>>) and invokes
545    /// the consumer factory to create a ConsumerTrait instance.
546    ///
547    /// Returns None if no factory is configured.
548    ///
549    /// Available in both `std` and `no_std + alloc` environments.
550    #[cfg(feature = "alloc")]
551    pub fn create_consumer(
552        &self,
553        db_any: Arc<dyn core::any::Any + Send + Sync>,
554    ) -> Option<Box<dyn ConsumerTrait>> {
555        self.consumer_factory.as_ref().map(|f| f(db_any))
556    }
557}
558
559/// Type alias for type-erased deserializer callbacks
560///
561/// Converts raw bytes to a boxed Any that can be downcast to the concrete type.
562/// This allows storing deserializers for different types in a unified collection.
563pub type DeserializerFn =
564    Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
565
566/// Type alias for context-aware type-erased deserializer callbacks
567///
568/// Like `DeserializerFn`, but receives a type-erased runtime context
569/// for platform-independent timestamps and logging during deserialization.
570///
571/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
572/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
573pub type ContextDeserializerFn = Arc<
574    dyn Fn(
575            Arc<dyn core::any::Any + Send + Sync>,
576            &[u8],
577        ) -> Result<Box<dyn core::any::Any + Send>, String>
578        + Send
579        + Sync,
580>;
581
582/// Which deserializer variant is registered for an inbound link
583///
584/// Enforces mutual exclusivity between raw bytes-only deserializers
585/// and context-aware deserializers.
586#[derive(Clone)]
587pub enum DeserializerKind {
588    /// Plain bytes-only deserializer (from `.with_deserializer_raw()`)
589    Raw(DeserializerFn),
590    /// Context-aware deserializer (from `.with_deserializer()`)
591    Context(ContextDeserializerFn),
592}
593
594/// Type alias for producer factory callback (alloc feature)
595///
596/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
597/// This allows capturing the record type T at link_from() time while storing
598/// the factory in a type-erased InboundConnectorLink.
599///
600/// Available in both `std` and `no_std + alloc` environments.
601#[cfg(feature = "alloc")]
602pub type ProducerFactoryFn =
603    Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait> + Send + Sync>;
604
605/// Topic resolver function for inbound connections (late-binding)
606///
607/// Called once at connector startup to resolve the subscription topic.
608/// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
609/// to the static topic from the `link_from()` URL.
610///
611/// # Use Cases
612///
613/// - Topics determined from smart contracts at runtime
614/// - Service discovery integration
615/// - Environment-specific topic configuration
616/// - Topics read from configuration files or databases
617///
618/// # no_std Compatibility
619///
620/// Works in both `std` and `no_std + alloc` environments.
621pub type TopicResolverFn = Arc<dyn Fn() -> Option<String> + Send + Sync>;
622
623/// Type-erased producer trait for MQTT router
624///
625/// Allows the router to call produce() on different record types without knowing
626/// the concrete type at compile time. The value is passed as Box<dyn Any> and
627/// downcast to the correct type inside the implementation.
628///
629/// # Implementation Note
630///
631/// This trait uses manual futures instead of `#[async_trait]` to enable `no_std`
632/// compatibility. The `async_trait` macro generates code that depends on `std`,
633/// while manual `Pin<Box<dyn Future>>` works in both `std` and `no_std + alloc`.
634pub trait ProducerTrait: Send + Sync {
635    /// Produce a value into the record's buffer
636    ///
637    /// The value must be passed as Box<dyn Any> and will be downcast to the correct type.
638    /// Returns an error if the downcast fails or if production fails.
639    fn produce_any<'a>(
640        &'a self,
641        value: Box<dyn core::any::Any + Send>,
642    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
643}
644
645/// Type alias for consumer factory callback (alloc feature)
646///
647/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ConsumerTrait.
648/// This allows capturing the record type T at link_to() time while storing
649/// the factory in a type-erased ConnectorLink.
650///
651/// Mirrors the ProducerFactoryFn pattern for symmetry between inbound and outbound.
652///
653/// Available in both `std` and `no_std + alloc` environments.
654#[cfg(feature = "alloc")]
655pub type ConsumerFactoryFn =
656    Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ConsumerTrait> + Send + Sync>;
657
658/// Type-erased consumer trait for outbound routing
659///
660/// Mirrors ProducerTrait but for consumption. Allows connectors to subscribe
661/// to typed values without knowing the concrete type T at compile time.
662///
663/// # Implementation Note
664///
665/// Like ProducerTrait, this uses manual futures instead of `#[async_trait]`
666/// to enable `no_std` compatibility.
667pub trait ConsumerTrait: Send + Sync {
668    /// Subscribe to typed values from this record
669    ///
670    /// Returns a type-erased reader that can be polled for Box<dyn Any> values.
671    /// The connector will downcast to the expected type after deserialization.
672    fn subscribe_any<'a>(&'a self) -> SubscribeAnyFuture<'a>;
673}
674
675/// Type alias for the future returned by `ConsumerTrait::subscribe_any`
676type SubscribeAnyFuture<'a> =
677    Pin<Box<dyn Future<Output = DbResult<Box<dyn AnyReader>>> + Send + 'a>>;
678
679/// Type alias for the future returned by `AnyReader::recv_any`
680type RecvAnyFuture<'a> =
681    Pin<Box<dyn Future<Output = DbResult<Box<dyn core::any::Any + Send>>> + Send + 'a>>;
682
683/// Helper trait for type-erased reading
684///
685/// Allows reading values from a buffer without knowing the concrete type at compile time.
686/// The value is returned as Box<dyn Any> and must be downcast by the caller.
687pub trait AnyReader: Send {
688    /// Receive a type-erased value from the buffer
689    ///
690    /// Returns Box<dyn Any> which must be downcast to the concrete type.
691    /// Returns an error if the buffer is closed or an I/O error occurs.
692    fn recv_any<'a>(&'a mut self) -> RecvAnyFuture<'a>;
693}
694
695/// Configuration for an inbound connector link (External → AimDB)
696///
697/// Stores the parsed URL, configuration, deserializer, and a producer creation callback.
698/// The callback captures the type T at creation time, allowing type-safe producer creation
699/// later without needing PhantomData or type parameters.
700pub struct InboundConnectorLink {
701    /// Parsed connector URL
702    pub url: ConnectorUrl,
703
704    /// Additional configuration options (protocol-specific)
705    pub config: Vec<(String, String)>,
706
707    /// Deserialization callback that converts bytes to typed values
708    ///
709    /// Either a plain bytes-only deserializer (`Raw`) or a context-aware
710    /// deserializer (`Context`) that receives `RuntimeContext` for timestamps
711    /// and logging.
712    ///
713    /// Available in both `std` and `no_std` (with `alloc` feature) environments.
714    pub deserializer: DeserializerKind,
715
716    /// Producer creation callback (alloc feature)
717    ///
718    /// Takes Arc<AimDb<R>> and returns Box<dyn ProducerTrait>.
719    /// Captures the record type T at link_from() call time.
720    ///
721    /// Available in both `std` and `no_std + alloc` environments.
722    #[cfg(feature = "alloc")]
723    pub producer_factory: Option<ProducerFactoryFn>,
724
725    /// Optional dynamic topic resolver (late-binding)
726    ///
727    /// Called once at connector startup to determine the subscription topic.
728    /// If the resolver returns `None`, the static topic from the URL is used.
729    ///
730    /// Available in both `std` and `no_std + alloc` environments.
731    pub topic_resolver: Option<TopicResolverFn>,
732}
733
734impl Clone for InboundConnectorLink {
735    fn clone(&self) -> Self {
736        Self {
737            url: self.url.clone(),
738            config: self.config.clone(),
739            deserializer: self.deserializer.clone(),
740            #[cfg(feature = "alloc")]
741            producer_factory: self.producer_factory.clone(),
742            topic_resolver: self.topic_resolver.clone(),
743        }
744    }
745}
746
747impl Debug for InboundConnectorLink {
748    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
749        f.debug_struct("InboundConnectorLink")
750            .field("url", &self.url)
751            .field("config", &self.config)
752            .field("deserializer", &"<function>")
753            .field(
754                "topic_resolver",
755                &self.topic_resolver.as_ref().map(|_| "<function>"),
756            )
757            .finish()
758    }
759}
760
761impl InboundConnectorLink {
762    /// Creates a new inbound connector link from a URL and deserializer
763    pub fn new(url: ConnectorUrl, deserializer: DeserializerKind) -> Self {
764        Self {
765            url,
766            config: Vec::new(),
767            deserializer,
768            #[cfg(feature = "alloc")]
769            producer_factory: None,
770            topic_resolver: None,
771        }
772    }
773
774    /// Sets the producer factory callback (alloc feature)
775    ///
776    /// Available in both `std` and `no_std + alloc` environments.
777    #[cfg(feature = "alloc")]
778    pub fn with_producer_factory<F>(mut self, factory: F) -> Self
779    where
780        F: Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait>
781            + Send
782            + Sync
783            + 'static,
784    {
785        self.producer_factory = Some(Arc::new(factory));
786        self
787    }
788
789    /// Creates a producer using the stored factory (alloc feature)
790    ///
791    /// Available in both `std` and `no_std + alloc` environments.
792    #[cfg(feature = "alloc")]
793    pub fn create_producer(
794        &self,
795        db_any: Arc<dyn core::any::Any + Send + Sync>,
796    ) -> Option<Box<dyn ProducerTrait>> {
797        self.producer_factory.as_ref().map(|f| f(db_any))
798    }
799
800    /// Resolves the subscription topic for this link
801    ///
802    /// If a topic resolver is configured, calls it to determine the topic.
803    /// Otherwise, returns the static topic from the URL.
804    ///
805    /// This is called once at connector startup.
806    pub fn resolve_topic(&self) -> String {
807        self.topic_resolver
808            .as_ref()
809            .and_then(|resolver| resolver())
810            .unwrap_or_else(|| self.url.resource_id())
811    }
812}
813
814/// Configuration for an outbound connector link (AimDB → External)
815pub struct OutboundConnectorLink {
816    pub url: ConnectorUrl,
817    pub config: Vec<(String, String)>,
818}
819
820/// Parses a connector URL string into structured components
821///
822/// This is a simple parser that handles the most common URL formats.
823/// For production use, consider using the `url` crate with feature flags.
824fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
825    use crate::DbError;
826
827    // Split scheme from rest
828    let (scheme, rest) = url.split_once("://").ok_or({
829        #[cfg(feature = "std")]
830        {
831            DbError::InvalidOperation {
832                operation: "parse_connector_url".into(),
833                reason: format!("Missing scheme in URL: {}", url),
834            }
835        }
836        #[cfg(not(feature = "std"))]
837        {
838            DbError::InvalidOperation {
839                _operation: (),
840                _reason: (),
841            }
842        }
843    })?;
844
845    // Extract credentials if present (user:pass@host)
846    let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
847        let creds = &rest[..at_idx];
848        let host = &rest[at_idx + 1..];
849        (Some(creds), host)
850    } else {
851        (None, rest)
852    };
853
854    let (username, password) = if let Some(creds) = credentials {
855        if let Some((user, pass)) = creds.split_once(':') {
856            (Some(user.to_string()), Some(pass.to_string()))
857        } else {
858            (Some(creds.to_string()), None)
859        }
860    } else {
861        (None, None)
862    };
863
864    // Split path and query from host:port
865    let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
866        let hp = &host_part[..slash_idx];
867        let path_query = &host_part[slash_idx..];
868
869        // Split query parameters
870        let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
871            (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
872        } else {
873            (path_query, None)
874        };
875
876        // Parse query parameters
877        let params = if let Some(query) = query_part {
878            query
879                .split('&')
880                .filter_map(|pair| {
881                    let (k, v) = pair.split_once('=')?;
882                    Some((k.to_string(), v.to_string()))
883                })
884                .collect()
885        } else {
886            Vec::new()
887        };
888
889        (hp, Some(path_part.to_string()), params)
890    } else {
891        (host_part, None, Vec::new())
892    };
893
894    // Split host and port
895    let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
896        let h = &host_port[..colon_idx];
897        let p = &host_port[colon_idx + 1..];
898        let port_num = p.parse::<u16>().ok();
899        (h.to_string(), port_num)
900    } else {
901        (host_port.to_string(), None)
902    };
903
904    Ok(ConnectorUrl {
905        scheme: scheme.to_string(),
906        host,
907        port,
908        path,
909        username,
910        password,
911        query_params,
912    })
913}
914
915/// Trait for building connectors after the database is constructed
916///
917/// Connectors that need to collect routes from the database (for inbound routing)
918/// implement this trait. The builder pattern allows connectors to be constructed
919/// in two phases:
920///
921/// 1. Configuration phase: User provides broker URLs and settings
922/// 2. Build phase: Connector collects routes from the database and initializes
923///
924/// # Example
925///
926/// ```rust,ignore
927/// pub struct MqttConnectorBuilder {
928///     broker_url: String,
929/// }
930///
931/// impl<R> ConnectorBuilder<R> for MqttConnectorBuilder
932/// where
933///     R: aimdb_executor::Spawn + 'static,
934/// {
935///     fn build<'a>(
936///         &'a self,
937///         db: &'a AimDb<R>,
938///     ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>> {
939///         Box::pin(async move {
940///             let routes = db.collect_inbound_routes(self.scheme());
941///             let router = RouterBuilder::from_routes(routes).build();
942///             let connector = MqttConnector::new(&self.broker_url, router).await?;
943///             Ok(Arc::new(connector) as Arc<dyn Connector>)
944///         })
945///     }
946///     
947///     fn scheme(&self) -> &str {
948///         "mqtt"
949///     }
950/// }
951/// ```
952pub trait ConnectorBuilder<R>: Send + Sync
953where
954    R: aimdb_executor::Spawn + 'static,
955{
956    /// Build the connector using the database
957    ///
958    /// This method is called during `AimDbBuilder::build()` after the database
959    /// has been constructed. The builder can use the database to:
960    /// - Collect inbound routes via `db.collect_inbound_routes()`
961    /// - Access database configuration
962    /// - Register subscriptions
963    ///
964    /// # Arguments
965    /// * `db` - The constructed database instance
966    ///
967    /// # Returns
968    /// An `Arc<dyn Connector>` that will be registered with the database
969    #[allow(clippy::type_complexity)]
970    fn build<'a>(
971        &'a self,
972        db: &'a AimDb<R>,
973    ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
974
975    /// The URL scheme this connector handles
976    ///
977    /// Returns the scheme (e.g., "mqtt", "kafka", "http") that this connector
978    /// will be registered under. Used for routing `.link_from()` and `.link_to()`
979    /// declarations to the appropriate connector.
980    fn scheme(&self) -> &str;
981}
982
983#[cfg(test)]
984mod tests {
985    use super::*;
986    use alloc::format;
987
988    #[test]
989    fn test_parse_simple_mqtt() {
990        let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
991        assert_eq!(url.scheme, "mqtt");
992        assert_eq!(url.host, "broker.example.com");
993        assert_eq!(url.port, Some(1883));
994        assert_eq!(url.username, None);
995        assert_eq!(url.password, None);
996    }
997
998    #[test]
999    fn test_parse_mqtt_with_credentials() {
1000        let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
1001        assert_eq!(url.scheme, "mqtt");
1002        assert_eq!(url.host, "broker.example.com");
1003        assert_eq!(url.port, Some(1883));
1004        assert_eq!(url.username, Some("user".to_string()));
1005        assert_eq!(url.password, Some("pass".to_string()));
1006    }
1007
1008    #[test]
1009    fn test_parse_https_with_path() {
1010        let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
1011        assert_eq!(url.scheme, "https");
1012        assert_eq!(url.host, "api.example.com");
1013        assert_eq!(url.port, Some(8443));
1014        assert_eq!(url.path, Some("/events".to_string()));
1015    }
1016
1017    #[test]
1018    fn test_parse_with_query_params() {
1019        let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
1020        assert_eq!(url.scheme, "http");
1021        assert_eq!(url.host, "api.example.com");
1022        assert_eq!(url.path, Some("/data".to_string()));
1023        assert_eq!(url.query_params.len(), 2);
1024        assert_eq!(
1025            url.query_params[0],
1026            ("key".to_string(), "value".to_string())
1027        );
1028        assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
1029    }
1030
1031    #[test]
1032    fn test_default_ports() {
1033        let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
1034        assert_eq!(mqtt.default_port(), Some(1883));
1035        assert_eq!(mqtt.effective_port(), Some(1883));
1036
1037        let https = ConnectorUrl::parse("https://api.example.com").unwrap();
1038        assert_eq!(https.default_port(), Some(443));
1039        assert_eq!(https.effective_port(), Some(443));
1040    }
1041
1042    #[test]
1043    fn test_is_secure() {
1044        assert!(ConnectorUrl::parse("mqtts://broker.local")
1045            .unwrap()
1046            .is_secure());
1047        assert!(ConnectorUrl::parse("https://api.example.com")
1048            .unwrap()
1049            .is_secure());
1050        assert!(ConnectorUrl::parse("wss://ws.example.com")
1051            .unwrap()
1052            .is_secure());
1053
1054        assert!(!ConnectorUrl::parse("mqtt://broker.local")
1055            .unwrap()
1056            .is_secure());
1057        assert!(!ConnectorUrl::parse("http://api.example.com")
1058            .unwrap()
1059            .is_secure());
1060        assert!(!ConnectorUrl::parse("ws://ws.example.com")
1061            .unwrap()
1062            .is_secure());
1063    }
1064
1065    #[test]
1066    fn test_display_hides_password() {
1067        let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
1068        let display = format!("{}", url);
1069        assert!(display.contains("user:****"));
1070        assert!(!display.contains("secret"));
1071    }
1072
1073    #[test]
1074    fn test_parse_kafka_style() {
1075        let url =
1076            ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
1077        assert_eq!(url.scheme, "kafka");
1078        // Note: Our simple parser doesn't handle the second port in comma-separated hosts perfectly
1079        // It parses "broker1.local:9092,broker2.local" as the host and "9092" as the port
1080        // This is acceptable for now - production connectors can handle this in their client factories
1081        assert!(url.host.contains("broker1.local"));
1082        assert!(url.host.contains("broker2.local"));
1083        assert_eq!(url.path, Some("/my-topic".to_string()));
1084    }
1085
1086    #[test]
1087    fn test_parse_missing_scheme() {
1088        let result = ConnectorUrl::parse("broker.example.com:1883");
1089        assert!(result.is_err());
1090    }
1091
1092    // ========================================================================
1093    // TopicProvider Tests
1094    // ========================================================================
1095
1096    #[allow(dead_code)]
1097    #[derive(Debug, Clone)]
1098    struct TestTemperature {
1099        sensor_id: String,
1100        celsius: f32,
1101    }
1102
1103    struct TestTopicProvider;
1104
1105    impl super::TopicProvider<TestTemperature> for TestTopicProvider {
1106        fn topic(&self, value: &TestTemperature) -> Option<String> {
1107            Some(format!("sensors/temp/{}", value.sensor_id))
1108        }
1109    }
1110
1111    #[test]
1112    fn test_topic_provider_type_erasure() {
1113        use super::{TopicProviderAny, TopicProviderWrapper};
1114
1115        let provider: Arc<dyn TopicProviderAny> =
1116            Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1117        let temp = TestTemperature {
1118            sensor_id: "kitchen-001".into(),
1119            celsius: 22.5,
1120        };
1121
1122        assert_eq!(
1123            provider.topic_any(&temp),
1124            Some("sensors/temp/kitchen-001".into())
1125        );
1126    }
1127
1128    #[test]
1129    fn test_topic_provider_type_mismatch() {
1130        use super::{TopicProviderAny, TopicProviderWrapper};
1131
1132        let provider: Arc<dyn TopicProviderAny> =
1133            Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1134        let wrong_type = "not a temperature";
1135
1136        // Type mismatch returns None (falls back to default topic)
1137        assert_eq!(provider.topic_any(&wrong_type), None);
1138    }
1139
1140    #[test]
1141    fn test_topic_provider_returns_none() {
1142        struct OptionalTopicProvider;
1143
1144        impl super::TopicProvider<TestTemperature> for OptionalTopicProvider {
1145            fn topic(&self, temp: &TestTemperature) -> Option<String> {
1146                if temp.sensor_id.is_empty() {
1147                    None // Fall back to default topic
1148                } else {
1149                    Some(format!("sensors/{}", temp.sensor_id))
1150                }
1151            }
1152        }
1153
1154        use super::{TopicProviderAny, TopicProviderWrapper};
1155
1156        let provider: Arc<dyn TopicProviderAny> =
1157            Arc::new(TopicProviderWrapper::new(OptionalTopicProvider));
1158
1159        // Non-empty sensor_id returns dynamic topic
1160        let temp_with_id = TestTemperature {
1161            sensor_id: "abc".into(),
1162            celsius: 20.0,
1163        };
1164        assert_eq!(
1165            provider.topic_any(&temp_with_id),
1166            Some("sensors/abc".into())
1167        );
1168
1169        // Empty sensor_id returns None (fallback)
1170        let temp_without_id = TestTemperature {
1171            sensor_id: String::new(),
1172            celsius: 20.0,
1173        };
1174        assert_eq!(provider.topic_any(&temp_without_id), None);
1175    }
1176
1177    // ========================================================================
1178    // TopicResolverFn Tests
1179    // ========================================================================
1180
1181    #[test]
1182    fn test_topic_resolver_returns_some() {
1183        let resolver: super::TopicResolverFn = Arc::new(|| Some("resolved/topic".into()));
1184
1185        assert_eq!(resolver(), Some("resolved/topic".into()));
1186    }
1187
1188    #[test]
1189    fn test_topic_resolver_returns_none() {
1190        let resolver: super::TopicResolverFn = Arc::new(|| None);
1191
1192        // Returns None, should fall back to default topic
1193        assert_eq!(resolver(), None);
1194    }
1195
1196    #[cfg(feature = "std")]
1197    #[test]
1198    fn test_topic_resolver_with_captured_state() {
1199        use std::sync::Mutex;
1200
1201        let config = Arc::new(Mutex::new(Some("dynamic/topic".to_string())));
1202        let config_clone = config.clone();
1203
1204        let resolver: super::TopicResolverFn =
1205            Arc::new(move || config_clone.lock().unwrap().clone());
1206
1207        assert_eq!(resolver(), Some("dynamic/topic".into()));
1208
1209        // Clear config
1210        *config.lock().unwrap() = None;
1211        assert_eq!(resolver(), None);
1212    }
1213
1214    #[test]
1215    fn test_inbound_connector_link_resolve_topic_default() {
1216        use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
1217
1218        let url = ConnectorUrl::parse("mqtt://sensors/temperature").unwrap();
1219        let deserializer: DeserializerFn =
1220            Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1221        let link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
1222
1223        // No resolver configured, should return static topic from URL
1224        assert_eq!(link.resolve_topic(), "sensors/temperature");
1225    }
1226
1227    #[test]
1228    fn test_inbound_connector_link_resolve_topic_dynamic() {
1229        use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
1230
1231        let url = ConnectorUrl::parse("mqtt://sensors/default").unwrap();
1232        let deserializer: DeserializerFn =
1233            Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1234        let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
1235
1236        // Configure dynamic resolver
1237        link.topic_resolver = Some(Arc::new(|| Some("sensors/dynamic/kitchen".into())));
1238
1239        // Should return resolved topic, not URL topic
1240        assert_eq!(link.resolve_topic(), "sensors/dynamic/kitchen");
1241    }
1242
1243    #[test]
1244    fn test_inbound_connector_link_resolve_topic_fallback() {
1245        use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
1246
1247        let url = ConnectorUrl::parse("mqtt://sensors/fallback").unwrap();
1248        let deserializer: DeserializerFn =
1249            Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1250        let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
1251
1252        // Configure resolver that returns None
1253        link.topic_resolver = Some(Arc::new(|| None));
1254
1255        // Should fall back to static topic from URL
1256        assert_eq!(link.resolve_topic(), "sensors/fallback");
1257    }
1258}