use rdkafka::{
config::ClientConfig,
consumer::{CommitMode, Consumer, MessageStream as RawMessageStream},
message::BorrowedMessage as RawMessage,
Message as KafkaMessageTrait, Offset, TopicPartitionList,
};
use sea_streamer_runtime::spawn_blocking;
use std::{collections::HashSet, fmt::Debug, time::Duration};
use sea_streamer_types::{
export::{
async_trait,
futures::{
future::Map,
stream::{Map as StreamMap, StreamFuture},
FutureExt, StreamExt,
},
},
runtime_error, Consumer as ConsumerTrait, ConsumerGroup, ConsumerMode, ConsumerOptions,
Message, Payload, SeqNo, SeqPos, ShardId, StreamErr, StreamKey, StreamerUri, Timestamp,
};
use crate::{
cluster_uri, impl_into_string, stream_err, BaseOptionKey, KafkaConnectOptions, KafkaErr,
KafkaResult, DEFAULT_TIMEOUT,
};
pub struct KafkaConsumer {
mode: ConsumerMode,
inner: Option<RawConsumer>,
streams: Vec<(StreamKey, ShardId)>,
}
pub type RawConsumer = rdkafka::consumer::StreamConsumer<
rdkafka::consumer::DefaultConsumerContext,
crate::KafkaAsyncRuntime,
>;
#[repr(transparent)]
pub struct KafkaMessage<'a>(RawMessage<'a>);
const ZERO: ShardId = ShardId::new(0);
#[derive(Debug, Default, Clone)]
pub struct KafkaConsumerOptions {
mode: ConsumerMode,
group_id: Option<ConsumerGroup>,
session_timeout: Option<Duration>,
auto_offset_reset: Option<AutoOffsetReset>,
enable_auto_commit: Option<bool>,
auto_commit_interval: Option<Duration>,
enable_auto_offset_store: Option<bool>,
custom_options: Vec<(String, String)>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum KafkaConsumerOptionKey {
GroupId,
SessionTimeout,
AutoOffsetReset,
EnableAutoCommit,
AutoCommitInterval,
EnableAutoOffsetStore,
}
type OptionKey = KafkaConsumerOptionKey;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AutoOffsetReset {
Earliest,
Latest,
NoReset,
}
pub type NextFuture<'a> = Map<
StreamFuture<RawMessageStream<'a>>,
fn(
(
Option<Result<RawMessage<'a>, KafkaErr>>,
RawMessageStream<'a>,
),
) -> KafkaResult<KafkaMessage<'a>>,
>;
pub type KafkaMessageStream<'a> = StreamMap<
RawMessageStream<'a>,
fn(Result<RawMessage<'a>, KafkaErr>) -> KafkaResult<KafkaMessage<'a>>,
>;
impl KafkaConsumerOptions {
pub fn set_group_id(&mut self, id: ConsumerGroup) -> &mut Self {
self.group_id = Some(id);
self
}
pub fn group_id(&self) -> Option<&ConsumerGroup> {
self.group_id.as_ref()
}
pub fn set_session_timeout(&mut self, v: Duration) -> &mut Self {
self.session_timeout = Some(v);
self
}
pub fn session_timeout(&self) -> Option<&Duration> {
self.session_timeout.as_ref()
}
pub fn set_auto_offset_reset(&mut self, v: AutoOffsetReset) -> &mut Self {
self.auto_offset_reset = Some(v);
self
}
pub fn auto_offset_reset(&self) -> Option<&AutoOffsetReset> {
self.auto_offset_reset.as_ref()
}
pub fn set_enable_auto_commit(&mut self, v: bool) -> &mut Self {
self.enable_auto_commit = Some(v);
self
}
pub fn enable_auto_commit(&self) -> Option<&bool> {
self.enable_auto_commit.as_ref()
}
pub fn set_auto_commit_interval(&mut self, v: Duration) -> &mut Self {
self.auto_commit_interval = Some(v);
self
}
pub fn auto_commit_interval(&self) -> Option<&Duration> {
self.auto_commit_interval.as_ref()
}
pub fn set_enable_auto_offset_store(&mut self, v: bool) -> &mut Self {
self.enable_auto_offset_store = Some(v);
self
}
pub fn enable_auto_offset_store(&self) -> Option<&bool> {
self.enable_auto_offset_store.as_ref()
}
pub fn add_custom_option<K, V>(&mut self, key: K, value: V) -> &mut Self
where
K: Into<String>,
V: Into<String>,
{
self.custom_options.push((key.into(), value.into()));
self
}
pub fn custom_options(&self) -> impl Iterator<Item = (&str, &str)> {
self.custom_options
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
}
fn make_client_config(&self, client_config: &mut ClientConfig) {
if let Some(group_id) = &self.group_id {
client_config.set(OptionKey::GroupId, group_id.name());
} else {
client_config.set(OptionKey::GroupId, "abcdefg");
}
if let Some(v) = self.session_timeout {
client_config.set(OptionKey::SessionTimeout, format!("{}", v.as_millis()));
}
if let Some(v) = self.auto_offset_reset {
client_config.set(OptionKey::AutoOffsetReset, v);
}
if let Some(v) = self.enable_auto_commit {
client_config.set(OptionKey::EnableAutoCommit, v.to_string());
}
if let Some(v) = self.auto_commit_interval {
client_config.set(OptionKey::AutoCommitInterval, format!("{}", v.as_millis()));
}
if let Some(v) = self.enable_auto_offset_store {
client_config.set(OptionKey::EnableAutoOffsetStore, v.to_string());
}
for (key, value) in self.custom_options() {
client_config.set(key, value);
}
}
}
impl OptionKey {
pub fn as_str(&self) -> &'static str {
match self {
Self::GroupId => "group.id",
Self::SessionTimeout => "session.timeout.ms",
Self::AutoOffsetReset => "auto.offset.reset",
Self::EnableAutoCommit => "enable.auto.commit",
Self::AutoCommitInterval => "auto.commit.interval.ms",
Self::EnableAutoOffsetStore => "enable.auto.offset.store",
}
}
}
impl AutoOffsetReset {
pub fn as_str(&self) -> &'static str {
match self {
Self::Earliest => "earliest",
Self::Latest => "latest",
Self::NoReset => "none",
}
}
}
impl_into_string!(OptionKey);
impl_into_string!(AutoOffsetReset);
impl std::fmt::Debug for KafkaConsumer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaConsumer").finish()
}
}
#[async_trait]
impl ConsumerTrait for KafkaConsumer {
type Error = KafkaErr;
type Message<'a> = KafkaMessage<'a>;
type NextFuture<'a> = NextFuture<'a>;
type Stream<'a> = KafkaMessageStream<'a>;
#[inline]
async fn seek(&mut self, timestamp: Timestamp) -> KafkaResult<()> {
self.seek_with_timeout(timestamp, DEFAULT_TIMEOUT).await
}
async fn rewind(&mut self, offset: SeqPos) -> KafkaResult<()> {
let mut tpl = TopicPartitionList::new();
for (stream, shard) in self.streams.iter() {
tpl.add_partition_offset(
stream.name(),
shard.id() as i32,
match offset {
SeqPos::Beginning => Offset::Beginning,
SeqPos::End => Offset::End,
SeqPos::At(seq) => Offset::Offset(seq.try_into().expect("u64 out of range")),
},
)
.map_err(stream_err)?;
}
self.get().assign(&tpl).map_err(stream_err)?;
Ok(())
}
fn assign(&mut self, (stream, shard): (StreamKey, ShardId)) -> KafkaResult<()> {
if !self.streams.iter().any(|(s, _)| s == &stream) {
return Err(StreamErr::StreamKeyNotFound);
}
if !self
.streams
.iter()
.any(|(s, t)| (s, t) == (&stream, &shard))
{
self.streams.push((stream, shard));
}
Ok(())
}
fn unassign(&mut self, s: (StreamKey, ShardId)) -> KafkaResult<()> {
if let Some((i, _)) = self.streams.iter().enumerate().find(|(_, t)| &s == *t) {
self.streams.remove(i);
if self.streams.is_empty() {
Err(StreamErr::StreamKeyEmpty)
} else {
Ok(())
}
} else {
Err(StreamErr::StreamKeyNotFound)
}
}
fn next(&self) -> Self::NextFuture<'_> {
self.get().stream().into_future().map(|(res, _)| match res {
Some(res) => Self::process(res),
None => panic!("Kafka stream never ends"),
})
}
fn stream<'a, 'b: 'a>(&'b mut self) -> Self::Stream<'a> {
self.get().stream().map(Self::process)
}
}
impl KafkaConsumer {
#[inline]
fn get(&self) -> &RawConsumer {
self.inner
.as_ref()
.expect("Client is still inside an async operation, please await the future")
}
pub fn inner(&mut self) -> &RawConsumer {
self.get()
}
fn process(res: Result<RawMessage, KafkaErr>) -> KafkaResult<KafkaMessage> {
match res {
Ok(mess) => Ok(KafkaMessage(mess)),
Err(err) => Err(StreamErr::Backend(err)),
}
}
pub fn stream_shards(&self) -> &[(StreamKey, ShardId)] {
&self.streams
}
#[inline]
async fn async_func<
T: Send + 'static,
F: FnOnce(&RawConsumer) -> Result<T, KafkaErr> + Send + 'static,
>(
&mut self,
func: F,
) -> KafkaResult<T> {
if self.inner.is_none() {
panic!("An async operation is still in progress.");
}
let client = self.inner.take().unwrap();
let inner = spawn_blocking(move || match func(&client) {
Ok(res) => Ok((res, client)),
Err(err) => Err((err, client)),
})
.await
.map_err(runtime_error)?;
match inner {
Ok((res, inner)) => {
self.inner = Some(inner);
Ok(res)
}
Err((err, inner)) => {
self.inner = Some(inner);
Err(stream_err(err))
}
}
}
pub async fn reassign_partitions(&mut self) -> KafkaResult<()> {
let current: HashSet<StreamKey> = self.streams.iter().map(|(s, _)| s.clone()).collect();
let mut streams = Vec::new();
for stream_key in current {
let s = stream_key.clone();
let raw = self
.async_func(move |c| c.fetch_metadata(Some(s.name()), DEFAULT_TIMEOUT))
.await?;
let partitions = raw
.topics()
.first()
.unwrap()
.partitions()
.iter()
.map(|p| p.id() as u64);
for p in partitions {
streams.push((stream_key.clone(), ShardId::new(p)));
}
}
if streams.is_empty() {
return Err(StreamErr::Backend(KafkaErr::Subscription(
"No partitions found.".to_owned(),
)));
}
self.streams = streams;
Ok(())
}
async fn seek_with_timeout(
&mut self,
timestamp: Timestamp,
timeout: Duration,
) -> KafkaResult<()> {
self.reassign_partitions().await?;
let mut tpl = TopicPartitionList::new();
for (stream, shard) in self.streams.iter() {
tpl.add_partition_offset(
stream.name(),
shard.id() as i32,
Offset::Offset(
(timestamp.unix_timestamp_nanos() / 1_000_000)
.try_into()
.expect("KafkaConsumer::seek: timestamp out of range"),
),
)
.map_err(stream_err)?;
}
let tpl = self
.async_func(move |c| c.offsets_for_times(tpl, timeout))
.await?;
self.inner
.as_mut()
.unwrap()
.assign(&tpl)
.map_err(stream_err)?;
Ok(())
}
pub async fn commit_message(&mut self, mess: &KafkaMessage<'_>) -> KafkaResult<()> {
self.commit(&mess.stream_key(), &mess.shard_id(), &mess.sequence())
.await
}
pub async fn commit_with(
&mut self,
(stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
) -> KafkaResult<()> {
self.commit(stream_key, shard_id, sequence).await
}
pub async fn commit(
&mut self,
stream: &StreamKey,
shard: &ShardId,
seq: &SeqNo,
) -> KafkaResult<()> {
if self.mode == ConsumerMode::RealTime {
return Err(StreamErr::CommitNotAllowed);
}
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(
stream.name(),
shard.id() as i32,
Offset::Offset((*seq).try_into().expect("u64 out of range")),
)
.map_err(stream_err)?;
self.async_func(move |c| c.commit(&tpl, CommitMode::Sync))
.await
}
pub fn store_offset(
&mut self,
stream: &StreamKey,
shard: &ShardId,
seq: &SeqNo,
) -> KafkaResult<()> {
self.get()
.store_offset(
stream.name(),
shard.id() as i32,
(*seq).try_into().expect("u64 out of range"),
)
.map_err(stream_err)
}
pub fn store_offset_for_message(&mut self, mess: &KafkaMessage<'_>) -> KafkaResult<()> {
self.store_offset(&mess.stream_key(), &mess.shard_id(), &mess.sequence())
}
pub fn store_offset_with(
&mut self,
(stream_key, shard_id, sequence): &(StreamKey, ShardId, SeqNo),
) -> KafkaResult<()> {
self.store_offset(stream_key, shard_id, sequence)
}
}
impl<'a> KafkaMessage<'a> {
fn mess(&self) -> &RawMessage {
&self.0
}
}
impl<'a> Debug for KafkaMessage<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.mess().fmt(f)
}
}
impl<'a> Message for KafkaMessage<'a> {
fn stream_key(&self) -> StreamKey {
StreamKey::new(self.mess().topic()).expect("A message should carry a valid stream key")
}
fn shard_id(&self) -> ShardId {
ShardId::new(self.mess().partition() as u64)
}
fn sequence(&self) -> SeqNo {
self.mess().offset() as SeqNo
}
fn timestamp(&self) -> Timestamp {
Timestamp::from_unix_timestamp_nanos(
self.mess()
.timestamp()
.to_millis()
.expect("message.timestamp() is None") as i128
* 1_000_000,
)
.expect("from_unix_timestamp_nanos")
}
fn message(&self) -> Payload {
Payload::new(self.mess().payload().unwrap_or_default())
}
}
impl ConsumerOptions for KafkaConsumerOptions {
type Error = KafkaErr;
fn new(mode: ConsumerMode) -> Self {
KafkaConsumerOptions {
mode,
..Default::default()
}
}
fn mode(&self) -> KafkaResult<&ConsumerMode> {
Ok(&self.mode)
}
fn consumer_group(&self) -> KafkaResult<&ConsumerGroup> {
self.group_id.as_ref().ok_or(StreamErr::ConsumerGroupNotSet)
}
fn set_consumer_group(&mut self, group: ConsumerGroup) -> KafkaResult<&mut Self> {
self.group_id = Some(group);
Ok(self)
}
}
pub(crate) fn create_consumer(
streamer: &StreamerUri,
base_options: &KafkaConnectOptions,
options: &KafkaConsumerOptions,
streams: Vec<StreamKey>,
) -> Result<KafkaConsumer, KafkaErr> {
let mut client_config = ClientConfig::new();
client_config.set(BaseOptionKey::BootstrapServers, cluster_uri(streamer)?);
base_options.make_client_config(&mut client_config);
options.make_client_config(&mut client_config);
let consumer: RawConsumer = client_config.create()?;
if !streams.is_empty() {
let topics: Vec<&str> = streams.iter().map(|s| s.name()).collect();
consumer.subscribe(&topics)?;
} else {
panic!("no topic?");
}
Ok(KafkaConsumer {
mode: options.mode,
inner: Some(consumer),
streams: streams.into_iter().map(|s| (s, ZERO)).collect(),
})
}