Skip to main content

sea_streamer_kafka/
consumer.rs

1use rdkafka::{
2    config::ClientConfig,
3    consumer::{CommitMode, Consumer, DefaultConsumerContext, MessageStream as RawMessageStream},
4    message::BorrowedMessage as RawMessage,
5    Message as KafkaMessageTrait, Offset, TopicPartitionList,
6};
7use sea_streamer_runtime::spawn_blocking;
8use std::{collections::HashSet, fmt::Debug, time::Duration};
9
10use sea_streamer_types::{
11    export::futures::{
12        future::Map,
13        stream::{Map as StreamMap, StreamFuture},
14        FutureExt, StreamExt,
15    },
16    runtime_error, Consumer as ConsumerTrait, ConsumerGroup, ConsumerMode, ConsumerOptions,
17    Message, Payload, SeqNo, SeqPos, ShardId, StreamErr, StreamKey, StreamerUri, Timestamp,
18};
19
20use crate::{
21    cluster_uri, impl_into_string, stream_err, BaseOptionKey, KafkaConnectOptions, KafkaErr,
22    KafkaResult, DEFAULT_TIMEOUT,
23};
24
25pub struct KafkaConsumer {
26    mode: ConsumerMode,
27    inner: Option<RawConsumer>,
28    streams: Vec<(StreamKey, ShardId)>,
29}
30
31/// rdkafka's StreamConsumer type
32pub type RawConsumer = rdkafka::consumer::StreamConsumer<
33    rdkafka::consumer::DefaultConsumerContext,
34    crate::KafkaAsyncRuntime,
35>;
36
37#[repr(transparent)]
38pub struct KafkaMessage<'a>(RawMessage<'a>);
39
40const ZERO: ShardId = ShardId::new(0);
41
42#[derive(Debug, Default, Clone)]
43pub struct KafkaConsumerOptions {
44    mode: ConsumerMode,
45    group_id: Option<ConsumerGroup>,
46    session_timeout: Option<Duration>,
47    auto_offset_reset: Option<AutoOffsetReset>,
48    enable_auto_commit: Option<bool>,
49    auto_commit_interval: Option<Duration>,
50    enable_auto_offset_store: Option<bool>,
51    custom_options: Vec<(String, String)>,
52}
53
54#[derive(Debug, Copy, Clone, PartialEq, Eq)]
55pub enum KafkaConsumerOptionKey {
56    GroupId,
57    SessionTimeout,
58    AutoOffsetReset,
59    EnableAutoCommit,
60    AutoCommitInterval,
61    EnableAutoOffsetStore,
62}
63
64type OptionKey = KafkaConsumerOptionKey;
65
66#[derive(Debug, Copy, Clone, PartialEq, Eq)]
67pub enum AutoOffsetReset {
68    /// Automatically reset the offset to the earliest offset.
69    Earliest,
70    /// Automatically reset the offset to the latest offset.
71    Latest,
72    /// Throw exception to the consumer if no previous offset is found for the consumer's group.
73    NoReset,
74}
75
76/// Concrete type of Future that will yield the next message.
77pub type NextFuture<'a> = Map<
78    StreamFuture<RawMessageStream<'a, DefaultConsumerContext>>,
79    fn(
80        (
81            Option<Result<RawMessage<'a>, KafkaErr>>,
82            RawMessageStream<'a, DefaultConsumerContext>,
83        ),
84    ) -> KafkaResult<KafkaMessage<'a>>,
85>;
86
87/// Concrete type of Stream that will yield the next messages.
88pub type KafkaMessageStream<'a> = StreamMap<
89    RawMessageStream<'a, DefaultConsumerContext>,
90    fn(Result<RawMessage<'a>, KafkaErr>) -> KafkaResult<KafkaMessage<'a>>,
91>;
92
93impl KafkaConsumerOptions {
94    /// A unique string that identifies the consumer group this consumer belongs to.
95    /// This property is required if the consumer uses either the group management functionality
96    /// by using subscribe(topic) or the Kafka-based offset management strategy.
97    ///
98    /// <https://kafka.apache.org/documentation/#connectconfigs_group.id>
99    pub fn set_group_id(&mut self, id: ConsumerGroup) -> &mut Self {
100        self.group_id = Some(id);
101        self
102    }
103    pub fn group_id(&self) -> Option<&ConsumerGroup> {
104        self.group_id.as_ref()
105    }
106
107    /// The timeout used to detect worker failures. The worker sends periodic heartbeats
108    /// to indicate its liveness to the broker. If no heartbeats are received by the broker
109    /// before the expiration of this session timeout, then the broker will remove the worker
110    /// from the group and initiate a rebalance.
111    ///
112    /// <https://kafka.apache.org/documentation/#connectconfigs_session.timeout.ms>
113    pub fn set_session_timeout(&mut self, v: Duration) -> &mut Self {
114        self.session_timeout = Some(v);
115        self
116    }
117    pub fn session_timeout(&self) -> Option<&Duration> {
118        self.session_timeout.as_ref()
119    }
120
121    /// Where to stream from when there is no initial offset in Kafka or if the current offset does
122    /// not exist any more on the server.
123    ///
124    /// If unset, defaults to Latest.
125    ///
126    /// <https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset>
127    pub fn set_auto_offset_reset(&mut self, v: AutoOffsetReset) -> &mut Self {
128        self.auto_offset_reset = Some(v);
129        self
130    }
131    pub fn auto_offset_reset(&self) -> Option<&AutoOffsetReset> {
132        self.auto_offset_reset.as_ref()
133    }
134
135    /// If enabled, the consumer's offset will be periodically committed in the background.
136    ///
137    /// If unset, defaults to true.
138    ///
139    /// <https://kafka.apache.org/documentation/#consumerconfigs_enable.auto.commit>
140    pub fn set_enable_auto_commit(&mut self, v: bool) -> &mut Self {
141        self.enable_auto_commit = Some(v);
142        self
143    }
144    pub fn enable_auto_commit(&self) -> Option<&bool> {
145        self.enable_auto_commit.as_ref()
146    }
147
148    /// The interval for offsets to be auto-committed. If `enable_auto_commit` is set to false,
149    /// this will have no effect.
150    ///
151    /// <https://kafka.apache.org/documentation/#consumerconfigs_auto.commit.interval.ms>
152    pub fn set_auto_commit_interval(&mut self, v: Duration) -> &mut Self {
153        self.auto_commit_interval = Some(v);
154        self
155    }
156    pub fn auto_commit_interval(&self) -> Option<&Duration> {
157        self.auto_commit_interval.as_ref()
158    }
159
160    /// If enabled, the consumer's offset will be updated as the messages are *read*. This does
161    /// not equate to them being *processed*. So if you want to make sure to commit only what
162    /// have been processed, set it to false. You will have to manually update these offsets.
163    ///
164    /// If unset, defaults to true.
165    pub fn set_enable_auto_offset_store(&mut self, v: bool) -> &mut Self {
166        self.enable_auto_offset_store = Some(v);
167        self
168    }
169    pub fn enable_auto_offset_store(&self) -> Option<&bool> {
170        self.enable_auto_offset_store.as_ref()
171    }
172
173    /// Add a custom option. If you have an option you frequently use,
174    /// please consider open a PR and add it to above.
175    pub fn add_custom_option<K, V>(&mut self, key: K, value: V) -> &mut Self
176    where
177        K: Into<String>,
178        V: Into<String>,
179    {
180        self.custom_options.push((key.into(), value.into()));
181        self
182    }
183    pub fn custom_options(&self) -> impl Iterator<Item = (&str, &str)> {
184        self.custom_options
185            .iter()
186            .map(|(k, v)| (k.as_str(), v.as_str()))
187    }
188
189    fn make_client_config(&self, client_config: &mut ClientConfig) {
190        if let Some(group_id) = &self.group_id {
191            client_config.set(OptionKey::GroupId, group_id.name());
192        } else {
193            // https://github.com/edenhill/librdkafka/issues/3261
194            // librdkafka always require a group_id even when not joining a consumer group
195            // But this is purely a client side issue
196            client_config.set(OptionKey::GroupId, "abcdefg");
197        }
198        if let Some(v) = self.session_timeout {
199            client_config.set(OptionKey::SessionTimeout, format!("{}", v.as_millis()));
200        }
201        if let Some(v) = self.auto_offset_reset {
202            client_config.set(OptionKey::AutoOffsetReset, v);
203        }
204        if let Some(v) = self.enable_auto_commit {
205            client_config.set(OptionKey::EnableAutoCommit, v.to_string());
206        }
207        if let Some(v) = self.auto_commit_interval {
208            client_config.set(OptionKey::AutoCommitInterval, format!("{}", v.as_millis()));
209        }
210        if let Some(v) = self.enable_auto_offset_store {
211            client_config.set(OptionKey::EnableAutoOffsetStore, v.to_string());
212        }
213        for (key, value) in self.custom_options() {
214            client_config.set(key, value);
215        }
216    }
217}
218
219impl OptionKey {
220    pub fn as_str(&self) -> &'static str {
221        match self {
222            Self::GroupId => "group.id",
223            Self::SessionTimeout => "session.timeout.ms",
224            Self::AutoOffsetReset => "auto.offset.reset",
225            Self::EnableAutoCommit => "enable.auto.commit",
226            Self::AutoCommitInterval => "auto.commit.interval.ms",
227            Self::EnableAutoOffsetStore => "enable.auto.offset.store",
228        }
229    }
230}
231
232impl AutoOffsetReset {
233    pub fn as_str(&self) -> &'static str {
234        match self {
235            Self::Earliest => "earliest",
236            Self::Latest => "latest",
237            Self::NoReset => "none",
238        }
239    }
240}
241
242impl_into_string!(OptionKey);
243impl_into_string!(AutoOffsetReset);
244
245impl std::fmt::Debug for KafkaConsumer {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        f.debug_struct("KafkaConsumer").finish()
248    }
249}
250
251impl ConsumerTrait for KafkaConsumer {
252    type Error = KafkaErr;
253    type Message<'a> = KafkaMessage<'a>;
254    // See we don't actually have to Box these! Looking forward to `type_alias_impl_trait`
255    type NextFuture<'a> = NextFuture<'a>;
256    type Stream<'a> = KafkaMessageStream<'a>;
257
258    /// Seek all streams to the given point in time, with all partitions assigned.
259    ///
260    /// # Warning
261    ///
262    /// This async method is not cancel safe. You must await this future,
263    /// and this Consumer will be unusable for any operations until it finishes.
264    #[inline]
265    async fn seek(&mut self, timestamp: Timestamp) -> KafkaResult<()> {
266        self.seek_with_timeout(timestamp, DEFAULT_TIMEOUT).await
267    }
268
269    /// Rewind all streams across all assigned partitions.
270    /// Call [`Consumer::assign`] to assign a partition beforehand,
271    /// or [`KafkaConsumer::reassign_partitions`] to assign all partitions.
272    async fn rewind(&mut self, offset: SeqPos) -> KafkaResult<()> {
273        let mut tpl = TopicPartitionList::new();
274
275        for (stream, shard) in self.streams.iter() {
276            tpl.add_partition_offset(
277                stream.name(),
278                shard.id() as i32,
279                match offset {
280                    SeqPos::Beginning => Offset::Beginning,
281                    SeqPos::End => Offset::End,
282                    SeqPos::At(seq) => Offset::Offset(seq.try_into().expect("u64 out of range")),
283                },
284            )
285            .map_err(stream_err)?;
286        }
287
288        self.get().assign(&tpl).map_err(stream_err)?;
289        Ok(())
290    }
291
292    fn assign(&mut self, (stream, shard): (StreamKey, ShardId)) -> KafkaResult<()> {
293        if !self.streams.iter().any(|(s, _)| s == &stream) {
294            return Err(StreamErr::StreamKeyNotFound);
295        }
296        if !self
297            .streams
298            .iter()
299            .any(|(s, t)| (s, t) == (&stream, &shard))
300        {
301            self.streams.push((stream, shard));
302        }
303        Ok(())
304    }
305
306    fn unassign(&mut self, s: (StreamKey, ShardId)) -> KafkaResult<()> {
307        if let Some((i, _)) = self.streams.iter().enumerate().find(|(_, t)| &s == *t) {
308            self.streams.remove(i);
309            if self.streams.is_empty() {
310                Err(StreamErr::StreamKeyEmpty)
311            } else {
312                Ok(())
313            }
314        } else {
315            Err(StreamErr::StreamKeyNotFound)
316        }
317    }
318
319    fn next(&self) -> Self::NextFuture<'_> {
320        self.get().stream().into_future().map(|(res, _)| match res {
321            Some(res) => Self::process(res),
322            None => panic!("Kafka stream never ends"),
323        })
324    }
325
326    /// Note: Kafka stream never ends.
327    fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
328        self.get().stream().map(Self::process)
329    }
330}
331
332impl KafkaConsumer {
333    /// Get the underlying StreamConsumer
334    #[inline]
335    fn get(&self) -> &RawConsumer {
336        self.inner
337            .as_ref()
338            .expect("Client is still inside an async operation, please await the future")
339    }
340
341    /// Borrow the inner KafkaConsumer. Use at your own risk.
342    pub fn inner(&mut self) -> &RawConsumer {
343        self.get()
344    }
345
346    fn process(res: Result<RawMessage, KafkaErr>) -> KafkaResult<KafkaMessage> {
347        match res {
348            Ok(mess) => Ok(KafkaMessage(mess)),
349            Err(err) => Err(StreamErr::Backend(err)),
350        }
351    }
352
353    /// Stream-shards this consumer has been (manually) assigned to.
354    /// Note that since this can be changing due to load-balancing,
355    /// only ZERO is assigned by default.
356    pub fn stream_shards(&self) -> &[(StreamKey, ShardId)] {
357        &self.streams
358    }
359
360    #[inline]
361    async fn async_func<
362        T: Send + 'static,
363        F: FnOnce(&RawConsumer) -> Result<T, KafkaErr> + Send + 'static,
364    >(
365        &mut self,
366        func: F,
367    ) -> KafkaResult<T> {
368        if self.inner.is_none() {
369            panic!("An async operation is still in progress.");
370        }
371
372        // Sadly, `commit` is a sync function, but we want an async API
373        // to await when transaction finishes. To avoid Client being dropped while
374        // committing, we transfer the ownership into the handler thread,
375        // and we take back the ownership after it finishes.
376        // Meanwhile, any Client operation will panic.
377        // This constraint should be held by the `&mut` signature of this method,
378        // but if someone ignores or discards this future, this Consumer will be broken.
379        let client = self.inner.take().unwrap();
380        let inner = spawn_blocking(move || match func(&client) {
381            Ok(res) => Ok((res, client)),
382            Err(err) => Err((err, client)),
383        })
384        .await
385        .map_err(runtime_error)?;
386
387        match inner {
388            Ok((res, inner)) => {
389                self.inner = Some(inner);
390                Ok(res)
391            }
392            Err((err, inner)) => {
393                self.inner = Some(inner);
394                Err(stream_err(err))
395            }
396        }
397    }
398
399    /// Fetch the partition list of subscribed topics and assign all partitions.
400    pub async fn reassign_partitions(&mut self) -> KafkaResult<()> {
401        let current: HashSet<StreamKey> = self.streams.iter().map(|(s, _)| s.clone()).collect();
402        let mut streams = Vec::new();
403        for stream_key in current {
404            let s = stream_key.clone();
405            let raw = self
406                .async_func(move |c| c.fetch_metadata(Some(s.name()), DEFAULT_TIMEOUT))
407                .await?;
408            let partitions = raw
409                .topics()
410                .first()
411                .unwrap()
412                .partitions()
413                .iter()
414                .map(|p| p.id() as u64);
415            for p in partitions {
416                streams.push((stream_key.clone(), ShardId::new(p)));
417            }
418        }
419        if streams.is_empty() {
420            return Err(StreamErr::Backend(KafkaErr::Subscription(
421                "No partitions found.".to_owned(),
422            )));
423        }
424        self.streams = streams;
425        Ok(())
426    }
427
428    /// Seek all streams to the given point in time, with all partitions assigned.
429    ///
430    /// # Warning
431    ///
432    /// This async method is not cancel safe. You must await this future,
433    /// and this Consumer will be unusable for any operations until it finishes.
434    async fn seek_with_timeout(
435        &mut self,
436        timestamp: Timestamp,
437        timeout: Duration,
438    ) -> KafkaResult<()> {
439        self.reassign_partitions().await?;
440
441        let mut tpl = TopicPartitionList::new();
442
443        for (stream, shard) in self.streams.iter() {
444            tpl.add_partition_offset(
445                stream.name(),
446                shard.id() as i32,
447                Offset::Offset(
448                    (timestamp.unix_timestamp_nanos() / 1_000_000)
449                        .try_into()
450                        .expect("KafkaConsumer::seek: timestamp out of range"),
451                ),
452            )
453            .map_err(stream_err)?;
454        }
455
456        let tpl = self
457            .async_func(move |c| c.offsets_for_times(tpl, timeout))
458            .await?;
459
460        self.inner
461            .as_mut()
462            .unwrap()
463            .assign(&tpl)
464            .map_err(stream_err)?;
465
466        Ok(())
467    }
468
469    /// Commit an "ack" to broker for having processed this message.
470    ///
471    /// # Warning
472    ///
473    /// This async method is not cancel safe. You must await this future,
474    /// and this Consumer will be unusable for any operations until it finishes.
475    pub async fn commit_message(&mut self, mess: &KafkaMessage<'_>) -> KafkaResult<()> {
476        self.commit(&mess.stream_key(), &mess.shard_id(), &mess.sequence())
477            .await
478    }
479
480    /// Commit an "ack" to broker for having processed up to this cursor.
481    ///
482    /// # Warning
483    ///
484    /// This async method is not cancel safe. You must await this future,
485    /// and this Consumer will be unusable for any operations until it finishes.
486    pub async fn commit_with(
487        &mut self,
488        (stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
489    ) -> KafkaResult<()> {
490        self.commit(stream_key, shard_id, sequence).await
491    }
492
493    /// Commit an "ack" to broker for having processed up to this cursor.
494    ///
495    /// # Warning
496    ///
497    /// This async method is not cancel safe. You must await this future,
498    /// and this Consumer will be unusable for any operations until it finishes.
499    pub async fn commit(
500        &mut self,
501        stream: &StreamKey,
502        shard: &ShardId,
503        seq: &SeqNo,
504    ) -> KafkaResult<()> {
505        if self.mode == ConsumerMode::RealTime {
506            return Err(StreamErr::CommitNotAllowed);
507        }
508        let mut tpl = TopicPartitionList::new();
509        tpl.add_partition_offset(
510            stream.name(),
511            shard.id() as i32,
512            Offset::Offset((*seq).try_into().expect("u64 out of range")),
513        )
514        .map_err(stream_err)?;
515
516        self.async_func(move |c| c.commit(&tpl, CommitMode::Sync))
517            .await
518    }
519
520    /// Store the offset so that it will be committed.
521    /// You must have `set_enable_auto_offset_store` to false.
522    pub fn store_offset(
523        &mut self,
524        stream: &StreamKey,
525        shard: &ShardId,
526        seq: &SeqNo,
527    ) -> KafkaResult<()> {
528        self.get()
529            .store_offset(
530                stream.name(),
531                shard.id() as i32,
532                (*seq).try_into().expect("u64 out of range"),
533            )
534            .map_err(stream_err)
535    }
536
537    /// Store the offset for this message so that it will be committed.
538    /// You must have `set_enable_auto_offset_store` to false.
539    pub fn store_offset_for_message(&mut self, mess: &KafkaMessage<'_>) -> KafkaResult<()> {
540        self.store_offset(&mess.stream_key(), &mess.shard_id(), &mess.sequence())
541    }
542
543    /// Store the offset with message identifier so that it will be committed.
544    /// You must have `set_enable_auto_offset_store` to false.
545    pub fn store_offset_with(
546        &mut self,
547        (stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
548    ) -> KafkaResult<()> {
549        self.store_offset(stream_key, shard_id, sequence)
550    }
551}
552
553impl<'a> KafkaMessage<'a> {
554    fn mess(&self) -> &RawMessage {
555        &self.0
556    }
557}
558
559impl<'a> Debug for KafkaMessage<'a> {
560    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561        self.mess().fmt(f)
562    }
563}
564
565impl<'a> Message for KafkaMessage<'a> {
566    fn stream_key(&self) -> StreamKey {
567        StreamKey::new(self.mess().topic()).expect("A message should carry a valid stream key")
568    }
569
570    fn shard_id(&self) -> ShardId {
571        ShardId::new(self.mess().partition() as u64)
572    }
573
574    fn sequence(&self) -> SeqNo {
575        self.mess().offset() as SeqNo
576    }
577
578    fn timestamp(&self) -> Timestamp {
579        Timestamp::from_unix_timestamp_nanos(
580            self.mess()
581                .timestamp()
582                .to_millis()
583                .expect("message.timestamp() is None") as i128
584                * 1_000_000,
585        )
586        .expect("from_unix_timestamp_nanos")
587    }
588
589    fn message(&self) -> Payload {
590        Payload::new(self.mess().payload().unwrap_or_default())
591    }
592}
593
594impl ConsumerOptions for KafkaConsumerOptions {
595    type Error = KafkaErr;
596
597    fn new(mode: ConsumerMode) -> Self {
598        KafkaConsumerOptions {
599            mode,
600            ..Default::default()
601        }
602    }
603
604    fn mode(&self) -> KafkaResult<&ConsumerMode> {
605        Ok(&self.mode)
606    }
607
608    fn consumer_group(&self) -> KafkaResult<&ConsumerGroup> {
609        self.group_id.as_ref().ok_or(StreamErr::ConsumerGroupNotSet)
610    }
611
612    /// If multiple consumers shares the same group, only one consumer in the group will
613    /// receive a message, i.e. it is load-balanced.
614    ///
615    /// However, the load-balancing mechanism is what makes Kafka different:
616    ///
617    /// Each stream is divided into multiple shards (known as partition),
618    /// and each partition will be assigned to only one consumer in a group.
619    ///
620    /// Say there are 2 consumers (in the group) and 2 partitions, then each consumer
621    /// will receive messages from one partition, and they are thus load-balanced.
622    ///
623    /// If there are 2 consumers and 3 partitions, then one consumer will be assigned
624    /// 2 partitions, and the other will be assigned only 1.
625    ///
626    /// However if the stream has only 1 partition, even if there are many consumers,
627    /// these messages will only be received by the assigned consumer, and other consumers
628    /// will be in stand-by mode, resulting in a hot-failover setup.
629    fn set_consumer_group(&mut self, group: ConsumerGroup) -> KafkaResult<&mut Self> {
630        self.group_id = Some(group);
631        Ok(self)
632    }
633}
634
635pub(crate) fn create_consumer(
636    streamer: &StreamerUri,
637    base_options: &KafkaConnectOptions,
638    options: &KafkaConsumerOptions,
639    streams: Vec<StreamKey>,
640) -> Result<KafkaConsumer, KafkaErr> {
641    let mut client_config = ClientConfig::new();
642    client_config.set(BaseOptionKey::BootstrapServers, cluster_uri(streamer)?);
643    base_options.make_client_config(&mut client_config);
644    options.make_client_config(&mut client_config);
645
646    let consumer: RawConsumer = client_config.create()?;
647
648    if !streams.is_empty() {
649        let topics: Vec<&str> = streams.iter().map(|s| s.name()).collect();
650        consumer.subscribe(&topics)?;
651    } else {
652        panic!("no topic?");
653    }
654
655    Ok(KafkaConsumer {
656        mode: options.mode,
657        inner: Some(consumer),
658        streams: streams.into_iter().map(|s| (s, ZERO)).collect(),
659    })
660}