use faucet_core::FaucetError;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PartitionOffset {
pub topic: String,
pub partition: i32,
pub offset: i64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Bookmark {
#[serde(default)]
pub partition_offsets: Vec<PartitionOffset>,
}
impl Bookmark {
pub fn from_value(v: Value) -> Result<Self, FaucetError> {
serde_json::from_value(v)
.map_err(|e| FaucetError::State(format!("kafka bookmark parse: {e}")))
}
pub fn to_value(&self) -> Result<Value, FaucetError> {
serde_json::to_value(self)
.map_err(|e| FaucetError::State(format!("kafka bookmark serialize: {e}")))
}
pub fn from_map(map: HashMap<(String, i32), i64>) -> Self {
let mut entries: Vec<PartitionOffset> = map
.into_iter()
.map(|((topic, partition), offset)| PartitionOffset {
topic,
partition,
offset,
})
.collect();
entries
.sort_by(|a, b| (a.topic.as_str(), a.partition).cmp(&(b.topic.as_str(), b.partition)));
Self {
partition_offsets: entries,
}
}
pub fn merged(
prior: Option<&Bookmark>,
positions: &[PartitionOffset],
consumed: &HashMap<(String, i32), i64>,
) -> Self {
let mut map: HashMap<(String, i32), i64> = HashMap::new();
if let Some(prior) = prior {
for p in &prior.partition_offsets {
map.insert((p.topic.clone(), p.partition), p.offset);
}
}
for p in positions {
map.insert((p.topic.clone(), p.partition), p.offset);
}
for (&(ref topic, partition), &offset) in consumed {
map.insert((topic.clone(), partition), offset);
}
Self::from_map(map)
}
}
pub fn state_key(group_id: &str, topics: &[String]) -> String {
let mut sorted = topics.to_vec();
sorted.sort();
format!("kafka:{group_id}:{}", sorted.join(":"))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::collections::HashMap;
#[test]
fn round_trip_via_value() {
let bookmark = Bookmark {
partition_offsets: vec![
PartitionOffset {
topic: "t".into(),
partition: 0,
offset: 5,
},
PartitionOffset {
topic: "t".into(),
partition: 1,
offset: 9,
},
],
};
let v = bookmark.to_value().unwrap();
let parsed = Bookmark::from_value(v).unwrap();
assert_eq!(parsed.partition_offsets, bookmark.partition_offsets);
}
#[test]
fn from_map_is_deterministic() {
let mut a: HashMap<(String, i32), i64> = HashMap::new();
a.insert(("z".into(), 0), 1);
a.insert(("a".into(), 1), 2);
a.insert(("a".into(), 0), 3);
let b = Bookmark::from_map(a);
let topics: Vec<_> = b
.partition_offsets
.iter()
.map(|p| (p.topic.as_str(), p.partition))
.collect();
assert_eq!(topics, vec![("a", 0), ("a", 1), ("z", 0)]);
}
#[test]
fn state_key_sorts_topics() {
assert_eq!(
state_key("g1", &["beta".into(), "alpha".into()]),
"kafka:g1:alpha:beta"
);
}
#[test]
fn state_key_single_topic() {
assert_eq!(state_key("g1", &["only".into()]), "kafka:g1:only");
}
#[test]
fn state_key_distinguishes_topic_sets_containing_dots() {
let k1 = state_key("g", &["a.b".into(), "c".into()]);
let k2 = state_key("g", &["a".into(), "b.c".into()]);
assert_ne!(k1, k2, "topic sets with dots must not produce the same key");
}
#[test]
fn from_value_rejects_garbage() {
let v = json!({"partition_offsets": "not an array"});
assert!(Bookmark::from_value(v).is_err());
}
#[test]
fn empty_bookmark_round_trips() {
let b = Bookmark::default();
let v = b.to_value().unwrap();
let parsed = Bookmark::from_value(v).unwrap();
assert!(parsed.partition_offsets.is_empty());
}
fn po(topic: &str, partition: i32, offset: i64) -> PartitionOffset {
PartitionOffset {
topic: topic.into(),
partition,
offset,
}
}
fn offsets_of(b: &Bookmark) -> Vec<(&str, i32, i64)> {
b.partition_offsets
.iter()
.map(|p| (p.topic.as_str(), p.partition, p.offset))
.collect()
}
#[test]
fn merged_consumed_overrides_position_and_prior() {
let prior = Bookmark {
partition_offsets: vec![po("orders", 0, 50)],
};
let positions = vec![po("orders", 0, 90)];
let mut consumed = HashMap::new();
consumed.insert(("orders".to_string(), 0), 100);
let merged = Bookmark::merged(Some(&prior), &positions, &consumed);
assert_eq!(offsets_of(&merged), vec![("orders", 0, 100)]);
}
#[test]
fn merged_seeds_empty_assigned_partition_from_position() {
let positions = vec![po("orders", 0, 100), po("orders", 1, 0)];
let mut consumed = HashMap::new();
consumed.insert(("orders".to_string(), 0), 100);
let merged = Bookmark::merged(None, &positions, &consumed);
assert_eq!(
offsets_of(&merged),
vec![("orders", 0, 100), ("orders", 1, 0)],
"the empty-this-run partition must be seeded from its position"
);
}
#[test]
fn merged_carries_forward_prior_when_no_position_or_message() {
let prior = Bookmark {
partition_offsets: vec![po("orders", 2, 777)],
};
let merged = Bookmark::merged(Some(&prior), &[], &HashMap::new());
assert_eq!(offsets_of(&merged), vec![("orders", 2, 777)]);
}
#[test]
fn merged_position_overrides_prior_but_not_consumed() {
let prior = Bookmark {
partition_offsets: vec![po("t", 0, 10), po("t", 1, 20)],
};
let positions = vec![po("t", 0, 15), po("t", 1, 25)];
let mut consumed = HashMap::new();
consumed.insert(("t".to_string(), 1), 30);
let merged = Bookmark::merged(Some(&prior), &positions, &consumed);
assert_eq!(
offsets_of(&merged),
vec![("t", 0, 15), ("t", 1, 30)],
"p0 takes the position; p1 takes the consumed next-offset"
);
}
#[test]
fn merged_all_empty_yields_empty_bookmark() {
let merged = Bookmark::merged(None, &[], &HashMap::new());
assert!(merged.partition_offsets.is_empty());
}
}