use crate::clients::consumer::{AutoCommit, AutoCommitAfter, IggyConsumer};
use crate::consumer_ext::{IggyConsumerMessageExt, MessageConsumer};
use crate::prelude::IggyError;
use async_trait::async_trait;
use futures_util::StreamExt;
use tokio::sync::oneshot;
use tracing::{error, info, trace};
#[async_trait]
impl<'a> IggyConsumerMessageExt<'a> for IggyConsumer {
async fn consume_messages<P>(
&mut self,
message_consumer: &'a P,
mut shutdown_rx: oneshot::Receiver<()>,
) -> Result<(), IggyError>
where
P: MessageConsumer + Sync,
{
let auto_commit = self.auto_commit();
let store_offset_after_each_message = matches!(
auto_commit,
AutoCommit::After(AutoCommitAfter::ConsumingEachMessage)
| AutoCommit::IntervalOrAfter(_, AutoCommitAfter::ConsumingEachMessage)
);
let store_offset_after_all_messages = matches!(
auto_commit,
AutoCommit::After(AutoCommitAfter::ConsumingAllMessages)
| AutoCommit::IntervalOrAfter(_, AutoCommitAfter::ConsumingAllMessages)
);
let store_after_every_nth_message = match auto_commit {
AutoCommit::After(AutoCommitAfter::ConsumingEveryNthMessage(n))
| AutoCommit::IntervalOrAfter(_, AutoCommitAfter::ConsumingEveryNthMessage(n)) => {
n as u64
}
_ => 0,
};
loop {
tokio::select! {
_ = &mut shutdown_rx => {
info!("Received shutdown signal, stopping message consumption from consumer {name} on topic: {topic} and stream: {stream}",
name = self.name(), topic = self.topic(), stream = self.stream());
break;
}
message = self.next() => {
match message {
Some(Ok(received_message)) => {
let partition_id = received_message.partition_id;
let current_offset = received_message.current_offset;
let message_offset = received_message.message.header.offset;
if let Err(err) = message_consumer.consume(received_message).await {
error!("Error while handling message at offset: {message_offset}/{current_offset}, partition: {partition_id} for consumer: {name} on topic: {topic} and stream: {stream} due to error: {err}",
name = self.name(), topic = self.topic(), stream = self.stream());
} else {
trace!("Message at offset: {message_offset}/{current_offset}, partition: {partition_id} has been handled by consumer: {name} on topic: {topic} and stream: {stream}",
name = self.name(), topic = self.topic(), stream = self.stream());
}
if store_offset_after_each_message {
trace!("Storing offset: {message_offset}/{current_offset}, partition: {partition_id}, after each message for consumer: {name} on topic: {topic} and stream: {stream}",
name = self.name(), topic = self.topic(), stream = self.stream());
self.send_store_offset(partition_id, message_offset);
} else if store_after_every_nth_message > 0 && message_offset % store_after_every_nth_message == 0 {
trace!("Storing offset: {message_offset}/{current_offset}, partition: {partition_id}, after every {store_after_every_nth_message} message for consumer: {name} on topic: {topic} and stream: {stream}",
store_after_every_nth_message = store_after_every_nth_message, name = self.name(), topic = self.topic(), stream = self.stream());
self.send_store_offset(partition_id, message_offset);
} else if store_offset_after_all_messages && message_offset == current_offset {
trace!("Storing offset: {message_offset}/{current_offset}, partition: {partition_id}, after all messages for consumer: {name} on topic: {topic} and stream: {stream}",
name = self.name(), topic = self.topic(), stream = self.stream());
self.send_store_offset(partition_id, message_offset);
}
}
Some(Err(err)) => {
match err {
IggyError::Disconnected |
IggyError::CannotEstablishConnection |
IggyError::StaleClient |
IggyError::InvalidServerAddress |
IggyError::InvalidClientAddress |
IggyError::NotConnected |
IggyError::ClientShutdown => {
error!("Client error: {err} for consumer: {name} on topic: {topic} and stream: {stream}",
name = self.name(), topic = self.topic(), stream = self.stream());
return Err(err);
}
_ => {
error!("Error while handling message: {err} for consumer: {name} on topic: {topic} and stream: {stream}",
name = self.name(), topic = self.topic(), stream = self.stream());
continue;
}
}
}
None => break,
}
}
}
}
Ok(())
}
}