use fred::clients::Pool;
use fred::interfaces::StreamsInterface;
use ruststream::Headers;
use crate::convert::fields_for_publish;
pub const DEAD_LETTER_REASON_HEADER: &str = "x-dead-letter-reason";
pub const DELIVERY_COUNT_HEADER: &str = "redis-delivery-count";
pub const IDLE_MS_HEADER: &str = "redis-idle-ms";
pub(crate) const REASON_DROPPED: &str = "dropped";
pub(crate) const REASON_MAX_DELIVERIES: &str = "max-deliveries";
#[derive(Debug, Clone, Default)]
pub(crate) struct PoisonPolicy {
pub(crate) dead_letter: Option<String>,
pub(crate) max_deliveries: Option<u64>,
}
impl PoisonPolicy {
pub(crate) const fn is_active(&self) -> bool {
self.dead_letter.is_some() || self.max_deliveries.is_some()
}
pub(crate) fn is_poison(&self, count: u64) -> bool {
self.max_deliveries.is_some_and(|max| count >= max)
}
pub(crate) fn dead_letter_key(&self) -> Option<&str> {
self.dead_letter.as_deref()
}
}
pub(crate) fn with_reason(headers: &Headers, reason: &'static str) -> Headers {
let mut tagged = headers.clone();
tagged.insert(DEAD_LETTER_REASON_HEADER, reason);
tagged
}
pub(crate) async fn settle_poison_stream(
pool: &Pool,
policy: &PoisonPolicy,
payload: &[u8],
headers: &Headers,
reason: &'static str,
) -> Result<(), fred::error::Error> {
if let Some(dlq) = policy.dead_letter_key() {
let fields = fields_for_publish(payload, &with_reason(headers, reason));
let _: String = pool.xadd(dlq, false, None::<()>, "*", fields).await?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn is_poison_only_when_count_reaches_the_cap() {
let policy = PoisonPolicy {
dead_letter: None,
max_deliveries: Some(3),
};
assert!(!policy.is_poison(2));
assert!(policy.is_poison(3));
assert!(policy.is_poison(4));
assert!(policy.is_active());
}
#[test]
fn no_cap_is_never_poison() {
let policy = PoisonPolicy::default();
assert!(!policy.is_poison(u64::MAX));
assert!(!policy.is_active());
}
#[test]
fn with_reason_tags_without_dropping_originals() {
let mut headers = Headers::new();
headers.insert("content-type", "application/json");
let tagged = with_reason(&headers, REASON_DROPPED);
assert_eq!(tagged.get_str(DEAD_LETTER_REASON_HEADER), Some("dropped"));
assert_eq!(tagged.content_type(), Some("application/json"));
}
}