mockforge_core/protocol_abstraction/
streaming.rs

1//! Streaming protocol abstractions for pub/sub and messaging patterns
2//!
3//! This module provides traits and types for protocols that support streaming,
4//! pub/sub, and asynchronous messaging patterns like MQTT, Kafka, and RabbitMQ.
5
6use crate::Result;
7use async_trait::async_trait;
8use futures::Stream;
9use std::pin::Pin;
10use std::sync::Arc;
11
12/// A message in a streaming protocol
13#[derive(Debug, Clone)]
14pub struct ProtocolMessage {
15    /// Message ID or sequence number
16    pub id: Option<String>,
17    /// Topic or channel name
18    pub topic: String,
19    /// Message payload
20    pub payload: Vec<u8>,
21    /// Message metadata (headers, properties, etc.)
22    pub metadata: std::collections::HashMap<String, String>,
23    /// Quality of Service level
24    pub qos: Option<u8>,
25    /// Timestamp when message was received
26    pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
27}
28
29/// A stream of protocol messages
30pub type MessageStream = Pin<Box<dyn Stream<Item = Result<ProtocolMessage>> + Send>>;
31
32/// Metadata about a streaming connection
33#[derive(Debug, Clone)]
34pub struct StreamingMetadata {
35    /// Protocol being used
36    pub protocol: super::Protocol,
37    /// Connection identifier
38    pub connection_id: String,
39    /// Server information
40    pub server_info: Option<String>,
41    /// Active subscriptions/topics
42    pub subscriptions: Vec<String>,
43    /// Connection status
44    pub connected: bool,
45}
46
47/// Trait for protocols that support streaming and pub/sub patterns
48#[async_trait]
49pub trait StreamingProtocol: Send + Sync {
50    /// Subscribe to a topic and return a stream of messages
51    async fn subscribe(&self, topic: &str, consumer_id: &str) -> Result<MessageStream>;
52
53    /// Publish a message to a topic
54    async fn publish(&self, topic: &str, message: ProtocolMessage) -> Result<()>;
55
56    /// Unsubscribe from a topic
57    async fn unsubscribe(&self, _topic: &str, _consumer_id: &str) -> Result<()> {
58        // Default implementation does nothing
59        Ok(())
60    }
61
62    /// Get metadata about the streaming connection
63    fn get_metadata(&self) -> StreamingMetadata;
64
65    /// Check if the connection is healthy
66    fn is_connected(&self) -> bool {
67        self.get_metadata().connected
68    }
69}
70
71/// A registry for managing multiple streaming protocol handlers
72pub struct StreamingProtocolRegistry {
73    handlers: std::collections::HashMap<super::Protocol, Arc<dyn StreamingProtocol>>,
74}
75
76impl StreamingProtocolRegistry {
77    /// Create a new empty registry
78    pub fn new() -> Self {
79        Self {
80            handlers: std::collections::HashMap::new(),
81        }
82    }
83
84    /// Register a streaming protocol handler
85    pub fn register_handler(
86        &mut self,
87        protocol: super::Protocol,
88        handler: Arc<dyn StreamingProtocol>,
89    ) {
90        self.handlers.insert(protocol, handler);
91    }
92
93    /// Get a handler for a specific protocol
94    pub fn get_handler(&self, protocol: &super::Protocol) -> Option<&Arc<dyn StreamingProtocol>> {
95        self.handlers.get(protocol)
96    }
97
98    /// Get all registered protocols
99    pub fn registered_protocols(&self) -> Vec<super::Protocol> {
100        self.handlers.keys().cloned().collect()
101    }
102
103    /// Check if a protocol is supported
104    pub fn supports_protocol(&self, protocol: &super::Protocol) -> bool {
105        self.handlers.contains_key(protocol)
106    }
107}
108
109impl Default for StreamingProtocolRegistry {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115/// Helper for creating protocol messages
116pub struct MessageBuilder {
117    message: ProtocolMessage,
118}
119
120impl MessageBuilder {
121    /// Create a new message builder
122    pub fn new(topic: impl Into<String>) -> Self {
123        Self {
124            message: ProtocolMessage {
125                id: None,
126                topic: topic.into(),
127                payload: Vec::new(),
128                metadata: std::collections::HashMap::new(),
129                qos: None,
130                timestamp: Some(chrono::Utc::now()),
131            },
132        }
133    }
134
135    /// Set the message ID
136    pub fn id(mut self, id: impl Into<String>) -> Self {
137        self.message.id = Some(id.into());
138        self
139    }
140
141    /// Set the message payload
142    pub fn payload(mut self, payload: impl Into<Vec<u8>>) -> Self {
143        self.message.payload = payload.into();
144        self
145    }
146
147    /// Set the message payload from a string
148    pub fn text(mut self, text: impl AsRef<str>) -> Self {
149        self.message.payload = text.as_ref().as_bytes().to_vec();
150        self
151    }
152
153    /// Set the message payload from JSON
154    pub fn json<T: serde::Serialize>(mut self, value: &T) -> Result<Self> {
155        self.message.payload = serde_json::to_vec(value)?;
156        self.message
157            .metadata
158            .insert("content-type".to_string(), "application/json".to_string());
159        Ok(self)
160    }
161
162    /// Add metadata
163    pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164        self.message.metadata.insert(key.into(), value.into());
165        self
166    }
167
168    /// Set QoS level
169    pub fn qos(mut self, qos: u8) -> Self {
170        self.message.qos = Some(qos);
171        self
172    }
173
174    /// Build the message
175    pub fn build(self) -> ProtocolMessage {
176        self.message
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_message_builder() {
186        let message = MessageBuilder::new("test-topic")
187            .id("msg-123")
188            .text("Hello, World!")
189            .metadata("priority", "high")
190            .qos(1)
191            .build();
192
193        assert_eq!(message.topic, "test-topic");
194        assert_eq!(message.id, Some("msg-123".to_string()));
195        assert_eq!(message.payload, b"Hello, World!");
196        assert_eq!(message.metadata.get("priority"), Some(&"high".to_string()));
197        assert_eq!(message.qos, Some(1));
198        assert!(message.timestamp.is_some());
199    }
200
201    #[test]
202    fn test_message_builder_json() {
203        #[derive(serde::Serialize)]
204        struct TestData {
205            name: String,
206            value: i32,
207        }
208
209        let data = TestData {
210            name: "test".to_string(),
211            value: 42,
212        };
213
214        let message = MessageBuilder::new("json-topic").json(&data).unwrap().build();
215
216        assert_eq!(message.topic, "json-topic");
217        assert_eq!(message.metadata.get("content-type"), Some(&"application/json".to_string()));
218        assert!(!message.payload.is_empty());
219    }
220
221    #[test]
222    fn test_streaming_registry() {
223        let registry = StreamingProtocolRegistry::new();
224
225        // Registry should start empty
226        assert!(!registry.supports_protocol(&crate::protocol_abstraction::Protocol::Mqtt));
227        assert_eq!(registry.registered_protocols().len(), 0);
228
229        // Note: We can't easily test with actual handlers without implementing mock streaming protocols
230        // This would require creating mock implementations of StreamingProtocol
231    }
232}