use std::collections::hash_map::{Entry, HashMap};
use std::slice;
use crate::client::fetch;
use crate::client::{CommitOffset, FetchPartition, KafkaClient};
use crate::error::{Error, KafkaCode, Result};
use crate::protocol;
pub use self::builder::Builder;
use self::state::TopicPartition;
pub use crate::client::FetchOffset;
pub use crate::client::GroupOffsetStorage;
pub use crate::client::fetch::Message;
mod assignment;
mod builder;
mod config;
mod state;
pub const DEFAULT_RETRY_MAX_BYTES_LIMIT: i32 = 0;
pub const DEFAULT_FALLBACK_OFFSET: FetchOffset = FetchOffset::Latest;
#[derive(Debug)]
pub struct Consumer {
client: KafkaClient,
state: state::State,
config: config::Config,
}
impl Consumer {
#[must_use]
pub fn from_client(client: KafkaClient) -> Builder {
builder::new(Some(client), Vec::new())
}
#[must_use]
pub fn from_hosts(hosts: Vec<String>) -> Builder {
builder::new(None, hosts)
}
#[must_use]
pub fn client(&self) -> &KafkaClient {
&self.client
}
#[must_use]
pub fn client_mut(&mut self) -> &mut KafkaClient {
&mut self.client
}
#[must_use]
pub fn into_client(self) -> KafkaClient {
self.client
}
#[must_use]
pub fn subscriptions(&self) -> HashMap<String, Vec<i32>> {
let mut h: HashMap<String, Vec<i32>> =
HashMap::with_capacity(self.state.assignments.as_slice().len());
let tps = self
.state
.fetch_offsets
.keys()
.map(|tp| (self.state.topic_name(tp.topic_ref), tp.partition));
for tp in tps {
if let Some(ps) = h.get_mut(tp.0) {
ps.push(tp.1);
continue;
}
h.insert(tp.0.to_owned(), vec![tp.1]);
}
h
}
pub fn poll(&mut self) -> Result<MessageSets> {
let (n, resps) = self.fetch_messages();
self.process_fetch_responses(n, resps?)
}
#[must_use]
fn single_partition_consumer(&self) -> bool {
self.state.fetch_offsets.len() == 1
}
#[must_use]
pub fn group(&self) -> &str {
&self.config.group
}
pub fn seek(&mut self, topic: &str, partition: i32, offset: i64) -> Result<()> {
let topic_ref = self.state.topic_ref(topic);
match topic_ref {
Some(topic_ref) => {
let tp = TopicPartition {
topic_ref,
partition,
};
let maybe_entry = self.state.fetch_offsets.entry(tp);
match maybe_entry {
Entry::Occupied(mut e) => {
e.get_mut().offset = offset;
Ok(())
}
Entry::Vacant(_) => Err(Error::TopicPartitionError {
topic_name: topic.to_string(),
partition_id: partition,
error_code: KafkaCode::UnknownTopicOrPartition,
}),
}
}
None => Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
}
}
fn fetch_messages(&mut self) -> (u32, Result<Vec<fetch::Response>>) {
if let Some(tp) = self.state.retry_partitions.pop_front() {
let Some(s) = self.state.fetch_offsets.get(&tp) else {
return (1, Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)));
};
let topic = self.state.topic_name(tp.topic_ref);
debug!(
"fetching retry messages: (fetch-offset: {{\"{}:{}\": {:?}}})",
topic, tp.partition, s
);
(
1,
self.client.fetch_messages_for_partition(
&FetchPartition::new(topic, tp.partition, s.offset).with_max_bytes(s.max_bytes),
),
)
} else {
let client = &mut self.client;
let state = &self.state;
debug!(
"fetching messages: (fetch-offsets: {:?})",
state.fetch_offsets_debug()
);
let reqs = state.fetch_offsets.iter().map(|(tp, s)| {
let topic = state.topic_name(tp.topic_ref);
FetchPartition::new(topic, tp.partition, s.offset).with_max_bytes(s.max_bytes)
});
(
state.fetch_offsets.len() as u32,
client.fetch_messages(reqs),
)
}
}
fn process_fetch_responses(
&mut self,
num_partitions_queried: u32,
resps: Vec<fetch::Response>,
) -> Result<MessageSets> {
let single_partition_consumer = self.single_partition_consumer();
let mut empty = true;
let retry_partitions = &mut self.state.retry_partitions;
for resp in &resps {
for t in resp.topics() {
let topic_ref = self
.state
.assignments
.topic_ref(t.topic())
.expect("unknown topic in response");
for p in t.partitions() {
let tp = state::TopicPartition {
topic_ref,
partition: p.partition(),
};
let data = p.data()?;
let fetch_state = self
.state
.fetch_offsets
.get_mut(&tp)
.expect("non-requested partition");
if let Some(last_msg) = data.messages().last() {
fetch_state.offset = last_msg.offset + 1;
empty = false;
if fetch_state.max_bytes != self.client.fetch_max_bytes_per_partition() {
let prev_max_bytes = fetch_state.max_bytes;
fetch_state.max_bytes = self.client.fetch_max_bytes_per_partition();
debug!(
"reset max_bytes for {}:{} from {} to {}",
t.topic(),
tp.partition,
prev_max_bytes,
fetch_state.max_bytes
);
}
} else {
debug!(
"no data received for {}:{} (max_bytes: {} / fetch_offset: {} / \
highwatermark_offset: {})",
t.topic(),
tp.partition,
fetch_state.max_bytes,
fetch_state.offset,
data.highwatermark_offset()
);
if fetch_state.offset < data.highwatermark_offset() {
if fetch_state.max_bytes < self.config.retry_max_bytes_limit {
let prev_max_bytes = fetch_state.max_bytes;
let incr_max_bytes = prev_max_bytes + prev_max_bytes;
if incr_max_bytes > self.config.retry_max_bytes_limit {
fetch_state.max_bytes = self.config.retry_max_bytes_limit;
} else {
fetch_state.max_bytes = incr_max_bytes;
}
debug!(
"increased max_bytes for {}:{} from {} to {}",
t.topic(),
tp.partition,
prev_max_bytes,
fetch_state.max_bytes
);
} else if num_partitions_queried == 1 {
return Err(Error::Kafka(KafkaCode::MessageSizeTooLarge));
}
if !single_partition_consumer {
debug!("rescheduled for retry: {}:{}", t.topic(), tp.partition);
retry_partitions.push_back(tp);
}
}
}
}
}
}
Ok(MessageSets {
responses: resps,
empty,
})
}
#[must_use]
pub fn last_consumed_message(&self, topic: &str, partition: i32) -> Option<i64> {
self.state
.topic_ref(topic)
.and_then(|tref| {
self.state.consumed_offsets.get(&state::TopicPartition {
topic_ref: tref,
partition,
})
})
.map(|co| co.offset)
}
pub fn consume_message(&mut self, topic: &str, partition: i32, offset: i64) -> Result<()> {
let topic_ref = self
.state
.topic_ref(topic)
.ok_or_else(|| Error::Kafka(KafkaCode::UnknownTopicOrPartition))?;
let tp = state::TopicPartition {
topic_ref,
partition,
};
match self.state.consumed_offsets.entry(tp) {
Entry::Vacant(v) => {
v.insert(state::ConsumedOffset {
offset,
dirty: true,
});
}
Entry::Occupied(mut v) => {
let o = v.get_mut();
if offset > o.offset {
o.offset = offset;
o.dirty = true;
}
}
}
Ok(())
}
pub fn consume_messageset(&mut self, msgs: &MessageSet<'_>) -> Result<()> {
if let Some(last) = msgs.messages().last() {
self.consume_message(msgs.topic(), msgs.partition(), last.offset)
} else {
Ok(())
}
}
pub fn commit_consumed(&mut self) -> Result<()> {
if self.config.group.is_empty() {
return Err(Error::UnsetGroupId);
}
debug!(
"commit_consumed: committing dirty-only consumer offsets (group: {} / offsets: {:?}",
self.config.group,
self.state.consumed_offsets_debug()
);
let (client, state) = (&mut self.client, &mut self.state);
client.commit_offsets(
&self.config.group,
state
.consumed_offsets
.iter()
.filter(|&(_, o)| o.dirty)
.map(|(tp, o)| {
let topic = state.topic_name(tp.topic_ref);
CommitOffset::new(topic, tp.partition, o.offset + 1)
}),
)?;
for co in state.consumed_offsets.values_mut() {
if co.dirty {
co.dirty = false;
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct MessageSets {
responses: Vec<fetch::Response>,
empty: bool,
}
impl MessageSets {
#[must_use]
pub fn is_empty(&self) -> bool {
self.empty
}
#[must_use]
pub fn iter(&self) -> MessageSetsIter<'_> {
<&Self as IntoIterator>::into_iter(self)
}
}
impl<'a> IntoIterator for &'a MessageSets {
type Item = MessageSet<'a>;
type IntoIter = MessageSetsIter<'a>;
fn into_iter(self) -> Self::IntoIter {
let mut responses = self.responses.iter();
let mut topics = responses.next().map(|r| r.topics().iter());
let (curr_topic, partitions) = topics
.as_mut()
.and_then(Iterator::next)
.map_or(("", None), |t| (t.topic(), Some(t.partitions().iter())));
MessageSetsIter {
responses,
topics,
curr_topic,
partitions,
}
}
}
pub struct MessageSet<'a> {
topic: &'a str,
partition: i32,
messages: &'a [Message<'a>],
}
impl<'a> MessageSet<'a> {
#[inline]
#[must_use]
pub fn topic(&self) -> &'a str {
self.topic
}
#[inline]
#[must_use]
pub fn partition(&self) -> i32 {
self.partition
}
#[inline]
#[must_use]
pub fn messages(&self) -> &'a [Message<'a>] {
self.messages
}
}
pub struct MessageSetsIter<'a> {
responses: slice::Iter<'a, fetch::Response>,
topics: Option<slice::Iter<'a, fetch::Topic<'a>>>,
curr_topic: &'a str,
partitions: Option<slice::Iter<'a, fetch::Partition<'a>>>,
}
impl<'a> Iterator for MessageSetsIter<'a> {
type Item = MessageSet<'a>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(p) = self.partitions.as_mut().and_then(Iterator::next) {
if let Some(messages) = p
.data()
.ok()
.map(protocol::fetch::Data::messages)
.filter(|msgs| !msgs.is_empty())
{
return Some(MessageSet {
topic: self.curr_topic,
partition: p.partition(),
messages,
});
}
continue;
}
if let Some(t) = self.topics.as_mut().and_then(Iterator::next) {
self.curr_topic = t.topic();
self.partitions = Some(t.partitions().iter());
continue;
}
if let Some(r) = self.responses.next() {
self.curr_topic = "";
self.topics = Some(r.topics().iter());
continue;
}
return None;
}
}
}