use crate::adapters::redis::RedisClient;
use crate::config::NotificationConfig;
use crate::domain::notification::{RealtimeNotification, UserEvent};
use redis::AsyncCommands;
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct NotificationRepository {
redis: Arc<RedisClient>,
channel_prefix: String,
push_queue_key: String,
global_channel_capacity: usize,
}
impl NotificationRepository {
#[must_use]
pub fn new(redis: Arc<RedisClient>, config: &NotificationConfig) -> Self {
Self {
redis,
channel_prefix: config.channel_prefix.clone(),
push_queue_key: config.push_queue_key.clone(),
global_channel_capacity: config.global_channel_capacity,
}
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn publish_realtime(&self, user_id: Uuid, event: UserEvent) -> anyhow::Result<()> {
let channel_name = format!("{}{user_id}", self.channel_prefix);
let payload = [event as u8];
let mut conn = self.redis.publisher();
conn.publish::<_, _, i64>(&channel_name, &payload).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn subscribe_realtime(&self) -> anyhow::Result<broadcast::Receiver<RealtimeNotification>> {
let pattern = format!("{}*", self.channel_prefix);
let mut redis_rx = self.redis.subscribe(&pattern).await?;
let (tx, rx) = broadcast::channel(self.global_channel_capacity);
let prefix = self.channel_prefix.clone();
tokio::spawn(async move {
while let Ok(msg) = redis_rx.recv().await {
if let Some(user_id_str) = msg.channel.strip_prefix(&prefix)
&& let Ok(user_id) = Uuid::parse_str(user_id_str)
&& let Some(payload_byte) = msg.payload.first()
&& let Ok(event) = UserEvent::try_from(*payload_byte)
{
let _ = tx.send(RealtimeNotification { user_id, event });
}
}
});
Ok(rx)
}
#[allow(clippy::cast_precision_loss)]
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn push_job(&self, user_id: Uuid, delay_secs: u64) -> anyhow::Result<()> {
let run_at = time::OffsetDateTime::now_utc().unix_timestamp() + i64::try_from(delay_secs).unwrap_or(0);
let mut conn = self.redis.publisher();
let _: i64 = redis::cmd("ZADD")
.arg(&self.push_queue_key)
.arg("NX")
.arg(run_at as f64)
.arg(user_id.to_string())
.query_async(&mut conn)
.await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn cancel_job(&self, user_id: Uuid) -> anyhow::Result<()> {
let mut conn = self.redis.publisher();
let _: i64 = conn.zrem(&self.push_queue_key, user_id.to_string()).await?;
Ok(())
}
#[allow(clippy::cast_precision_loss)]
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn lease_due_jobs(&self, limit: isize, timeout_secs: u64) -> anyhow::Result<Vec<Uuid>> {
let now = time::OffsetDateTime::now_utc().unix_timestamp() as f64;
let lease_until = now + timeout_secs as f64;
let mut conn = self.redis.publisher();
let script = redis::Script::new(
r#"
local jobs = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
if #jobs > 0 then
for _, job in ipairs(jobs) do
redis.call('ZADD', KEYS[1], ARGV[3], job)
end
end
return jobs
"#,
);
let candidates: Vec<String> =
script.key(&self.push_queue_key).arg(now).arg(limit).arg(lease_until).invoke_async(&mut conn).await?;
let leased = candidates.into_iter().filter_map(|s| Uuid::parse_str(&s).ok()).collect();
Ok(leased)
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn delete_job(&self, user_id: Uuid) -> anyhow::Result<()> {
let mut conn = self.redis.publisher();
let _: i64 = conn.zrem(&self.push_queue_key, user_id.to_string()).await?;
Ok(())
}
}