use crate::job::now_ms;
use crate::redis::keys::events_key;
use fred::clients::Client;
use fred::interfaces::ClientLike;
use fred::types::{ClusterHash, CustomCommand, Value};
use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct EventsWriter {
inner: Inner,
}
#[derive(Clone)]
enum Inner {
Disabled,
Enabled {
client: Client,
stream_key: Arc<str>,
max_stream_len: u64,
},
}
impl EventsWriter {
pub(crate) fn new(client: Client, queue_name: &str, max_stream_len: u64) -> Self {
Self {
inner: Inner::Enabled {
client,
stream_key: Arc::from(events_key(queue_name)),
max_stream_len,
},
}
}
pub(crate) fn disabled() -> Self {
Self {
inner: Inner::Disabled,
}
}
pub(crate) fn is_enabled(&self) -> bool {
matches!(self.inner, Inner::Enabled { .. })
}
pub(crate) async fn emit_waiting(&self, id: &str, name: &str) {
self.xadd("waiting", id, name, &[]).await;
}
pub(crate) async fn emit_active(&self, id: &str, name: &str, attempt: u32) {
let attempt_s = attempt.to_string();
self.xadd("active", id, name, &[("attempt", &attempt_s)])
.await;
}
pub(crate) async fn emit_completed(
&self,
id: &str,
name: &str,
attempt: u32,
duration_us: u64,
) {
let attempt_s = attempt.to_string();
let duration_s = duration_us.to_string();
self.xadd(
"completed",
id,
name,
&[("attempt", &attempt_s), ("duration_us", &duration_s)],
)
.await;
}
pub(crate) async fn emit_failed(
&self,
id: &str,
name: &str,
attempt: u32,
reason: &str,
duration_us: Option<u64>,
) {
let attempt_s = attempt.to_string();
match duration_us {
Some(us) => {
let duration_s = us.to_string();
self.xadd(
"failed",
id,
name,
&[
("attempt", &attempt_s),
("reason", reason),
("duration_us", &duration_s),
],
)
.await;
}
None => {
self.xadd(
"failed",
id,
name,
&[("attempt", &attempt_s), ("reason", reason)],
)
.await;
}
}
}
pub(crate) async fn emit_retry_scheduled(
&self,
id: &str,
name: &str,
attempt: u32,
backoff_ms: u64,
) {
let attempt_s = attempt.to_string();
let backoff_s = backoff_ms.to_string();
self.xadd(
"retry-scheduled",
id,
name,
&[("attempt", &attempt_s), ("backoff_ms", &backoff_s)],
)
.await;
}
#[allow(dead_code)]
pub(crate) async fn emit_delayed(&self, id: &str, name: &str, delay_ms: u64) {
let delay_s = delay_ms.to_string();
self.xadd("delayed", id, name, &[("delay_ms", &delay_s)])
.await;
}
pub(crate) async fn emit_dlq(&self, id: &str, name: &str, reason: &str, attempt: u32) {
let attempt_s = attempt.to_string();
self.xadd(
"dlq",
id,
name,
&[("attempt", &attempt_s), ("reason", reason)],
)
.await;
}
pub(crate) async fn emit_drained(&self) {
self.xadd("drained", "", "", &[]).await;
}
async fn xadd(&self, event_name: &str, id: &str, name: &str, extra: &[(&str, &str)]) {
let (client, stream_key, max_stream_len) = match &self.inner {
Inner::Disabled => return,
Inner::Enabled {
client,
stream_key,
max_stream_len,
} => (client, stream_key, *max_stream_len),
};
let now = now_ms();
let mut args: Vec<Value> = Vec::with_capacity(8 + 6 + 2 + extra.len() * 2);
args.push(Value::from(stream_key.as_ref()));
args.push(Value::from("MAXLEN"));
args.push(Value::from("~"));
args.push(Value::from(max_stream_len as i64));
args.push(Value::from("*"));
args.push(Value::from("e"));
args.push(Value::from(event_name));
args.push(Value::from("id"));
args.push(Value::from(id));
args.push(Value::from("ts"));
args.push(Value::from(now.to_string()));
if !name.is_empty() {
args.push(Value::from("n"));
args.push(Value::from(name));
}
for (k, v) in extra {
args.push(Value::from(*k));
args.push(Value::from(*v));
}
let cmd = CustomCommand::new_static("XADD", ClusterHash::FirstKey, false);
let res: std::result::Result<Value, _> = client.custom(cmd, args).await;
if let Err(e) = res {
tracing::warn!(
event = event_name,
stream_key = %stream_key,
error = %e,
"events: XADD failed; event dropped"
);
}
}
}