use super::persistence::{
MarkJobCompleted, MarkJobFailed, MoveToDeadLetterQueue, PersistJob,
};
use super::queue::QueuedJob;
use crate::jobs::{JobId, JobStatus};
use acton_reactive::prelude::*;
use redis::AsyncCommands;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tracing::{debug, error, warn};
#[derive(Clone, Default)]
pub struct RedisPersistenceAgent {
redis_conn: Option<redis::aio::MultiplexedConnection>,
operations_count: Arc<AtomicUsize>,
}
impl std::fmt::Debug for RedisPersistenceAgent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisPersistenceAgent")
.field("redis_conn", &self.redis_conn.is_some())
.field("operations_count", &self.operations_count.load(Ordering::Relaxed))
.finish()
}
}
impl RedisPersistenceAgent {
pub async fn spawn(
redis_url: &str,
runtime: &mut AgentRuntime,
) -> anyhow::Result<AgentHandle> {
let client = redis::Client::open(redis_url)?;
let conn = client.get_multiplexed_async_connection().await?;
debug!("Connected to Redis at {}", redis_url);
runtime
.spawn_agent(|mut agent: ManagedAgent<Idle, Self>| {
agent.model = Self {
redis_conn: Some(conn),
operations_count: Arc::new(AtomicUsize::new(0)),
};
Box::pin(async move {
Self::configure_handlers(agent).await.expect("Failed to configure handlers")
})
})
.await
}
async fn configure_handlers(
mut builder: ManagedAgent<Idle, Self>,
) -> anyhow::Result<AgentHandle> {
builder
.act_on::<PersistJob>(|agent, envelope| {
let conn_opt = agent.model.redis_conn.clone();
let job = envelope.message().job.clone();
let ops_count = agent.model.operations_count.clone();
Box::pin(async move {
tokio::spawn(async move {
if let Some(mut conn) = conn_opt {
match persist_job_impl(&mut conn, &job).await {
Ok(()) => {
ops_count.fetch_add(1, Ordering::Relaxed);
debug!("Successfully persisted job {}", job.id);
}
Err(e) => {
error!("Failed to persist job {}: {:?}", job.id, e);
}
}
}
});
})
})
.act_on::<MarkJobCompleted>(|agent, envelope| {
let conn_opt = agent.model.redis_conn.clone();
let msg = envelope.message().clone();
let ops_count = agent.model.operations_count.clone();
Box::pin(async move {
tokio::spawn(async move {
if let Some(mut conn) = conn_opt {
match mark_completed_impl(&mut conn, msg.id, msg.execution_time_ms).await {
Ok(()) => {
ops_count.fetch_add(1, Ordering::Relaxed);
debug!("Successfully marked job {} as completed", msg.id);
}
Err(e) => {
error!("Failed to mark job {} as completed: {:?}", msg.id, e);
}
}
}
});
})
})
.act_on::<MarkJobFailed>(|agent, envelope| {
let conn_opt = agent.model.redis_conn.clone();
let msg = envelope.message().clone();
let ops_count = agent.model.operations_count.clone();
Box::pin(async move {
tokio::spawn(async move {
if let Some(mut conn) = conn_opt {
match mark_failed_impl(&mut conn, msg.id, &msg.error, msg.attempt).await {
Ok(()) => {
ops_count.fetch_add(1, Ordering::Relaxed);
debug!("Successfully marked job {} as failed", msg.id);
}
Err(e) => {
error!("Failed to mark job {} as failed: {:?}", msg.id, e);
}
}
}
});
})
})
.act_on::<MoveToDeadLetterQueue>(|agent, envelope| {
let conn_opt = agent.model.redis_conn.clone();
let msg = envelope.message().clone();
let ops_count = agent.model.operations_count.clone();
Box::pin(async move {
tokio::spawn(async move {
if let Some(mut conn) = conn_opt {
match move_to_dlq_impl(&mut conn, msg.id, &msg.job, &msg.error).await {
Ok(()) => {
ops_count.fetch_add(1, Ordering::Relaxed);
warn!("Moved job {} to DLQ: {}", msg.id, msg.error);
}
Err(e) => {
error!("Failed to move job {} to DLQ: {:?}", msg.id, e);
}
}
}
});
})
});
Ok(builder.start().await)
}
}
async fn persist_job_impl(
redis: &mut redis::aio::MultiplexedConnection,
job: &QueuedJob,
) -> Result<(), redis::RedisError> {
let key = format!("job:{}", job.id);
let json = serde_json::to_string(job).map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::TypeError,
"serialization error",
e.to_string(),
))
})?;
let _: () = redis.set_ex(&key, json, 604_800).await?;
let _: usize = redis.lpush("queue:pending", job.id.to_string()).await?;
Ok(())
}
async fn mark_completed_impl(
redis: &mut redis::aio::MultiplexedConnection,
id: JobId,
execution_time_ms: u64,
) -> Result<(), redis::RedisError> {
let key = format!("job:{id}");
let status = JobStatus::Completed {
completed_at: chrono::Utc::now(),
};
let status_json = serde_json::to_string(&status).map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::TypeError,
"serialization error",
e.to_string(),
))
})?;
let _: () = redis.hset(&key, "status", status_json).await?;
let _: () = redis.hset(&key, "execution_time_ms", execution_time_ms).await?;
let _: usize = redis.lrem("queue:pending", 1, id.to_string()).await?;
let _: usize = redis.lpush("queue:completed", id.to_string()).await?;
Ok(())
}
async fn mark_failed_impl(
redis: &mut redis::aio::MultiplexedConnection,
id: JobId,
error: &str,
attempt: u32,
) -> Result<(), redis::RedisError> {
let key = format!("job:{id}");
let status = JobStatus::Failed {
failed_at: chrono::Utc::now(),
attempts: attempt,
error: error.to_string(),
};
let status_json = serde_json::to_string(&status).map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::TypeError,
"serialization error",
e.to_string(),
))
})?;
let _: () = redis.hset(&key, "status", status_json).await?;
let _: () = redis.hset(&key, "attempts", attempt).await?;
Ok(())
}
async fn move_to_dlq_impl(
redis: &mut redis::aio::MultiplexedConnection,
id: JobId,
job: &QueuedJob,
error: &str,
) -> Result<(), redis::RedisError> {
let dlq_key = format!("dlq:{id}");
let json = serde_json::to_string(job).map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::TypeError,
"serialization error",
e.to_string(),
))
})?;
let _: () = redis.hset(&dlq_key, "job", json).await?;
let _: () = redis.hset(&dlq_key, "error", error).await?;
let _: () = redis.hset(&dlq_key, "moved_at", chrono::Utc::now().to_rfc3339()).await?;
let _: usize = redis.lpush("queue:dlq", id.to_string()).await?;
let _: usize = redis.lrem("queue:pending", 1, id.to_string()).await?;
Ok(())
}