use async_trait::async_trait;
use redis::AsyncCommands;
use redis::aio::MultiplexedConnection;
use serde_json;
use super::{EventPublisher, QueueError, QueueResult};
use crate::runtime::AgentEvent;
pub struct RedisStreamsPublisher {
conn: MultiplexedConnection,
stream_key: String,
}
impl RedisStreamsPublisher {
#[must_use]
pub fn new(conn: MultiplexedConnection, stream_key: impl Into<String>) -> Self {
Self {
conn,
stream_key: stream_key.into(),
}
}
pub async fn connect(url: &str, stream_key: impl Into<String>) -> QueueResult<Self> {
let client = redis::Client::open(url).map_err(|e| QueueError::ConnectionFailed {
message: format!("Redis client creation failed: {e}"),
})?;
let conn = client
.get_multiplexed_async_connection()
.await
.map_err(|e| QueueError::ConnectionFailed {
message: format!("Redis async connection failed: {e}"),
})?;
Ok(Self {
conn,
stream_key: stream_key.into(),
})
}
#[must_use]
pub fn stream_key(&self) -> &str {
&self.stream_key
}
}
#[async_trait]
impl EventPublisher for RedisStreamsPublisher {
async fn publish(&self, event: AgentEvent) -> QueueResult<()> {
let payload =
serde_json::to_string(&event).map_err(|e| QueueError::SerializationFailed {
message: format!("failed to serialize AgentEvent: {e}"),
})?;
let mut conn = self.conn.clone();
let entry_id: String = conn
.xadd(&self.stream_key, "*", &[("event", &payload)])
.await
.map_err(|e| QueueError::PublishFailed {
message: format!("Redis XADD failed: {e}"),
})?;
tracing::debug!(
stream_key = %self.stream_key,
entry_id = %entry_id,
"published AgentEvent to Redis Stream",
);
Ok(())
}
}