aimdb_core/transport.rs
1//! Transport connector traits for MQTT, Kafka, HTTP, shmem, and other protocols
2//!
3//! Provides a generic `Connector` trait that enables scheme-based routing
4//! to different transport protocols. Each connector manages a single connection
5//! to a specific endpoint (e.g., one MQTT broker, one shared memory segment, etc.).
6//!
7//! # Design Philosophy
8//!
9//! - **Scheme-based routing**: URL scheme (mqtt://, shmem://, kafka://) determines which connector handles requests
10//! - **Single endpoint per connector**: Each connector connects to ONE broker/resource
11//! - **Multi-transport publishing**: Same data can be published to multiple protocols
12//! - **Protocol-agnostic core**: Core doesn't know about MQTT, Kafka, etc. - just routes by scheme
13
14extern crate alloc;
15
16use alloc::{boxed::Box, string::String, vec::Vec};
17use core::future::Future;
18use core::pin::Pin;
19
20/// Protocol-agnostic connector configuration
21///
22/// Provides common configuration options that apply across multiple protocols.
23/// Each protocol interprets these fields according to its semantics.
24///
25/// # Protocol Interpretation
26///
27/// - **MQTT**: qos=QoS level, retain=retain flag, timeout_ms=publish timeout
28/// - **Kafka**: qos=acks setting (0=none, 1=leader, 2=all), timeout_ms=send timeout
29/// - **HTTP**: qos=retry count, timeout_ms=request timeout
30/// - **Shmem**: qos=priority, retain=pin in memory
31#[derive(Debug, Clone)]
32pub struct ConnectorConfig {
33 /// Quality of Service / reliability level (0, 1, or 2)
34 pub qos: u8,
35
36 /// Whether to retain/persist the message
37 pub retain: bool,
38
39 /// Optional timeout in milliseconds
40 pub timeout_ms: Option<u32>,
41
42 /// Protocol-specific options as key-value pairs
43 /// Allows custom configuration without polluting the base struct
44 pub protocol_options: Vec<(String, String)>,
45}
46
47impl Default for ConnectorConfig {
48 fn default() -> Self {
49 Self {
50 qos: 0,
51 retain: false,
52 timeout_ms: Some(5000),
53 protocol_options: Vec::new(),
54 }
55 }
56}
57
58/// Error that can occur during connector publishing
59///
60/// Uses an enum instead of String for better performance in `no_std` environments
61/// and to enable defmt logging support in Embassy.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum PublishError {
64 /// Failed to connect to endpoint
65 ConnectionFailed,
66 /// Message payload too large for buffer
67 MessageTooLarge,
68 /// Quality of Service level not supported
69 UnsupportedQoS,
70 /// Network or operation timeout occurred
71 Timeout,
72 /// Buffer full, cannot queue message
73 BufferFull,
74 /// Invalid destination (topic, segment, endpoint)
75 InvalidDestination,
76}
77
78#[cfg(feature = "defmt")]
79impl defmt::Format for PublishError {
80 fn format(&self, f: defmt::Formatter) {
81 match self {
82 Self::ConnectionFailed => defmt::write!(f, "ConnectionFailed"),
83 Self::MessageTooLarge => defmt::write!(f, "MessageTooLarge"),
84 Self::UnsupportedQoS => defmt::write!(f, "UnsupportedQoS"),
85 Self::Timeout => defmt::write!(f, "Timeout"),
86 Self::BufferFull => defmt::write!(f, "BufferFull"),
87 Self::InvalidDestination => defmt::write!(f, "InvalidDestination"),
88 }
89 }
90}
91
92#[cfg(feature = "std")]
93impl std::fmt::Display for PublishError {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 Self::ConnectionFailed => write!(f, "Failed to connect to endpoint"),
97 Self::MessageTooLarge => write!(f, "Message payload too large"),
98 Self::UnsupportedQoS => write!(f, "QoS level not supported"),
99 Self::Timeout => write!(f, "Operation timeout"),
100 Self::BufferFull => write!(f, "Buffer full, cannot queue message"),
101 Self::InvalidDestination => write!(f, "Invalid destination"),
102 }
103 }
104}
105
106#[cfg(feature = "std")]
107impl std::error::Error for PublishError {}
108
109/// Generic transport connector trait for protocol-agnostic publishing
110///
111/// This trait enables multi-protocol publishing via scheme-based routing:
112/// - `mqtt://topic` → MQTT broker
113/// - `shmem://segment` → Shared memory
114/// - `kafka://topic` → Kafka cluster
115/// - `http://endpoint` → HTTP POST
116/// - `dds://topic` → DDS topic
117///
118/// Each connector manages ONE connection/endpoint. For multiple brokers/endpoints,
119/// create multiple connectors and register them with different schemes.
120///
121/// # Example Implementation
122///
123/// ```rust,ignore
124/// impl Connector for MqttConnector {
125/// fn publish(
126/// &self,
127/// destination: &str, // "sensors/temperature"
128/// config: &ConnectorConfig,
129/// payload: &[u8],
130/// ) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>> {
131/// Box::pin(async move {
132/// self.client.publish(destination, config.qos, config.retain, payload).await
133/// .map_err(|_| PublishError::ConnectionFailed)
134/// })
135/// }
136/// }
137/// ```
138///
139/// # Usage
140///
141/// ```rust,ignore
142/// let mqtt_connector = MqttConnector::new("mqtt://broker.local:1883").await?;
143///
144/// let db = AimDbBuilder::new()
145/// .runtime(runtime)
146/// .with_connector("mqtt", Arc::new(mqtt_connector))
147/// .configure::<Temperature>(|reg| {
148/// reg.link_to("mqtt://sensors/temp")
149/// .with_qos(1)
150/// .finish()
151/// })
152/// .build()?;
153/// ```
154///
155/// # Thread Safety
156///
157/// Requires Send + Sync for Tokio compatibility. For Embassy (single-threaded),
158/// use `unsafe impl Send + Sync` with safety documentation.
159pub trait Connector: Send + Sync {
160 /// Publish data to a protocol-specific destination
161 ///
162 /// # Arguments
163 /// * `destination` - Protocol-specific path (no broker/host info):
164 /// - MQTT: "sensors/temperature"
165 /// - Shmem: "temp_readings"
166 /// - Kafka: "production/events"
167 /// - HTTP: "api/v1/sensors"
168 /// * `config` - Publishing configuration (QoS, retain, timeout, protocol options)
169 /// * `payload` - Message payload as byte slice
170 ///
171 /// # Returns
172 /// `Ok(())` on success, `PublishError` on failure
173 fn publish(
174 &self,
175 destination: &str,
176 config: &ConnectorConfig,
177 payload: &[u8],
178 ) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>>;
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use alloc::sync::Arc;
185
186 // Mock connector for testing
187 struct MockConnector;
188
189 impl Connector for MockConnector {
190 fn publish(
191 &self,
192 _destination: &str,
193 _config: &ConnectorConfig,
194 _payload: &[u8],
195 ) -> Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send + '_>> {
196 Box::pin(async move { Ok(()) })
197 }
198 }
199
200 #[test]
201 fn test_connector_trait() {
202 let connector = Arc::new(MockConnector);
203
204 // Verify the connector can be used as a trait object
205 let _trait_obj: Arc<dyn Connector> = connector;
206 }
207
208 #[test]
209 fn test_connector_config_default() {
210 let config = ConnectorConfig::default();
211 assert_eq!(config.qos, 0);
212 assert!(!config.retain);
213 assert_eq!(config.timeout_ms, Some(5000));
214 assert_eq!(config.protocol_options.len(), 0);
215 }
216
217 #[test]
218 fn test_publish_error_copy() {
219 let err = PublishError::ConnectionFailed;
220 let err2 = err; // Should be Copy
221 assert_eq!(err, err2);
222 }
223}