1use std::collections::{HashMap, VecDeque};
4use std::fmt::{Debug, Formatter};
5use std::time::Duration;
6
7use fred::clients::Pool;
8use fred::interfaces::StreamsInterface;
9use fred::types::streams::XReadValue;
10use futures::Stream;
11use futures::stream::unfold;
12use ruststream::{BatchSubscriber, Subscriber};
13
14use crate::convert::{HEADER_PREFIX, parts_from_fields};
15use crate::deadletter::{
16 self, DELIVERY_COUNT_HEADER, IDLE_MS_HEADER, PoisonPolicy, REASON_MAX_DELIVERIES,
17};
18use crate::delay::{self, DelayConfig};
19use crate::{error::RedisError, message::RedisMessage, stream::ReadMode};
20
21type Entry = (String, HashMap<String, Vec<u8>>);
23
24type RawStreams = Vec<(String, Vec<(String, Vec<(String, Vec<u8>)>)>)>;
29
30const RECLAIM_START: &str = "0-0";
32
33fn duration_to_millis(d: Duration) -> u64 {
34 u64::try_from(d.as_millis()).unwrap_or(u64::MAX)
35}
36
37pub struct RedisSubscriber {
42 pool: Pool,
43 key: String,
44 group: String,
45 consumer: String,
46 count: u64,
47 block: Duration,
48 mode: ReadMode,
49 policy: PoisonPolicy,
50 delay: Option<DelayConfig>,
53 cursor: String,
55 buffer: VecDeque<Entry>,
57}
58
59impl Debug for RedisSubscriber {
60 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("RedisSubscriber")
62 .field("key", &self.key)
63 .field("group", &self.group)
64 .field("consumer", &self.consumer)
65 .field("mode", &self.mode)
66 .finish_non_exhaustive()
67 }
68}
69
70impl RedisSubscriber {
71 #[allow(
72 clippy::too_many_arguments,
73 reason = "internal constructor mirroring the descriptor"
74 )]
75 pub(crate) fn new(
76 pool: Pool,
77 key: String,
78 group: String,
79 consumer: String,
80 count: u64,
81 block: Duration,
82 mode: ReadMode,
83 policy: PoisonPolicy,
84 delay: Option<DelayConfig>,
85 ) -> Self {
86 Self {
87 pool,
88 key,
89 group,
90 consumer,
91 count,
92 block,
93 mode,
94 policy,
95 delay,
96 cursor: RECLAIM_START.to_owned(),
97 buffer: VecDeque::new(),
98 }
99 }
100
101 fn message(&self, id: String, fields: HashMap<String, Vec<u8>>) -> RedisMessage {
102 let (payload, headers) = parts_from_fields(fields);
103 RedisMessage::new(
104 self.pool.clone(),
105 self.key.clone(),
106 self.group.clone(),
107 id,
108 payload,
109 headers,
110 self.policy.clone(),
111 self.delay.clone(),
112 )
113 }
114
115 async fn fetch(&mut self) -> Result<(), RedisError> {
118 if let Some(cfg) = &self.delay {
121 delay::sweep_due(&self.pool, cfg, &self.key).await?;
122 }
123 let entries = match self.mode.clone() {
124 ReadMode::Fresh => self.fetch_fresh().await?,
125 ReadMode::Reclaim { min_idle } => self.fetch_reclaim(min_idle).await?,
126 };
127 self.buffer.extend(entries);
128 Ok(())
129 }
130
131 async fn fetch_fresh(&self) -> Result<Vec<Entry>, RedisError> {
132 let resp: RawStreams = self
133 .pool
134 .xreadgroup(
135 self.group.as_str(),
136 self.consumer.as_str(),
137 Some(self.count),
138 Some(duration_to_millis(self.block)),
139 false,
140 self.key.as_str(),
141 ">",
142 )
143 .await
144 .map_err(RedisError::stream)?;
145 let entries = resp
146 .into_iter()
147 .find(|(key, _)| key == &self.key)
148 .map(|(_, entries)| entries)
149 .unwrap_or_default();
150 Ok(entries
151 .into_iter()
152 .map(|(id, fields)| (id, fields.into_iter().collect()))
153 .collect())
154 }
155
156 async fn fetch_reclaim(&mut self, min_idle: Duration) -> Result<Vec<Entry>, RedisError> {
157 let (cursor, entries): (String, Vec<XReadValue<String, String, Vec<u8>>>) = self
158 .pool
159 .xautoclaim_values(
160 self.key.as_str(),
161 self.group.as_str(),
162 self.consumer.as_str(),
163 duration_to_millis(min_idle),
164 self.cursor.as_str(),
165 Some(self.count),
166 false,
167 )
168 .await
169 .map_err(RedisError::stream)?;
170 self.cursor = cursor;
171 if entries.is_empty() {
173 tokio::time::sleep(self.block).await;
174 return Ok(entries);
175 }
176 if !self.policy.is_active() {
178 return Ok(entries);
179 }
180 self.enrich_reclaimed(entries).await
181 }
182
183 async fn enrich_reclaimed(&self, entries: Vec<Entry>) -> Result<Vec<Entry>, RedisError> {
186 let meta = self.pending_meta().await?;
187 let mut out = Vec::with_capacity(entries.len());
188 for (id, mut fields) in entries {
189 let (idle, count) = meta.get(&id).copied().unwrap_or((0, 0));
190 if self.policy.is_poison(count) {
191 self.dead_letter_reclaimed(&id, &fields).await?;
192 continue;
193 }
194 insert_meta_header(&mut fields, DELIVERY_COUNT_HEADER, count);
195 insert_meta_header(&mut fields, IDLE_MS_HEADER, idle);
196 out.push((id, fields));
197 }
198 Ok(out)
199 }
200
201 async fn pending_meta(&self) -> Result<HashMap<String, (u64, u64)>, RedisError> {
204 let rows: Vec<(String, String, u64, u64)> = self
205 .pool
206 .xpending(
207 self.key.as_str(),
208 self.group.as_str(),
209 (0_u64, "-", "+", self.count, self.consumer.as_str()),
210 )
211 .await
212 .map_err(RedisError::stream)?;
213 Ok(rows
214 .into_iter()
215 .map(|(id, _consumer, idle, count)| (id, (idle, count)))
216 .collect())
217 }
218
219 async fn dead_letter_reclaimed(
222 &self,
223 id: &str,
224 fields: &HashMap<String, Vec<u8>>,
225 ) -> Result<(), RedisError> {
226 let (payload, headers) = parts_from_fields(fields.clone());
227 deadletter::settle_poison_stream(
228 &self.pool,
229 &self.policy,
230 &payload,
231 &headers,
232 REASON_MAX_DELIVERIES,
233 )
234 .await
235 .map_err(RedisError::stream)?;
236 let _: i64 = self
237 .pool
238 .xack(self.key.as_str(), self.group.as_str(), id)
239 .await
240 .map_err(RedisError::stream)?;
241 Ok(())
242 }
243}
244
245fn insert_meta_header(fields: &mut HashMap<String, Vec<u8>>, name: &str, value: u64) {
248 fields.insert(
249 format!("{HEADER_PREFIX}{name}"),
250 value.to_string().into_bytes(),
251 );
252}
253
254impl Subscriber for RedisSubscriber {
255 type Message = RedisMessage;
256 type Error = RedisError;
257
258 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
266 unfold(self, |s| async move {
267 loop {
268 if let Some((id, fields)) = s.buffer.pop_front() {
269 return Some((Ok(s.message(id, fields)), s));
270 }
271 if let Err(err) = s.fetch().await {
273 return Some((Err(err), s));
274 }
275 }
276 })
277 }
278}
279
280impl BatchSubscriber for RedisSubscriber {
281 type Batch = Vec<RedisMessage>;
282
283 fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
291 unfold(self, |s| async move {
292 loop {
293 if !s.buffer.is_empty() {
294 let entries = std::mem::take(&mut s.buffer);
297 let batch = entries
298 .into_iter()
299 .map(|(id, fields)| s.message(id, fields))
300 .collect::<Vec<_>>();
301 return Some((Ok(batch), s));
302 }
303 if let Err(err) = s.fetch().await {
304 return Some((Err(err), s));
305 }
306 }
307 })
308 }
309}