use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::time::Duration;
use fred::clients::Pool;
use fred::interfaces::StreamsInterface;
use fred::types::streams::XReadValue;
use futures::Stream;
use futures::stream::unfold;
use ruststream::{BatchSubscriber, Subscriber};
use crate::convert::{HEADER_PREFIX, parts_from_fields};
use crate::deadletter::{
self, DELIVERY_COUNT_HEADER, IDLE_MS_HEADER, PoisonPolicy, REASON_MAX_DELIVERIES,
};
use crate::delay::{self, DelayConfig};
use crate::{error::RedisError, message::RedisMessage, stream::ReadMode};
type Entry = (String, HashMap<String, Vec<u8>>);
type RawStreams = Vec<(String, Vec<(String, Vec<(String, Vec<u8>)>)>)>;
const RECLAIM_START: &str = "0-0";
fn duration_to_millis(d: Duration) -> u64 {
u64::try_from(d.as_millis()).unwrap_or(u64::MAX)
}
pub struct RedisSubscriber {
pool: Pool,
key: String,
group: String,
consumer: String,
count: u64,
block: Duration,
mode: ReadMode,
policy: PoisonPolicy,
delay: Option<DelayConfig>,
cursor: String,
buffer: VecDeque<Entry>,
}
impl Debug for RedisSubscriber {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisSubscriber")
.field("key", &self.key)
.field("group", &self.group)
.field("consumer", &self.consumer)
.field("mode", &self.mode)
.finish_non_exhaustive()
}
}
impl RedisSubscriber {
#[allow(
clippy::too_many_arguments,
reason = "internal constructor mirroring the descriptor"
)]
pub(crate) fn new(
pool: Pool,
key: String,
group: String,
consumer: String,
count: u64,
block: Duration,
mode: ReadMode,
policy: PoisonPolicy,
delay: Option<DelayConfig>,
) -> Self {
Self {
pool,
key,
group,
consumer,
count,
block,
mode,
policy,
delay,
cursor: RECLAIM_START.to_owned(),
buffer: VecDeque::new(),
}
}
fn message(&self, id: String, fields: HashMap<String, Vec<u8>>) -> RedisMessage {
let (payload, headers) = parts_from_fields(fields);
RedisMessage::new(
self.pool.clone(),
self.key.clone(),
self.group.clone(),
id,
payload,
headers,
self.policy.clone(),
self.delay.clone(),
)
}
async fn fetch(&mut self) -> Result<(), RedisError> {
if let Some(cfg) = &self.delay {
delay::sweep_due(&self.pool, cfg, &self.key).await?;
}
let entries = match self.mode.clone() {
ReadMode::Fresh => self.fetch_fresh().await?,
ReadMode::Reclaim { min_idle } => self.fetch_reclaim(min_idle).await?,
};
self.buffer.extend(entries);
Ok(())
}
async fn fetch_fresh(&self) -> Result<Vec<Entry>, RedisError> {
let resp: RawStreams = self
.pool
.xreadgroup(
self.group.as_str(),
self.consumer.as_str(),
Some(self.count),
Some(duration_to_millis(self.block)),
false,
self.key.as_str(),
">",
)
.await
.map_err(RedisError::stream)?;
let entries = resp
.into_iter()
.find(|(key, _)| key == &self.key)
.map(|(_, entries)| entries)
.unwrap_or_default();
Ok(entries
.into_iter()
.map(|(id, fields)| (id, fields.into_iter().collect()))
.collect())
}
async fn fetch_reclaim(&mut self, min_idle: Duration) -> Result<Vec<Entry>, RedisError> {
let (cursor, entries): (String, Vec<XReadValue<String, String, Vec<u8>>>) = self
.pool
.xautoclaim_values(
self.key.as_str(),
self.group.as_str(),
self.consumer.as_str(),
duration_to_millis(min_idle),
self.cursor.as_str(),
Some(self.count),
false,
)
.await
.map_err(RedisError::stream)?;
self.cursor = cursor;
if entries.is_empty() {
tokio::time::sleep(self.block).await;
return Ok(entries);
}
if !self.policy.is_active() {
return Ok(entries);
}
self.enrich_reclaimed(entries).await
}
async fn enrich_reclaimed(&self, entries: Vec<Entry>) -> Result<Vec<Entry>, RedisError> {
let meta = self.pending_meta().await?;
let mut out = Vec::with_capacity(entries.len());
for (id, mut fields) in entries {
let (idle, count) = meta.get(&id).copied().unwrap_or((0, 0));
if self.policy.is_poison(count) {
self.dead_letter_reclaimed(&id, &fields).await?;
continue;
}
insert_meta_header(&mut fields, DELIVERY_COUNT_HEADER, count);
insert_meta_header(&mut fields, IDLE_MS_HEADER, idle);
out.push((id, fields));
}
Ok(out)
}
async fn pending_meta(&self) -> Result<HashMap<String, (u64, u64)>, RedisError> {
let rows: Vec<(String, String, u64, u64)> = self
.pool
.xpending(
self.key.as_str(),
self.group.as_str(),
(0_u64, "-", "+", self.count, self.consumer.as_str()),
)
.await
.map_err(RedisError::stream)?;
Ok(rows
.into_iter()
.map(|(id, _consumer, idle, count)| (id, (idle, count)))
.collect())
}
async fn dead_letter_reclaimed(
&self,
id: &str,
fields: &HashMap<String, Vec<u8>>,
) -> Result<(), RedisError> {
let (payload, headers) = parts_from_fields(fields.clone());
deadletter::settle_poison_stream(
&self.pool,
&self.policy,
&payload,
&headers,
REASON_MAX_DELIVERIES,
)
.await
.map_err(RedisError::stream)?;
let _: i64 = self
.pool
.xack(self.key.as_str(), self.group.as_str(), id)
.await
.map_err(RedisError::stream)?;
Ok(())
}
}
fn insert_meta_header(fields: &mut HashMap<String, Vec<u8>>, name: &str, value: u64) {
fields.insert(
format!("{HEADER_PREFIX}{name}"),
value.to_string().into_bytes(),
);
}
impl Subscriber for RedisSubscriber {
type Message = RedisMessage;
type Error = RedisError;
fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
unfold(self, |s| async move {
loop {
if let Some((id, fields)) = s.buffer.pop_front() {
return Some((Ok(s.message(id, fields)), s));
}
if let Err(err) = s.fetch().await {
return Some((Err(err), s));
}
}
})
}
}
impl BatchSubscriber for RedisSubscriber {
type Batch = Vec<RedisMessage>;
fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
unfold(self, |s| async move {
loop {
if !s.buffer.is_empty() {
let entries = std::mem::take(&mut s.buffer);
let batch = entries
.into_iter()
.map(|(id, fields)| s.message(id, fields))
.collect::<Vec<_>>();
return Some((Ok(batch), s));
}
if let Err(err) = s.fetch().await {
return Some((Err(err), s));
}
}
})
}
}