ruststream_fred/
stream.rs1use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::Duration;
16
17use ruststream::SubscriptionSource;
18
19use crate::{RedisBroker, error::RedisError, subscriber::RedisSubscriber};
20
21const DEFAULT_COUNT: u64 = 64;
22const DEFAULT_BLOCK: Duration = Duration::from_secs(5);
23
24fn auto_consumer() -> String {
27 static COUNTER: AtomicU64 = AtomicU64::new(0);
28 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
29 format!("ruststream-{n}")
30}
31
32#[derive(Debug, Clone, Default)]
35pub enum StreamStart {
36 #[default]
38 New,
39 Beginning,
41 Id(String),
43}
44
45impl StreamStart {
46 pub(crate) fn as_id(&self) -> &str {
47 match self {
48 Self::New => "$",
49 Self::Beginning => "0",
50 Self::Id(id) => id,
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
56pub(crate) enum ReadMode {
57 Fresh,
59 Reclaim { min_idle: Duration },
61}
62
63#[derive(Debug, Clone)]
79#[must_use]
80pub struct RedisStream {
81 key: String,
82 group: Option<String>,
83 consumer: Option<String>,
84 count: Option<u64>,
85 block: Option<Duration>,
86 start: StreamStart,
87 mode: ReadMode,
88}
89
90impl RedisStream {
91 pub fn new(key: impl Into<String>) -> Self {
95 Self {
96 key: key.into(),
97 group: None,
98 consumer: None,
99 count: None,
100 block: None,
101 start: StreamStart::New,
102 mode: ReadMode::Fresh,
103 }
104 }
105
106 pub fn reclaim(key: impl Into<String>, min_idle: Duration) -> Self {
113 Self {
114 key: key.into(),
115 group: None,
116 consumer: None,
117 count: None,
118 block: None,
119 start: StreamStart::New,
120 mode: ReadMode::Reclaim { min_idle },
121 }
122 }
123
124 pub fn group(mut self, group: impl Into<String>) -> Self {
126 self.group = Some(group.into());
127 self
128 }
129
130 pub fn consumer(mut self, consumer: impl Into<String>) -> Self {
132 self.consumer = Some(consumer.into());
133 self
134 }
135
136 pub const fn count(mut self, count: u64) -> Self {
138 self.count = Some(count);
139 self
140 }
141
142 pub const fn block(mut self, block: Duration) -> Self {
146 self.block = Some(block);
147 self
148 }
149
150 pub fn start_id(mut self, start: StreamStart) -> Self {
153 self.start = start;
154 self
155 }
156
157 #[must_use]
159 pub fn key(&self) -> &str {
160 &self.key
161 }
162
163 pub(crate) fn group_or_err(&self) -> Result<&str, RedisError> {
164 self.group.as_deref().ok_or_else(|| {
165 RedisError::InvalidOptions(format!(
166 "stream subscription on `{}` requires a consumer group: call .group(name)",
167 self.key
168 ))
169 })
170 }
171
172 pub(crate) fn consumer_or_auto(&self) -> String {
173 self.consumer.clone().unwrap_or_else(auto_consumer)
174 }
175
176 pub(crate) fn count_or_default(&self) -> u64 {
177 self.count.unwrap_or(DEFAULT_COUNT)
178 }
179
180 pub(crate) fn block_or_default(&self) -> Duration {
181 self.block.unwrap_or(DEFAULT_BLOCK)
182 }
183
184 pub(crate) const fn start(&self) -> &StreamStart {
185 &self.start
186 }
187
188 pub(crate) fn mode(&self) -> ReadMode {
189 self.mode.clone()
190 }
191}
192
193impl SubscriptionSource<RedisBroker> for RedisStream {
194 type Subscriber = RedisSubscriber;
195
196 fn name(&self) -> &str {
197 self.key()
198 }
199
200 async fn subscribe(self, broker: &RedisBroker) -> Result<Self::Subscriber, RedisError> {
201 broker.subscribe(self).await
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn group_is_required() {
211 let err = RedisStream::new("orders").group_or_err().unwrap_err();
212 assert!(matches!(err, RedisError::InvalidOptions(msg) if msg.contains("consumer group")));
213 }
214
215 #[test]
216 fn group_set_resolves() {
217 let s = RedisStream::new("orders").group("workers");
218 assert_eq!(s.group_or_err().expect("group set"), "workers");
219 }
220
221 #[test]
222 fn start_maps_to_redis_ids() {
223 assert_eq!(StreamStart::New.as_id(), "$");
224 assert_eq!(StreamStart::Beginning.as_id(), "0");
225 assert_eq!(StreamStart::Id("5-0".into()).as_id(), "5-0");
226 }
227
228 #[test]
229 fn reclaim_carries_min_idle() {
230 let s = RedisStream::reclaim("orders", Duration::from_secs(30)).group("g");
231 assert!(matches!(s.mode(), ReadMode::Reclaim { min_idle } if min_idle.as_secs() == 30));
232 }
233}