use super::types::{CloudEvent, ResultEvent};
use anyhow::{Context, Result};
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, Client};
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug, Clone, Default)]
pub struct PublisherConfig {
pub max_stream_length: Option<usize>,
}
pub struct RedisStreamPublisher {
client: Client,
config: PublisherConfig,
}
impl RedisStreamPublisher {
pub fn new(redis_url: &str, config: PublisherConfig) -> Result<Self> {
let client = Client::open(redis_url).context("Failed to create Redis client")?;
Ok(Self { client, config })
}
async fn get_connection(&self) -> Result<MultiplexedConnection> {
let max_retries = 3;
let mut retry_delay = Duration::from_millis(100);
for attempt in 1..=max_retries {
match self.client.get_multiplexed_async_connection().await {
Ok(conn) => return Ok(conn),
Err(e) if attempt < max_retries => {
log::warn!(
"Failed to connect to Redis (attempt {attempt}/{max_retries}): {e}. Retrying in {retry_delay:?}"
);
sleep(retry_delay).await;
retry_delay *= 2; }
Err(e) => {
return Err(e).context("Failed to connect to Redis after retries");
}
}
}
unreachable!("Loop should have returned or errored");
}
pub async fn publish(&self, event: CloudEvent<ResultEvent>) -> Result<String> {
let mut conn = self.get_connection().await?;
let json_data = serde_json::to_string(&event).context("Failed to serialize CloudEvent")?;
let stream_key = &event.topic;
let message_id: String = if let Some(max_len) = self.config.max_stream_length {
conn.xadd_maxlen(
stream_key,
redis::streams::StreamMaxlen::Approx(max_len),
"*",
&[("data", json_data.as_str())],
)
.await
.context("Failed to publish message to Redis Stream with MAXLEN")?
} else {
conn.xadd(stream_key, "*", &[("data", json_data.as_str())])
.await
.context("Failed to publish message to Redis Stream")?
};
log::debug!("Published CloudEvent to stream '{stream_key}' with message ID: {message_id}");
Ok(message_id)
}
pub async fn publish_with_retry(
&self,
event: CloudEvent<ResultEvent>,
max_retries: usize,
) -> Result<String> {
let mut retry_delay = Duration::from_millis(100);
for attempt in 1..=max_retries {
match self.publish(event.clone()).await {
Ok(message_id) => return Ok(message_id),
Err(e) if attempt < max_retries => {
log::warn!(
"Failed to publish CloudEvent (attempt {attempt}/{max_retries}): {e}. Retrying in {retry_delay:?}"
);
sleep(retry_delay).await;
retry_delay *= 2; }
Err(e) => {
return Err(e).context("Failed to publish CloudEvent after retries");
}
}
}
unreachable!("Loop should have returned or errored");
}
pub async fn publish_batch(&self, events: Vec<CloudEvent<ResultEvent>>) -> Result<Vec<String>> {
if events.is_empty() {
return Ok(Vec::new());
}
let mut conn = self.get_connection().await?;
let mut pipe = redis::pipe();
pipe.atomic();
for event in &events {
let json_data =
serde_json::to_string(event).context("Failed to serialize CloudEvent in batch")?;
let stream_key = &event.topic;
if let Some(max_len) = self.config.max_stream_length {
pipe.xadd_maxlen(
stream_key,
redis::streams::StreamMaxlen::Approx(max_len),
"*",
&[("data", json_data.as_str())],
);
} else {
pipe.xadd(stream_key, "*", &[("data", json_data.as_str())]);
}
}
let message_ids: Vec<String> = pipe
.query_async(&mut conn)
.await
.context("Failed to execute batch publish pipeline")?;
log::debug!(
"Published batch of {} CloudEvents to Redis streams",
events.len()
);
Ok(message_ids)
}
pub async fn publish_batch_with_retry(
&self,
events: Vec<CloudEvent<ResultEvent>>,
max_retries: usize,
) -> Result<Vec<String>> {
let mut retry_delay = Duration::from_millis(100);
for attempt in 1..=max_retries {
match self.publish_batch(events.clone()).await {
Ok(message_ids) => return Ok(message_ids),
Err(e) if attempt < max_retries => {
log::warn!(
"Failed to publish batch of {} CloudEvents (attempt {}/{}): {}. Retrying in {:?}",
events.len(),
attempt,
max_retries,
e,
retry_delay
);
sleep(retry_delay).await;
retry_delay *= 2; }
Err(e) => {
log::error!(
"Failed to publish batch after {max_retries} retries, falling back to individual publishes"
);
let mut message_ids = Vec::with_capacity(events.len());
for event in events {
match self.publish_with_retry(event, max_retries).await {
Ok(msg_id) => message_ids.push(msg_id),
Err(individual_err) => {
return Err(e).context(format!(
"Batch publish failed after retries, and individual fallback also failed: {individual_err}"
));
}
}
}
return Ok(message_ids);
}
}
}
unreachable!("Loop should have returned or errored");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_publisher_config_default() {
let config = PublisherConfig::default();
assert!(config.max_stream_length.is_none());
}
#[test]
fn test_create_publisher() {
let config = PublisherConfig::default();
let result = RedisStreamPublisher::new("redis://localhost:6379", config);
assert!(result.is_ok());
}
#[test]
fn test_create_publisher_with_maxlen() {
let config = PublisherConfig {
max_stream_length: Some(10000),
};
let result = RedisStreamPublisher::new("redis://localhost:6379", config);
assert!(result.is_ok());
}
#[test]
fn test_invalid_redis_url() {
let config = PublisherConfig::default();
let result = RedisStreamPublisher::new("invalid-url", config);
assert!(result.is_err());
}
}