use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use ruststream::SubscriptionSource;
use crate::deadletter::PoisonPolicy;
use crate::delay::{DelayConfig, DelayedRetry};
use crate::{RedisBroker, error::RedisError, subscriber::RedisSubscriber};
const DEFAULT_COUNT: u64 = 64;
const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
fn auto_consumer() -> String {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
format!("ruststream-{n}")
}
#[derive(Debug, Clone, Default)]
pub enum StreamStart {
#[default]
New,
Beginning,
Id(String),
}
impl StreamStart {
pub(crate) fn as_id(&self) -> &str {
match self {
Self::New => "$",
Self::Beginning => "0",
Self::Id(id) => id,
}
}
}
#[derive(Debug, Clone)]
pub(crate) enum ReadMode {
Fresh,
Reclaim { min_idle: Duration },
}
#[derive(Debug, Clone)]
#[must_use]
pub struct RedisStream {
key: String,
group: Option<String>,
consumer: Option<String>,
count: Option<u64>,
block: Option<Duration>,
start: StreamStart,
mode: ReadMode,
dead_letter: Option<String>,
max_deliveries: Option<u64>,
delayed_retry: Option<DelayedRetry>,
}
impl RedisStream {
pub fn new(key: impl Into<String>) -> Self {
Self {
key: key.into(),
group: None,
consumer: None,
count: None,
block: None,
start: StreamStart::New,
mode: ReadMode::Fresh,
dead_letter: None,
max_deliveries: None,
delayed_retry: None,
}
}
pub fn reclaim(key: impl Into<String>, min_idle: Duration) -> Self {
Self {
key: key.into(),
group: None,
consumer: None,
count: None,
block: None,
start: StreamStart::New,
mode: ReadMode::Reclaim { min_idle },
dead_letter: None,
max_deliveries: None,
delayed_retry: None,
}
}
pub fn group(mut self, group: impl Into<String>) -> Self {
self.group = Some(group.into());
self
}
pub fn consumer(mut self, consumer: impl Into<String>) -> Self {
self.consumer = Some(consumer.into());
self
}
pub const fn count(mut self, count: u64) -> Self {
self.count = Some(count);
self
}
pub const fn block(mut self, block: Duration) -> Self {
self.block = Some(block);
self
}
pub fn start_id(mut self, start: StreamStart) -> Self {
self.start = start;
self
}
pub fn dead_letter(mut self, key: impl Into<String>) -> Self {
self.dead_letter = Some(key.into());
self
}
pub const fn max_deliveries(mut self, max: u64) -> Self {
self.max_deliveries = Some(max);
self
}
pub fn delayed_retry(mut self, retry: DelayedRetry) -> Self {
self.delayed_retry = Some(retry);
self
}
#[must_use]
pub fn key(&self) -> &str {
&self.key
}
pub(crate) fn group_or_err(&self) -> Result<&str, RedisError> {
self.group.as_deref().ok_or_else(|| {
RedisError::InvalidOptions(format!(
"stream subscription on `{}` requires a consumer group: call .group(name)",
self.key
))
})
}
pub(crate) fn consumer_or_auto(&self) -> String {
self.consumer.clone().unwrap_or_else(auto_consumer)
}
pub(crate) fn count_or_default(&self) -> u64 {
self.count.unwrap_or(DEFAULT_COUNT)
}
pub(crate) fn block_or_default(&self) -> Duration {
self.block.unwrap_or(DEFAULT_BLOCK)
}
pub(crate) const fn start(&self) -> &StreamStart {
&self.start
}
pub(crate) fn mode(&self) -> ReadMode {
self.mode.clone()
}
pub(crate) fn poison_policy(&self) -> PoisonPolicy {
PoisonPolicy {
dead_letter: self.dead_letter.clone(),
max_deliveries: self.max_deliveries,
}
}
pub(crate) fn delay_config(&self) -> Option<DelayConfig> {
self.delayed_retry.as_ref().map(DelayConfig::from_retry)
}
}
impl SubscriptionSource<RedisBroker> for RedisStream {
type Subscriber = RedisSubscriber;
fn name(&self) -> &str {
self.key()
}
async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
broker.subscribe(self).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn group_is_required() {
let err = RedisStream::new("orders").group_or_err().unwrap_err();
assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("consumer group")));
}
#[test]
fn group_set_resolves() {
let s = RedisStream::new("orders").group("workers");
assert_eq!(s.group_or_err().expect("group set"), "workers");
}
#[test]
fn start_maps_to_redis_ids() {
assert_eq!(StreamStart::New.as_id(), "$");
assert_eq!(StreamStart::Beginning.as_id(), "0");
assert_eq!(StreamStart::Id("5-0".into()).as_id(), "5-0");
}
#[test]
fn reclaim_carries_min_idle() {
let s = RedisStream::reclaim("orders", Duration::from_secs(30)).group("g");
assert!(matches!(s.mode(), ReadMode::Reclaim { min_idle } if min_idle.as_secs() == 30));
}
}