1use rdkafka::{
2 config::ClientConfig,
3 consumer::{CommitMode, Consumer, DefaultConsumerContext, MessageStream as RawMessageStream},
4 message::BorrowedMessage as RawMessage,
5 Message as KafkaMessageTrait, Offset, TopicPartitionList,
6};
7use sea_streamer_runtime::spawn_blocking;
8use std::{collections::HashSet, fmt::Debug, time::Duration};
9
10use sea_streamer_types::{
11 export::futures::{
12 future::Map,
13 stream::{Map as StreamMap, StreamFuture},
14 FutureExt, StreamExt,
15 },
16 runtime_error, Consumer as ConsumerTrait, ConsumerGroup, ConsumerMode, ConsumerOptions,
17 Message, Payload, SeqNo, SeqPos, ShardId, StreamErr, StreamKey, StreamerUri, Timestamp,
18};
19
20use crate::{
21 cluster_uri, impl_into_string, stream_err, BaseOptionKey, KafkaConnectOptions, KafkaErr,
22 KafkaResult, DEFAULT_TIMEOUT,
23};
24
25pub struct KafkaConsumer {
26 mode: ConsumerMode,
27 inner: Option<RawConsumer>,
28 streams: Vec<(StreamKey, ShardId)>,
29}
30
31pub type RawConsumer = rdkafka::consumer::StreamConsumer<
33 rdkafka::consumer::DefaultConsumerContext,
34 crate::KafkaAsyncRuntime,
35>;
36
37#[repr(transparent)]
38pub struct KafkaMessage<'a>(RawMessage<'a>);
39
40const ZERO: ShardId = ShardId::new(0);
41
42#[derive(Debug, Default, Clone)]
43pub struct KafkaConsumerOptions {
44 mode: ConsumerMode,
45 group_id: Option<ConsumerGroup>,
46 session_timeout: Option<Duration>,
47 auto_offset_reset: Option<AutoOffsetReset>,
48 enable_auto_commit: Option<bool>,
49 auto_commit_interval: Option<Duration>,
50 enable_auto_offset_store: Option<bool>,
51 custom_options: Vec<(String, String)>,
52}
53
54#[derive(Debug, Copy, Clone, PartialEq, Eq)]
55pub enum KafkaConsumerOptionKey {
56 GroupId,
57 SessionTimeout,
58 AutoOffsetReset,
59 EnableAutoCommit,
60 AutoCommitInterval,
61 EnableAutoOffsetStore,
62}
63
64type OptionKey = KafkaConsumerOptionKey;
65
66#[derive(Debug, Copy, Clone, PartialEq, Eq)]
67pub enum AutoOffsetReset {
68 Earliest,
70 Latest,
72 NoReset,
74}
75
76pub type NextFuture<'a> = Map<
78 StreamFuture<RawMessageStream<'a, DefaultConsumerContext>>,
79 fn(
80 (
81 Option<Result<RawMessage<'a>, KafkaErr>>,
82 RawMessageStream<'a, DefaultConsumerContext>,
83 ),
84 ) -> KafkaResult<KafkaMessage<'a>>,
85>;
86
87pub type KafkaMessageStream<'a> = StreamMap<
89 RawMessageStream<'a, DefaultConsumerContext>,
90 fn(Result<RawMessage<'a>, KafkaErr>) -> KafkaResult<KafkaMessage<'a>>,
91>;
92
93impl KafkaConsumerOptions {
94 pub fn set_group_id(&mut self, id: ConsumerGroup) -> &mut Self {
100 self.group_id = Some(id);
101 self
102 }
103 pub fn group_id(&self) -> Option<&ConsumerGroup> {
104 self.group_id.as_ref()
105 }
106
107 pub fn set_session_timeout(&mut self, v: Duration) -> &mut Self {
114 self.session_timeout = Some(v);
115 self
116 }
117 pub fn session_timeout(&self) -> Option<&Duration> {
118 self.session_timeout.as_ref()
119 }
120
121 pub fn set_auto_offset_reset(&mut self, v: AutoOffsetReset) -> &mut Self {
128 self.auto_offset_reset = Some(v);
129 self
130 }
131 pub fn auto_offset_reset(&self) -> Option<&AutoOffsetReset> {
132 self.auto_offset_reset.as_ref()
133 }
134
135 pub fn set_enable_auto_commit(&mut self, v: bool) -> &mut Self {
141 self.enable_auto_commit = Some(v);
142 self
143 }
144 pub fn enable_auto_commit(&self) -> Option<&bool> {
145 self.enable_auto_commit.as_ref()
146 }
147
148 pub fn set_auto_commit_interval(&mut self, v: Duration) -> &mut Self {
153 self.auto_commit_interval = Some(v);
154 self
155 }
156 pub fn auto_commit_interval(&self) -> Option<&Duration> {
157 self.auto_commit_interval.as_ref()
158 }
159
160 pub fn set_enable_auto_offset_store(&mut self, v: bool) -> &mut Self {
166 self.enable_auto_offset_store = Some(v);
167 self
168 }
169 pub fn enable_auto_offset_store(&self) -> Option<&bool> {
170 self.enable_auto_offset_store.as_ref()
171 }
172
173 pub fn add_custom_option<K, V>(&mut self, key: K, value: V) -> &mut Self
176 where
177 K: Into<String>,
178 V: Into<String>,
179 {
180 self.custom_options.push((key.into(), value.into()));
181 self
182 }
183 pub fn custom_options(&self) -> impl Iterator<Item = (&str, &str)> {
184 self.custom_options
185 .iter()
186 .map(|(k, v)| (k.as_str(), v.as_str()))
187 }
188
189 fn make_client_config(&self, client_config: &mut ClientConfig) {
190 if let Some(group_id) = &self.group_id {
191 client_config.set(OptionKey::GroupId, group_id.name());
192 } else {
193 client_config.set(OptionKey::GroupId, "abcdefg");
197 }
198 if let Some(v) = self.session_timeout {
199 client_config.set(OptionKey::SessionTimeout, format!("{}", v.as_millis()));
200 }
201 if let Some(v) = self.auto_offset_reset {
202 client_config.set(OptionKey::AutoOffsetReset, v);
203 }
204 if let Some(v) = self.enable_auto_commit {
205 client_config.set(OptionKey::EnableAutoCommit, v.to_string());
206 }
207 if let Some(v) = self.auto_commit_interval {
208 client_config.set(OptionKey::AutoCommitInterval, format!("{}", v.as_millis()));
209 }
210 if let Some(v) = self.enable_auto_offset_store {
211 client_config.set(OptionKey::EnableAutoOffsetStore, v.to_string());
212 }
213 for (key, value) in self.custom_options() {
214 client_config.set(key, value);
215 }
216 }
217}
218
219impl OptionKey {
220 pub fn as_str(&self) -> &'static str {
221 match self {
222 Self::GroupId => "group.id",
223 Self::SessionTimeout => "session.timeout.ms",
224 Self::AutoOffsetReset => "auto.offset.reset",
225 Self::EnableAutoCommit => "enable.auto.commit",
226 Self::AutoCommitInterval => "auto.commit.interval.ms",
227 Self::EnableAutoOffsetStore => "enable.auto.offset.store",
228 }
229 }
230}
231
232impl AutoOffsetReset {
233 pub fn as_str(&self) -> &'static str {
234 match self {
235 Self::Earliest => "earliest",
236 Self::Latest => "latest",
237 Self::NoReset => "none",
238 }
239 }
240}
241
242impl_into_string!(OptionKey);
243impl_into_string!(AutoOffsetReset);
244
245impl std::fmt::Debug for KafkaConsumer {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 f.debug_struct("KafkaConsumer").finish()
248 }
249}
250
251impl ConsumerTrait for KafkaConsumer {
252 type Error = KafkaErr;
253 type Message<'a> = KafkaMessage<'a>;
254 type NextFuture<'a> = NextFuture<'a>;
256 type Stream<'a> = KafkaMessageStream<'a>;
257
258 #[inline]
265 async fn seek(&mut self, timestamp: Timestamp) -> KafkaResult<()> {
266 self.seek_with_timeout(timestamp, DEFAULT_TIMEOUT).await
267 }
268
269 async fn rewind(&mut self, offset: SeqPos) -> KafkaResult<()> {
273 let mut tpl = TopicPartitionList::new();
274
275 for (stream, shard) in self.streams.iter() {
276 tpl.add_partition_offset(
277 stream.name(),
278 shard.id() as i32,
279 match offset {
280 SeqPos::Beginning => Offset::Beginning,
281 SeqPos::End => Offset::End,
282 SeqPos::At(seq) => Offset::Offset(seq.try_into().expect("u64 out of range")),
283 },
284 )
285 .map_err(stream_err)?;
286 }
287
288 self.get().assign(&tpl).map_err(stream_err)?;
289 Ok(())
290 }
291
292 fn assign(&mut self, (stream, shard): (StreamKey, ShardId)) -> KafkaResult<()> {
293 if !self.streams.iter().any(|(s, _)| s == &stream) {
294 return Err(StreamErr::StreamKeyNotFound);
295 }
296 if !self
297 .streams
298 .iter()
299 .any(|(s, t)| (s, t) == (&stream, &shard))
300 {
301 self.streams.push((stream, shard));
302 }
303 Ok(())
304 }
305
306 fn unassign(&mut self, s: (StreamKey, ShardId)) -> KafkaResult<()> {
307 if let Some((i, _)) = self.streams.iter().enumerate().find(|(_, t)| &s == *t) {
308 self.streams.remove(i);
309 if self.streams.is_empty() {
310 Err(StreamErr::StreamKeyEmpty)
311 } else {
312 Ok(())
313 }
314 } else {
315 Err(StreamErr::StreamKeyNotFound)
316 }
317 }
318
319 fn next(&self) -> Self::NextFuture<'_> {
320 self.get().stream().into_future().map(|(res, _)| match res {
321 Some(res) => Self::process(res),
322 None => panic!("Kafka stream never ends"),
323 })
324 }
325
326 fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
328 self.get().stream().map(Self::process)
329 }
330}
331
332impl KafkaConsumer {
333 #[inline]
335 fn get(&self) -> &RawConsumer {
336 self.inner
337 .as_ref()
338 .expect("Client is still inside an async operation, please await the future")
339 }
340
341 pub fn inner(&mut self) -> &RawConsumer {
343 self.get()
344 }
345
346 fn process(res: Result<RawMessage, KafkaErr>) -> KafkaResult<KafkaMessage> {
347 match res {
348 Ok(mess) => Ok(KafkaMessage(mess)),
349 Err(err) => Err(StreamErr::Backend(err)),
350 }
351 }
352
353 pub fn stream_shards(&self) -> &[(StreamKey, ShardId)] {
357 &self.streams
358 }
359
360 #[inline]
361 async fn async_func<
362 T: Send + 'static,
363 F: FnOnce(&RawConsumer) -> Result<T, KafkaErr> + Send + 'static,
364 >(
365 &mut self,
366 func: F,
367 ) -> KafkaResult<T> {
368 if self.inner.is_none() {
369 panic!("An async operation is still in progress.");
370 }
371
372 let client = self.inner.take().unwrap();
380 let inner = spawn_blocking(move || match func(&client) {
381 Ok(res) => Ok((res, client)),
382 Err(err) => Err((err, client)),
383 })
384 .await
385 .map_err(runtime_error)?;
386
387 match inner {
388 Ok((res, inner)) => {
389 self.inner = Some(inner);
390 Ok(res)
391 }
392 Err((err, inner)) => {
393 self.inner = Some(inner);
394 Err(stream_err(err))
395 }
396 }
397 }
398
399 pub async fn reassign_partitions(&mut self) -> KafkaResult<()> {
401 let current: HashSet<StreamKey> = self.streams.iter().map(|(s, _)| s.clone()).collect();
402 let mut streams = Vec::new();
403 for stream_key in current {
404 let s = stream_key.clone();
405 let raw = self
406 .async_func(move |c| c.fetch_metadata(Some(s.name()), DEFAULT_TIMEOUT))
407 .await?;
408 let partitions = raw
409 .topics()
410 .first()
411 .unwrap()
412 .partitions()
413 .iter()
414 .map(|p| p.id() as u64);
415 for p in partitions {
416 streams.push((stream_key.clone(), ShardId::new(p)));
417 }
418 }
419 if streams.is_empty() {
420 return Err(StreamErr::Backend(KafkaErr::Subscription(
421 "No partitions found.".to_owned(),
422 )));
423 }
424 self.streams = streams;
425 Ok(())
426 }
427
428 async fn seek_with_timeout(
435 &mut self,
436 timestamp: Timestamp,
437 timeout: Duration,
438 ) -> KafkaResult<()> {
439 self.reassign_partitions().await?;
440
441 let mut tpl = TopicPartitionList::new();
442
443 for (stream, shard) in self.streams.iter() {
444 tpl.add_partition_offset(
445 stream.name(),
446 shard.id() as i32,
447 Offset::Offset(
448 (timestamp.unix_timestamp_nanos() / 1_000_000)
449 .try_into()
450 .expect("KafkaConsumer::seek: timestamp out of range"),
451 ),
452 )
453 .map_err(stream_err)?;
454 }
455
456 let tpl = self
457 .async_func(move |c| c.offsets_for_times(tpl, timeout))
458 .await?;
459
460 self.inner
461 .as_mut()
462 .unwrap()
463 .assign(&tpl)
464 .map_err(stream_err)?;
465
466 Ok(())
467 }
468
469 pub async fn commit_message(&mut self, mess: &KafkaMessage<'_>) -> KafkaResult<()> {
476 self.commit(&mess.stream_key(), &mess.shard_id(), &mess.sequence())
477 .await
478 }
479
480 pub async fn commit_with(
487 &mut self,
488 (stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
489 ) -> KafkaResult<()> {
490 self.commit(stream_key, shard_id, sequence).await
491 }
492
493 pub async fn commit(
500 &mut self,
501 stream: &StreamKey,
502 shard: &ShardId,
503 seq: &SeqNo,
504 ) -> KafkaResult<()> {
505 if self.mode == ConsumerMode::RealTime {
506 return Err(StreamErr::CommitNotAllowed);
507 }
508 let mut tpl = TopicPartitionList::new();
509 tpl.add_partition_offset(
510 stream.name(),
511 shard.id() as i32,
512 Offset::Offset((*seq).try_into().expect("u64 out of range")),
513 )
514 .map_err(stream_err)?;
515
516 self.async_func(move |c| c.commit(&tpl, CommitMode::Sync))
517 .await
518 }
519
520 pub fn store_offset(
523 &mut self,
524 stream: &StreamKey,
525 shard: &ShardId,
526 seq: &SeqNo,
527 ) -> KafkaResult<()> {
528 self.get()
529 .store_offset(
530 stream.name(),
531 shard.id() as i32,
532 (*seq).try_into().expect("u64 out of range"),
533 )
534 .map_err(stream_err)
535 }
536
537 pub fn store_offset_for_message(&mut self, mess: &KafkaMessage<'_>) -> KafkaResult<()> {
540 self.store_offset(&mess.stream_key(), &mess.shard_id(), &mess.sequence())
541 }
542
543 pub fn store_offset_with(
546 &mut self,
547 (stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
548 ) -> KafkaResult<()> {
549 self.store_offset(stream_key, shard_id, sequence)
550 }
551}
552
553impl<'a> KafkaMessage<'a> {
554 fn mess(&self) -> &RawMessage {
555 &self.0
556 }
557}
558
559impl<'a> Debug for KafkaMessage<'a> {
560 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
561 self.mess().fmt(f)
562 }
563}
564
565impl<'a> Message for KafkaMessage<'a> {
566 fn stream_key(&self) -> StreamKey {
567 StreamKey::new(self.mess().topic()).expect("A message should carry a valid stream key")
568 }
569
570 fn shard_id(&self) -> ShardId {
571 ShardId::new(self.mess().partition() as u64)
572 }
573
574 fn sequence(&self) -> SeqNo {
575 self.mess().offset() as SeqNo
576 }
577
578 fn timestamp(&self) -> Timestamp {
579 Timestamp::from_unix_timestamp_nanos(
580 self.mess()
581 .timestamp()
582 .to_millis()
583 .expect("message.timestamp() is None") as i128
584 * 1_000_000,
585 )
586 .expect("from_unix_timestamp_nanos")
587 }
588
589 fn message(&self) -> Payload {
590 Payload::new(self.mess().payload().unwrap_or_default())
591 }
592}
593
594impl ConsumerOptions for KafkaConsumerOptions {
595 type Error = KafkaErr;
596
597 fn new(mode: ConsumerMode) -> Self {
598 KafkaConsumerOptions {
599 mode,
600 ..Default::default()
601 }
602 }
603
604 fn mode(&self) -> KafkaResult<&ConsumerMode> {
605 Ok(&self.mode)
606 }
607
608 fn consumer_group(&self) -> KafkaResult<&ConsumerGroup> {
609 self.group_id.as_ref().ok_or(StreamErr::ConsumerGroupNotSet)
610 }
611
612 fn set_consumer_group(&mut self, group: ConsumerGroup) -> KafkaResult<&mut Self> {
630 self.group_id = Some(group);
631 Ok(self)
632 }
633}
634
635pub(crate) fn create_consumer(
636 streamer: &StreamerUri,
637 base_options: &KafkaConnectOptions,
638 options: &KafkaConsumerOptions,
639 streams: Vec<StreamKey>,
640) -> Result<KafkaConsumer, KafkaErr> {
641 let mut client_config = ClientConfig::new();
642 client_config.set(BaseOptionKey::BootstrapServers, cluster_uri(streamer)?);
643 base_options.make_client_config(&mut client_config);
644 options.make_client_config(&mut client_config);
645
646 let consumer: RawConsumer = client_config.create()?;
647
648 if !streams.is_empty() {
649 let topics: Vec<&str> = streams.iter().map(|s| s.name()).collect();
650 consumer.subscribe(&topics)?;
651 } else {
652 panic!("no topic?");
653 }
654
655 Ok(KafkaConsumer {
656 mode: options.mode,
657 inner: Some(consumer),
658 streams: streams.into_iter().map(|s| (s, ZERO)).collect(),
659 })
660}