aimdb_core/
transport.rs

1//! Transport connector traits for MQTT, Kafka, HTTP, shmem, and other protocols
2//!
3//! Provides a generic `Connector` trait that enables scheme-based routing
4//! to different transport protocols. Each connector manages a single connection
5//! to a specific endpoint (e.g., one MQTT broker, one shared memory segment, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **Scheme-based routing**: URL scheme (mqtt://, shmem://, kafka://) determines which connector handles requests
10//! - **Single endpoint per connector**: Each connector connects to ONE broker/resource
11//! - **Multi-transport publishing**: Same data can be published to multiple protocols
12//! - **Protocol-agnostic core**: Core doesn't know about MQTT, Kafka, etc. - just routes by scheme
13
14extern crate alloc;
15
16use alloc::{boxed::Box, string::String, vec::Vec};
17use core::future::Future;
18use core::pin::Pin;
19
20/// Protocol-agnostic connector configuration
21///
22/// Provides common configuration options that apply across multiple protocols.
23/// Each protocol interprets these fields according to its semantics.
24///
25/// # Protocol Interpretation
26///
27/// - **MQTT**: qos=QoS level, retain=retain flag, timeout_ms=publish timeout
28/// - **Kafka**: qos=acks setting (0=none, 1=leader, 2=all), timeout_ms=send timeout
29/// - **HTTP**: qos=retry count, timeout_ms=request timeout
30/// - **Shmem**: qos=priority, retain=pin in memory
31#[derive(Debug, Clone)]
32pub struct ConnectorConfig {
33    /// Quality of Service / reliability level (0, 1, or 2)
34    pub qos: u8,
35
36    /// Whether to retain/persist the message
37    pub retain: bool,
38
39    /// Optional timeout in milliseconds
40    pub timeout_ms: Option<u32>,
41
42    /// Protocol-specific options as key-value pairs
43    /// Allows custom configuration without polluting the base struct
44    pub protocol_options: Vec<(String, String)>,
45}
46
47impl Default for ConnectorConfig {
48    fn default() -> Self {
49        Self {
50            qos: 0,
51            retain: false,
52            timeout_ms: Some(5000),
53            protocol_options: Vec::new(),
54        }
55    }
56}
57
58/// Error that can occur during connector publishing
59///
60/// Uses an enum instead of String for better performance in `no_std` environments
61/// and to enable defmt logging support in Embassy.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum PublishError {
64    /// Failed to connect to endpoint
65    ConnectionFailed,
66    /// Message payload too large for buffer
67    MessageTooLarge,
68    /// Quality of Service level not supported
69    UnsupportedQoS,
70    /// Network or operation timeout occurred
71    Timeout,
72    /// Buffer full, cannot queue message
73    BufferFull,
74    /// Invalid destination (topic, segment, endpoint)
75    InvalidDestination,
76}
77
78#[cfg(feature = "defmt")]
79impl defmt::Format for PublishError {
80    fn format(&self, f: defmt::Formatter) {
81        match self {
82            Self::ConnectionFailed => defmt::write!(f, "ConnectionFailed"),
83            Self::MessageTooLarge => defmt::write!(f, "MessageTooLarge"),
84            Self::UnsupportedQoS => defmt::write!(f, "UnsupportedQoS"),
85            Self::Timeout => defmt::write!(f, "Timeout"),
86            Self::BufferFull => defmt::write!(f, "BufferFull"),
87            Self::InvalidDestination => defmt::write!(f, "InvalidDestination"),
88        }
89    }
90}
91
92#[cfg(feature = "std")]
93impl std::fmt::Display for PublishError {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            Self::ConnectionFailed => write!(f, "Failed to connect to endpoint"),
97            Self::MessageTooLarge => write!(f, "Message payload too large"),
98            Self::UnsupportedQoS => write!(f, "QoS level not supported"),
99            Self::Timeout => write!(f, "Operation timeout"),
100            Self::BufferFull => write!(f, "Buffer full, cannot queue message"),
101            Self::InvalidDestination => write!(f, "Invalid destination"),
102        }
103    }
104}
105
106#[cfg(feature = "std")]
107impl std::error::Error for PublishError {}
108
109/// Generic transport connector trait for protocol-agnostic publishing
110///
111/// This trait enables multi-protocol publishing via scheme-based routing:
112/// - `mqtt://topic` → MQTT broker
113/// - `shmem://segment` → Shared memory
114/// - `kafka://topic` → Kafka cluster
115/// - `http://endpoint` → HTTP POST
116/// - `dds://topic` → DDS topic
117///
118/// Each connector manages ONE connection/endpoint. For multiple brokers/endpoints,
119/// create multiple connectors and register them with different schemes.
120///
121/// # Example Implementation
122///
123/// ```rust,ignore
124/// impl Connector for MqttConnector {
125///     fn publish(
126///         &self,
127///         destination: &str,  // "sensors/temperature"
128///         config: &ConnectorConfig,
129///         payload: &[u8],
130///     ) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>> {
131///         Box::pin(async move {
132///             self.client.publish(destination, config.qos, config.retain, payload).await
133///                 .map_err(|_| PublishError::ConnectionFailed)
134///         })
135///     }
136/// }
137/// ```
138///
139/// # Usage
140///
141/// ```rust,ignore
142/// let mqtt_connector = MqttConnector::new("mqtt://broker.local:1883").await?;
143///
144/// let db = AimDbBuilder::new()
145///     .runtime(runtime)
146///     .with_connector("mqtt", Arc::new(mqtt_connector))
147///     .configure::<Temperature>(|reg| {
148///         reg.link_to("mqtt://sensors/temp")
149///            .with_qos(1)
150///            .finish()
151///     })
152///     .build()?;
153/// ```
154///
155/// # Thread Safety
156///
157/// Requires Send + Sync for Tokio compatibility. For Embassy (single-threaded),
158/// use `unsafe impl Send + Sync` with safety documentation.
159pub trait Connector: Send + Sync {
160    /// Publish data to a protocol-specific destination
161    ///
162    /// # Arguments
163    /// * `destination` - Protocol-specific path (no broker/host info):
164    ///   - MQTT: "sensors/temperature"
165    ///   - Shmem: "temp_readings"
166    ///   - Kafka: "production/events"
167    ///   - HTTP: "api/v1/sensors"
168    /// * `config` - Publishing configuration (QoS, retain, timeout, protocol options)
169    /// * `payload` - Message payload as byte slice
170    ///
171    /// # Returns
172    /// `Ok(())` on success, `PublishError` on failure
173    fn publish(
174        &self,
175        destination: &str,
176        config: &ConnectorConfig,
177        payload: &[u8],
178    ) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>>;
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use alloc::sync::Arc;
185
186    // Mock connector for testing
187    struct MockConnector;
188
189    impl Connector for MockConnector {
190        fn publish(
191            &self,
192            _destination: &str,
193            _config: &ConnectorConfig,
194            _payload: &[u8],
195        ) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>> {
196            Box::pin(async move { Ok(()) })
197        }
198    }
199
200    #[test]
201    fn test_connector_trait() {
202        let connector = Arc::new(MockConnector);
203
204        // Verify the connector can be used as a trait object
205        let _trait_obj: Arc<dyn Connector> = connector;
206    }
207
208    #[test]
209    fn test_connector_config_default() {
210        let config = ConnectorConfig::default();
211        assert_eq!(config.qos, 0);
212        assert!(!config.retain);
213        assert_eq!(config.timeout_ms, Some(5000));
214        assert_eq!(config.protocol_options.len(), 0);
215    }
216
217    #[test]
218    fn test_publish_error_copy() {
219        let err = PublishError::ConnectionFailed;
220        let err2 = err; // Should be Copy
221        assert_eq!(err, err2);
222    }
223}