use std::sync::Arc;
use anyhow::{Context, Result};
use serde_json;
use tracing::{debug, error, info, warn};
use crate::hub::BextHub;
use crate::message::HubEvent;
pub struct RedisRelay {
client: redis::Client,
prefix: String,
channel: String,
instance_id: String,
}
impl RedisRelay {
pub fn new(redis_url: &str, prefix: &str) -> Result<Self> {
if redis_url.starts_with("redis://") {
eprintln!("[SECURITY WARNING] Redis connection is unencrypted. Use rediss:// for TLS in production.");
}
let client = redis::Client::open(redis_url)
.with_context(|| format!("failed to open Redis connection to {}", redis_url))?;
let channel = format!("{}hub:events", prefix);
let instance_id = generate_instance_id();
info!(
channel = %channel,
instance_id = %instance_id,
"redis relay created"
);
Ok(Self {
client,
prefix: prefix.to_string(),
channel,
instance_id,
})
}
pub async fn publish(&self, topic: &str, event: &HubEvent) -> Result<()> {
let envelope = RelayEnvelope {
instance_id: self.instance_id.clone(),
event: event.clone(),
};
let payload =
serde_json::to_string(&envelope).context("failed to serialize relay envelope")?;
let mut conn = self
.client
.get_multiplexed_async_connection()
.await
.context("failed to get Redis connection")?;
redis::cmd("PUBLISH")
.arg(&self.channel)
.arg(&payload)
.query_async::<i64>(&mut conn)
.await
.with_context(|| format!("failed to PUBLISH to channel {}", self.channel))?;
debug!(
topic = %topic,
event_id = event.id,
channel = %self.channel,
"relayed event to Redis"
);
Ok(())
}
pub async fn subscribe_loop(self: Arc<Self>, hub: Arc<BextHub>) -> Result<()> {
let mut backoff_ms = 100u64;
let max_backoff_ms = 30_000u64;
loop {
match self.run_subscription(&hub).await {
Ok(()) => {
info!("redis relay subscription ended cleanly");
return Ok(());
}
Err(e) => {
warn!(
error = %e,
backoff_ms = backoff_ms,
"redis relay subscription failed, reconnecting"
);
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(max_backoff_ms);
}
}
}
}
pub fn channel(&self) -> &str {
&self.channel
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn prefix(&self) -> &str {
&self.prefix
}
async fn run_subscription(&self, hub: &Arc<BextHub>) -> Result<()> {
let conn = self
.client
.get_async_pubsub()
.await
.context("failed to get Redis Pub/Sub connection")?;
let mut pubsub = conn;
pubsub
.subscribe(&self.channel)
.await
.with_context(|| format!("failed to SUBSCRIBE to {}", self.channel))?;
info!(channel = %self.channel, "redis relay subscribed");
let mut msg_stream = pubsub.on_message();
loop {
let msg: redis::Msg = match tokio::time::timeout(
std::time::Duration::from_secs(60),
futures_stream_next(&mut msg_stream),
)
.await
{
Ok(Some(msg)) => msg,
Ok(None) => {
warn!("redis pub/sub stream ended");
return Ok(());
}
Err(_timeout) => {
continue;
}
};
let payload: String = match msg.get_payload() {
Ok(p) => p,
Err(e) => {
error!(error = %e, "failed to decode Redis message payload");
continue;
}
};
let envelope: RelayEnvelope = match serde_json::from_str(&payload) {
Ok(env) => env,
Err(e) => {
error!(error = %e, "failed to deserialize relay envelope");
continue;
}
};
if envelope.instance_id == self.instance_id {
continue;
}
debug!(
event_id = envelope.event.id,
topic = %envelope.event.topic,
from_instance = %envelope.instance_id,
"received relayed event from Redis"
);
hub.publish_event(envelope.event);
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct RelayEnvelope {
instance_id: String,
event: HubEvent,
}
fn generate_instance_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
format!("bext-{}-{}", std::process::id(), ts % 1_000_000_000)
}
async fn futures_stream_next(
stream: &mut (impl futures_core::Stream<Item = redis::Msg> + Unpin),
) -> Option<redis::Msg> {
use std::pin::Pin;
std::future::poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use serde_json::json;
#[test]
fn relay_envelope_roundtrip() {
let envelope = RelayEnvelope {
instance_id: "bext-123-456".to_string(),
event: HubEvent {
id: 42,
topic: "app/deploy".to_string(),
data: json!({"version": "1.0"}),
timestamp: Utc::now(),
},
};
let json_str = serde_json::to_string(&envelope).unwrap();
let deserialized: RelayEnvelope = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.instance_id, "bext-123-456");
assert_eq!(deserialized.event.id, 42);
assert_eq!(deserialized.event.topic, "app/deploy");
}
#[test]
fn instance_id_is_unique() {
let id1 = generate_instance_id();
std::thread::sleep(std::time::Duration::from_millis(1));
let id2 = generate_instance_id();
assert!(id1.starts_with("bext-"));
assert!(id2.starts_with("bext-"));
}
#[test]
fn instance_id_format() {
let id = generate_instance_id();
assert!(id.starts_with("bext-"));
let parts: Vec<&str> = id.split('-').collect();
assert_eq!(parts.len(), 3); }
#[test]
fn relay_new_invalid_url() {
let result = RedisRelay::new("not-a-valid-url", "test:");
let _ = result;
}
#[test]
fn relay_channel_name() {
let relay = RedisRelay::new("redis://127.0.0.1:6379", "bext:").unwrap();
assert_eq!(relay.channel(), "bext:hub:events");
}
#[test]
fn relay_prefix() {
let relay = RedisRelay::new("redis://127.0.0.1:6379", "myapp:").unwrap();
assert_eq!(relay.prefix(), "myapp:");
assert_eq!(relay.channel(), "myapp:hub:events");
}
#[test]
fn relay_instance_id_set() {
let relay = RedisRelay::new("redis://127.0.0.1:6379", "test:").unwrap();
assert!(!relay.instance_id().is_empty());
assert!(relay.instance_id().starts_with("bext-"));
}
#[test]
fn echo_prevention_same_instance() {
let relay_id = "bext-123-456";
let envelope = RelayEnvelope {
instance_id: relay_id.to_string(),
event: HubEvent {
id: 1,
topic: "test".to_string(),
data: json!(null),
timestamp: Utc::now(),
},
};
assert_eq!(envelope.instance_id, relay_id);
}
#[test]
fn echo_prevention_different_instance() {
let relay_id = "bext-123-456";
let envelope = RelayEnvelope {
instance_id: "bext-789-012".to_string(),
event: HubEvent {
id: 1,
topic: "test".to_string(),
data: json!(null),
timestamp: Utc::now(),
},
};
assert_ne!(envelope.instance_id, relay_id);
}
#[test]
fn envelope_with_nested_data() {
let envelope = RelayEnvelope {
instance_id: "i1".to_string(),
event: HubEvent {
id: 99,
topic: "complex/topic".to_string(),
data: json!({
"users": [{"id": 1, "name": "Alice"}],
"metadata": {"version": 2, "flags": [true, false]}
}),
timestamp: Utc::now(),
},
};
let json_str = serde_json::to_string(&envelope).unwrap();
let deserialized: RelayEnvelope = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.event.data["users"][0]["name"], "Alice");
assert_eq!(deserialized.event.data["metadata"]["version"], 2);
}
}