use crate::client::fetch_kp;
use crate::client::{CommitOffset, FetchPartition, KafkaClient};
use crate::error::{Error, KafkaCode, Result};
use std::collections::hash_map::{Entry, HashMap};
use std::slice;
use std::sync::Arc;
use tracing::debug;
pub use self::builder::Builder;
use self::state::TopicPartition;
pub use crate::client::FetchOffset;
pub use crate::client::GroupOffsetStorage;
pub use crate::protocol::fetch::OwnedMessage as Message;
pub use assignor::{PartitionAssignor, RangeAssignor, RoundRobinAssignor};
pub use group_coordinator::GroupCoordinator;
pub use rebalance::{RebalanceHandler, RebalanceListener};
mod assignment;
mod assignor;
mod builder;
mod config;
mod group_coordinator;
mod rebalance;
mod state;
pub const DEFAULT_RETRY_MAX_BYTES_LIMIT: i32 = 0;
pub const DEFAULT_FALLBACK_OFFSET: FetchOffset = FetchOffset::Latest;
#[derive(Debug)]
pub struct Consumer {
client: KafkaClient,
state: state::State,
config: config::Config,
}
impl Consumer {
#[must_use]
pub fn from_client(client: KafkaClient) -> Builder {
builder::new(Some(client), Vec::new())
}
#[must_use]
pub fn from_hosts(hosts: Vec<String>) -> Builder {
builder::new(None, hosts)
}
#[must_use]
pub fn client(&self) -> &KafkaClient {
&self.client
}
#[must_use]
pub fn client_mut(&mut self) -> &mut KafkaClient {
&mut self.client
}
#[must_use]
pub fn into_client(self) -> KafkaClient {
self.client
}
pub fn pause(&mut self, topic: &str, partitions: &[i32]) {
for &p in partitions {
self.state.paused.insert((topic.to_owned(), p));
}
debug!("Paused partitions for topic '{}': {:?}", topic, partitions);
}
pub fn resume(&mut self, topic: &str, partitions: &[i32]) {
for &p in partitions {
self.state.paused.remove(&(topic.to_owned(), p));
}
debug!("Resumed partitions for topic '{}': {:?}", topic, partitions);
}
#[must_use]
pub fn is_paused(&self, topic: &str, partition: i32) -> bool {
self.state.paused.contains(&(topic.to_owned(), partition))
}
pub fn paused_partitions(&self) -> impl Iterator<Item = &(String, i32)> {
self.state.paused.iter()
}
#[must_use]
pub fn subscriptions(&self) -> HashMap<String, Vec<i32>> {
let mut h: HashMap<String, Vec<i32>> =
HashMap::with_capacity(self.state.assignments.as_slice().len());
let tps = self
.state
.fetch_offsets
.keys()
.map(|tp| (self.state.topic_name(tp.topic_ref), tp.partition));
for tp in tps {
if let Some(ps) = h.get_mut(tp.0) {
ps.push(tp.1);
continue;
}
h.insert(tp.0.to_owned(), vec![tp.1]);
}
h
}
#[tracing::instrument(skip(self))]
pub fn poll(&mut self) -> Result<MessageSets> {
let (n, resps) = self.fetch_messages();
let resps = resps?;
self.process_fetch_responses(n, resps)
}
#[must_use]
fn single_partition_consumer(&self) -> bool {
self.state.fetch_offsets.len() == 1
}
#[must_use]
pub fn group(&self) -> &str {
&self.config.group
}
pub fn seek(&mut self, topic: &str, partition: i32, offset: i64) -> Result<()> {
let topic_ref = self.state.topic_ref(topic);
match topic_ref {
Some(topic_ref) => {
let tp = TopicPartition {
topic_ref,
partition,
};
let maybe_entry = self.state.fetch_offsets.entry(tp);
match maybe_entry {
Entry::Occupied(mut e) => {
e.get_mut().offset = offset;
Ok(())
}
Entry::Vacant(_) => Err(Error::TopicPartitionError {
topic_name: topic.to_string(),
partition_id: partition,
error_code: KafkaCode::UnknownTopicOrPartition,
}),
}
}
None => Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
}
}
fn fetch_messages(&mut self) -> (u32, Result<Vec<fetch_kp::OwnedFetchResponse>>) {
if let Some(tp) = self.state.retry_partitions.pop_front() {
let Some(s) = self.state.fetch_offsets.get(&tp) else {
return (1, Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)));
};
let topic = self.state.topic_name(tp.topic_ref);
debug!(
"fetching retry messages: (fetch-offset: {{\"{}:{}\": {:?}}})",
topic, tp.partition, s
);
(
1,
self.client.fetch_messages_kp(std::iter::once(
&FetchPartition::new(topic, tp.partition, s.offset).with_max_bytes(s.max_bytes),
)),
)
} else {
let client = &mut self.client;
let state = &self.state;
debug!(
"fetching messages: (fetch-offsets: {:?})",
state.fetch_offsets_debug()
);
let reqs: Vec<FetchPartition<'_>> = state
.fetch_offsets
.iter()
.map(|(tp, s)| {
let topic = state.topic_name(tp.topic_ref);
FetchPartition::new(topic, tp.partition, s.offset).with_max_bytes(s.max_bytes)
})
.collect();
#[allow(clippy::cast_possible_truncation)] let num_partitions = state.fetch_offsets.len() as u32;
(num_partitions, client.fetch_messages_kp(reqs.iter()))
}
}
fn process_fetch_responses(
&mut self,
num_partitions_queried: u32,
resps: Vec<fetch_kp::OwnedFetchResponse>,
) -> Result<MessageSets> {
let single_partition_consumer = self.single_partition_consumer();
let mut empty = true;
let retry_partitions = &mut self.state.retry_partitions;
for resp in &resps {
for t in &resp.topics {
let topic_ref = self
.state
.assignments
.topic_ref(&t.topic)
.expect("unknown topic in response");
for p in &t.partitions {
let tp = TopicPartition {
topic_ref,
partition: p.partition,
};
let data = match p.data() {
Ok(d) => d,
Err(e) => {
if let Error::TopicPartitionError {
error_code: KafkaCode::OffsetOutOfRange,
..
} = e.as_ref()
{
if let Some(fetch_state) = self.state.fetch_offsets.get_mut(&tp) {
debug!(
"OffsetOutOfRange for {}:{}, resetting to highwatermark {}",
&t.topic, tp.partition, p.highwatermark
);
fetch_state.offset = p.highwatermark;
}
continue;
}
return Err(Error::from(Arc::clone(e)));
}
};
let fetch_state = self
.state
.fetch_offsets
.get_mut(&tp)
.expect("non-requested partition");
if let Some(last_msg) = data.messages.last() {
fetch_state.offset = last_msg.offset + 1;
empty = false;
if fetch_state.max_bytes != self.client.fetch_max_bytes_per_partition() {
let prev_max_bytes = fetch_state.max_bytes;
fetch_state.max_bytes = self.client.fetch_max_bytes_per_partition();
debug!(
"reset max_bytes for {}:{} from {} to {}",
&t.topic, tp.partition, prev_max_bytes, fetch_state.max_bytes
);
}
} else {
debug!(
"no data received for {}:{} (max_bytes: {} / fetch_offset: {} / \
highwatermark_offset: {})",
&t.topic,
tp.partition,
fetch_state.max_bytes,
fetch_state.offset,
data.highwatermark_offset
);
if fetch_state.offset < data.highwatermark_offset {
if fetch_state.max_bytes < self.config.retry_max_bytes_limit {
let prev_max_bytes = fetch_state.max_bytes;
let incr_max_bytes = prev_max_bytes + prev_max_bytes;
if incr_max_bytes > self.config.retry_max_bytes_limit {
fetch_state.max_bytes = self.config.retry_max_bytes_limit;
} else {
fetch_state.max_bytes = incr_max_bytes;
}
debug!(
"increased max_bytes for {}:{} from {} to {}",
&t.topic, tp.partition, prev_max_bytes, fetch_state.max_bytes
);
} else if num_partitions_queried == 1 {
return Err(Error::Kafka(KafkaCode::MessageSizeTooLarge));
}
if !single_partition_consumer {
debug!("rescheduled for retry: {}:{}", &t.topic, tp.partition);
retry_partitions.push_back(tp);
}
}
}
}
}
}
Ok(MessageSets {
responses: resps,
empty,
})
}
#[must_use]
pub fn last_consumed_message(&self, topic: &str, partition: i32) -> Option<i64> {
self.state
.topic_ref(topic)
.and_then(|tref| {
self.state.consumed_offsets.get(&TopicPartition {
topic_ref: tref,
partition,
})
})
.map(|co| co.offset)
}
pub fn consume_message(&mut self, topic: &str, partition: i32, offset: i64) -> Result<()> {
let topic_ref = self
.state
.topic_ref(topic)
.ok_or(Error::Kafka(KafkaCode::UnknownTopicOrPartition))?;
let tp = TopicPartition {
topic_ref,
partition,
};
match self.state.consumed_offsets.entry(tp) {
Entry::Vacant(v) => {
v.insert(state::ConsumedOffset {
offset,
dirty: true,
});
}
Entry::Occupied(mut v) => {
let o = v.get_mut();
if offset > o.offset {
o.offset = offset;
o.dirty = true;
}
}
}
Ok(())
}
pub fn consume_messageset(&mut self, msgs: &MessageSet) -> Result<()> {
if let Some(last) = msgs.messages.last() {
self.consume_message(&msgs.topic, msgs.partition, last.offset)
} else {
Ok(())
}
}
pub fn commit_consumed(&mut self) -> Result<()> {
if self.config.group.is_empty() {
return Err(Error::unset_group_id());
}
debug!(
"commit_consumed: committing dirty-only consumer offsets (group: {} / offsets: {:?}",
self.config.group,
self.state.consumed_offsets_debug()
);
let (client, state) = (&mut self.client, &mut self.state);
client.commit_offsets(
&self.config.group,
state
.consumed_offsets
.iter()
.filter(|&(_, o)| o.dirty)
.map(|(tp, o)| {
let topic = state.topic_name(tp.topic_ref);
CommitOffset::new(topic, tp.partition, o.offset + 1)
}),
)?;
for co in state.consumed_offsets.values_mut() {
if co.dirty {
co.dirty = false;
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct MessageSets {
responses: Vec<fetch_kp::OwnedFetchResponse>,
empty: bool,
}
impl MessageSets {
#[must_use]
pub fn from_fetch_responses(responses: Vec<fetch_kp::OwnedFetchResponse>) -> Self {
let empty = !responses
.iter()
.flat_map(|r| r.topics.iter())
.flat_map(|t| t.partitions.iter())
.filter_map(|p| p.data().ok())
.any(|d| !d.messages.is_empty());
Self { responses, empty }
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.empty
}
#[must_use]
pub fn iter(&self) -> MessageSetsIter<'_> {
MessageSetsIter::new(&self.responses)
}
}
impl<'a> IntoIterator for &'a MessageSets {
type Item = MessageSet;
type IntoIter = MessageSetsIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[derive(Debug)]
pub struct MessageSet {
topic: String,
partition: i32,
messages: Vec<Message>,
}
impl MessageSet {
#[inline]
#[must_use]
pub fn topic(&self) -> &str {
&self.topic
}
#[must_use]
#[inline]
pub fn partition(&self) -> i32 {
self.partition
}
#[must_use]
#[inline]
pub fn messages(&self) -> &[Message] {
&self.messages
}
#[inline]
pub fn iter(&self) -> slice::Iter<'_, Message> {
self.messages.iter()
}
}
impl<'a> IntoIterator for &'a MessageSet {
type Item = &'a Message;
type IntoIter = slice::Iter<'a, Message>;
fn into_iter(self) -> Self::IntoIter {
self.messages.iter()
}
}
pub struct MessageSetsIter<'a> {
responses: slice::Iter<'a, fetch_kp::OwnedFetchResponse>,
topics: Option<slice::Iter<'a, fetch_kp::OwnedTopic>>,
curr_topic: Option<&'a str>,
partitions: Option<slice::Iter<'a, fetch_kp::OwnedPartition>>,
}
impl<'a> MessageSetsIter<'a> {
fn new(responses: &'a [fetch_kp::OwnedFetchResponse]) -> Self {
Self {
responses: responses.iter(),
topics: None,
curr_topic: None,
partitions: None,
}
}
}
impl Iterator for MessageSetsIter<'_> {
type Item = MessageSet;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(p) = self.partitions.as_mut().and_then(Iterator::next) {
if let Ok(data) = p.data()
&& !data.messages.is_empty()
{
let topic = self.curr_topic.unwrap_or("").to_owned();
return Some(MessageSet {
topic,
partition: p.partition,
messages: data.messages.clone(),
});
}
continue;
}
if let Some(t) = self.topics.as_mut().and_then(Iterator::next) {
self.curr_topic = Some(&t.topic);
self.partitions = Some(t.partitions.iter());
continue;
}
if let Some(r) = self.responses.next() {
self.topics = Some(r.topics.iter());
self.curr_topic = None;
continue;
}
return None;
}
}
}
#[cfg(test)]
mod pause_resume_tests {
#[test]
fn test_pause_and_resume() {
let paused = std::collections::HashSet::new();
assert!(paused.is_empty());
let mut paused = paused;
paused.insert(("t".to_owned(), 0));
paused.insert(("t".to_owned(), 1));
assert!(paused.contains(&("t".to_owned(), 0)));
assert!(paused.contains(&("t".to_owned(), 1)));
assert!(!paused.contains(&("t".to_owned(), 2)));
paused.remove(&("t".to_owned(), 0));
assert!(!paused.contains(&("t".to_owned(), 0)));
assert!(paused.contains(&("t".to_owned(), 1)));
}
#[test]
fn test_pause_multiple_partitions() {
let mut paused = std::collections::HashSet::new();
paused.insert(("t".to_owned(), 0));
paused.insert(("t".to_owned(), 1));
paused.insert(("t".to_owned(), 2));
assert_eq!(paused.len(), 3);
paused.remove(&("t".to_owned(), 1));
assert_eq!(paused.len(), 2);
}
#[test]
fn test_pause_nonexistent_partition_no_panic() {
let mut paused = std::collections::HashSet::new();
paused.insert(("t".to_owned(), 999));
assert_eq!(paused.len(), 1);
}
}