bext-realtime 0.2.0

Realtime pub/sub for bext — WebSocket and SSE with optional Redis relay
Documentation
//! Wire types for the realtime hub: [`HubEvent`] (server-originated),
//! [`ClientMessage`] (subscribe/publish from clients), and [`ServerMessage`] (responses).

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;

/// A hub event flowing through the pub/sub system.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct HubEvent {
    /// Monotonically increasing event ID (used for Last-Event-ID replay).
    pub id: u64,
    /// Topic the event was published to (e.g. `app/marketing/events`).
    pub topic: String,
    /// Arbitrary JSON payload.
    pub data: Value,
    /// UTC timestamp when the event was created.
    pub timestamp: DateTime<Utc>,
}

impl HubEvent {
    /// Format as a valid SSE text block.
    ///
    /// Output:
    /// ```text
    /// event: {topic}
    /// data: {json}
    /// id: {id}
    ///
    /// ```
    pub fn to_sse_string(&self) -> String {
        let json = serde_json::to_string(&self.data).unwrap_or_else(|_| "null".to_string());
        format!("event: {}\ndata: {}\nid: {}\n\n", self.topic, json, self.id)
    }
}

/// Messages sent by WebSocket clients to the server.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ClientMessage {
    /// Subscribe to one or more topics (supports wildcards).
    Subscribe { topics: Vec<String> },
    /// Unsubscribe from one or more topics.
    Unsubscribe { topics: Vec<String> },
    /// Publish an event to a topic.
    Publish { topic: String, data: Value },
    /// Pong response to a server Ping.
    Pong,
}

/// Messages sent by the server to WebSocket clients.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ServerMessage {
    /// A topic event delivered to the client.
    Event { topic: String, data: Value, id: u64 },
    /// Heartbeat ping — client should respond with Pong.
    Ping,
    /// An error message (e.g. authorization failure).
    Error { message: String },
    /// Acknowledgement of a successful subscription.
    Subscribed { topics: Vec<String> },
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::TimeZone;
    use serde_json::json;

    fn sample_event() -> HubEvent {
        HubEvent {
            id: 42,
            topic: "app/deploy".to_string(),
            data: json!({"version": "1.2.3"}),
            timestamp: Utc.with_ymd_and_hms(2026, 1, 15, 10, 30, 0).unwrap(),
        }
    }

    // ── HubEvent serialization ──────────────────────────────────────

    #[test]
    fn hub_event_serializes_to_json() {
        let evt = sample_event();
        let json_str = serde_json::to_string(&evt).unwrap();
        assert!(json_str.contains("\"id\":42"));
        assert!(json_str.contains("\"topic\":\"app/deploy\""));
        assert!(json_str.contains("\"version\":\"1.2.3\""));
    }

    #[test]
    fn hub_event_roundtrip() {
        let evt = sample_event();
        let json_str = serde_json::to_string(&evt).unwrap();
        let deserialized: HubEvent = serde_json::from_str(&json_str).unwrap();
        assert_eq!(evt, deserialized);
    }

    #[test]
    fn hub_event_with_null_data() {
        let evt = HubEvent {
            id: 1,
            topic: "test".to_string(),
            data: Value::Null,
            timestamp: Utc::now(),
        };
        let json_str = serde_json::to_string(&evt).unwrap();
        let deserialized: HubEvent = serde_json::from_str(&json_str).unwrap();
        assert_eq!(evt.data, deserialized.data);
    }

    #[test]
    fn hub_event_with_nested_data() {
        let evt = HubEvent {
            id: 99,
            topic: "complex".to_string(),
            data: json!({
                "users": [{"name": "Alice"}, {"name": "Bob"}],
                "count": 2,
                "active": true
            }),
            timestamp: Utc::now(),
        };
        let json_str = serde_json::to_string(&evt).unwrap();
        let deserialized: HubEvent = serde_json::from_str(&json_str).unwrap();
        assert_eq!(evt, deserialized);
    }

    // ── SSE formatting ──────────────────────────────────────────────

    #[test]
    fn sse_string_format() {
        let evt = sample_event();
        let sse = evt.to_sse_string();
        assert_eq!(
            sse,
            "event: app/deploy\ndata: {\"version\":\"1.2.3\"}\nid: 42\n\n"
        );
    }

    #[test]
    fn sse_string_with_null() {
        let evt = HubEvent {
            id: 0,
            topic: "t".to_string(),
            data: Value::Null,
            timestamp: Utc::now(),
        };
        let sse = evt.to_sse_string();
        assert_eq!(sse, "event: t\ndata: null\nid: 0\n\n");
    }

    #[test]
    fn sse_string_with_string_data() {
        let evt = HubEvent {
            id: 5,
            topic: "msg".to_string(),
            data: json!("hello world"),
            timestamp: Utc::now(),
        };
        let sse = evt.to_sse_string();
        assert_eq!(sse, "event: msg\ndata: \"hello world\"\nid: 5\n\n");
    }

    #[test]
    fn sse_string_with_array_data() {
        let evt = HubEvent {
            id: 7,
            topic: "arr".to_string(),
            data: json!([1, 2, 3]),
            timestamp: Utc::now(),
        };
        let sse = evt.to_sse_string();
        assert!(sse.starts_with("event: arr\n"));
        assert!(sse.contains("data: [1,2,3]\n"));
        assert!(sse.ends_with("id: 7\n\n"));
    }

    #[test]
    fn sse_string_ends_with_double_newline() {
        let evt = sample_event();
        let sse = evt.to_sse_string();
        assert!(sse.ends_with("\n\n"));
        // Ensure exactly two trailing newlines (not three)
        assert!(!sse.ends_with("\n\n\n"));
    }

    // ── ClientMessage serialization ─────────────────────────────────

    #[test]
    fn client_subscribe_roundtrip() {
        let msg = ClientMessage::Subscribe {
            topics: vec!["app/*".to_string(), "system/deploy".to_string()],
        };
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn client_unsubscribe_roundtrip() {
        let msg = ClientMessage::Unsubscribe {
            topics: vec!["app/events".to_string()],
        };
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn client_publish_roundtrip() {
        let msg = ClientMessage::Publish {
            topic: "custom/chat".to_string(),
            data: json!({"text": "hello"}),
        };
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn client_pong_roundtrip() {
        let msg = ClientMessage::Pong;
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ClientMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn client_message_tagged_format() {
        let msg = ClientMessage::Subscribe {
            topics: vec!["t".to_string()],
        };
        let json_str = serde_json::to_string(&msg).unwrap();
        // Should use snake_case tag
        assert!(json_str.contains("\"type\":\"subscribe\""));
    }

    #[test]
    fn client_pong_minimal_json() {
        let msg = ClientMessage::Pong;
        let json_str = serde_json::to_string(&msg).unwrap();
        assert_eq!(json_str, r#"{"type":"pong"}"#);
    }

    // ── ServerMessage serialization ─────────────────────────────────

    #[test]
    fn server_event_roundtrip() {
        let msg = ServerMessage::Event {
            topic: "app/deploy".to_string(),
            data: json!({"v": 1}),
            id: 10,
        };
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn server_ping_roundtrip() {
        let msg = ServerMessage::Ping;
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn server_error_roundtrip() {
        let msg = ServerMessage::Error {
            message: "not authorized".to_string(),
        };
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn server_subscribed_roundtrip() {
        let msg = ServerMessage::Subscribed {
            topics: vec!["a".to_string(), "b".to_string()],
        };
        let json_str = serde_json::to_string(&msg).unwrap();
        let deserialized: ServerMessage = serde_json::from_str(&json_str).unwrap();
        assert_eq!(msg, deserialized);
    }

    #[test]
    fn server_message_tagged_format() {
        let msg = ServerMessage::Ping;
        let json_str = serde_json::to_string(&msg).unwrap();
        assert!(json_str.contains("\"type\":\"ping\""));
    }

    // ── Deserialization error handling ───────────────────────────────

    #[test]
    fn invalid_client_message_type() {
        let json_str = r#"{"type":"invalid_type"}"#;
        let result: Result<ClientMessage, _> = serde_json::from_str(json_str);
        assert!(result.is_err());
    }

    #[test]
    fn missing_required_fields() {
        // Subscribe without topics
        let json_str = r#"{"type":"subscribe"}"#;
        let result: Result<ClientMessage, _> = serde_json::from_str(json_str);
        assert!(result.is_err());
    }

    #[test]
    fn empty_json_fails() {
        let result: Result<ClientMessage, _> = serde_json::from_str("{}");
        assert!(result.is_err());
    }
}