sea_streamer_redis/consumer/
mod.rs1mod cluster;
2mod future;
3mod node;
4mod options;
5mod shard;
6
7use cluster::*;
8use future::StreamFuture;
9pub use future::{NextFuture, StreamFuture as RedisMessageStream};
10use node::*;
11pub use options::*;
12use shard::*;
13
14use flume::{bounded, unbounded, Receiver, Sender};
15use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};
16
17use crate::{
18 from_seq_no, get_message_id, host_id, MessageField, MessageId, RedisCluster, RedisErr,
19 RedisResult, TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID,
20};
21use sea_streamer_runtime::{spawn_task, timeout};
22use sea_streamer_types::{
23 export::futures::FutureExt, Buffer, ConnectOptions, Consumer, ConsumerGroup, ConsumerId,
24 ConsumerMode, ConsumerOptions, Message, MessageHeader, SeqNo, SeqPos, ShardId, SharedMessage,
25 StreamErr, StreamKey, Timestamp, SEA_STREAMER_INTERNAL,
26};
27
28#[derive(Debug)]
29pub struct RedisConsumer {
31 config: ConsumerConfig,
32 streams: Vec<StreamShard>,
33 receiver: Receiver<RedisResult<SharedMessage>>,
34 handle: Sender<CtrlMsg>,
35}
36
37#[derive(Debug, Clone)]
38pub struct RedisConsumerOptions {
40 mode: ConsumerMode,
41 group_id: Option<ConsumerGroup>,
42 consumer_id: Option<ConsumerId>,
43 consumer_timeout: Option<Duration>,
44 auto_stream_reset: AutoStreamReset,
45 auto_commit: AutoCommit,
46 auto_commit_delay: Duration,
47 auto_commit_interval: Duration,
48 auto_claim_interval: Option<Duration>,
49 auto_claim_idle: Duration,
50 batch_size: usize,
51 shard_ownership: ShardOwnership,
52 mkstream: bool,
53 pub(crate) timestamp_format: TimestampFormat,
54 pub(crate) message_field: MessageField,
55}
56
57#[derive(Debug)]
58struct ConsumerConfig {
59 group_id: Option<ConsumerGroup>,
60 consumer_id: Option<ConsumerId>,
61 auto_ack: bool,
62 pre_fetch: bool,
63 timestamp_format: TimestampFormat,
64}
65
66pub mod constants {
68 use std::time::Duration;
69
70 pub const DEFAULT_AUTO_COMMIT_DELAY: Duration = Duration::from_secs(5);
71 pub const DEFAULT_AUTO_COMMIT_INTERVAL: Duration = Duration::from_secs(1);
72 pub const DEFAULT_AUTO_CLAIM_INTERVAL: Duration = Duration::from_secs(30);
73 pub const DEFAULT_AUTO_CLAIM_IDLE: Duration = Duration::from_secs(60);
74 pub const DEFAULT_BATCH_SIZE: usize = 100;
75 pub const DEFAULT_LOAD_BALANCED_BATCH_SIZE: usize = 10;
76 #[cfg(feature = "test")]
77 pub const HEARTBEAT: Duration = Duration::from_secs(1);
78 #[cfg(not(feature = "test"))]
79 pub const HEARTBEAT: Duration = Duration::from_secs(10);
80}
81
82impl Consumer for RedisConsumer {
83 type Error = RedisErr;
84 type Message<'a> = SharedMessage;
85 type NextFuture<'a> = NextFuture<'a>;
86 type Stream<'a> = StreamFuture<'a>;
87
88 #[inline]
89 async fn seek(&mut self, ts: Timestamp) -> RedisResult<()> {
90 self.seek_to((
91 (match self.config.timestamp_format {
92 TimestampFormat::UnixTimestampMillis => ts.unix_timestamp_nanos() / 1_000_000,
93 #[cfg(feature = "nanosecond-timestamp")]
94 TimestampFormat::UnixTimestampNanos => ts.unix_timestamp_nanos(),
95 }) as u64,
96 u16::MAX,
97 ))
98 .await
99 }
100
101 #[inline]
102 async fn rewind(&mut self, pos: SeqPos) -> RedisResult<()> {
103 self.seek_to(match pos {
104 SeqPos::Beginning => (0, 0),
105 SeqPos::End => MAX_MSG_ID,
106 SeqPos::At(no) => from_seq_no(no),
107 })
108 .await
109 }
110
111 fn assign(&mut self, (stream, shard): StreamShard) -> RedisResult<()> {
112 if !self.streams.iter().any(|(s, _)| s == &stream) {
113 return Err(StreamErr::StreamKeyNotFound);
114 }
115 if !self
116 .streams
117 .iter()
118 .any(|(s, t)| (s, t) == (&stream, &shard))
119 {
120 self.streams.push((stream, shard));
121 }
122 Ok(())
123 }
124
125 fn unassign(&mut self, s: StreamShard) -> RedisResult<()> {
126 if let Some((i, _)) = self.streams.iter().enumerate().find(|(_, t)| &s == *t) {
127 self.streams.remove(i);
128 if self.streams.is_empty() {
129 Err(StreamErr::StreamKeyEmpty)
130 } else {
131 Ok(())
132 }
133 } else {
134 Err(StreamErr::StreamKeyNotFound)
135 }
136 }
137
138 fn next(&self) -> NextFuture<'_> {
139 NextFuture {
140 con: self,
141 fut: self.receiver.recv_async(),
142 read: false,
143 }
144 }
145
146 fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
147 StreamFuture::new(self)
148 }
149}
150
151impl RedisConsumer {
152 pub fn group_id(&self) -> Option<&ConsumerGroup> {
154 self.config.group_id.as_ref()
155 }
156
157 pub fn consumer_id(&self) -> Option<&ConsumerId> {
159 self.config.consumer_id.as_ref()
160 }
161
162 pub fn stream_shards(&self) -> &[StreamShard] {
165 &self.streams
166 }
167
168 pub async fn seek_to(&mut self, id: MessageId) -> RedisResult<()> {
170 if self
171 .handle
172 .try_send(CtrlMsg::Rewind(self.streams.clone(), id))
173 .is_ok()
174 {
175 while let Ok(msg) = self.receiver.recv_async().await {
177 let msg = msg?;
178 if msg.stream_key().name() == SEA_STREAMER_INTERNAL && msg.message().size() == 0 {
179 return Ok(());
180 }
181 }
182 }
183 Err(StreamErr::Backend(RedisErr::ConsumerDied))
184 }
185
186 #[inline]
187 pub fn ack(&self, msg: &SharedMessage) -> RedisResult<()> {
189 if self.config.auto_ack {
190 return Err(StreamErr::Backend(RedisErr::InvalidClientConfig(
191 "Please do not set AutoCommit to Delayed.".to_owned(),
192 )));
193 }
194 self.auto_ack(msg.header())
195 }
196
197 pub fn ack_with(
198 &self,
199 (stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
200 ) -> RedisResult<()> {
201 if self.config.auto_ack {
202 return Err(StreamErr::Backend(RedisErr::InvalidClientConfig(
203 "Please do not set AutoCommit to Delayed.".to_owned(),
204 )));
205 }
206 if self
208 .handle
209 .try_send(CtrlMsg::Ack(
210 (stream_key.clone(), *shard_id),
211 from_seq_no(*sequence),
212 Timestamp::now_utc(),
213 ))
214 .is_ok()
215 {
216 Ok(())
217 } else {
218 Err(StreamErr::Backend(RedisErr::ConsumerDied))
219 }
220 }
221
222 fn auto_ack(&self, header: &MessageHeader) -> RedisResult<()> {
223 if self
225 .handle
226 .try_send(CtrlMsg::Ack(
227 (header.stream_key().clone(), *header.shard_id()),
228 get_message_id(header),
229 Timestamp::now_utc(),
230 ))
231 .is_ok()
232 {
233 Ok(())
234 } else {
235 Err(StreamErr::Backend(RedisErr::ConsumerDied))
236 }
237 }
238
239 pub fn commit(&mut self) -> RedisResult<impl Future<Output = RedisResult<()>>> {
241 if self.config.pre_fetch {
242 return Err(StreamErr::Backend(RedisErr::InvalidClientConfig(
243 "Manual commit is not allowed. Please use another AutoCommit option.".to_owned(),
244 )));
245 }
246 let (sender, notify) = bounded(1);
247 if self.handle.try_send(CtrlMsg::Commit(sender)).is_ok() {
249 Ok(notify.into_recv_async().map(|res| match res {
250 Ok(Ok(res)) => Ok(res),
251 Ok(Err(err)) => Err(err),
252 Err(_) => Err(StreamErr::Backend(RedisErr::ConsumerDied)),
253 }))
254 } else {
255 Err(StreamErr::Backend(RedisErr::ConsumerDied))
256 }
257 }
258
259 pub fn commit_asap(&mut self) -> RedisResult<()> {
261 let (sender, _) = bounded(1);
262 if self.handle.try_send(CtrlMsg::Commit(sender)).is_ok() {
264 Ok(())
265 } else {
266 Err(StreamErr::Backend(RedisErr::ConsumerDied))
267 }
268 }
269
270 pub async fn end(self) -> RedisResult<()> {
272 let (sender, notify) = bounded(1);
273 if self.handle.send_async(CtrlMsg::Kill(sender)).await.is_ok() {
274 let receiver = self.receiver;
275 spawn_task(async move { while receiver.recv_async().await.is_ok() {} });
277 notify.recv_async().await.ok();
278 }
279 Ok(())
280 }
281}
282
283pub(crate) async fn create_consumer(
284 mut conn: RedisCluster,
285 mut options: RedisConsumerOptions,
286 streams: Vec<StreamKey>,
287) -> RedisResult<RedisConsumer> {
288 let mode = *options.mode()?;
289 if mode != ConsumerMode::RealTime {
290 if options.consumer_group().is_err() {
291 options.set_consumer_group(group_id(&mode))?;
292 }
293 if options.consumer_id().is_none() {
294 options.set_consumer_id(match mode {
295 ConsumerMode::Resumable => ConsumerId::new(options.consumer_group()?.name()),
296 ConsumerMode::LoadBalanced => consumer_id(),
297 _ => unreachable!(),
298 });
299 }
300 }
301
302 if options.shard_ownership() == &ShardOwnership::Owned {
303 todo!("Hopefully this will come out in the next release.");
304 }
305
306 let options = Arc::new(options);
307 conn.reconnect_all().await?;
308 let mut shards = Vec::new();
309 for stream in streams {
310 shards.extend(discover_shards(&mut conn, stream).await?);
311 }
312 let stream_shards = shards.iter().map(|s| s.stream.clone()).collect();
313
314 let dur = conn.options.timeout().unwrap_or(DEFAULT_TIMEOUT);
315 let enable_cluster = conn.options.enable_cluster();
316 let config: ConsumerConfig = options.as_ref().into();
317 let (sender, receiver) = if config.pre_fetch {
318 bounded(0)
323 } else {
324 unbounded()
326 };
327 let (handle, response) = unbounded();
328 let (status, ready) = bounded(1);
329
330 if enable_cluster {
331 let cluster = Cluster::new(options.clone(), shards, sender)?;
332 spawn_task(cluster.run(conn, response, status));
333 } else {
334 if conn.cluster.nodes().len() != 1 {
335 return Err(StreamErr::Connect(
336 "There are multiple nodes in streamer URI, please enable the cluster option"
337 .to_owned(),
338 ));
339 }
340 let node = Node::new(conn, options.clone(), shards, handle.clone(), sender)?;
341 spawn_task(node.run(response, status));
342 }
343
344 match timeout(dur, ready.recv_async()).await {
345 Ok(Ok(StatusMsg::Ready)) => Ok(RedisConsumer {
346 config,
347 streams: stream_shards,
348 receiver,
349 handle,
350 }),
351 _ => Err(StreamErr::Connect(format!(
352 "Failed to initialize {}",
353 if enable_cluster { "cluster" } else { "node" }
354 ))),
355 }
356}
357
358pub fn group_id(mode: &ConsumerMode) -> ConsumerGroup {
360 let id = format!(
361 "{}:{}",
362 host_id(),
363 match mode {
364 ConsumerMode::RealTime => "!",
365 ConsumerMode::Resumable => "r",
366 ConsumerMode::LoadBalanced => "s",
367 }
368 );
369 ConsumerGroup::new(id)
370}
371
372pub fn consumer_id() -> ConsumerId {
374 let thread_id = format!("{:?}", std::thread::current().id());
375 let thread_id = thread_id
376 .trim_start_matches("ThreadId(")
377 .trim_end_matches(')');
378 let id = format!(
379 "{}:{}:{}:{}",
380 host_id(),
381 std::process::id(),
382 thread_id,
383 Timestamp::now_utc().unix_timestamp_nanos(),
384 );
385 ConsumerId::new(id)
386}