use std::fmt::{Debug, Formatter};
use std::time::Duration;
use bytes::Bytes;
use fred::clients::Pool;
use fred::interfaces::StreamsInterface;
use ruststream::runtime::RETRY_COUNT_HEADER;
use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
use crate::convert::fields_for_publish;
use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
use crate::delay::{self, DelayConfig};
pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
struct AckHandle {
pool: Pool,
key: String,
group: String,
id: String,
}
pub struct RedisMessage {
payload: Bytes,
headers: Headers,
ack: Option<AckHandle>,
policy: PoisonPolicy,
delay: Option<DelayConfig>,
}
impl Debug for RedisMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("RedisMessage");
s.field("payload_len", &self.payload.len());
if let Some(ack) = &self.ack {
s.field("key", &ack.key).field("id", &ack.id);
}
s.finish_non_exhaustive()
}
}
impl RedisMessage {
#[allow(
clippy::too_many_arguments,
reason = "internal constructor mirroring the descriptor"
)]
pub(crate) fn new(
pool: Pool,
key: String,
group: String,
id: String,
payload: Bytes,
headers: Headers,
policy: PoisonPolicy,
delay: Option<DelayConfig>,
) -> Self {
Self {
payload,
headers,
ack: Some(AckHandle {
pool,
key,
group,
id,
}),
policy,
delay,
}
}
#[must_use]
pub fn id(&self) -> Option<&str> {
self.ack.as_ref().map(|a| a.id.as_str())
}
}
impl Partitioned for RedisMessage {
fn partition_key(&self) -> Option<&[u8]> {
self.headers().get(PARTITION_KEY_HEADER)
}
}
impl IncomingMessage for RedisMessage {
fn payload(&self) -> &[u8] {
&self.payload
}
fn headers(&self) -> &Headers {
&self.headers
}
async fn ack(mut self) -> Result<(), AckError> {
let handle = self.ack.take().expect("RedisMessage settled twice");
xack(&handle).await
}
async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
let handle = self.ack.take().expect("RedisMessage settled twice");
if requeue {
if self.policy.is_active() {
let next = next_retry_count(&self.headers);
if self.policy.is_poison(next) {
deadletter::settle_poison_stream(
&handle.pool,
&self.policy,
&self.payload,
&self.headers,
REASON_MAX_DELIVERIES,
)
.await
.map_err(broker_err)?;
} else {
let mut headers = self.headers.clone();
headers.insert(RETRY_COUNT_HEADER, next.to_string());
republish(&handle, &self.payload, &headers).await?;
}
} else {
republish(&handle, &self.payload, &self.headers).await?;
}
} else if self.policy.is_active() {
deadletter::settle_poison_stream(
&handle.pool,
&self.policy,
&self.payload,
&self.headers,
REASON_DROPPED,
)
.await
.map_err(broker_err)?;
}
xack(&handle).await
}
fn supports_nack_after(&self) -> bool {
self.delay.is_some()
}
async fn nack_after(mut self, delay: Duration) -> Result<(), AckError> {
let handle = self.ack.take().expect("RedisMessage settled twice");
let Some(cfg) = self.delay.as_ref() else {
return Err(AckError::Unsupported);
};
delay::schedule(
&handle.pool,
cfg,
&handle.id,
&self.payload,
&self.headers,
delay,
)
.await?;
xack(&handle).await
}
}
fn next_retry_count(headers: &Headers) -> u64 {
headers
.get_str(RETRY_COUNT_HEADER)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0)
+ 1
}
fn broker_err(err: fred::error::Error) -> AckError {
AckError::Broker(Box::new(err))
}
async fn republish(handle: &AckHandle, payload: &[u8], headers: &Headers) -> Result<(), AckError> {
let fields = fields_for_publish(payload, headers);
let _: String = handle
.pool
.xadd(handle.key.as_str(), false, None::<()>, "*", fields)
.await
.map_err(broker_err)?;
Ok(())
}
async fn xack(handle: &AckHandle) -> Result<(), AckError> {
let _: i64 = handle
.pool
.xack(
handle.key.as_str(),
handle.group.as_str(),
handle.id.as_str(),
)
.await
.map_err(broker_err)?;
Ok(())
}