sea_streamer_redis/consumer/
mod.rs

1mod cluster;
2mod future;
3mod node;
4mod options;
5mod shard;
6
7use cluster::*;
8use future::StreamFuture;
9pub use future::{NextFuture, StreamFuture as RedisMessageStream};
10use node::*;
11pub use options::*;
12use shard::*;
13
14use flume::{bounded, unbounded, Receiver, Sender};
15use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};
16
17use crate::{
18    from_seq_no, get_message_id, host_id, MessageField, MessageId, RedisCluster, RedisErr,
19    RedisResult, TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID,
20};
21use sea_streamer_runtime::{spawn_task, timeout};
22use sea_streamer_types::{
23    export::futures::FutureExt, Buffer, ConnectOptions, Consumer, ConsumerGroup, ConsumerId,
24    ConsumerMode, ConsumerOptions, Message, MessageHeader, SeqNo, SeqPos, ShardId, SharedMessage,
25    StreamErr, StreamKey, Timestamp, SEA_STREAMER_INTERNAL,
26};
27
28#[derive(Debug)]
29/// The Redis Consumer.
30pub struct RedisConsumer {
31    config: ConsumerConfig,
32    streams: Vec<StreamShard>,
33    receiver: Receiver<RedisResult<SharedMessage>>,
34    handle: Sender<CtrlMsg>,
35}
36
37#[derive(Debug, Clone)]
38/// Options for Consumers, including mode, group and other streaming mechanisms.
39pub struct RedisConsumerOptions {
40    mode: ConsumerMode,
41    group_id: Option<ConsumerGroup>,
42    consumer_id: Option<ConsumerId>,
43    consumer_timeout: Option<Duration>,
44    auto_stream_reset: AutoStreamReset,
45    auto_commit: AutoCommit,
46    auto_commit_delay: Duration,
47    auto_commit_interval: Duration,
48    auto_claim_interval: Option<Duration>,
49    auto_claim_idle: Duration,
50    batch_size: usize,
51    shard_ownership: ShardOwnership,
52    mkstream: bool,
53    pub(crate) timestamp_format: TimestampFormat,
54    pub(crate) message_field: MessageField,
55}
56
57#[derive(Debug)]
58struct ConsumerConfig {
59    group_id: Option<ConsumerGroup>,
60    consumer_id: Option<ConsumerId>,
61    auto_ack: bool,
62    pre_fetch: bool,
63    timestamp_format: TimestampFormat,
64}
65
66/// More constants used throughout SeaStreamer Redis.
67pub mod constants {
68    use std::time::Duration;
69
70    pub const DEFAULT_AUTO_COMMIT_DELAY: Duration = Duration::from_secs(5);
71    pub const DEFAULT_AUTO_COMMIT_INTERVAL: Duration = Duration::from_secs(1);
72    pub const DEFAULT_AUTO_CLAIM_INTERVAL: Duration = Duration::from_secs(30);
73    pub const DEFAULT_AUTO_CLAIM_IDLE: Duration = Duration::from_secs(60);
74    pub const DEFAULT_BATCH_SIZE: usize = 100;
75    pub const DEFAULT_LOAD_BALANCED_BATCH_SIZE: usize = 10;
76    #[cfg(feature = "test")]
77    pub const HEARTBEAT: Duration = Duration::from_secs(1);
78    #[cfg(not(feature = "test"))]
79    pub const HEARTBEAT: Duration = Duration::from_secs(10);
80}
81
82impl Consumer for RedisConsumer {
83    type Error = RedisErr;
84    type Message<'a> = SharedMessage;
85    type NextFuture<'a> = NextFuture<'a>;
86    type Stream<'a> = StreamFuture<'a>;
87
88    #[inline]
89    async fn seek(&mut self, ts: Timestamp) -> RedisResult<()> {
90        self.seek_to((
91            (match self.config.timestamp_format {
92                TimestampFormat::UnixTimestampMillis => ts.unix_timestamp_nanos() / 1_000_000,
93                #[cfg(feature = "nanosecond-timestamp")]
94                TimestampFormat::UnixTimestampNanos => ts.unix_timestamp_nanos(),
95            }) as u64,
96            u16::MAX,
97        ))
98        .await
99    }
100
101    #[inline]
102    async fn rewind(&mut self, pos: SeqPos) -> RedisResult<()> {
103        self.seek_to(match pos {
104            SeqPos::Beginning => (0, 0),
105            SeqPos::End => MAX_MSG_ID,
106            SeqPos::At(no) => from_seq_no(no),
107        })
108        .await
109    }
110
111    fn assign(&mut self, (stream, shard): StreamShard) -> RedisResult<()> {
112        if !self.streams.iter().any(|(s, _)| s == &stream) {
113            return Err(StreamErr::StreamKeyNotFound);
114        }
115        if !self
116            .streams
117            .iter()
118            .any(|(s, t)| (s, t) == (&stream, &shard))
119        {
120            self.streams.push((stream, shard));
121        }
122        Ok(())
123    }
124
125    fn unassign(&mut self, s: StreamShard) -> RedisResult<()> {
126        if let Some((i, _)) = self.streams.iter().enumerate().find(|(_, t)| &s == *t) {
127            self.streams.remove(i);
128            if self.streams.is_empty() {
129                Err(StreamErr::StreamKeyEmpty)
130            } else {
131                Ok(())
132            }
133        } else {
134            Err(StreamErr::StreamKeyNotFound)
135        }
136    }
137
138    fn next(&self) -> NextFuture<'_> {
139        NextFuture {
140            con: self,
141            fut: self.receiver.recv_async(),
142            read: false,
143        }
144    }
145
146    fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
147        StreamFuture::new(self)
148    }
149}
150
151impl RedisConsumer {
152    /// Get the assigned group id.
153    pub fn group_id(&self) -> Option<&ConsumerGroup> {
154        self.config.group_id.as_ref()
155    }
156
157    /// Get the assigned consumer id.
158    pub fn consumer_id(&self) -> Option<&ConsumerId> {
159        self.config.consumer_id.as_ref()
160    }
161
162    /// Return the stream-shards this consumer has been assigned.
163    /// On create, it will self-assign all shards.
164    pub fn stream_shards(&self) -> &[StreamShard] {
165        &self.streams
166    }
167
168    /// Like `Consumer::seek`, but with `MessageId`.
169    pub async fn seek_to(&mut self, id: MessageId) -> RedisResult<()> {
170        if self
171            .handle
172            .try_send(CtrlMsg::Rewind(self.streams.clone(), id))
173            .is_ok()
174        {
175            // we drain all messages until hitting the latch
176            while let Ok(msg) = self.receiver.recv_async().await {
177                let msg = msg?;
178                if msg.stream_key().name() == SEA_STREAMER_INTERNAL && msg.message().size() == 0 {
179                    return Ok(());
180                }
181            }
182        }
183        Err(StreamErr::Backend(RedisErr::ConsumerDied))
184    }
185
186    #[inline]
187    /// Mark a message as read. The ACK will be queued for commit.
188    pub fn ack(&self, msg: &SharedMessage) -> RedisResult<()> {
189        if self.config.auto_ack {
190            return Err(StreamErr::Backend(RedisErr::InvalidClientConfig(
191                "Please do not set AutoCommit to Delayed.".to_owned(),
192            )));
193        }
194        self.auto_ack(msg.header())
195    }
196
197    pub fn ack_with(
198        &self,
199        (stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
200    ) -> RedisResult<()> {
201        if self.config.auto_ack {
202            return Err(StreamErr::Backend(RedisErr::InvalidClientConfig(
203                "Please do not set AutoCommit to Delayed.".to_owned(),
204            )));
205        }
206        // unbounded, so never blocks
207        if self
208            .handle
209            .try_send(CtrlMsg::Ack(
210                (stream_key.clone(), *shard_id),
211                from_seq_no(*sequence),
212                Timestamp::now_utc(),
213            ))
214            .is_ok()
215        {
216            Ok(())
217        } else {
218            Err(StreamErr::Backend(RedisErr::ConsumerDied))
219        }
220    }
221
222    fn auto_ack(&self, header: &MessageHeader) -> RedisResult<()> {
223        // unbounded, so never blocks
224        if self
225            .handle
226            .try_send(CtrlMsg::Ack(
227                (header.stream_key().clone(), *header.shard_id()),
228                get_message_id(header),
229                Timestamp::now_utc(),
230            ))
231            .is_ok()
232        {
233            Ok(())
234        } else {
235            Err(StreamErr::Backend(RedisErr::ConsumerDied))
236        }
237    }
238
239    /// Commit all pending acks and (optionally) wait for the result.
240    pub fn commit(&mut self) -> RedisResult<impl Future<Output = RedisResult<()>>> {
241        if self.config.pre_fetch {
242            return Err(StreamErr::Backend(RedisErr::InvalidClientConfig(
243                "Manual commit is not allowed. Please use another AutoCommit option.".to_owned(),
244            )));
245        }
246        let (sender, notify) = bounded(1);
247        // unbounded, so never blocks
248        if self.handle.try_send(CtrlMsg::Commit(sender)).is_ok() {
249            Ok(notify.into_recv_async().map(|res| match res {
250                Ok(Ok(res)) => Ok(res),
251                Ok(Err(err)) => Err(err),
252                Err(_) => Err(StreamErr::Backend(RedisErr::ConsumerDied)),
253            }))
254        } else {
255            Err(StreamErr::Backend(RedisErr::ConsumerDied))
256        }
257    }
258
259    /// Push a Commit request to the command queue, will be executed on the next cycle
260    pub fn commit_asap(&mut self) -> RedisResult<()> {
261        let (sender, _) = bounded(1);
262        // unbounded, so never blocks
263        if self.handle.try_send(CtrlMsg::Commit(sender)).is_ok() {
264            Ok(())
265        } else {
266            Err(StreamErr::Backend(RedisErr::ConsumerDied))
267        }
268    }
269
270    /// Commit all pending acks and end the consumer.
271    pub async fn end(self) -> RedisResult<()> {
272        let (sender, notify) = bounded(1);
273        if self.handle.send_async(CtrlMsg::Kill(sender)).await.is_ok() {
274            let receiver = self.receiver;
275            // drain the channel
276            spawn_task(async move { while receiver.recv_async().await.is_ok() {} });
277            notify.recv_async().await.ok();
278        }
279        Ok(())
280    }
281}
282
283pub(crate) async fn create_consumer(
284    mut conn: RedisCluster,
285    mut options: RedisConsumerOptions,
286    streams: Vec<StreamKey>,
287) -> RedisResult<RedisConsumer> {
288    let mode = *options.mode()?;
289    if mode != ConsumerMode::RealTime {
290        if options.consumer_group().is_err() {
291            options.set_consumer_group(group_id(&mode))?;
292        }
293        if options.consumer_id().is_none() {
294            options.set_consumer_id(match mode {
295                ConsumerMode::Resumable => ConsumerId::new(options.consumer_group()?.name()),
296                ConsumerMode::LoadBalanced => consumer_id(),
297                _ => unreachable!(),
298            });
299        }
300    }
301
302    if options.shard_ownership() == &ShardOwnership::Owned {
303        todo!("Hopefully this will come out in the next release.");
304    }
305
306    let options = Arc::new(options);
307    conn.reconnect_all().await?;
308    let mut shards = Vec::new();
309    for stream in streams {
310        shards.extend(discover_shards(&mut conn, stream).await?);
311    }
312    let stream_shards = shards.iter().map(|s| s.stream.clone()).collect();
313
314    let dur = conn.options.timeout().unwrap_or(DEFAULT_TIMEOUT);
315    let enable_cluster = conn.options.enable_cluster();
316    let config: ConsumerConfig = options.as_ref().into();
317    let (sender, receiver) = if config.pre_fetch {
318        // With pre-fetch, it will only read more if the channel is free.
319        // Zero-capacity channels are always blocking. It means that *at the moment* the consumer
320        // consumes the last item in the buffer, it will proceed to fetch more.
321        // This number could be made configurable in the future.
322        bounded(0)
323    } else {
324        // Without pre-fetch, it only fetches when next is called, aka. on demand.
325        unbounded()
326    };
327    let (handle, response) = unbounded();
328    let (status, ready) = bounded(1);
329
330    if enable_cluster {
331        let cluster = Cluster::new(options.clone(), shards, sender)?;
332        spawn_task(cluster.run(conn, response, status));
333    } else {
334        if conn.cluster.nodes().len() != 1 {
335            return Err(StreamErr::Connect(
336                "There are multiple nodes in streamer URI, please enable the cluster option"
337                    .to_owned(),
338            ));
339        }
340        let node = Node::new(conn, options.clone(), shards, handle.clone(), sender)?;
341        spawn_task(node.run(response, status));
342    }
343
344    match timeout(dur, ready.recv_async()).await {
345        Ok(Ok(StatusMsg::Ready)) => Ok(RedisConsumer {
346            config,
347            streams: stream_shards,
348            receiver,
349            handle,
350        }),
351        _ => Err(StreamErr::Connect(format!(
352            "Failed to initialize {}",
353            if enable_cluster { "cluster" } else { "node" }
354        ))),
355    }
356}
357
358/// Generate a new group id which should uniquely identify this host.
359pub fn group_id(mode: &ConsumerMode) -> ConsumerGroup {
360    let id = format!(
361        "{}:{}",
362        host_id(),
363        match mode {
364            ConsumerMode::RealTime => "!",
365            ConsumerMode::Resumable => "r",
366            ConsumerMode::LoadBalanced => "s",
367        }
368    );
369    ConsumerGroup::new(id)
370}
371
372/// Generate a new consumer id, which should never collide.
373pub fn consumer_id() -> ConsumerId {
374    let thread_id = format!("{:?}", std::thread::current().id());
375    let thread_id = thread_id
376        .trim_start_matches("ThreadId(")
377        .trim_end_matches(')');
378    let id = format!(
379        "{}:{}:{}:{}",
380        host_id(),
381        std::process::id(),
382        thread_id,
383        Timestamp::now_utc().unix_timestamp_nanos(),
384    );
385    ConsumerId::new(id)
386}