Skip to main content

faucet_source_kafka/
state.rs

1//! Bookmark <-> JSON serialization for Kafka offset progress.
2//!
3//! Bookmark shape (round-trips through `serde_json::Value`):
4//!
5//! ```json
6//! {
7//!   "partition_offsets": [
8//!     {"topic": "orders", "partition": 0, "offset": 1234},
9//!     {"topic": "orders", "partition": 1, "offset":  987}
10//!   ]
11//! }
12//! ```
13//!
14//! `offset` is the next offset to read (i.e. one past the highest offset
15//! whose value has been written to the sink).
16
17use faucet_core::FaucetError;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::collections::HashMap;
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
23pub struct PartitionOffset {
24    pub topic: String,
25    pub partition: i32,
26    pub offset: i64,
27}
28
29#[derive(Debug, Clone, Default, Serialize, Deserialize)]
30pub struct Bookmark {
31    #[serde(default)]
32    pub partition_offsets: Vec<PartitionOffset>,
33}
34
35impl Bookmark {
36    pub fn from_value(v: Value) -> Result<Self, FaucetError> {
37        serde_json::from_value(v)
38            .map_err(|e| FaucetError::State(format!("kafka bookmark parse: {e}")))
39    }
40
41    pub fn to_value(&self) -> Result<Value, FaucetError> {
42        serde_json::to_value(self)
43            .map_err(|e| FaucetError::State(format!("kafka bookmark serialize: {e}")))
44    }
45
46    pub fn from_map(map: HashMap<(String, i32), i64>) -> Self {
47        let mut entries: Vec<PartitionOffset> = map
48            .into_iter()
49            .map(|((topic, partition), offset)| PartitionOffset {
50                topic,
51                partition,
52                offset,
53            })
54            .collect();
55        // Deterministic order makes diffs in state-store files reviewable.
56        entries
57            .sort_by(|a, b| (a.topic.as_str(), a.partition).cmp(&(b.topic.as_str(), b.partition)));
58        Self {
59            partition_offsets: entries,
60        }
61    }
62
63    /// Build the bookmark to persist for a run/page, merging three offset
64    /// sources in **increasing precedence** so no assigned partition is ever
65    /// silently dropped from the bookmark (the H9 data-loss fix):
66    ///
67    /// 1. `prior` — offsets from the bookmark applied at run start
68    ///    (carry-forward). A partition consumed in an earlier run but
69    ///    assigned-yet-empty in this one keeps its last-known offset instead
70    ///    of vanishing. Acts as the safety net when librdkafka has not yet
71    ///    resolved a position for an assigned partition.
72    /// 2. `positions` — the consumer's current position for every *assigned*
73    ///    partition (from `rdkafka::consumer::Consumer::position`). This is
74    ///    the crux of the fix: a partition that produced **no** message this
75    ///    run still records where the consumer actually sits, so the next
76    ///    resume seeks it to that offset instead of leaving it absent — an
77    ///    absent partition resets to `auto.offset.reset` (default `latest`)
78    ///    and silently skips any records that arrived in the meantime.
79    /// 3. `consumed` — the next offset after each message actually delivered
80    ///    this run (authoritative: we definitely read up to here, so this
81    ///    overrides a position/prior value for the same partition).
82    pub fn merged(
83        prior: Option<&Bookmark>,
84        positions: &[PartitionOffset],
85        consumed: &HashMap<(String, i32), i64>,
86    ) -> Self {
87        let mut map: HashMap<(String, i32), i64> = HashMap::new();
88        if let Some(prior) = prior {
89            for p in &prior.partition_offsets {
90                map.insert((p.topic.clone(), p.partition), p.offset);
91            }
92        }
93        for p in positions {
94            map.insert((p.topic.clone(), p.partition), p.offset);
95        }
96        for (&(ref topic, partition), &offset) in consumed {
97            map.insert((topic.clone(), partition), offset);
98        }
99        Self::from_map(map)
100    }
101}
102
103/// Generate the `state_key` for a `(group_id, topics)` pair.
104///
105/// Topics are sorted before joining so the key is stable regardless of config
106/// ordering. Topics are joined with `:` rather than `.`: a Kafka topic name may
107/// legally contain `.` (e.g. `orders.eu`), so a `.` join made `["a.b","c"]` and
108/// `["a","b.c"]` collide on a shared `group_id`. `:` is **not** a legal Kafka
109/// topic character, so it is an unambiguous separator — and it is permitted in a
110/// state key per [`faucet_core::state::validate_state_key`] (`[A-Za-z0-9_:.-]`).
111pub fn state_key(group_id: &str, topics: &[String]) -> String {
112    let mut sorted = topics.to_vec();
113    sorted.sort();
114    format!("kafka:{group_id}:{}", sorted.join(":"))
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use serde_json::json;
121    use std::collections::HashMap;
122
123    #[test]
124    fn round_trip_via_value() {
125        let bookmark = Bookmark {
126            partition_offsets: vec![
127                PartitionOffset {
128                    topic: "t".into(),
129                    partition: 0,
130                    offset: 5,
131                },
132                PartitionOffset {
133                    topic: "t".into(),
134                    partition: 1,
135                    offset: 9,
136                },
137            ],
138        };
139        let v = bookmark.to_value().unwrap();
140        let parsed = Bookmark::from_value(v).unwrap();
141        assert_eq!(parsed.partition_offsets, bookmark.partition_offsets);
142    }
143
144    #[test]
145    fn from_map_is_deterministic() {
146        let mut a: HashMap<(String, i32), i64> = HashMap::new();
147        a.insert(("z".into(), 0), 1);
148        a.insert(("a".into(), 1), 2);
149        a.insert(("a".into(), 0), 3);
150        let b = Bookmark::from_map(a);
151        let topics: Vec<_> = b
152            .partition_offsets
153            .iter()
154            .map(|p| (p.topic.as_str(), p.partition))
155            .collect();
156        assert_eq!(topics, vec![("a", 0), ("a", 1), ("z", 0)]);
157    }
158
159    #[test]
160    fn state_key_sorts_topics() {
161        assert_eq!(
162            state_key("g1", &["beta".into(), "alpha".into()]),
163            "kafka:g1:alpha:beta"
164        );
165    }
166
167    #[test]
168    fn state_key_single_topic() {
169        assert_eq!(state_key("g1", &["only".into()]), "kafka:g1:only");
170    }
171
172    #[test]
173    fn state_key_distinguishes_topic_sets_containing_dots() {
174        // Topic names legally contain dots. Joining with '.' made
175        // ["a.b","c"] and ["a","b.c"] collide on a shared group_id, so two
176        // distinct subscriptions would clobber each other's bookmark.
177        let k1 = state_key("g", &["a.b".into(), "c".into()]);
178        let k2 = state_key("g", &["a".into(), "b.c".into()]);
179        assert_ne!(k1, k2, "topic sets with dots must not produce the same key");
180    }
181
182    #[test]
183    fn from_value_rejects_garbage() {
184        let v = json!({"partition_offsets": "not an array"});
185        assert!(Bookmark::from_value(v).is_err());
186    }
187
188    #[test]
189    fn empty_bookmark_round_trips() {
190        let b = Bookmark::default();
191        let v = b.to_value().unwrap();
192        let parsed = Bookmark::from_value(v).unwrap();
193        assert!(parsed.partition_offsets.is_empty());
194    }
195
196    fn po(topic: &str, partition: i32, offset: i64) -> PartitionOffset {
197        PartitionOffset {
198            topic: topic.into(),
199            partition,
200            offset,
201        }
202    }
203
204    fn offsets_of(b: &Bookmark) -> Vec<(&str, i32, i64)> {
205        b.partition_offsets
206            .iter()
207            .map(|p| (p.topic.as_str(), p.partition, p.offset))
208            .collect()
209    }
210
211    #[test]
212    fn merged_consumed_overrides_position_and_prior() {
213        // A partition that delivered messages this run: the per-message
214        // next-offset is authoritative and wins over both the consumer
215        // position and the carry-forward value.
216        let prior = Bookmark {
217            partition_offsets: vec![po("orders", 0, 50)],
218        };
219        let positions = vec![po("orders", 0, 90)];
220        let mut consumed = HashMap::new();
221        consumed.insert(("orders".to_string(), 0), 100);
222
223        let merged = Bookmark::merged(Some(&prior), &positions, &consumed);
224        assert_eq!(offsets_of(&merged), vec![("orders", 0, 100)]);
225    }
226
227    #[test]
228    fn merged_seeds_empty_assigned_partition_from_position() {
229        // The H9 case: partition 1 produced no message this run (absent from
230        // `consumed`) but is assigned, so its current position must be
231        // recorded — otherwise it would be missing from the bookmark and
232        // reset to `auto.offset.reset` on the next resume, skipping records.
233        let positions = vec![po("orders", 0, 100), po("orders", 1, 0)];
234        let mut consumed = HashMap::new();
235        consumed.insert(("orders".to_string(), 0), 100);
236
237        let merged = Bookmark::merged(None, &positions, &consumed);
238        assert_eq!(
239            offsets_of(&merged),
240            vec![("orders", 0, 100), ("orders", 1, 0)],
241            "the empty-this-run partition must be seeded from its position"
242        );
243    }
244
245    #[test]
246    fn merged_carries_forward_prior_when_no_position_or_message() {
247        // A previously-known partition with neither a fresh position nor a
248        // delivered message this run keeps its prior offset rather than being
249        // dropped (safety net for an unresolved position).
250        let prior = Bookmark {
251            partition_offsets: vec![po("orders", 2, 777)],
252        };
253        let merged = Bookmark::merged(Some(&prior), &[], &HashMap::new());
254        assert_eq!(offsets_of(&merged), vec![("orders", 2, 777)]);
255    }
256
257    #[test]
258    fn merged_position_overrides_prior_but_not_consumed() {
259        // position > prior (we trust where the consumer currently sits over a
260        // stale carry-forward), but consumed still wins over both.
261        let prior = Bookmark {
262            partition_offsets: vec![po("t", 0, 10), po("t", 1, 20)],
263        };
264        let positions = vec![po("t", 0, 15), po("t", 1, 25)];
265        let mut consumed = HashMap::new();
266        consumed.insert(("t".to_string(), 1), 30);
267
268        let merged = Bookmark::merged(Some(&prior), &positions, &consumed);
269        assert_eq!(
270            offsets_of(&merged),
271            vec![("t", 0, 15), ("t", 1, 30)],
272            "p0 takes the position; p1 takes the consumed next-offset"
273        );
274    }
275
276    #[test]
277    fn merged_all_empty_yields_empty_bookmark() {
278        let merged = Bookmark::merged(None, &[], &HashMap::new());
279        assert!(merged.partition_offsets.is_empty());
280    }
281}