use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::time::Duration;
use fred::clients::Pool;
use fred::interfaces::StreamsInterface;
use fred::types::streams::XReadValue;
use futures::Stream;
use futures::stream::unfold;
use ruststream::{BatchSubscriber, Subscriber};
use crate::{
convert::parts_from_fields, error::RedisError, message::RedisMessage, stream::ReadMode,
};
type Entry = (String, HashMap<String, Vec<u8>>);
type RawStreams = Vec<(String, Vec<(String, Vec<(String, Vec<u8>)>)>)>;
const RECLAIM_START: &str = "0-0";
fn duration_to_millis(d: Duration) -> u64 {
u64::try_from(d.as_millis()).unwrap_or(u64::MAX)
}
pub struct RedisSubscriber {
pool: Pool,
key: String,
group: String,
consumer: String,
count: u64,
block: Duration,
mode: ReadMode,
cursor: String,
buffer: VecDeque<Entry>,
}
impl Debug for RedisSubscriber {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisSubscriber")
.field("key", &self.key)
.field("group", &self.group)
.field("consumer", &self.consumer)
.field("mode", &self.mode)
.finish_non_exhaustive()
}
}
impl RedisSubscriber {
pub(crate) fn new(
pool: Pool,
key: String,
group: String,
consumer: String,
count: u64,
block: Duration,
mode: ReadMode,
) -> Self {
Self {
pool,
key,
group,
consumer,
count,
block,
mode,
cursor: RECLAIM_START.to_owned(),
buffer: VecDeque::new(),
}
}
fn message(&self, id: String, fields: HashMap<String, Vec<u8>>) -> RedisMessage {
let (payload, headers) = parts_from_fields(fields);
RedisMessage::new(
self.pool.clone(),
self.key.clone(),
self.group.clone(),
id,
payload,
headers,
)
}
async fn fetch(&mut self) -> Result<(), RedisError> {
let entries = match self.mode.clone() {
ReadMode::Fresh => self.fetch_fresh().await?,
ReadMode::Reclaim { min_idle } => self.fetch_reclaim(min_idle).await?,
};
self.buffer.extend(entries);
Ok(())
}
async fn fetch_fresh(&self) -> Result<Vec<Entry>, RedisError> {
let resp: RawStreams = self
.pool
.xreadgroup(
self.group.as_str(),
self.consumer.as_str(),
Some(self.count),
Some(duration_to_millis(self.block)),
false,
self.key.as_str(),
">",
)
.await
.map_err(RedisError::stream)?;
let entries = resp
.into_iter()
.find(|(key, _)| key == &self.key)
.map(|(_, entries)| entries)
.unwrap_or_default();
Ok(entries
.into_iter()
.map(|(id, fields)| (id, fields.into_iter().collect()))
.collect())
}
async fn fetch_reclaim(&mut self, min_idle: Duration) -> Result<Vec<Entry>, RedisError> {
let (cursor, entries): (String, Vec<XReadValue<String, String, Vec<u8>>>) = self
.pool
.xautoclaim_values(
self.key.as_str(),
self.group.as_str(),
self.consumer.as_str(),
duration_to_millis(min_idle),
self.cursor.as_str(),
Some(self.count),
false,
)
.await
.map_err(RedisError::stream)?;
self.cursor = cursor;
if entries.is_empty() {
tokio::time::sleep(self.block).await;
}
Ok(entries)
}
}
impl Subscriber for RedisSubscriber {
type Message = RedisMessage;
type Error = RedisError;
fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
unfold(self, |s| async move {
loop {
if let Some((id, fields)) = s.buffer.pop_front() {
return Some((Ok(s.message(id, fields)), s));
}
if let Err(err) = s.fetch().await {
return Some((Err(err), s));
}
}
})
}
}
impl BatchSubscriber for RedisSubscriber {
type Batch = Vec<RedisMessage>;
fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
unfold(self, |s| async move {
loop {
if !s.buffer.is_empty() {
let entries = std::mem::take(&mut s.buffer);
let batch = entries
.into_iter()
.map(|(id, fields)| s.message(id, fields))
.collect::<Vec<_>>();
return Some((Ok(batch), s));
}
if let Err(err) = s.fetch().await {
return Some((Err(err), s));
}
}
})
}
}