aimdb_core/connector.rs
1//! Connector infrastructure for external protocol integration
2//!
3//! Provides the `.link()` builder API for ergonomic connector setup with
4//! automatic client lifecycle management. Connectors bridge AimDB records
5//! to external systems (MQTT, Kafka, HTTP, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **User Extensions**: Connector implementations are provided by users
10//! - **Shared Clients**: Single client instance shared across tasks (Arc or static)
11//! - **No Buffering**: Direct access to protocol clients, no intermediate queues
12//! - **Type Safety**: Compile-time guarantees via typed handlers
13//!
14//! # Example
15//!
16//! ```rust,ignore
17//! use aimdb_core::{RecordConfig, BufferCfg};
18//!
19//! fn weather_alert_record() -> RecordConfig<WeatherAlert> {
20//! RecordConfig::builder()
21//! .buffer(BufferCfg::SingleLatest)
22//! .link_to("mqtt://broker.example.com:1883")
23//! .out::<WeatherAlert>(|reader, mqtt| {
24//! publish_alerts_to_mqtt(reader, mqtt)
25//! })
26//! .build()
27//! }
28//! ```
29
30use core::fmt::{self, Debug};
31use core::future::Future;
32use core::pin::Pin;
33
34extern crate alloc;
35
36use alloc::{
37 boxed::Box,
38 string::{String, ToString},
39 sync::Arc,
40 vec::Vec,
41};
42
43#[cfg(feature = "std")]
44use alloc::format;
45
46use crate::{builder::AimDb, transport::Connector, DbResult};
47
48/// Error that can occur during serialization
49///
50/// Uses an enum instead of String for better performance in `no_std` environments
51/// and to enable defmt logging support in Embassy.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SerializeError {
54 /// Output buffer is too small for the serialized data
55 BufferTooSmall,
56
57 /// Type mismatch in serializer (wrong type passed)
58 TypeMismatch,
59
60 /// Invalid data that cannot be serialized
61 InvalidData,
62}
63
64#[cfg(feature = "defmt")]
65impl defmt::Format for SerializeError {
66 fn format(&self, f: defmt::Formatter) {
67 match self {
68 Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
69 Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
70 Self::InvalidData => defmt::write!(f, "InvalidData"),
71 }
72 }
73}
74
75#[cfg(feature = "std")]
76impl std::fmt::Display for SerializeError {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 match self {
79 Self::BufferTooSmall => write!(f, "Output buffer too small"),
80 Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
81 Self::InvalidData => write!(f, "Invalid data for serialization"),
82 }
83 }
84}
85
86#[cfg(feature = "std")]
87impl std::error::Error for SerializeError {}
88
89/// Type alias for serializer callbacks (reduces type complexity)
90///
91/// Requires the `alloc` feature for `Arc` and `Vec` (available in both std and no_std+alloc).
92/// Serializers convert record values to bytes for publishing to external systems.
93///
94/// # Current Implementation
95///
96/// Returns `Vec<u8>` which requires heap allocation. This works in:
97/// - ✅ `std` environments (full standard library)
98/// - ✅ `no_std + alloc` environments (embedded with allocator, e.g., ESP32, STM32 with heap)
99/// - ❌ `no_std` without `alloc` (bare-metal MCUs without allocator)
100///
101/// # Future Considerations
102///
103/// For zero-allocation embedded environments, future versions may support buffer-based
104/// serialization using `&mut [u8]` output or static lifetime slices.
105pub type SerializerFn =
106 Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
107
108/// Type alias for context-aware type-erased serializer callbacks
109///
110/// Like `SerializerFn`, but receives a type-erased runtime context
111/// for platform-independent timestamps and logging during serialization.
112///
113/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
114/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
115pub type ContextSerializerFn = Arc<
116 dyn Fn(
117 Arc<dyn core::any::Any + Send + Sync>,
118 &dyn core::any::Any,
119 ) -> Result<Vec<u8>, SerializeError>
120 + Send
121 + Sync,
122>;
123
124/// Which serializer variant is registered for an outbound link
125///
126/// Enforces mutual exclusivity between raw value-only serializers
127/// and context-aware serializers.
128#[derive(Clone)]
129pub enum SerializerKind {
130 /// Plain value-only serializer (from `.with_serializer_raw()`)
131 Raw(SerializerFn),
132 /// Context-aware serializer (from `.with_serializer()`)
133 Context(ContextSerializerFn),
134}
135
136// ============================================================================
137// TopicProvider - Dynamic topic/destination routing
138// ============================================================================
139
140/// Trait for dynamic topic providers (outbound only)
141///
142/// Implement this trait to dynamically determine MQTT topics (or KNX group addresses)
143/// based on the data being published. This enables reusable routing logic that
144/// can be shared across multiple record types.
145///
146/// # Type Safety
147///
148/// The trait is generic over `T`, providing compile-time type safety
149/// at the implementation site. Type erasure occurs only at storage time.
150///
151/// # no_std Compatibility
152///
153/// Works in both `std` and `no_std + alloc` environments.
154///
155/// # Example
156///
157/// ```rust,ignore
158/// use aimdb_core::connector::TopicProvider;
159///
160/// struct SensorTopicProvider;
161///
162/// impl TopicProvider<Temperature> for SensorTopicProvider {
163/// fn topic(&self, value: &Temperature) -> Option<String> {
164/// Some(format!("sensors/temp/{}", value.sensor_id))
165/// }
166/// }
167/// ```
168pub trait TopicProvider<T>: Send + Sync {
169 /// Determine the topic/destination for a given value
170 ///
171 /// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
172 /// to the static topic from the `link_to()` URL.
173 fn topic(&self, value: &T) -> Option<String>;
174}
175
176/// Type-erased topic provider trait (internal)
177///
178/// Allows storing providers for different types in a unified collection.
179/// The concrete type is recovered via `Any::downcast_ref()` at runtime.
180pub trait TopicProviderAny: Send + Sync {
181 /// Get the topic for a type-erased value
182 ///
183 /// Returns `None` if the value type doesn't match or if the provider
184 /// returns `None` for this value.
185 fn topic_any(&self, value: &dyn core::any::Any) -> Option<String>;
186}
187
188/// Wrapper struct for type-erasing a `TopicProvider<T>`
189///
190/// This wraps a concrete `TopicProvider<T>` implementation and provides
191/// the `TopicProviderAny` interface for type-erased storage.
192///
193/// Uses `PhantomData<fn(T) -> T>` instead of `PhantomData<T>` to avoid
194/// inheriting Send/Sync bounds from T (the data type isn't stored).
195pub struct TopicProviderWrapper<T, P>
196where
197 T: 'static,
198 P: TopicProvider<T>,
199{
200 provider: P,
201 // Use fn(T) -> T to avoid Send/Sync variance issues with T
202 _phantom: core::marker::PhantomData<fn(T) -> T>,
203}
204
205impl<T, P> TopicProviderWrapper<T, P>
206where
207 T: 'static,
208 P: TopicProvider<T>,
209{
210 /// Create a new wrapper for a topic provider
211 pub fn new(provider: P) -> Self {
212 Self {
213 provider,
214 _phantom: core::marker::PhantomData,
215 }
216 }
217}
218
219impl<T, P> TopicProviderAny for TopicProviderWrapper<T, P>
220where
221 T: 'static,
222 P: TopicProvider<T> + Send + Sync,
223{
224 fn topic_any(&self, value: &dyn core::any::Any) -> Option<String> {
225 value
226 .downcast_ref::<T>()
227 .and_then(|v| self.provider.topic(v))
228 }
229}
230
231/// Type alias for stored topic provider (no_std compatible)
232///
233/// Uses `Arc<dyn TopicProviderAny>` for shared ownership across async tasks.
234pub type TopicProviderFn = Arc<dyn TopicProviderAny>;
235
236/// Parsed connector URL with protocol, host, port, and credentials
237///
238/// Supports multiple protocol schemes:
239/// - MQTT: `mqtt://host:port`, `mqtts://host:port`
240/// - Kafka: `kafka://broker1:port,broker2:port/topic`
241/// - HTTP: `http://host:port/path`, `https://host:port/path`
242/// - WebSocket: `ws://host:port/path`, `wss://host:port/path`
243#[derive(Clone, Debug, PartialEq)]
244pub struct ConnectorUrl {
245 /// Protocol scheme (mqtt, mqtts, kafka, http, https, ws, wss)
246 pub scheme: String,
247
248 /// Host or comma-separated list of hosts (for Kafka)
249 pub host: String,
250
251 /// Port number (optional, protocol-specific defaults)
252 pub port: Option<u16>,
253
254 /// Path component (for HTTP/WebSocket)
255 pub path: Option<String>,
256
257 /// Username for authentication (optional)
258 pub username: Option<String>,
259
260 /// Password for authentication (optional)
261 pub password: Option<String>,
262
263 /// Query parameters (optional, parsed from URL)
264 pub query_params: Vec<(String, String)>,
265}
266
267impl ConnectorUrl {
268 /// Parses a connector URL string
269 ///
270 /// # Supported Formats
271 ///
272 /// - `mqtt://host:port`
273 /// - `mqtt://user:pass@host:port`
274 /// - `mqtts://host:port` (TLS)
275 /// - `kafka://broker1:9092,broker2:9092/topic`
276 /// - `http://host:port/path`
277 /// - `https://host:port/path?key=value`
278 /// - `ws://host:port/mqtt` (WebSocket)
279 /// - `wss://host:port/mqtt` (WebSocket Secure)
280 ///
281 /// # Example
282 ///
283 /// ```rust
284 /// use aimdb_core::connector::ConnectorUrl;
285 ///
286 /// let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
287 /// assert_eq!(url.scheme, "mqtt");
288 /// assert_eq!(url.host, "broker.example.com");
289 /// assert_eq!(url.port, Some(1883));
290 /// assert_eq!(url.username, Some("user".to_string()));
291 /// ```
292 pub fn parse(url: &str) -> DbResult<Self> {
293 parse_connector_url(url)
294 }
295
296 /// Returns the default port for this protocol scheme
297 pub fn default_port(&self) -> Option<u16> {
298 match self.scheme.as_str() {
299 "mqtt" | "ws" => Some(1883),
300 "mqtts" | "wss" => Some(8883),
301 "kafka" => Some(9092),
302 "http" => Some(80),
303 "https" => Some(443),
304 _ => None,
305 }
306 }
307
308 /// Returns the effective port (explicit or default)
309 pub fn effective_port(&self) -> Option<u16> {
310 self.port.or_else(|| self.default_port())
311 }
312
313 /// Returns true if this is a secure connection (TLS)
314 pub fn is_secure(&self) -> bool {
315 matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
316 }
317
318 /// Returns the URL scheme (protocol)
319 pub fn scheme(&self) -> &str {
320 &self.scheme
321 }
322
323 /// Returns the path component, or "/" if not specified
324 pub fn path(&self) -> &str {
325 self.path.as_deref().unwrap_or("/")
326 }
327
328 /// Returns the resource identifier for protocols where the URL specifies a topic/key
329 ///
330 /// This is designed for the simplified connector model where each connector manages
331 /// a single broker/server connection, and URLs only specify the resource (topic, key, path).
332 ///
333 /// # Examples
334 ///
335 /// - `mqtt://commands/temperature` → `"commands/temperature"` (topic)
336 /// - `mqtt://sensors/temp` → `"sensors/temp"` (topic)
337 /// - `kafka://events` → `"events"` (topic)
338 ///
339 /// The format is `scheme://resource` where resource = host + path combined.
340 pub fn resource_id(&self) -> String {
341 let path = self.path().trim_start_matches('/');
342
343 // Combine host and path to form the complete resource identifier
344 // For mqtt://commands/temperature: host="commands", path="/temperature"
345 // Result: "commands/temperature"
346 if !self.host.is_empty() && !path.is_empty() {
347 alloc::format!("{}/{}", self.host, path)
348 } else if !self.host.is_empty() {
349 self.host.clone()
350 } else if !path.is_empty() {
351 path.to_string()
352 } else {
353 String::new()
354 }
355 }
356}
357
358impl fmt::Display for ConnectorUrl {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 write!(f, "{}://", self.scheme)?;
361
362 if let Some(ref username) = self.username {
363 write!(f, "{}", username)?;
364 if self.password.is_some() {
365 write!(f, ":****")?; // Don't expose password in Display
366 }
367 write!(f, "@")?;
368 }
369
370 write!(f, "{}", self.host)?;
371
372 if let Some(port) = self.port {
373 write!(f, ":{}", port)?;
374 }
375
376 if let Some(ref path) = self.path {
377 if !path.starts_with('/') {
378 write!(f, "/")?;
379 }
380 write!(f, "{}", path)?;
381 }
382
383 Ok(())
384 }
385}
386
387/// Connector client types (type-erased for storage)
388///
389/// This enum allows storing different connector client types in a unified way.
390/// Actual protocol implementations will downcast to their concrete types.
391///
392/// # Design Note
393///
394/// This is intentionally minimal - actual client types are defined by
395/// user extensions. The core only provides the infrastructure.
396///
397/// Works in both `std` and `no_std` (with `alloc`) environments.
398#[derive(Clone)]
399pub enum ConnectorClient {
400 /// MQTT client (protocol-specific, user-provided)
401 Mqtt(Arc<dyn core::any::Any + Send + Sync>),
402
403 /// Kafka producer (protocol-specific, user-provided)
404 Kafka(Arc<dyn core::any::Any + Send + Sync>),
405
406 /// HTTP client (protocol-specific, user-provided)
407 Http(Arc<dyn core::any::Any + Send + Sync>),
408
409 /// Generic connector for custom protocols
410 Generic {
411 protocol: String,
412 client: Arc<dyn core::any::Any + Send + Sync>,
413 },
414}
415
416impl Debug for ConnectorClient {
417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418 match self {
419 ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
420 ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
421 ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
422 ConnectorClient::Generic { protocol, .. } => {
423 write!(f, "ConnectorClient::Generic({})", protocol)
424 }
425 }
426 }
427}
428
429impl ConnectorClient {
430 /// Downcasts to a concrete client type
431 ///
432 /// # Example
433 ///
434 /// ```rust,ignore
435 /// use rumqttc::AsyncClient;
436 ///
437 /// if let Some(mqtt_client) = connector.downcast_ref::<Arc<AsyncClient>>() {
438 /// // Use the MQTT client
439 /// }
440 /// ```
441 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
442 match self {
443 ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
444 ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
445 ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
446 ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
447 }
448 }
449}
450
451/// Configuration for a connector link
452///
453/// Stores the parsed URL and configuration until the record is built.
454/// The actual client creation and handler spawning happens during the build phase.
455#[derive(Clone)]
456pub struct ConnectorLink {
457 /// Parsed connector URL
458 pub url: ConnectorUrl,
459
460 /// Additional configuration options (protocol-specific)
461 pub config: Vec<(String, String)>,
462
463 /// Serialization callback that converts record values to bytes for publishing
464 ///
465 /// Either a plain value-only serializer (`Raw`) or a context-aware
466 /// serializer (`Context`) that receives `RuntimeContext` for timestamps
467 /// and logging.
468 ///
469 /// If `None`, the connector must provide a default serialization mechanism or fail.
470 ///
471 /// Available in both `std` and `no_std` (with `alloc` feature) environments.
472 pub serializer: Option<SerializerKind>,
473
474 /// Consumer factory callback (alloc feature)
475 ///
476 /// Creates ConsumerTrait from Arc<AimDb<R>> to enable type-safe subscription.
477 /// The factory captures the record type T at link_to() configuration time,
478 /// allowing the connector to subscribe without knowing T at compile time.
479 ///
480 /// Mirrors the producer_factory pattern used for inbound connectors.
481 ///
482 /// Available in both `std` and `no_std + alloc` environments.
483 #[cfg(feature = "alloc")]
484 pub consumer_factory: Option<ConsumerFactoryFn>,
485
486 /// Optional dynamic topic provider
487 ///
488 /// When set, the provider is called with each value to determine the
489 /// topic/destination dynamically. If the provider returns `None`,
490 /// the static topic from the URL is used as fallback.
491 ///
492 /// Available in both `std` and `no_std + alloc` environments.
493 pub topic_provider: Option<TopicProviderFn>,
494}
495
496impl Debug for ConnectorLink {
497 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498 f.debug_struct("ConnectorLink")
499 .field("url", &self.url)
500 .field("config", &self.config)
501 .field(
502 "serializer",
503 &self.serializer.as_ref().map(|s| match s {
504 SerializerKind::Raw(_) => "<raw>",
505 SerializerKind::Context(_) => "<context>",
506 }),
507 )
508 .field(
509 "consumer_factory",
510 #[cfg(feature = "alloc")]
511 &self.consumer_factory.as_ref().map(|_| "<function>"),
512 #[cfg(not(feature = "alloc"))]
513 &None::<()>,
514 )
515 .field(
516 "topic_provider",
517 &self.topic_provider.as_ref().map(|_| "<function>"),
518 )
519 .finish()
520 }
521}
522
523impl ConnectorLink {
524 /// Creates a new connector link from a URL
525 pub fn new(url: ConnectorUrl) -> Self {
526 Self {
527 url,
528 config: Vec::new(),
529 serializer: None,
530 #[cfg(feature = "alloc")]
531 consumer_factory: None,
532 topic_provider: None,
533 }
534 }
535
536 /// Adds a configuration option
537 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
538 self.config.push((key.into(), value.into()));
539 self
540 }
541
542 /// Creates a consumer using the stored factory (alloc feature)
543 ///
544 /// Takes an Arc<dyn Any> (which should contain Arc<AimDb<R>>) and invokes
545 /// the consumer factory to create a ConsumerTrait instance.
546 ///
547 /// Returns None if no factory is configured.
548 ///
549 /// Available in both `std` and `no_std + alloc` environments.
550 #[cfg(feature = "alloc")]
551 pub fn create_consumer(
552 &self,
553 db_any: Arc<dyn core::any::Any + Send + Sync>,
554 ) -> Option<Box<dyn ConsumerTrait>> {
555 self.consumer_factory.as_ref().map(|f| f(db_any))
556 }
557}
558
559/// Type alias for type-erased deserializer callbacks
560///
561/// Converts raw bytes to a boxed Any that can be downcast to the concrete type.
562/// This allows storing deserializers for different types in a unified collection.
563pub type DeserializerFn =
564 Arc<dyn Fn(&[u8]) -> Result<Box<dyn core::any::Any + Send>, String> + Send + Sync>;
565
566/// Type alias for context-aware type-erased deserializer callbacks
567///
568/// Like `DeserializerFn`, but receives a type-erased runtime context
569/// for platform-independent timestamps and logging during deserialization.
570///
571/// The first argument is the type-erased runtime (as `Arc<dyn Any + Send + Sync>`),
572/// which is downcast to the concrete runtime type via `RuntimeContext::extract_from_any`.
573pub type ContextDeserializerFn = Arc<
574 dyn Fn(
575 Arc<dyn core::any::Any + Send + Sync>,
576 &[u8],
577 ) -> Result<Box<dyn core::any::Any + Send>, String>
578 + Send
579 + Sync,
580>;
581
582/// Which deserializer variant is registered for an inbound link
583///
584/// Enforces mutual exclusivity between raw bytes-only deserializers
585/// and context-aware deserializers.
586#[derive(Clone)]
587pub enum DeserializerKind {
588 /// Plain bytes-only deserializer (from `.with_deserializer_raw()`)
589 Raw(DeserializerFn),
590 /// Context-aware deserializer (from `.with_deserializer()`)
591 Context(ContextDeserializerFn),
592}
593
594/// Type alias for producer factory callback (alloc feature)
595///
596/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ProducerTrait.
597/// This allows capturing the record type T at link_from() time while storing
598/// the factory in a type-erased InboundConnectorLink.
599///
600/// Available in both `std` and `no_std + alloc` environments.
601#[cfg(feature = "alloc")]
602pub type ProducerFactoryFn =
603 Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait> + Send + Sync>;
604
605/// Topic resolver function for inbound connections (late-binding)
606///
607/// Called once at connector startup to resolve the subscription topic.
608/// Returns `Some(topic)` to use a dynamic topic, or `None` to fall back
609/// to the static topic from the `link_from()` URL.
610///
611/// # Use Cases
612///
613/// - Topics determined from smart contracts at runtime
614/// - Service discovery integration
615/// - Environment-specific topic configuration
616/// - Topics read from configuration files or databases
617///
618/// # no_std Compatibility
619///
620/// Works in both `std` and `no_std + alloc` environments.
621pub type TopicResolverFn = Arc<dyn Fn() -> Option<String> + Send + Sync>;
622
623/// Type-erased producer trait for MQTT router
624///
625/// Allows the router to call produce() on different record types without knowing
626/// the concrete type at compile time. The value is passed as Box<dyn Any> and
627/// downcast to the correct type inside the implementation.
628///
629/// # Implementation Note
630///
631/// This trait uses manual futures instead of `#[async_trait]` to enable `no_std`
632/// compatibility. The `async_trait` macro generates code that depends on `std`,
633/// while manual `Pin<Box<dyn Future>>` works in both `std` and `no_std + alloc`.
634pub trait ProducerTrait: Send + Sync {
635 /// Produce a value into the record's buffer
636 ///
637 /// The value must be passed as Box<dyn Any> and will be downcast to the correct type.
638 /// Returns an error if the downcast fails or if production fails.
639 fn produce_any<'a>(
640 &'a self,
641 value: Box<dyn core::any::Any + Send>,
642 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
643}
644
645/// Type alias for consumer factory callback (alloc feature)
646///
647/// Takes Arc<dyn Any> (which contains AimDb<R>) and returns a boxed ConsumerTrait.
648/// This allows capturing the record type T at link_to() time while storing
649/// the factory in a type-erased ConnectorLink.
650///
651/// Mirrors the ProducerFactoryFn pattern for symmetry between inbound and outbound.
652///
653/// Available in both `std` and `no_std + alloc` environments.
654#[cfg(feature = "alloc")]
655pub type ConsumerFactoryFn =
656 Arc<dyn Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ConsumerTrait> + Send + Sync>;
657
658/// Type-erased consumer trait for outbound routing
659///
660/// Mirrors ProducerTrait but for consumption. Allows connectors to subscribe
661/// to typed values without knowing the concrete type T at compile time.
662///
663/// # Implementation Note
664///
665/// Like ProducerTrait, this uses manual futures instead of `#[async_trait]`
666/// to enable `no_std` compatibility.
667pub trait ConsumerTrait: Send + Sync {
668 /// Subscribe to typed values from this record
669 ///
670 /// Returns a type-erased reader that can be polled for Box<dyn Any> values.
671 /// The connector will downcast to the expected type after deserialization.
672 fn subscribe_any<'a>(&'a self) -> SubscribeAnyFuture<'a>;
673}
674
675/// Type alias for the future returned by `ConsumerTrait::subscribe_any`
676type SubscribeAnyFuture<'a> =
677 Pin<Box<dyn Future<Output = DbResult<Box<dyn AnyReader>>> + Send + 'a>>;
678
679/// Type alias for the future returned by `AnyReader::recv_any`
680type RecvAnyFuture<'a> =
681 Pin<Box<dyn Future<Output = DbResult<Box<dyn core::any::Any + Send>>> + Send + 'a>>;
682
683/// Helper trait for type-erased reading
684///
685/// Allows reading values from a buffer without knowing the concrete type at compile time.
686/// The value is returned as Box<dyn Any> and must be downcast by the caller.
687pub trait AnyReader: Send {
688 /// Receive a type-erased value from the buffer
689 ///
690 /// Returns Box<dyn Any> which must be downcast to the concrete type.
691 /// Returns an error if the buffer is closed or an I/O error occurs.
692 fn recv_any<'a>(&'a mut self) -> RecvAnyFuture<'a>;
693}
694
695/// Configuration for an inbound connector link (External → AimDB)
696///
697/// Stores the parsed URL, configuration, deserializer, and a producer creation callback.
698/// The callback captures the type T at creation time, allowing type-safe producer creation
699/// later without needing PhantomData or type parameters.
700pub struct InboundConnectorLink {
701 /// Parsed connector URL
702 pub url: ConnectorUrl,
703
704 /// Additional configuration options (protocol-specific)
705 pub config: Vec<(String, String)>,
706
707 /// Deserialization callback that converts bytes to typed values
708 ///
709 /// Either a plain bytes-only deserializer (`Raw`) or a context-aware
710 /// deserializer (`Context`) that receives `RuntimeContext` for timestamps
711 /// and logging.
712 ///
713 /// Available in both `std` and `no_std` (with `alloc` feature) environments.
714 pub deserializer: DeserializerKind,
715
716 /// Producer creation callback (alloc feature)
717 ///
718 /// Takes Arc<AimDb<R>> and returns Box<dyn ProducerTrait>.
719 /// Captures the record type T at link_from() call time.
720 ///
721 /// Available in both `std` and `no_std + alloc` environments.
722 #[cfg(feature = "alloc")]
723 pub producer_factory: Option<ProducerFactoryFn>,
724
725 /// Optional dynamic topic resolver (late-binding)
726 ///
727 /// Called once at connector startup to determine the subscription topic.
728 /// If the resolver returns `None`, the static topic from the URL is used.
729 ///
730 /// Available in both `std` and `no_std + alloc` environments.
731 pub topic_resolver: Option<TopicResolverFn>,
732}
733
734impl Clone for InboundConnectorLink {
735 fn clone(&self) -> Self {
736 Self {
737 url: self.url.clone(),
738 config: self.config.clone(),
739 deserializer: self.deserializer.clone(),
740 #[cfg(feature = "alloc")]
741 producer_factory: self.producer_factory.clone(),
742 topic_resolver: self.topic_resolver.clone(),
743 }
744 }
745}
746
747impl Debug for InboundConnectorLink {
748 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
749 f.debug_struct("InboundConnectorLink")
750 .field("url", &self.url)
751 .field("config", &self.config)
752 .field("deserializer", &"<function>")
753 .field(
754 "topic_resolver",
755 &self.topic_resolver.as_ref().map(|_| "<function>"),
756 )
757 .finish()
758 }
759}
760
761impl InboundConnectorLink {
762 /// Creates a new inbound connector link from a URL and deserializer
763 pub fn new(url: ConnectorUrl, deserializer: DeserializerKind) -> Self {
764 Self {
765 url,
766 config: Vec::new(),
767 deserializer,
768 #[cfg(feature = "alloc")]
769 producer_factory: None,
770 topic_resolver: None,
771 }
772 }
773
774 /// Sets the producer factory callback (alloc feature)
775 ///
776 /// Available in both `std` and `no_std + alloc` environments.
777 #[cfg(feature = "alloc")]
778 pub fn with_producer_factory<F>(mut self, factory: F) -> Self
779 where
780 F: Fn(Arc<dyn core::any::Any + Send + Sync>) -> Box<dyn ProducerTrait>
781 + Send
782 + Sync
783 + 'static,
784 {
785 self.producer_factory = Some(Arc::new(factory));
786 self
787 }
788
789 /// Creates a producer using the stored factory (alloc feature)
790 ///
791 /// Available in both `std` and `no_std + alloc` environments.
792 #[cfg(feature = "alloc")]
793 pub fn create_producer(
794 &self,
795 db_any: Arc<dyn core::any::Any + Send + Sync>,
796 ) -> Option<Box<dyn ProducerTrait>> {
797 self.producer_factory.as_ref().map(|f| f(db_any))
798 }
799
800 /// Resolves the subscription topic for this link
801 ///
802 /// If a topic resolver is configured, calls it to determine the topic.
803 /// Otherwise, returns the static topic from the URL.
804 ///
805 /// This is called once at connector startup.
806 pub fn resolve_topic(&self) -> String {
807 self.topic_resolver
808 .as_ref()
809 .and_then(|resolver| resolver())
810 .unwrap_or_else(|| self.url.resource_id())
811 }
812}
813
814/// Configuration for an outbound connector link (AimDB → External)
815pub struct OutboundConnectorLink {
816 pub url: ConnectorUrl,
817 pub config: Vec<(String, String)>,
818}
819
820/// Parses a connector URL string into structured components
821///
822/// This is a simple parser that handles the most common URL formats.
823/// For production use, consider using the `url` crate with feature flags.
824fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
825 use crate::DbError;
826
827 // Split scheme from rest
828 let (scheme, rest) = url.split_once("://").ok_or({
829 #[cfg(feature = "std")]
830 {
831 DbError::InvalidOperation {
832 operation: "parse_connector_url".into(),
833 reason: format!("Missing scheme in URL: {}", url),
834 }
835 }
836 #[cfg(not(feature = "std"))]
837 {
838 DbError::InvalidOperation {
839 _operation: (),
840 _reason: (),
841 }
842 }
843 })?;
844
845 // Extract credentials if present (user:pass@host)
846 let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
847 let creds = &rest[..at_idx];
848 let host = &rest[at_idx + 1..];
849 (Some(creds), host)
850 } else {
851 (None, rest)
852 };
853
854 let (username, password) = if let Some(creds) = credentials {
855 if let Some((user, pass)) = creds.split_once(':') {
856 (Some(user.to_string()), Some(pass.to_string()))
857 } else {
858 (Some(creds.to_string()), None)
859 }
860 } else {
861 (None, None)
862 };
863
864 // Split path and query from host:port
865 let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
866 let hp = &host_part[..slash_idx];
867 let path_query = &host_part[slash_idx..];
868
869 // Split query parameters
870 let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
871 (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
872 } else {
873 (path_query, None)
874 };
875
876 // Parse query parameters
877 let params = if let Some(query) = query_part {
878 query
879 .split('&')
880 .filter_map(|pair| {
881 let (k, v) = pair.split_once('=')?;
882 Some((k.to_string(), v.to_string()))
883 })
884 .collect()
885 } else {
886 Vec::new()
887 };
888
889 (hp, Some(path_part.to_string()), params)
890 } else {
891 (host_part, None, Vec::new())
892 };
893
894 // Split host and port
895 let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
896 let h = &host_port[..colon_idx];
897 let p = &host_port[colon_idx + 1..];
898 let port_num = p.parse::<u16>().ok();
899 (h.to_string(), port_num)
900 } else {
901 (host_port.to_string(), None)
902 };
903
904 Ok(ConnectorUrl {
905 scheme: scheme.to_string(),
906 host,
907 port,
908 path,
909 username,
910 password,
911 query_params,
912 })
913}
914
915/// Trait for building connectors after the database is constructed
916///
917/// Connectors that need to collect routes from the database (for inbound routing)
918/// implement this trait. The builder pattern allows connectors to be constructed
919/// in two phases:
920///
921/// 1. Configuration phase: User provides broker URLs and settings
922/// 2. Build phase: Connector collects routes from the database and initializes
923///
924/// # Example
925///
926/// ```rust,ignore
927/// pub struct MqttConnectorBuilder {
928/// broker_url: String,
929/// }
930///
931/// impl<R> ConnectorBuilder<R> for MqttConnectorBuilder
932/// where
933/// R: aimdb_executor::Spawn + 'static,
934/// {
935/// fn build<'a>(
936/// &'a self,
937/// db: &'a AimDb<R>,
938/// ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>> {
939/// Box::pin(async move {
940/// let routes = db.collect_inbound_routes(self.scheme());
941/// let router = RouterBuilder::from_routes(routes).build();
942/// let connector = MqttConnector::new(&self.broker_url, router).await?;
943/// Ok(Arc::new(connector) as Arc<dyn Connector>)
944/// })
945/// }
946///
947/// fn scheme(&self) -> &str {
948/// "mqtt"
949/// }
950/// }
951/// ```
952pub trait ConnectorBuilder<R>: Send + Sync
953where
954 R: aimdb_executor::Spawn + 'static,
955{
956 /// Build the connector using the database
957 ///
958 /// This method is called during `AimDbBuilder::build()` after the database
959 /// has been constructed. The builder can use the database to:
960 /// - Collect inbound routes via `db.collect_inbound_routes()`
961 /// - Access database configuration
962 /// - Register subscriptions
963 ///
964 /// # Arguments
965 /// * `db` - The constructed database instance
966 ///
967 /// # Returns
968 /// An `Arc<dyn Connector>` that will be registered with the database
969 #[allow(clippy::type_complexity)]
970 fn build<'a>(
971 &'a self,
972 db: &'a AimDb<R>,
973 ) -> Pin<Box<dyn Future<Output = DbResult<Arc<dyn Connector>>> + Send + 'a>>;
974
975 /// The URL scheme this connector handles
976 ///
977 /// Returns the scheme (e.g., "mqtt", "kafka", "http") that this connector
978 /// will be registered under. Used for routing `.link_from()` and `.link_to()`
979 /// declarations to the appropriate connector.
980 fn scheme(&self) -> &str;
981}
982
983#[cfg(test)]
984mod tests {
985 use super::*;
986 use alloc::format;
987
988 #[test]
989 fn test_parse_simple_mqtt() {
990 let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
991 assert_eq!(url.scheme, "mqtt");
992 assert_eq!(url.host, "broker.example.com");
993 assert_eq!(url.port, Some(1883));
994 assert_eq!(url.username, None);
995 assert_eq!(url.password, None);
996 }
997
998 #[test]
999 fn test_parse_mqtt_with_credentials() {
1000 let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
1001 assert_eq!(url.scheme, "mqtt");
1002 assert_eq!(url.host, "broker.example.com");
1003 assert_eq!(url.port, Some(1883));
1004 assert_eq!(url.username, Some("user".to_string()));
1005 assert_eq!(url.password, Some("pass".to_string()));
1006 }
1007
1008 #[test]
1009 fn test_parse_https_with_path() {
1010 let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
1011 assert_eq!(url.scheme, "https");
1012 assert_eq!(url.host, "api.example.com");
1013 assert_eq!(url.port, Some(8443));
1014 assert_eq!(url.path, Some("/events".to_string()));
1015 }
1016
1017 #[test]
1018 fn test_parse_with_query_params() {
1019 let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
1020 assert_eq!(url.scheme, "http");
1021 assert_eq!(url.host, "api.example.com");
1022 assert_eq!(url.path, Some("/data".to_string()));
1023 assert_eq!(url.query_params.len(), 2);
1024 assert_eq!(
1025 url.query_params[0],
1026 ("key".to_string(), "value".to_string())
1027 );
1028 assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
1029 }
1030
1031 #[test]
1032 fn test_default_ports() {
1033 let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
1034 assert_eq!(mqtt.default_port(), Some(1883));
1035 assert_eq!(mqtt.effective_port(), Some(1883));
1036
1037 let https = ConnectorUrl::parse("https://api.example.com").unwrap();
1038 assert_eq!(https.default_port(), Some(443));
1039 assert_eq!(https.effective_port(), Some(443));
1040 }
1041
1042 #[test]
1043 fn test_is_secure() {
1044 assert!(ConnectorUrl::parse("mqtts://broker.local")
1045 .unwrap()
1046 .is_secure());
1047 assert!(ConnectorUrl::parse("https://api.example.com")
1048 .unwrap()
1049 .is_secure());
1050 assert!(ConnectorUrl::parse("wss://ws.example.com")
1051 .unwrap()
1052 .is_secure());
1053
1054 assert!(!ConnectorUrl::parse("mqtt://broker.local")
1055 .unwrap()
1056 .is_secure());
1057 assert!(!ConnectorUrl::parse("http://api.example.com")
1058 .unwrap()
1059 .is_secure());
1060 assert!(!ConnectorUrl::parse("ws://ws.example.com")
1061 .unwrap()
1062 .is_secure());
1063 }
1064
1065 #[test]
1066 fn test_display_hides_password() {
1067 let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
1068 let display = format!("{}", url);
1069 assert!(display.contains("user:****"));
1070 assert!(!display.contains("secret"));
1071 }
1072
1073 #[test]
1074 fn test_parse_kafka_style() {
1075 let url =
1076 ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
1077 assert_eq!(url.scheme, "kafka");
1078 // Note: Our simple parser doesn't handle the second port in comma-separated hosts perfectly
1079 // It parses "broker1.local:9092,broker2.local" as the host and "9092" as the port
1080 // This is acceptable for now - production connectors can handle this in their client factories
1081 assert!(url.host.contains("broker1.local"));
1082 assert!(url.host.contains("broker2.local"));
1083 assert_eq!(url.path, Some("/my-topic".to_string()));
1084 }
1085
1086 #[test]
1087 fn test_parse_missing_scheme() {
1088 let result = ConnectorUrl::parse("broker.example.com:1883");
1089 assert!(result.is_err());
1090 }
1091
1092 // ========================================================================
1093 // TopicProvider Tests
1094 // ========================================================================
1095
1096 #[allow(dead_code)]
1097 #[derive(Debug, Clone)]
1098 struct TestTemperature {
1099 sensor_id: String,
1100 celsius: f32,
1101 }
1102
1103 struct TestTopicProvider;
1104
1105 impl super::TopicProvider<TestTemperature> for TestTopicProvider {
1106 fn topic(&self, value: &TestTemperature) -> Option<String> {
1107 Some(format!("sensors/temp/{}", value.sensor_id))
1108 }
1109 }
1110
1111 #[test]
1112 fn test_topic_provider_type_erasure() {
1113 use super::{TopicProviderAny, TopicProviderWrapper};
1114
1115 let provider: Arc<dyn TopicProviderAny> =
1116 Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1117 let temp = TestTemperature {
1118 sensor_id: "kitchen-001".into(),
1119 celsius: 22.5,
1120 };
1121
1122 assert_eq!(
1123 provider.topic_any(&temp),
1124 Some("sensors/temp/kitchen-001".into())
1125 );
1126 }
1127
1128 #[test]
1129 fn test_topic_provider_type_mismatch() {
1130 use super::{TopicProviderAny, TopicProviderWrapper};
1131
1132 let provider: Arc<dyn TopicProviderAny> =
1133 Arc::new(TopicProviderWrapper::new(TestTopicProvider));
1134 let wrong_type = "not a temperature";
1135
1136 // Type mismatch returns None (falls back to default topic)
1137 assert_eq!(provider.topic_any(&wrong_type), None);
1138 }
1139
1140 #[test]
1141 fn test_topic_provider_returns_none() {
1142 struct OptionalTopicProvider;
1143
1144 impl super::TopicProvider<TestTemperature> for OptionalTopicProvider {
1145 fn topic(&self, temp: &TestTemperature) -> Option<String> {
1146 if temp.sensor_id.is_empty() {
1147 None // Fall back to default topic
1148 } else {
1149 Some(format!("sensors/{}", temp.sensor_id))
1150 }
1151 }
1152 }
1153
1154 use super::{TopicProviderAny, TopicProviderWrapper};
1155
1156 let provider: Arc<dyn TopicProviderAny> =
1157 Arc::new(TopicProviderWrapper::new(OptionalTopicProvider));
1158
1159 // Non-empty sensor_id returns dynamic topic
1160 let temp_with_id = TestTemperature {
1161 sensor_id: "abc".into(),
1162 celsius: 20.0,
1163 };
1164 assert_eq!(
1165 provider.topic_any(&temp_with_id),
1166 Some("sensors/abc".into())
1167 );
1168
1169 // Empty sensor_id returns None (fallback)
1170 let temp_without_id = TestTemperature {
1171 sensor_id: String::new(),
1172 celsius: 20.0,
1173 };
1174 assert_eq!(provider.topic_any(&temp_without_id), None);
1175 }
1176
1177 // ========================================================================
1178 // TopicResolverFn Tests
1179 // ========================================================================
1180
1181 #[test]
1182 fn test_topic_resolver_returns_some() {
1183 let resolver: super::TopicResolverFn = Arc::new(|| Some("resolved/topic".into()));
1184
1185 assert_eq!(resolver(), Some("resolved/topic".into()));
1186 }
1187
1188 #[test]
1189 fn test_topic_resolver_returns_none() {
1190 let resolver: super::TopicResolverFn = Arc::new(|| None);
1191
1192 // Returns None, should fall back to default topic
1193 assert_eq!(resolver(), None);
1194 }
1195
1196 #[cfg(feature = "std")]
1197 #[test]
1198 fn test_topic_resolver_with_captured_state() {
1199 use std::sync::Mutex;
1200
1201 let config = Arc::new(Mutex::new(Some("dynamic/topic".to_string())));
1202 let config_clone = config.clone();
1203
1204 let resolver: super::TopicResolverFn =
1205 Arc::new(move || config_clone.lock().unwrap().clone());
1206
1207 assert_eq!(resolver(), Some("dynamic/topic".into()));
1208
1209 // Clear config
1210 *config.lock().unwrap() = None;
1211 assert_eq!(resolver(), None);
1212 }
1213
1214 #[test]
1215 fn test_inbound_connector_link_resolve_topic_default() {
1216 use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
1217
1218 let url = ConnectorUrl::parse("mqtt://sensors/temperature").unwrap();
1219 let deserializer: DeserializerFn =
1220 Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1221 let link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
1222
1223 // No resolver configured, should return static topic from URL
1224 assert_eq!(link.resolve_topic(), "sensors/temperature");
1225 }
1226
1227 #[test]
1228 fn test_inbound_connector_link_resolve_topic_dynamic() {
1229 use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
1230
1231 let url = ConnectorUrl::parse("mqtt://sensors/default").unwrap();
1232 let deserializer: DeserializerFn =
1233 Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1234 let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
1235
1236 // Configure dynamic resolver
1237 link.topic_resolver = Some(Arc::new(|| Some("sensors/dynamic/kitchen".into())));
1238
1239 // Should return resolved topic, not URL topic
1240 assert_eq!(link.resolve_topic(), "sensors/dynamic/kitchen");
1241 }
1242
1243 #[test]
1244 fn test_inbound_connector_link_resolve_topic_fallback() {
1245 use super::{ConnectorUrl, DeserializerFn, DeserializerKind, InboundConnectorLink};
1246
1247 let url = ConnectorUrl::parse("mqtt://sensors/fallback").unwrap();
1248 let deserializer: DeserializerFn =
1249 Arc::new(|_| Ok(Box::new(()) as Box<dyn core::any::Any + Send>));
1250 let mut link = InboundConnectorLink::new(url, DeserializerKind::Raw(deserializer));
1251
1252 // Configure resolver that returns None
1253 link.topic_resolver = Some(Arc::new(|| None));
1254
1255 // Should fall back to static topic from URL
1256 assert_eq!(link.resolve_topic(), "sensors/fallback");
1257 }
1258}