1use faucet_core::FaucetError;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::collections::HashMap;
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
23pub struct PartitionOffset {
24 pub topic: String,
25 pub partition: i32,
26 pub offset: i64,
27}
28
29#[derive(Debug, Clone, Default, Serialize, Deserialize)]
30pub struct Bookmark {
31 #[serde(default)]
32 pub partition_offsets: Vec<PartitionOffset>,
33}
34
35impl Bookmark {
36 pub fn from_value(v: Value) -> Result<Self, FaucetError> {
37 serde_json::from_value(v)
38 .map_err(|e| FaucetError::State(format!("kafka bookmark parse: {e}")))
39 }
40
41 pub fn to_value(&self) -> Result<Value, FaucetError> {
42 serde_json::to_value(self)
43 .map_err(|e| FaucetError::State(format!("kafka bookmark serialize: {e}")))
44 }
45
46 pub fn from_map(map: HashMap<(String, i32), i64>) -> Self {
47 let mut entries: Vec<PartitionOffset> = map
48 .into_iter()
49 .map(|((topic, partition), offset)| PartitionOffset {
50 topic,
51 partition,
52 offset,
53 })
54 .collect();
55 entries
57 .sort_by(|a, b| (a.topic.as_str(), a.partition).cmp(&(b.topic.as_str(), b.partition)));
58 Self {
59 partition_offsets: entries,
60 }
61 }
62
63 pub fn merged(
83 prior: Option<&Bookmark>,
84 positions: &[PartitionOffset],
85 consumed: &HashMap<(String, i32), i64>,
86 ) -> Self {
87 let mut map: HashMap<(String, i32), i64> = HashMap::new();
88 if let Some(prior) = prior {
89 for p in &prior.partition_offsets {
90 map.insert((p.topic.clone(), p.partition), p.offset);
91 }
92 }
93 for p in positions {
94 map.insert((p.topic.clone(), p.partition), p.offset);
95 }
96 for (&(ref topic, partition), &offset) in consumed {
97 map.insert((topic.clone(), partition), offset);
98 }
99 Self::from_map(map)
100 }
101}
102
103pub fn state_key(group_id: &str, topics: &[String]) -> String {
112 let mut sorted = topics.to_vec();
113 sorted.sort();
114 format!("kafka:{group_id}:{}", sorted.join(":"))
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use serde_json::json;
121 use std::collections::HashMap;
122
123 #[test]
124 fn round_trip_via_value() {
125 let bookmark = Bookmark {
126 partition_offsets: vec![
127 PartitionOffset {
128 topic: "t".into(),
129 partition: 0,
130 offset: 5,
131 },
132 PartitionOffset {
133 topic: "t".into(),
134 partition: 1,
135 offset: 9,
136 },
137 ],
138 };
139 let v = bookmark.to_value().unwrap();
140 let parsed = Bookmark::from_value(v).unwrap();
141 assert_eq!(parsed.partition_offsets, bookmark.partition_offsets);
142 }
143
144 #[test]
145 fn from_map_is_deterministic() {
146 let mut a: HashMap<(String, i32), i64> = HashMap::new();
147 a.insert(("z".into(), 0), 1);
148 a.insert(("a".into(), 1), 2);
149 a.insert(("a".into(), 0), 3);
150 let b = Bookmark::from_map(a);
151 let topics: Vec<_> = b
152 .partition_offsets
153 .iter()
154 .map(|p| (p.topic.as_str(), p.partition))
155 .collect();
156 assert_eq!(topics, vec![("a", 0), ("a", 1), ("z", 0)]);
157 }
158
159 #[test]
160 fn state_key_sorts_topics() {
161 assert_eq!(
162 state_key("g1", &["beta".into(), "alpha".into()]),
163 "kafka:g1:alpha:beta"
164 );
165 }
166
167 #[test]
168 fn state_key_single_topic() {
169 assert_eq!(state_key("g1", &["only".into()]), "kafka:g1:only");
170 }
171
172 #[test]
173 fn state_key_distinguishes_topic_sets_containing_dots() {
174 let k1 = state_key("g", &["a.b".into(), "c".into()]);
178 let k2 = state_key("g", &["a".into(), "b.c".into()]);
179 assert_ne!(k1, k2, "topic sets with dots must not produce the same key");
180 }
181
182 #[test]
183 fn from_value_rejects_garbage() {
184 let v = json!({"partition_offsets": "not an array"});
185 assert!(Bookmark::from_value(v).is_err());
186 }
187
188 #[test]
189 fn empty_bookmark_round_trips() {
190 let b = Bookmark::default();
191 let v = b.to_value().unwrap();
192 let parsed = Bookmark::from_value(v).unwrap();
193 assert!(parsed.partition_offsets.is_empty());
194 }
195
196 fn po(topic: &str, partition: i32, offset: i64) -> PartitionOffset {
197 PartitionOffset {
198 topic: topic.into(),
199 partition,
200 offset,
201 }
202 }
203
204 fn offsets_of(b: &Bookmark) -> Vec<(&str, i32, i64)> {
205 b.partition_offsets
206 .iter()
207 .map(|p| (p.topic.as_str(), p.partition, p.offset))
208 .collect()
209 }
210
211 #[test]
212 fn merged_consumed_overrides_position_and_prior() {
213 let prior = Bookmark {
217 partition_offsets: vec![po("orders", 0, 50)],
218 };
219 let positions = vec![po("orders", 0, 90)];
220 let mut consumed = HashMap::new();
221 consumed.insert(("orders".to_string(), 0), 100);
222
223 let merged = Bookmark::merged(Some(&prior), &positions, &consumed);
224 assert_eq!(offsets_of(&merged), vec![("orders", 0, 100)]);
225 }
226
227 #[test]
228 fn merged_seeds_empty_assigned_partition_from_position() {
229 let positions = vec![po("orders", 0, 100), po("orders", 1, 0)];
234 let mut consumed = HashMap::new();
235 consumed.insert(("orders".to_string(), 0), 100);
236
237 let merged = Bookmark::merged(None, &positions, &consumed);
238 assert_eq!(
239 offsets_of(&merged),
240 vec![("orders", 0, 100), ("orders", 1, 0)],
241 "the empty-this-run partition must be seeded from its position"
242 );
243 }
244
245 #[test]
246 fn merged_carries_forward_prior_when_no_position_or_message() {
247 let prior = Bookmark {
251 partition_offsets: vec![po("orders", 2, 777)],
252 };
253 let merged = Bookmark::merged(Some(&prior), &[], &HashMap::new());
254 assert_eq!(offsets_of(&merged), vec![("orders", 2, 777)]);
255 }
256
257 #[test]
258 fn merged_position_overrides_prior_but_not_consumed() {
259 let prior = Bookmark {
262 partition_offsets: vec![po("t", 0, 10), po("t", 1, 20)],
263 };
264 let positions = vec![po("t", 0, 15), po("t", 1, 25)];
265 let mut consumed = HashMap::new();
266 consumed.insert(("t".to_string(), 1), 30);
267
268 let merged = Bookmark::merged(Some(&prior), &positions, &consumed);
269 assert_eq!(
270 offsets_of(&merged),
271 vec![("t", 0, 15), ("t", 1, 30)],
272 "p0 takes the position; p1 takes the consumed next-offset"
273 );
274 }
275
276 #[test]
277 fn merged_all_empty_yields_empty_bookmark() {
278 let merged = Bookmark::merged(None, &[], &HashMap::new());
279 assert!(merged.partition_offsets.is_empty());
280 }
281}