bext-realtime 0.2.0

Realtime pub/sub for bext — WebSocket and SSE with optional Redis relay
Documentation
//! Cross-instance event relay using Redis Pub/Sub, allowing events published
//! on one bext instance to reach subscribers on all instances behind a load balancer.

use std::sync::Arc;

use anyhow::{Context, Result};
use serde_json;
use tracing::{debug, error, info, warn};

use crate::hub::BextHub;
use crate::message::HubEvent;

/// Cross-instance event relay using Redis Pub/Sub.
///
/// When multiple bext instances run behind a load balancer, events published
/// on one instance need to reach subscribers on all instances. `RedisRelay`
/// bridges local hub events to a shared Redis Pub/Sub channel and vice versa.
pub struct RedisRelay {
    /// Redis connection manager for publishing.
    client: redis::Client,
    /// Channel prefix (e.g. `bext:` → channel `bext:hub:events`).
    prefix: String,
    /// The full channel name.
    channel: String,
    /// Instance ID to prevent echo (re-delivering our own publishes).
    instance_id: String,
}

impl RedisRelay {
    /// Create a new relay.
    ///
    /// - `redis_url` — Redis connection string (e.g. `redis://127.0.0.1:6379`)
    /// - `prefix` — namespace prefix for the Pub/Sub channel
    pub fn new(redis_url: &str, prefix: &str) -> Result<Self> {
        if redis_url.starts_with("redis://") {
            eprintln!("[SECURITY WARNING] Redis connection is unencrypted. Use rediss:// for TLS in production.");
        }
        let client = redis::Client::open(redis_url)
            .with_context(|| format!("failed to open Redis connection to {}", redis_url))?;

        let channel = format!("{}hub:events", prefix);
        let instance_id = generate_instance_id();

        info!(
            channel = %channel,
            instance_id = %instance_id,
            "redis relay created"
        );

        Ok(Self {
            client,
            prefix: prefix.to_string(),
            channel,
            instance_id,
        })
    }

    /// Publish a hub event to Redis so other instances can receive it.
    pub async fn publish(&self, topic: &str, event: &HubEvent) -> Result<()> {
        let envelope = RelayEnvelope {
            instance_id: self.instance_id.clone(),
            event: event.clone(),
        };

        let payload =
            serde_json::to_string(&envelope).context("failed to serialize relay envelope")?;

        let mut conn = self
            .client
            .get_multiplexed_async_connection()
            .await
            .context("failed to get Redis connection")?;

        redis::cmd("PUBLISH")
            .arg(&self.channel)
            .arg(&payload)
            .query_async::<i64>(&mut conn)
            .await
            .with_context(|| format!("failed to PUBLISH to channel {}", self.channel))?;

        debug!(
            topic = %topic,
            event_id = event.id,
            channel = %self.channel,
            "relayed event to Redis"
        );

        Ok(())
    }

    /// Start the background subscribe loop that listens on Redis Pub/Sub
    /// and feeds incoming events into the local hub.
    ///
    /// This is a long-running task — spawn it with `tokio::spawn`.
    /// It reconnects automatically on failure with exponential backoff.
    pub async fn subscribe_loop(self: Arc<Self>, hub: Arc<BextHub>) -> Result<()> {
        let mut backoff_ms = 100u64;
        let max_backoff_ms = 30_000u64;

        loop {
            match self.run_subscription(&hub).await {
                Ok(()) => {
                    // Clean exit (shouldn't happen in normal operation)
                    info!("redis relay subscription ended cleanly");
                    return Ok(());
                }
                Err(e) => {
                    warn!(
                        error = %e,
                        backoff_ms = backoff_ms,
                        "redis relay subscription failed, reconnecting"
                    );
                    tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
                    backoff_ms = (backoff_ms * 2).min(max_backoff_ms);
                }
            }
        }
    }

    /// Get the channel name used by this relay.
    pub fn channel(&self) -> &str {
        &self.channel
    }

    /// Get the instance ID.
    pub fn instance_id(&self) -> &str {
        &self.instance_id
    }

    /// Get the prefix.
    pub fn prefix(&self) -> &str {
        &self.prefix
    }

    // ── Private ─────────────────────────────────────────────────────

    async fn run_subscription(&self, hub: &Arc<BextHub>) -> Result<()> {
        let conn = self
            .client
            .get_async_pubsub()
            .await
            .context("failed to get Redis Pub/Sub connection")?;

        // We need to use the pubsub connection
        let mut pubsub = conn;
        pubsub
            .subscribe(&self.channel)
            .await
            .with_context(|| format!("failed to SUBSCRIBE to {}", self.channel))?;

        info!(channel = %self.channel, "redis relay subscribed");

        // Reset backoff on successful connection
        let mut msg_stream = pubsub.on_message();

        loop {
            let msg: redis::Msg = match tokio::time::timeout(
                std::time::Duration::from_secs(60),
                futures_stream_next(&mut msg_stream),
            )
            .await
            {
                Ok(Some(msg)) => msg,
                Ok(None) => {
                    warn!("redis pub/sub stream ended");
                    return Ok(());
                }
                Err(_timeout) => {
                    // Timeout is normal — just means no messages in 60s.
                    // Redis connection is still alive due to TCP keepalive.
                    continue;
                }
            };

            let payload: String = match msg.get_payload() {
                Ok(p) => p,
                Err(e) => {
                    error!(error = %e, "failed to decode Redis message payload");
                    continue;
                }
            };

            let envelope: RelayEnvelope = match serde_json::from_str(&payload) {
                Ok(env) => env,
                Err(e) => {
                    error!(error = %e, "failed to deserialize relay envelope");
                    continue;
                }
            };

            // Skip messages from this instance (prevent echo)
            if envelope.instance_id == self.instance_id {
                continue;
            }

            debug!(
                event_id = envelope.event.id,
                topic = %envelope.event.topic,
                from_instance = %envelope.instance_id,
                "received relayed event from Redis"
            );

            hub.publish_event(envelope.event);
        }
    }
}

/// Envelope wrapping a HubEvent with the originating instance ID.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct RelayEnvelope {
    instance_id: String,
    event: HubEvent,
}

/// Generate a random instance ID for echo prevention.
fn generate_instance_id() -> String {
    use std::time::{SystemTime, UNIX_EPOCH};
    let ts = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos();
    // Use timestamp + pid for uniqueness
    format!("bext-{}-{}", std::process::id(), ts % 1_000_000_000)
}

/// Helper to get next item from a Redis PubSub message stream.
/// Redis's `on_message()` returns an object implementing `Stream`.
async fn futures_stream_next(
    stream: &mut (impl futures_core::Stream<Item = redis::Msg> + Unpin),
) -> Option<redis::Msg> {
    use std::pin::Pin;

    std::future::poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::Utc;
    use serde_json::json;

    // ── RelayEnvelope serialization ─────────────────────────────────

    #[test]
    fn relay_envelope_roundtrip() {
        let envelope = RelayEnvelope {
            instance_id: "bext-123-456".to_string(),
            event: HubEvent {
                id: 42,
                topic: "app/deploy".to_string(),
                data: json!({"version": "1.0"}),
                timestamp: Utc::now(),
            },
        };

        let json_str = serde_json::to_string(&envelope).unwrap();
        let deserialized: RelayEnvelope = serde_json::from_str(&json_str).unwrap();

        assert_eq!(deserialized.instance_id, "bext-123-456");
        assert_eq!(deserialized.event.id, 42);
        assert_eq!(deserialized.event.topic, "app/deploy");
    }

    // ── Instance ID generation ──────────────────────────────────────

    #[test]
    fn instance_id_is_unique() {
        let id1 = generate_instance_id();
        // Sleep briefly to get a different timestamp
        std::thread::sleep(std::time::Duration::from_millis(1));
        let id2 = generate_instance_id();
        // They should at least start with the same prefix (same pid)
        assert!(id1.starts_with("bext-"));
        assert!(id2.starts_with("bext-"));
        // Note: ids might collide within the same millisecond but that's
        // acceptable — the pid portion makes it unique across processes.
    }

    #[test]
    fn instance_id_format() {
        let id = generate_instance_id();
        assert!(id.starts_with("bext-"));
        let parts: Vec<&str> = id.split('-').collect();
        assert_eq!(parts.len(), 3); // "bext", pid, timestamp_suffix
    }

    // ── RedisRelay construction (no real Redis needed) ──────────────

    #[test]
    fn relay_new_invalid_url() {
        let result = RedisRelay::new("not-a-valid-url", "test:");
        // redis::Client::open accepts most strings, so this might still succeed.
        // What matters is it doesn't panic.
        let _ = result;
    }

    #[test]
    fn relay_channel_name() {
        let relay = RedisRelay::new("redis://127.0.0.1:6379", "bext:").unwrap();
        assert_eq!(relay.channel(), "bext:hub:events");
    }

    #[test]
    fn relay_prefix() {
        let relay = RedisRelay::new("redis://127.0.0.1:6379", "myapp:").unwrap();
        assert_eq!(relay.prefix(), "myapp:");
        assert_eq!(relay.channel(), "myapp:hub:events");
    }

    #[test]
    fn relay_instance_id_set() {
        let relay = RedisRelay::new("redis://127.0.0.1:6379", "test:").unwrap();
        assert!(!relay.instance_id().is_empty());
        assert!(relay.instance_id().starts_with("bext-"));
    }

    // ── Echo prevention logic ───────────────────────────────────────

    #[test]
    fn echo_prevention_same_instance() {
        // Simulate: if instance_id matches, message should be skipped
        let relay_id = "bext-123-456";
        let envelope = RelayEnvelope {
            instance_id: relay_id.to_string(),
            event: HubEvent {
                id: 1,
                topic: "test".to_string(),
                data: json!(null),
                timestamp: Utc::now(),
            },
        };

        // Same instance — should skip
        assert_eq!(envelope.instance_id, relay_id);
    }

    #[test]
    fn echo_prevention_different_instance() {
        let relay_id = "bext-123-456";
        let envelope = RelayEnvelope {
            instance_id: "bext-789-012".to_string(),
            event: HubEvent {
                id: 1,
                topic: "test".to_string(),
                data: json!(null),
                timestamp: Utc::now(),
            },
        };

        // Different instance — should process
        assert_ne!(envelope.instance_id, relay_id);
    }

    // ── Envelope with complex data ──────────────────────────────────

    #[test]
    fn envelope_with_nested_data() {
        let envelope = RelayEnvelope {
            instance_id: "i1".to_string(),
            event: HubEvent {
                id: 99,
                topic: "complex/topic".to_string(),
                data: json!({
                    "users": [{"id": 1, "name": "Alice"}],
                    "metadata": {"version": 2, "flags": [true, false]}
                }),
                timestamp: Utc::now(),
            },
        };

        let json_str = serde_json::to_string(&envelope).unwrap();
        let deserialized: RelayEnvelope = serde_json::from_str(&json_str).unwrap();

        assert_eq!(deserialized.event.data["users"][0]["name"], "Alice");
        assert_eq!(deserialized.event.data["metadata"]["version"], 2);
    }
}