mockforge_core/protocol_abstraction/
streaming.rs1use crate::Result;
7use async_trait::async_trait;
8use futures::Stream;
9use std::pin::Pin;
10use std::sync::Arc;
11
12#[derive(Debug, Clone)]
14pub struct ProtocolMessage {
15    pub id: Option<String>,
17    pub topic: String,
19    pub payload: Vec<u8>,
21    pub metadata: std::collections::HashMap<String, String>,
23    pub qos: Option<u8>,
25    pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
27}
28
29pub type MessageStream = Pin<Box<dyn Stream<Item = Result<ProtocolMessage>> + Send>>;
31
32#[derive(Debug, Clone)]
34pub struct StreamingMetadata {
35    pub protocol: super::Protocol,
37    pub connection_id: String,
39    pub server_info: Option<String>,
41    pub subscriptions: Vec<String>,
43    pub connected: bool,
45}
46
47#[async_trait]
49pub trait StreamingProtocol: Send + Sync {
50    async fn subscribe(&self, topic: &str, consumer_id: &str) -> Result<MessageStream>;
52
53    async fn publish(&self, topic: &str, message: ProtocolMessage) -> Result<()>;
55
56    async fn unsubscribe(&self, _topic: &str, _consumer_id: &str) -> Result<()> {
58        Ok(())
60    }
61
62    fn get_metadata(&self) -> StreamingMetadata;
64
65    fn is_connected(&self) -> bool {
67        self.get_metadata().connected
68    }
69}
70
71pub struct StreamingProtocolRegistry {
73    handlers: std::collections::HashMap<super::Protocol, Arc<dyn StreamingProtocol>>,
74}
75
76impl StreamingProtocolRegistry {
77    pub fn new() -> Self {
79        Self {
80            handlers: std::collections::HashMap::new(),
81        }
82    }
83
84    pub fn register_handler(
86        &mut self,
87        protocol: super::Protocol,
88        handler: Arc<dyn StreamingProtocol>,
89    ) {
90        self.handlers.insert(protocol, handler);
91    }
92
93    pub fn get_handler(&self, protocol: &super::Protocol) -> Option<&Arc<dyn StreamingProtocol>> {
95        self.handlers.get(protocol)
96    }
97
98    pub fn registered_protocols(&self) -> Vec<super::Protocol> {
100        self.handlers.keys().cloned().collect()
101    }
102
103    pub fn supports_protocol(&self, protocol: &super::Protocol) -> bool {
105        self.handlers.contains_key(protocol)
106    }
107}
108
109impl Default for StreamingProtocolRegistry {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115pub struct MessageBuilder {
117    message: ProtocolMessage,
118}
119
120impl MessageBuilder {
121    pub fn new(topic: impl Into<String>) -> Self {
123        Self {
124            message: ProtocolMessage {
125                id: None,
126                topic: topic.into(),
127                payload: Vec::new(),
128                metadata: std::collections::HashMap::new(),
129                qos: None,
130                timestamp: Some(chrono::Utc::now()),
131            },
132        }
133    }
134
135    pub fn id(mut self, id: impl Into<String>) -> Self {
137        self.message.id = Some(id.into());
138        self
139    }
140
141    pub fn payload(mut self, payload: impl Into<Vec<u8>>) -> Self {
143        self.message.payload = payload.into();
144        self
145    }
146
147    pub fn text(mut self, text: impl AsRef<str>) -> Self {
149        self.message.payload = text.as_ref().as_bytes().to_vec();
150        self
151    }
152
153    pub fn json<T: serde::Serialize>(mut self, value: &T) -> Result<Self> {
155        self.message.payload = serde_json::to_vec(value)?;
156        self.message
157            .metadata
158            .insert("content-type".to_string(), "application/json".to_string());
159        Ok(self)
160    }
161
162    pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164        self.message.metadata.insert(key.into(), value.into());
165        self
166    }
167
168    pub fn qos(mut self, qos: u8) -> Self {
170        self.message.qos = Some(qos);
171        self
172    }
173
174    pub fn build(self) -> ProtocolMessage {
176        self.message
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_message_builder() {
186        let message = MessageBuilder::new("test-topic")
187            .id("msg-123")
188            .text("Hello, World!")
189            .metadata("priority", "high")
190            .qos(1)
191            .build();
192
193        assert_eq!(message.topic, "test-topic");
194        assert_eq!(message.id, Some("msg-123".to_string()));
195        assert_eq!(message.payload, b"Hello, World!");
196        assert_eq!(message.metadata.get("priority"), Some(&"high".to_string()));
197        assert_eq!(message.qos, Some(1));
198        assert!(message.timestamp.is_some());
199    }
200
201    #[test]
202    fn test_message_builder_json() {
203        #[derive(serde::Serialize)]
204        struct TestData {
205            name: String,
206            value: i32,
207        }
208
209        let data = TestData {
210            name: "test".to_string(),
211            value: 42,
212        };
213
214        let message = MessageBuilder::new("json-topic").json(&data).unwrap().build();
215
216        assert_eq!(message.topic, "json-topic");
217        assert_eq!(message.metadata.get("content-type"), Some(&"application/json".to_string()));
218        assert!(!message.payload.is_empty());
219    }
220
221    #[test]
222    fn test_streaming_registry() {
223        let registry = StreamingProtocolRegistry::new();
224
225        assert!(!registry.supports_protocol(&crate::protocol_abstraction::Protocol::Mqtt));
227        assert_eq!(registry.registered_protocols().len(), 0);
228
229        }
232}