mockforge_core/protocol_abstraction/
streaming.rs1use crate::Result;
7use async_trait::async_trait;
8use futures::Stream;
9use std::pin::Pin;
10use std::sync::Arc;
11
12#[derive(Debug, Clone)]
14pub struct ProtocolMessage {
15 pub id: Option<String>,
17 pub topic: String,
19 pub payload: Vec<u8>,
21 pub metadata: std::collections::HashMap<String, String>,
23 pub qos: Option<u8>,
25 pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
27}
28
29pub type MessageStream = Pin<Box<dyn Stream<Item = Result<ProtocolMessage>> + Send>>;
31
32#[derive(Debug, Clone)]
34pub struct StreamingMetadata {
35 pub protocol: super::Protocol,
37 pub connection_id: String,
39 pub server_info: Option<String>,
41 pub subscriptions: Vec<String>,
43 pub connected: bool,
45}
46
47#[async_trait]
49pub trait StreamingProtocol: Send + Sync {
50 async fn subscribe(&self, topic: &str, consumer_id: &str) -> Result<MessageStream>;
52
53 async fn publish(&self, topic: &str, message: ProtocolMessage) -> Result<()>;
55
56 async fn unsubscribe(&self, _topic: &str, _consumer_id: &str) -> Result<()> {
58 Ok(())
60 }
61
62 fn get_metadata(&self) -> StreamingMetadata;
64
65 fn is_connected(&self) -> bool {
67 self.get_metadata().connected
68 }
69}
70
71pub struct StreamingProtocolRegistry {
73 handlers: std::collections::HashMap<super::Protocol, Arc<dyn StreamingProtocol>>,
74}
75
76impl StreamingProtocolRegistry {
77 pub fn new() -> Self {
79 Self {
80 handlers: std::collections::HashMap::new(),
81 }
82 }
83
84 pub fn register_handler(
86 &mut self,
87 protocol: super::Protocol,
88 handler: Arc<dyn StreamingProtocol>,
89 ) {
90 self.handlers.insert(protocol, handler);
91 }
92
93 pub fn get_handler(&self, protocol: &super::Protocol) -> Option<&Arc<dyn StreamingProtocol>> {
95 self.handlers.get(protocol)
96 }
97
98 pub fn registered_protocols(&self) -> Vec<super::Protocol> {
100 self.handlers.keys().cloned().collect()
101 }
102
103 pub fn supports_protocol(&self, protocol: &super::Protocol) -> bool {
105 self.handlers.contains_key(protocol)
106 }
107}
108
109impl Default for StreamingProtocolRegistry {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115pub struct MessageBuilder {
117 message: ProtocolMessage,
118}
119
120impl MessageBuilder {
121 pub fn new(topic: impl Into<String>) -> Self {
123 Self {
124 message: ProtocolMessage {
125 id: None,
126 topic: topic.into(),
127 payload: Vec::new(),
128 metadata: std::collections::HashMap::new(),
129 qos: None,
130 timestamp: Some(chrono::Utc::now()),
131 },
132 }
133 }
134
135 pub fn id(mut self, id: impl Into<String>) -> Self {
137 self.message.id = Some(id.into());
138 self
139 }
140
141 pub fn payload(mut self, payload: impl Into<Vec<u8>>) -> Self {
143 self.message.payload = payload.into();
144 self
145 }
146
147 pub fn text(mut self, text: impl AsRef<str>) -> Self {
149 self.message.payload = text.as_ref().as_bytes().to_vec();
150 self
151 }
152
153 pub fn json<T: serde::Serialize>(mut self, value: &T) -> Result<Self> {
155 self.message.payload = serde_json::to_vec(value)?;
156 self.message
157 .metadata
158 .insert("content-type".to_string(), "application/json".to_string());
159 Ok(self)
160 }
161
162 pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
164 self.message.metadata.insert(key.into(), value.into());
165 self
166 }
167
168 pub fn qos(mut self, qos: u8) -> Self {
170 self.message.qos = Some(qos);
171 self
172 }
173
174 pub fn build(self) -> ProtocolMessage {
176 self.message
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn test_message_builder() {
186 let message = MessageBuilder::new("test-topic")
187 .id("msg-123")
188 .text("Hello, World!")
189 .metadata("priority", "high")
190 .qos(1)
191 .build();
192
193 assert_eq!(message.topic, "test-topic");
194 assert_eq!(message.id, Some("msg-123".to_string()));
195 assert_eq!(message.payload, b"Hello, World!");
196 assert_eq!(message.metadata.get("priority"), Some(&"high".to_string()));
197 assert_eq!(message.qos, Some(1));
198 assert!(message.timestamp.is_some());
199 }
200
201 #[test]
202 fn test_message_builder_json() {
203 #[derive(serde::Serialize)]
204 struct TestData {
205 name: String,
206 value: i32,
207 }
208
209 let data = TestData {
210 name: "test".to_string(),
211 value: 42,
212 };
213
214 let message = MessageBuilder::new("json-topic").json(&data).unwrap().build();
215
216 assert_eq!(message.topic, "json-topic");
217 assert_eq!(message.metadata.get("content-type"), Some(&"application/json".to_string()));
218 assert!(!message.payload.is_empty());
219 }
220
221 #[test]
222 fn test_streaming_registry() {
223 let registry = StreamingProtocolRegistry::new();
224
225 assert!(!registry.supports_protocol(&crate::protocol_abstraction::Protocol::Mqtt));
227 assert_eq!(registry.registered_protocols().len(), 0);
228
229 }
232}