use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use fred::clients::Pool;
use fred::error::ErrorKind;
use fred::interfaces::{KeysInterface, ListInterface};
use fred::types::lists::LMoveDirection;
use futures::Stream;
use futures::stream::unfold;
use ruststream::codec::Codec;
use ruststream::runtime::RETRY_COUNT_HEADER;
use ruststream::{AckError, Headers, IncomingMessage, Partitioned, SubscriptionSource};
use crate::deadletter::{self, PoisonPolicy, REASON_DROPPED, REASON_MAX_DELIVERIES};
use crate::envelope::{SharedEnvelope, frame, unframe};
use crate::recovery::{self, RecoveryConfig};
use crate::{RedisBroker, error::RedisError, message::PARTITION_KEY_HEADER};
const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
const PROCESSING_SUFFIX: &str = ".processing";
fn block_secs(block: Duration) -> f64 {
block.as_secs_f64()
}
fn empty_on_timeout<T>(
result: Result<Option<T>, fred::error::Error>,
) -> Result<Option<T>, RedisError> {
match result {
Ok(value) => Ok(value),
Err(err) if matches!(err.kind(), ErrorKind::Timeout) => Ok(None),
Err(err) => Err(RedisError::stream(err)),
}
}
#[derive(Clone)]
#[must_use]
pub struct RedisList {
key: String,
reliable: bool,
processing: Option<String>,
block: Option<Duration>,
codec: Option<SharedEnvelope>,
dead_letter: Option<String>,
max_deliveries: Option<u64>,
min_idle: Option<Duration>,
recovery_zset: Option<String>,
recovery_ttl: Option<Duration>,
}
impl Debug for RedisList {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisList")
.field("key", &self.key)
.field("reliable", &self.reliable)
.field("processing", &self.processing)
.field("codec", &self.codec.is_some())
.field("dead_letter", &self.dead_letter)
.field("max_deliveries", &self.max_deliveries)
.field("recovery_zset", &self.recovery_zset)
.field("recovery_ttl", &self.recovery_ttl)
.finish_non_exhaustive()
}
}
impl RedisList {
pub fn new(key: impl Into<String>) -> Self {
Self {
key: key.into(),
reliable: false,
processing: None,
block: None,
codec: None,
dead_letter: None,
max_deliveries: None,
min_idle: None,
recovery_zset: None,
recovery_ttl: None,
}
}
pub const fn reliable(mut self) -> Self {
self.reliable = true;
self
}
pub fn processing(mut self, key: impl Into<String>) -> Self {
self.processing = Some(key.into());
self
}
pub const fn block(mut self, block: Duration) -> Self {
self.block = Some(block);
self
}
pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
self.codec = Some(Arc::new(codec));
self
}
pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
self.dead_letter = Some(key.into());
self
}
pub const fn max_deliveries(mut self, max: u64) -> Self {
self.max_deliveries = Some(max);
self
}
pub const fn min_idle(mut self, min_idle: Duration) -> Self {
self.min_idle = Some(min_idle);
self
}
pub fn recovery_zset(mut self, key: impl Into<String>) -> Self {
self.recovery_zset = Some(key.into());
self.reliable = true;
self
}
pub const fn recovery_ttl(mut self, ttl: Duration) -> Self {
self.recovery_ttl = Some(ttl);
self
}
#[must_use]
pub fn key(&self) -> &str {
&self.key
}
pub(crate) const fn is_reliable(&self) -> bool {
self.reliable
}
pub(crate) fn processing_or_default(&self) -> String {
self.processing
.clone()
.unwrap_or_else(|| format!("{}{PROCESSING_SUFFIX}", self.key))
}
pub(crate) fn block_or_default(&self) -> Duration {
self.block.unwrap_or(DEFAULT_BLOCK)
}
pub(crate) fn codec_handle(&self) -> Option<SharedEnvelope> {
self.codec.clone()
}
pub(crate) fn poison_policy(&self) -> PoisonPolicy {
PoisonPolicy {
dead_letter: self.dead_letter.clone(),
max_deliveries: self.max_deliveries,
}
}
pub(crate) fn recovery_config(&self) -> Result<Option<RecoveryConfig>, RedisError> {
let Some(zset_key) = self.recovery_zset.clone() else {
return Ok(None);
};
let min_idle = self.min_idle.ok_or_else(|| {
RedisError::InvalidOptions(format!(
"reliable list recovery on `{}` needs a min_idle: call .min_idle(duration) \
alongside .recovery_zset(key)",
self.key
))
})?;
Ok(Some(RecoveryConfig {
zset_key,
min_idle,
ttl: self.recovery_ttl,
}))
}
}
impl SubscriptionSource<RedisBroker> for RedisList {
type Subscriber = RedisListSubscriber;
fn name(&self) -> &str {
self.key()
}
async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
broker.subscribe_list(self).await
}
}
pub struct RedisListSubscriber {
pool: Pool,
key: String,
reliable: bool,
processing: String,
block: Duration,
codec: Option<SharedEnvelope>,
policy: PoisonPolicy,
recovery: Option<RecoveryConfig>,
}
impl Debug for RedisListSubscriber {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisListSubscriber")
.field("key", &self.key)
.field("reliable", &self.reliable)
.field("poison", &self.policy.is_active())
.field("recovery", &self.recovery.is_some())
.finish_non_exhaustive()
}
}
impl RedisListSubscriber {
#[allow(
clippy::too_many_arguments,
reason = "internal constructor mirroring the descriptor"
)]
pub(crate) fn new(
pool: Pool,
key: String,
reliable: bool,
processing: String,
block: Duration,
codec: Option<SharedEnvelope>,
policy: PoisonPolicy,
recovery: Option<RecoveryConfig>,
) -> Self {
Self {
pool,
key,
reliable,
processing,
block,
codec,
policy,
recovery,
}
}
fn simple_message(&self, raw: &[u8]) -> RedisListMessage {
let (payload, headers) = unframe(self.codec.as_ref(), raw);
RedisListMessage {
payload,
headers,
ack: None,
}
}
fn reliable_message(&self, raw: Vec<u8>, recovery: Option<RecoveryHandle>) -> RedisListMessage {
let (payload, headers) = unframe(self.codec.as_ref(), &raw);
RedisListMessage {
payload,
headers,
ack: Some(ListAck {
pool: self.pool.clone(),
main_key: self.key.clone(),
processing_key: self.processing.clone(),
value: raw,
codec: self.codec.clone(),
policy: self.policy.clone(),
recovery,
}),
}
}
async fn next_entry(&self) -> Result<Option<RedisListMessage>, RedisError> {
let secs = block_secs(self.block);
if self.reliable {
if let Some(cfg) = &self.recovery {
recovery::sweep_orphans(&self.pool, cfg, &self.key, &self.processing).await?;
}
let value: Option<Vec<u8>> = empty_on_timeout(
self.pool
.blmove(
self.key.as_str(),
self.processing.as_str(),
LMoveDirection::Right,
LMoveDirection::Left,
secs,
)
.await,
)?;
let Some(value) = value else {
return Ok(None);
};
let handle = match &self.recovery {
Some(cfg) => {
let member = recovery::record_claim(&self.pool, cfg, &value).await?;
Some(RecoveryHandle {
zset_key: cfg.zset_key.clone(),
member,
})
}
None => None,
};
Ok(Some(self.reliable_message(value, handle)))
} else {
let popped: Option<(String, Vec<u8>)> =
empty_on_timeout(self.pool.brpop(self.key.as_str(), secs).await)?;
Ok(popped.map(|(_, v)| self.simple_message(&v)))
}
}
}
impl ruststream::Subscriber for RedisListSubscriber {
type Message = RedisListMessage;
type Error = RedisError;
fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
unfold(&*self, |s| async move {
loop {
match s.next_entry().await {
Ok(Some(msg)) => return Some((Ok(msg), s)),
Ok(None) => {}
Err(err) => return Some((Err(err), s)),
}
}
})
}
}
struct ListAck {
pool: Pool,
main_key: String,
processing_key: String,
value: Vec<u8>,
codec: Option<SharedEnvelope>,
policy: PoisonPolicy,
recovery: Option<RecoveryHandle>,
}
struct RecoveryHandle {
zset_key: String,
member: Vec<u8>,
}
pub struct RedisListMessage {
payload: Bytes,
headers: Headers,
ack: Option<ListAck>,
}
impl Debug for RedisListMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisListMessage")
.field("payload_len", &self.payload.len())
.field("reliable", &self.ack.is_some())
.finish_non_exhaustive()
}
}
impl IncomingMessage for RedisListMessage {
fn payload(&self) -> &[u8] {
&self.payload
}
fn headers(&self) -> &Headers {
&self.headers
}
async fn ack(self) -> Result<(), AckError> {
let Some(handle) = self.ack else {
return Err(AckError::Unsupported);
};
settle(&handle).await
}
async fn nack(self, requeue: bool) -> Result<(), AckError> {
let Some(handle) = self.ack else {
return Err(AckError::Unsupported);
};
if requeue {
if handle.policy.is_active() {
let next = next_retry_count(&self.headers);
if handle.policy.is_poison(next) {
list_dead_letter(&handle, &self.payload, &self.headers, REASON_MAX_DELIVERIES)
.await?;
} else {
let mut headers = self.headers.clone();
headers.insert(RETRY_COUNT_HEADER, next.to_string());
let body = frame(handle.codec.as_ref(), &self.payload, &headers);
lpush(&handle.pool, handle.main_key.as_str(), body).await?;
}
} else {
lpush(&handle.pool, handle.main_key.as_str(), handle.value.clone()).await?;
}
} else if handle.policy.is_active() {
list_dead_letter(&handle, &self.payload, &self.headers, REASON_DROPPED).await?;
}
settle(&handle).await
}
}
fn ack_broker(err: fred::error::Error) -> AckError {
AckError::Broker(Box::new(err))
}
fn next_retry_count(headers: &Headers) -> u64 {
headers
.get_str(RETRY_COUNT_HEADER)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0)
+ 1
}
async fn lpush(pool: &Pool, key: &str, body: Vec<u8>) -> Result<(), AckError> {
let _: i64 = pool.lpush(key, body).await.map_err(ack_broker)?;
Ok(())
}
async fn list_dead_letter(
handle: &ListAck,
payload: &[u8],
headers: &Headers,
reason: &'static str,
) -> Result<(), AckError> {
if let Some(dlq) = handle.policy.dead_letter_key() {
let body = frame(
handle.codec.as_ref(),
payload,
&deadletter::with_reason(headers, reason),
);
lpush(&handle.pool, dlq, body).await?;
}
Ok(())
}
async fn settle(handle: &ListAck) -> Result<(), AckError> {
let _: i64 = handle
.pool
.lrem(handle.processing_key.as_str(), 1, handle.value.clone())
.await
.map_err(ack_broker)?;
if let Some(rec) = &handle.recovery {
recovery::forget(&handle.pool, &rec.zset_key, &rec.member).await?;
}
Ok(())
}
impl Partitioned for RedisListMessage {
fn partition_key(&self) -> Option<&[u8]> {
self.headers().get(PARTITION_KEY_HEADER)
}
}
#[derive(Clone)]
pub struct RedisListPublisher {
pool: Arc<tokio::sync::OnceCell<Pool>>,
codec: Option<SharedEnvelope>,
ttl: Option<Duration>,
}
impl Debug for RedisListPublisher {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisListPublisher")
.field("codec", &self.codec.is_some())
.field("ttl", &self.ttl)
.finish_non_exhaustive()
}
}
impl RedisListPublisher {
pub(crate) fn new(pool: Arc<tokio::sync::OnceCell<Pool>>) -> Self {
Self {
pool,
codec: None,
ttl: None,
}
}
#[must_use]
pub fn codec(mut self, codec: impl Codec + 'static) -> Self {
self.codec = Some(Arc::new(codec));
self
}
#[must_use]
pub const fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
self
}
}
fn ttl_millis(ttl: Duration) -> i64 {
i64::try_from(ttl.as_millis()).unwrap_or(i64::MAX).max(1)
}
impl ruststream::Publisher for RedisListPublisher {
type Error = RedisError;
async fn publish(&self, msg: ruststream::OutgoingMessage<'_>) -> Result<(), Self::Error> {
let pool = self.pool.get().cloned().ok_or(RedisError::NotConnected)?;
let body = frame(self.codec.as_ref(), msg.payload(), msg.headers());
let Some(ttl) = self.ttl else {
let _: i64 = pool
.lpush(msg.name(), body)
.await
.map_err(RedisError::publish)?;
return Ok(());
};
let pipeline = pool.next().pipeline();
let _: () = pipeline
.lpush(msg.name(), body)
.await
.map_err(RedisError::publish)?;
let _: () = pipeline
.pexpire(msg.name(), ttl_millis(ttl), None)
.await
.map_err(RedisError::publish)?;
let _: Vec<fred::types::Value> = pipeline.all().await.map_err(RedisError::publish)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ttl_millis_converts_and_clamps() {
assert_eq!(ttl_millis(Duration::from_secs(60)), 60_000);
assert_eq!(ttl_millis(Duration::from_millis(1)), 1);
assert_eq!(ttl_millis(Duration::from_nanos(1)), 1);
assert_eq!(ttl_millis(Duration::ZERO), 1);
}
}