use crate::state::Bookmark;
use faucet_core::FaucetError;
use rdkafka::Offset;
use rdkafka::TopicPartitionList;
use rdkafka::client::ClientContext;
use rdkafka::consumer::base_consumer::BaseConsumer;
use rdkafka::consumer::{Consumer, ConsumerContext, Rebalance, RebalanceProtocol};
use rdkafka::error::KafkaError;
use rdkafka::types::RDKafkaRespErr;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Clone, Default)]
pub(crate) struct BookmarkContext {
pub(crate) pending_bookmark: Arc<Mutex<Option<Bookmark>>>,
pub(crate) start_offsets: Arc<Mutex<Option<Bookmark>>>,
pub(crate) callback_error: Arc<Mutex<Option<FaucetError>>>,
}
impl BookmarkContext {
pub(crate) fn new() -> Self {
Self::default()
}
fn record_error(&self, err: FaucetError) {
let Ok(mut guard) = self.callback_error.lock() else {
return;
};
if guard.is_none() {
*guard = Some(err);
}
}
}
impl ClientContext for BookmarkContext {}
impl ConsumerContext for BookmarkContext {
fn rebalance(
&self,
base_consumer: &BaseConsumer<Self>,
err: RDKafkaRespErr,
tpl: &mut TopicPartitionList,
) {
match err {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => {
let rebalance = Rebalance::Assign(tpl);
self.pre_rebalance(base_consumer, &rebalance);
drop(rebalance);
let bookmark = match self.pending_bookmark.lock() {
Ok(mut guard) => guard.take(),
Err(poisoned) => {
self.record_error(FaucetError::State(format!(
"kafka pending_bookmark mutex poisoned: {poisoned}"
)));
None
}
};
if let Some(bookmark) = bookmark {
let lookup: HashMap<(&str, i32), i64> = bookmark
.partition_offsets
.iter()
.map(|p| ((p.topic.as_str(), p.partition), p.offset))
.collect();
let seeks: Vec<(String, i32, i64)> = tpl
.elements()
.into_iter()
.filter_map(|elem| {
let topic = elem.topic().to_owned();
let partition = elem.partition();
lookup
.get(&(topic.as_str(), partition))
.copied()
.map(|offset| (topic, partition, offset))
})
.collect();
for (topic, partition, offset) in seeks {
if let Err(e) =
tpl.set_partition_offset(&topic, partition, Offset::Offset(offset))
{
self.record_error(FaucetError::State(format!(
"kafka set_partition_offset topic={topic} \
partition={partition} offset={offset}: {e}"
)));
}
}
}
match base_consumer.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
if let Err(e) = base_consumer.incremental_assign(tpl) {
self.record_error(FaucetError::State(format!(
"kafka incremental_assign failed: {e}"
)));
}
}
_ => {
if let Err(e) = base_consumer.assign(tpl) {
self.record_error(FaucetError::State(format!(
"kafka assign failed: {e}"
)));
}
}
}
let rebalance = Rebalance::Assign(tpl);
self.post_rebalance(base_consumer, &rebalance);
}
RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => {
let rebalance = Rebalance::Revoke(tpl);
self.pre_rebalance(base_consumer, &rebalance);
drop(rebalance);
match base_consumer.rebalance_protocol() {
RebalanceProtocol::Cooperative => {
if let Err(e) = base_consumer.incremental_unassign(tpl) {
self.record_error(FaucetError::State(format!(
"kafka incremental_unassign failed: {e}"
)));
}
}
_ => {
if let Err(e) = base_consumer.unassign() {
self.record_error(FaucetError::State(format!(
"kafka unassign failed: {e}"
)));
}
}
}
let rebalance = Rebalance::Revoke(tpl);
self.post_rebalance(base_consumer, &rebalance);
}
_ => {
let error_code = rdkafka::error::RDKafkaErrorCode::from(err);
let rebalance = Rebalance::Error(KafkaError::Rebalance(error_code));
self.pre_rebalance(base_consumer, &rebalance);
self.post_rebalance(base_consumer, &rebalance);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::{Bookmark, PartitionOffset};
#[test]
fn shared_state_round_trips_through_clone() {
let ctx = BookmarkContext::new();
let clone = ctx.clone();
let bookmark = Bookmark {
partition_offsets: vec![PartitionOffset {
topic: "t".into(),
partition: 0,
offset: 42,
}],
};
*clone.pending_bookmark.lock().unwrap() = Some(bookmark.clone());
let read_back = ctx.pending_bookmark.lock().unwrap().clone();
assert_eq!(
read_back.unwrap().partition_offsets,
bookmark.partition_offsets
);
}
#[test]
fn record_error_keeps_first_only() {
let ctx = BookmarkContext::new();
ctx.record_error(FaucetError::State("first".into()));
ctx.record_error(FaucetError::State("second".into()));
let captured = ctx.callback_error.lock().unwrap().take().unwrap();
match captured {
FaucetError::State(msg) => assert_eq!(msg, "first"),
other => panic!("expected State, got {other:?}"),
}
}
}