use std::fmt::{Debug, Formatter};
use bytes::Bytes;
use fred::clients::Pool;
use fred::interfaces::StreamsInterface;
use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
use crate::convert::fields_for_publish;
pub const PARTITION_KEY_HEADER: &str = "redis-partition-key";
struct AckHandle {
pool: Pool,
key: String,
group: String,
id: String,
}
pub struct RedisMessage {
payload: Bytes,
headers: Headers,
ack: Option<AckHandle>,
}
impl Debug for RedisMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("RedisMessage");
s.field("payload_len", &self.payload.len());
if let Some(ack) = &self.ack {
s.field("key", &ack.key).field("id", &ack.id);
}
s.finish_non_exhaustive()
}
}
impl RedisMessage {
pub(crate) fn new(
pool: Pool,
key: String,
group: String,
id: String,
payload: Bytes,
headers: Headers,
) -> Self {
Self {
payload,
headers,
ack: Some(AckHandle {
pool,
key,
group,
id,
}),
}
}
#[must_use]
pub fn id(&self) -> Option<&str> {
self.ack.as_ref().map(|a| a.id.as_str())
}
}
impl Partitioned for RedisMessage {
fn partition_key(&self) -> Option<&[u8]> {
self.headers().get(PARTITION_KEY_HEADER)
}
}
impl IncomingMessage for RedisMessage {
fn payload(&self) -> &[u8] {
&self.payload
}
fn headers(&self) -> &Headers {
&self.headers
}
async fn ack(mut self) -> Result<(), AckError> {
let handle = self.ack.take().expect("RedisMessage settled twice");
xack(&handle).await
}
async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
let handle = self.ack.take().expect("RedisMessage settled twice");
if requeue {
let fields = fields_for_publish(&self.payload, &self.headers);
let _: String = handle
.pool
.xadd(handle.key.as_str(), false, None::<()>, "*", fields)
.await
.map_err(|err| AckError::Broker(Box::new(err)))?;
}
xack(&handle).await
}
}
async fn xack(handle: &AckHandle) -> Result<(), AckError> {
let _: i64 = handle
.pool
.xack(
handle.key.as_str(),
handle.group.as_str(),
handle.id.as_str(),
)
.await
.map_err(|err| AckError::Broker(Box::new(err)))?;
Ok(())
}