use std::collections::hash_map::{Entry, HashMap};
use std::slice;
use crate::client::fetch;
use crate::client::{CommitOffset, FetchPartition, KafkaClient};
use crate::error::{Error, KafkaCode, Result};
pub use self::builder::Builder;
pub use crate::client::fetch::Message;
pub use crate::client::FetchOffset;
pub use crate::client::GroupOffsetStorage;
mod assignment;
mod builder;
mod config;
mod state;
pub const DEFAULT_RETRY_MAX_BYTES_LIMIT: i32 = 0;
pub const DEFAULT_FALLBACK_OFFSET: FetchOffset = FetchOffset::Latest;
#[derive(Debug)]
pub struct Consumer {
client: KafkaClient,
state: state::State,
config: config::Config,
}
impl Consumer {
pub fn from_client(client: KafkaClient) -> Builder {
builder::new(Some(client), Vec::new())
}
pub fn from_hosts(hosts: Vec<String>) -> Builder {
builder::new(None, hosts)
}
pub fn client(&self) -> &KafkaClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut KafkaClient {
&mut self.client
}
pub fn into_client(self) -> KafkaClient {
self.client
}
pub fn subscriptions(&self) -> HashMap<String, Vec<i32>> {
let mut h: HashMap<String, Vec<i32>> =
HashMap::with_capacity(self.state.assignments.as_slice().len());
let tps = self
.state
.fetch_offsets
.keys()
.map(|tp| (self.state.topic_name(tp.topic_ref), tp.partition));
for tp in tps {
if let Some(ps) = h.get_mut(tp.0) {
ps.push(tp.1);
continue;
}
h.insert(tp.0.to_owned(), vec![tp.1]);
}
h
}
pub fn poll(&mut self) -> Result<MessageSets> {
let (n, resps) = self.fetch_messages();
self.process_fetch_responses(n, resps?)
}
fn single_partition_consumer(&self) -> bool {
self.state.fetch_offsets.len() == 1
}
pub fn group(&self) -> &str {
&self.config.group
}
fn fetch_messages(&mut self) -> (u32, Result<Vec<fetch::Response>>) {
match self.state.retry_partitions.pop_front() {
Some(tp) => {
let s = match self.state.fetch_offsets.get(&tp) {
Some(fstate) => fstate,
None => return (1, Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))),
};
let topic = self.state.topic_name(tp.topic_ref);
debug!(
"fetching retry messages: (fetch-offset: {{\"{}:{}\": {:?}}})",
topic, tp.partition, s
);
(
1,
self.client.fetch_messages_for_partition(
&FetchPartition::new(topic, tp.partition, s.offset)
.with_max_bytes(s.max_bytes),
),
)
}
None => {
let client = &mut self.client;
let state = &self.state;
debug!(
"fetching messages: (fetch-offsets: {:?})",
state.fetch_offsets_debug()
);
let reqs = state.fetch_offsets.iter().map(|(tp, s)| {
let topic = state.topic_name(tp.topic_ref);
FetchPartition::new(topic, tp.partition, s.offset).with_max_bytes(s.max_bytes)
});
(
state.fetch_offsets.len() as u32,
client.fetch_messages(reqs),
)
}
}
}
fn process_fetch_responses(
&mut self,
num_partitions_queried: u32,
resps: Vec<fetch::Response>,
) -> Result<MessageSets> {
let single_partition_consumer = self.single_partition_consumer();
let mut empty = true;
let retry_partitions = &mut self.state.retry_partitions;
for resp in &resps {
for t in resp.topics() {
let topic_ref = self
.state
.assignments
.topic_ref(t.topic())
.expect("unknown topic in response");
for p in t.partitions() {
let tp = state::TopicPartition {
topic_ref,
partition: p.partition(),
};
let data = p.data()?;
let mut fetch_state = self
.state
.fetch_offsets
.get_mut(&tp)
.expect("non-requested partition");
if let Some(last_msg) = data.messages().last() {
fetch_state.offset = last_msg.offset + 1;
empty = false;
if fetch_state.max_bytes != self.client.fetch_max_bytes_per_partition() {
let prev_max_bytes = fetch_state.max_bytes;
fetch_state.max_bytes = self.client.fetch_max_bytes_per_partition();
debug!(
"reset max_bytes for {}:{} from {} to {}",
t.topic(),
tp.partition,
prev_max_bytes,
fetch_state.max_bytes
);
}
} else {
debug!(
"no data received for {}:{} (max_bytes: {} / fetch_offset: {} / \
highwatermark_offset: {})",
t.topic(),
tp.partition,
fetch_state.max_bytes,
fetch_state.offset,
data.highwatermark_offset()
);
if fetch_state.offset < data.highwatermark_offset() {
if fetch_state.max_bytes < self.config.retry_max_bytes_limit {
let prev_max_bytes = fetch_state.max_bytes;
let incr_max_bytes = prev_max_bytes + prev_max_bytes;
if incr_max_bytes > self.config.retry_max_bytes_limit {
fetch_state.max_bytes = self.config.retry_max_bytes_limit;
} else {
fetch_state.max_bytes = incr_max_bytes;
}
debug!(
"increased max_bytes for {}:{} from {} to {}",
t.topic(),
tp.partition,
prev_max_bytes,
fetch_state.max_bytes
);
} else if num_partitions_queried == 1 {
return Err(Error::Kafka(KafkaCode::MessageSizeTooLarge));
}
if !single_partition_consumer {
debug!("rescheduled for retry: {}:{}", t.topic(), tp.partition);
retry_partitions.push_back(tp)
}
}
}
}
}
}
Ok(MessageSets {
responses: resps,
empty,
})
}
pub fn last_consumed_message(&self, topic: &str, partition: i32) -> Option<i64> {
self.state
.topic_ref(topic)
.and_then(|tref| {
self.state.consumed_offsets.get(&state::TopicPartition {
topic_ref: tref,
partition,
})
})
.map(|co| co.offset)
}
pub fn consume_message(&mut self, topic: &str, partition: i32, offset: i64) -> Result<()> {
let topic_ref = match self.state.topic_ref(topic) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(topic_ref) => topic_ref,
};
let tp = state::TopicPartition {
topic_ref,
partition,
};
match self.state.consumed_offsets.entry(tp) {
Entry::Vacant(v) => {
v.insert(state::ConsumedOffset {
offset,
dirty: true,
});
}
Entry::Occupied(mut v) => {
let o = v.get_mut();
if offset > o.offset {
o.offset = offset;
o.dirty = true;
}
}
}
Ok(())
}
pub fn consume_messageset(&mut self, msgs: MessageSet<'_>) -> Result<()> {
if !msgs.messages.is_empty() {
self.consume_message(
msgs.topic,
msgs.partition,
msgs.messages.last().unwrap().offset,
)
} else {
Ok(())
}
}
pub fn commit_consumed(&mut self) -> Result<()> {
if self.config.group.is_empty() {
return Err(Error::UnsetGroupId);
}
debug!(
"commit_consumed: committing dirty-only consumer offsets (group: {} / offsets: {:?}",
self.config.group,
self.state.consumed_offsets_debug()
);
let (client, state) = (&mut self.client, &mut self.state);
client.commit_offsets(
&self.config.group,
state
.consumed_offsets
.iter()
.filter(|&(_, o)| o.dirty)
.map(|(tp, o)| {
let topic = state.topic_name(tp.topic_ref);
CommitOffset::new(topic, tp.partition, o.offset + 1)
}),
)?;
for co in state.consumed_offsets.values_mut() {
if co.dirty {
co.dirty = false;
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct MessageSets {
responses: Vec<fetch::Response>,
empty: bool,
}
impl MessageSets {
pub fn is_empty(&self) -> bool {
self.empty
}
pub fn iter(&self) -> MessageSetsIter<'_> {
let mut responses = self.responses.iter();
let mut topics = responses.next().map(|r| r.topics().iter());
let (curr_topic, partitions) = topics
.as_mut()
.and_then(|t| t.next())
.map_or((None, None), |t| {
(Some(t.topic()), Some(t.partitions().iter()))
});
MessageSetsIter {
responses,
topics,
curr_topic: curr_topic.unwrap_or(""),
partitions,
}
}
}
pub struct MessageSet<'a> {
topic: &'a str,
partition: i32,
messages: &'a [Message<'a>],
}
impl<'a> MessageSet<'a> {
#[inline]
pub fn topic(&self) -> &'a str {
self.topic
}
#[inline]
pub fn partition(&self) -> i32 {
self.partition
}
#[inline]
pub fn messages(&self) -> &'a [Message<'a>] {
self.messages
}
}
pub struct MessageSetsIter<'a> {
responses: slice::Iter<'a, fetch::Response>,
topics: Option<slice::Iter<'a, fetch::Topic<'a>>>,
curr_topic: &'a str,
partitions: Option<slice::Iter<'a, fetch::Partition<'a>>>,
}
impl<'a> Iterator for MessageSetsIter<'a> {
type Item = MessageSet<'a>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(p) = self.partitions.as_mut().and_then(|p| p.next()) {
match p.data() {
Err(_) => {
continue;
}
Ok(pdata) => {
let msgs = pdata.messages();
if msgs.is_empty() {
continue;
} else {
return Some(MessageSet {
topic: self.curr_topic,
partition: p.partition(),
messages: msgs,
});
}
}
}
}
if let Some(t) = self.topics.as_mut().and_then(|t| t.next()) {
self.curr_topic = t.topic();
self.partitions = Some(t.partitions().iter());
continue;
}
if let Some(r) = self.responses.next() {
self.curr_topic = "";
self.topics = Some(r.topics().iter());
continue;
}
return None;
}
}
}