use crate::adapters::redis::RedisClient;
use crate::config::NotificationConfig;
use crate::domain::notification::{RealtimeNotification, UserEvent};
use redis::AsyncCommands;
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct NotificationRepository {
redis: Arc<RedisClient>,
channel_prefix: String,
push_queue_key: String,
global_channel_capacity: usize,
}
impl NotificationRepository {
#[must_use]
pub fn new(redis: Arc<RedisClient>, config: &NotificationConfig) -> Self {
Self {
redis,
channel_prefix: config.channel_prefix.clone(),
push_queue_key: config.push_queue_key.clone(),
global_channel_capacity: config.global_channel_capacity,
}
}
#[tracing::instrument(level = "debug", skip(self, device_ids), err)]
pub async fn publish_realtime(&self, device_ids: &[Uuid], event: UserEvent) -> anyhow::Result<()> {
if device_ids.is_empty() {
return Ok(());
}
let payload = [event as u8];
let mut pipe = redis::pipe();
for device_id in device_ids {
let channel_name = format!("{}{device_id}", self.channel_prefix);
pipe.publish(&channel_name, &payload);
}
let mut conn = self.redis.publisher();
let _: () = pipe.query_async(&mut conn).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn subscribe_realtime(&self) -> anyhow::Result<broadcast::Receiver<RealtimeNotification>> {
let pattern = format!("{}*", self.channel_prefix);
let mut redis_rx = self.redis.subscribe(&pattern).await?;
let (tx, rx) = broadcast::channel(self.global_channel_capacity);
let prefix = self.channel_prefix.clone();
tokio::spawn(async move {
while let Ok(msg) = redis_rx.recv().await {
if let Some(device_id_str) = msg.channel.strip_prefix(&prefix)
&& let Ok(device_id) = Uuid::parse_str(device_id_str)
&& let Some(payload_byte) = msg.payload.first()
&& let Ok(event) = UserEvent::try_from(*payload_byte)
{
let _ = tx.send(RealtimeNotification { device_id, event });
}
}
});
Ok(rx)
}
#[allow(clippy::cast_precision_loss)]
#[tracing::instrument(level = "debug", skip(self, device_ids), err)]
pub async fn push_jobs(&self, device_ids: &[Uuid], delay_secs: u64) -> anyhow::Result<()> {
if device_ids.is_empty() {
return Ok(());
}
let run_at = time::OffsetDateTime::now_utc().unix_timestamp() + i64::try_from(delay_secs).unwrap_or(0);
let mut pipe = redis::pipe();
for device_id in device_ids {
pipe.cmd("ZADD").arg(&self.push_queue_key).arg("NX").arg(run_at as f64).arg(device_id.to_string());
}
let mut conn = self.redis.publisher();
let _: () = pipe.query_async(&mut conn).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn cancel_job(&self, device_id: Uuid) -> anyhow::Result<()> {
let mut conn = self.redis.publisher();
let _: i64 = conn.zrem(&self.push_queue_key, device_id.to_string()).await?;
Ok(())
}
#[allow(clippy::cast_precision_loss)]
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn lease_due_jobs(&self, limit: isize, timeout_secs: u64) -> anyhow::Result<Vec<Uuid>> {
let now = time::OffsetDateTime::now_utc().unix_timestamp() as f64;
let lease_until = now + timeout_secs as f64;
let mut conn = self.redis.publisher();
let script = redis::Script::new(
r#"
local jobs = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
if #jobs > 0 then
for _, job in ipairs(jobs) do
redis.call('ZADD', KEYS[1], ARGV[3], job)
end
end
return jobs
"#,
);
let candidates: Vec<String> =
script.key(&self.push_queue_key).arg(now).arg(limit).arg(lease_until).invoke_async(&mut conn).await?;
let leased = candidates.into_iter().filter_map(|s| Uuid::parse_str(&s).ok()).collect();
Ok(leased)
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn delete_job(&self, device_id: Uuid) -> anyhow::Result<()> {
let mut conn = self.redis.publisher();
let _: i64 = conn.zrem(&self.push_queue_key, device_id.to_string()).await?;
Ok(())
}
}