Skip to main content

bext_realtime/
message.rs

1//! Wire types for the realtime hub: [`HubEvent`] (server-originated),
2//! [`ClientMessage`] (subscribe/publish from clients), and [`ServerMessage`] (responses).
3
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8/// A hub event flowing through the pub/sub system.
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
10pub struct HubEvent {
11    /// Monotonically increasing event ID (used for Last-Event-ID replay).
12    pub id: u64,
13    /// Topic the event was published to (e.g. `app/marketing/events`).
14    pub topic: String,
15    /// Arbitrary JSON payload.
16    pub data: Value,
17    /// UTC timestamp when the event was created.
18    pub timestamp: DateTime<Utc>,
19}
20
21impl HubEvent {
22    /// Format as a valid SSE text block.
23    ///
24    /// Output:
25    /// ```text
26    /// event: {topic}
27    /// data: {json}
28    /// id: {id}
29    ///
30    /// ```
31    pub fn to_sse_string(&self) -> String {
32        let json = serde_json::to_string(&self.data).unwrap_or_else(|_| "null".to_string());
33        format!("event: {}\ndata: {}\nid: {}\n\n", self.topic, json, self.id)
34    }
35}
36
37/// Messages sent by WebSocket clients to the server.
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39#[serde(tag = "type", rename_all = "snake_case")]
40pub enum ClientMessage {
41    /// Subscribe to one or more topics (supports wildcards).
42    Subscribe { topics: Vec<String> },
43    /// Unsubscribe from one or more topics.
44    Unsubscribe { topics: Vec<String> },
45    /// Publish an event to a topic.
46    Publish { topic: String, data: Value },
47    /// Pong response to a server Ping.
48    Pong,
49}
50
51/// Messages sent by the server to WebSocket clients.
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(tag = "type", rename_all = "snake_case")]
54pub enum ServerMessage {
55    /// A topic event delivered to the client.
56    Event { topic: String, data: Value, id: u64 },
57    /// Heartbeat ping — client should respond with Pong.
58    Ping,
59    /// An error message (e.g. authorization failure).
60    Error { message: String },
61    /// Acknowledgement of a successful subscription.
62    Subscribed { topics: Vec<String> },
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use chrono::TimeZone;
69    use serde_json::json;
70
71    fn sample_event() -> HubEvent {
72        HubEvent {
73            id: 42,
74            topic: "app/deploy".to_string(),
75            data: json!({"version": "1.2.3"}),
76            timestamp: Utc.with_ymd_and_hms(2026, 1, 15, 10, 30, 0).unwrap(),
77        }
78    }
79
80    // ── HubEvent serialization ──────────────────────────────────────
81
82    #[test]
83    fn hub_event_serializes_to_json() {
84        let evt = sample_event();
85        let json_str = serde_json::to_string(&evt).unwrap();
86        assert!(json_str.contains("\"id\":42"));
87        assert!(json_str.contains("\"topic\":\"app/deploy\""));
88        assert!(json_str.contains("\"version\":\"1.2.3\""));
89    }
90
91    #[test]
92    fn hub_event_roundtrip() {
93        let evt = sample_event();
94        let json_str = serde_json::to_string(&evt).unwrap();
95        let deserialized: HubEvent = serde_json::from_str(&json_str).unwrap();
96        assert_eq!(evt, deserialized);
97    }
98
99    #[test]
100    fn hub_event_with_null_data() {
101        let evt = HubEvent {
102            id: 1,
103            topic: "test".to_string(),
104            data: Value::Null,
105            timestamp: Utc::now(),
106        };
107        let json_str = serde_json::to_string(&evt).unwrap();
108        let deserialized: HubEvent = serde_json::from_str(&json_str).unwrap();
109        assert_eq!(evt.data, deserialized.data);
110    }
111
112    #[test]
113    fn hub_event_with_nested_data() {
114        let evt = HubEvent {
115            id: 99,
116            topic: "complex".to_string(),
117            data: json!({
118                "users": [{"name": "Alice"}, {"name": "Bob"}],
119                "count": 2,
120                "active": true
121            }),
122            timestamp: Utc::now(),
123        };
124        let json_str = serde_json::to_string(&evt).unwrap();
125        let deserialized: HubEvent = serde_json::from_str(&json_str).unwrap();
126        assert_eq!(evt, deserialized);
127    }
128
129    // ── SSE formatting ──────────────────────────────────────────────
130
131    #[test]
132    fn sse_string_format() {
133        let evt = sample_event();
134        let sse = evt.to_sse_string();
135        assert_eq!(
136            sse,
137            "event: app/deploy\ndata: {\"version\":\"1.2.3\"}\nid: 42\n\n"
138        );
139    }
140
141    #[test]
142    fn sse_string_with_null() {
143        let evt = HubEvent {
144            id: 0,
145            topic: "t".to_string(),
146            data: Value::Null,
147            timestamp: Utc::now(),
148        };
149        let sse = evt.to_sse_string();
150        assert_eq!(sse, "event: t\ndata: null\nid: 0\n\n");
151    }
152
153    #[test]
154    fn sse_string_with_string_data() {
155        let evt = HubEvent {
156            id: 5,
157            topic: "msg".to_string(),
158            data: json!("hello world"),
159            timestamp: Utc::now(),
160        };
161        let sse = evt.to_sse_string();
162        assert_eq!(sse, "event: msg\ndata: \"hello world\"\nid: 5\n\n");
163    }
164
165    #[test]
166    fn sse_string_with_array_data() {
167        let evt = HubEvent {
168            id: 7,
169            topic: "arr".to_string(),
170            data: json!([1, 2, 3]),
171            timestamp: Utc::now(),
172        };
173        let sse = evt.to_sse_string();
174        assert!(sse.starts_with("event: arr\n"));
175        assert!(sse.contains("data: [1,2,3]\n"));
176        assert!(sse.ends_with("id: 7\n\n"));
177    }
178
179    #[test]
180    fn sse_string_ends_with_double_newline() {
181        let evt = sample_event();
182        let sse = evt.to_sse_string();
183        assert!(sse.ends_with("\n\n"));
184        // Ensure exactly two trailing newlines (not three)
185        assert!(!sse.ends_with("\n\n\n"));
186    }
187
188    // ── ClientMessage serialization ─────────────────────────────────
189
190    #[test]
191    fn client_subscribe_roundtrip() {
192        let msg = ClientMessage::Subscribe {
193            topics: vec!["app/*".to_string(), "system/deploy".to_string()],
194        };
195        let json_str = serde_json::to_string(&msg).unwrap();
196        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
197        assert_eq!(msg, deserialized);
198    }
199
200    #[test]
201    fn client_unsubscribe_roundtrip() {
202        let msg = ClientMessage::Unsubscribe {
203            topics: vec!["app/events".to_string()],
204        };
205        let json_str = serde_json::to_string(&msg).unwrap();
206        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
207        assert_eq!(msg, deserialized);
208    }
209
210    #[test]
211    fn client_publish_roundtrip() {
212        let msg = ClientMessage::Publish {
213            topic: "custom/chat".to_string(),
214            data: json!({"text": "hello"}),
215        };
216        let json_str = serde_json::to_string(&msg).unwrap();
217        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
218        assert_eq!(msg, deserialized);
219    }
220
221    #[test]
222    fn client_pong_roundtrip() {
223        let msg = ClientMessage::Pong;
224        let json_str = serde_json::to_string(&msg).unwrap();
225        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
226        assert_eq!(msg, deserialized);
227    }
228
229    #[test]
230    fn client_message_tagged_format() {
231        let msg = ClientMessage::Subscribe {
232            topics: vec!["t".to_string()],
233        };
234        let json_str = serde_json::to_string(&msg).unwrap();
235        // Should use snake_case tag
236        assert!(json_str.contains("\"type\":\"subscribe\""));
237    }
238
239    #[test]
240    fn client_pong_minimal_json() {
241        let msg = ClientMessage::Pong;
242        let json_str = serde_json::to_string(&msg).unwrap();
243        assert_eq!(json_str, r#"{"type":"pong"}"#);
244    }
245
246    // ── ServerMessage serialization ─────────────────────────────────
247
248    #[test]
249    fn server_event_roundtrip() {
250        let msg = ServerMessage::Event {
251            topic: "app/deploy".to_string(),
252            data: json!({"v": 1}),
253            id: 10,
254        };
255        let json_str = serde_json::to_string(&msg).unwrap();
256        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
257        assert_eq!(msg, deserialized);
258    }
259
260    #[test]
261    fn server_ping_roundtrip() {
262        let msg = ServerMessage::Ping;
263        let json_str = serde_json::to_string(&msg).unwrap();
264        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
265        assert_eq!(msg, deserialized);
266    }
267
268    #[test]
269    fn server_error_roundtrip() {
270        let msg = ServerMessage::Error {
271            message: "not authorized".to_string(),
272        };
273        let json_str = serde_json::to_string(&msg).unwrap();
274        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
275        assert_eq!(msg, deserialized);
276    }
277
278    #[test]
279    fn server_subscribed_roundtrip() {
280        let msg = ServerMessage::Subscribed {
281            topics: vec!["a".to_string(), "b".to_string()],
282        };
283        let json_str = serde_json::to_string(&msg).unwrap();
284        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
285        assert_eq!(msg, deserialized);
286    }
287
288    #[test]
289    fn server_message_tagged_format() {
290        let msg = ServerMessage::Ping;
291        let json_str = serde_json::to_string(&msg).unwrap();
292        assert!(json_str.contains("\"type\":\"ping\""));
293    }
294
295    // ── Deserialization error handling ───────────────────────────────
296
297    #[test]
298    fn invalid_client_message_type() {
299        let json_str = r#"{"type":"invalid_type"}"#;
300        let result: Result<ClientMessage, _> = serde_json::from_str(json_str);
301        assert!(result.is_err());
302    }
303
304    #[test]
305    fn missing_required_fields() {
306        // Subscribe without topics
307        let json_str = r#"{"type":"subscribe"}"#;
308        let result: Result<ClientMessage, _> = serde_json::from_str(json_str);
309        assert!(result.is_err());
310    }
311
312    #[test]
313    fn empty_json_fails() {
314        let result: Result<ClientMessage, _> = serde_json::from_str("{}");
315        assert!(result.is_err());
316    }
317}