Skip to main content

crabka_client_streams/runtime/iqv2/
request.rs

1//! `IQv2` request envelope: `Position`/`PositionBound` (KIP-796 bounded
2//! staleness), partition selection, and the finalized `StateQuery<Q>` that
3//! `KafkaStreams::query` consumes.
4
5use std::collections::{BTreeMap, BTreeSet};
6
7use super::query::Query;
8
9/// Source-topic consumed offsets folded into a store: topic → partition →
10/// offset (the next offset to read, i.e. one past the last consumed record).
11#[derive(Debug, Clone, Default, PartialEq, Eq)]
12pub struct Position(pub BTreeMap<String, BTreeMap<i32, i64>>);
13
14impl Position {
15    /// Offset recorded for one topic-partition, if any.
16    #[must_use]
17    pub fn offset(&self, topic: &str, partition: i32) -> Option<i64> {
18        self.0.get(topic).and_then(|m| m.get(&partition)).copied()
19    }
20
21    /// True if `self` meets or exceeds every `(topic, partition)` offset in
22    /// `bound`. A bound naming a partition `self` has never advanced fails.
23    #[must_use]
24    pub(crate) fn dominates(&self, bound: &Position) -> bool {
25        bound.0.iter().all(|(topic, parts)| {
26            parts
27                .iter()
28                .all(|(p, off)| self.offset(topic, *p).is_some_and(|cur| cur >= *off))
29        })
30    }
31}
32
33/// Freshness bound for a query (KIP-796). `At` requires each partition's
34/// `Position` to dominate the given one, else that partition fails fast with
35/// `NotUpToBound` — the query never blocks.
36#[derive(Debug, Clone, Default)]
37pub enum PositionBound {
38    #[default]
39    Unbounded,
40    At(Position),
41}
42
43/// Which local partitions to query.
44#[derive(Debug, Clone, Default)]
45pub(crate) enum PartitionSel {
46    #[default]
47    All,
48    Set(BTreeSet<i32>),
49}
50
51/// A finalized `IQv2` request: built via
52/// `StateQueryRequest::in_store(name).with_query(q)`.
53pub struct StateQuery<Q: Query> {
54    pub(crate) store: String,
55    pub(crate) query: Q,
56    pub(crate) partitions: PartitionSel,
57    pub(crate) bound: PositionBound,
58    pub(crate) require_active: bool,
59}
60
61impl<Q: Query> StateQuery<Q> {
62    /// Restrict to a specific set of local partitions (default: all).
63    #[must_use]
64    pub fn with_partitions(mut self, set: BTreeSet<i32>) -> Self {
65        self.partitions = PartitionSel::Set(set);
66        self
67    }
68
69    /// Query all locally assigned partitions (the default).
70    #[must_use]
71    pub fn with_all_partitions(mut self) -> Self {
72        self.partitions = PartitionSel::All;
73        self
74    }
75
76    /// Require each queried partition to meet a freshness bound.
77    #[must_use]
78    pub fn with_position_bound(mut self, bound: PositionBound) -> Self {
79        self.bound = bound;
80        self
81    }
82
83    /// Only serve from active (not standby/restoring) tasks.
84    #[must_use]
85    pub fn require_active(mut self) -> Self {
86        self.require_active = true;
87        self
88    }
89}
90
91/// Entry point namespace: `StateQueryRequest::in_store("s").with_query(q)`.
92pub struct StateQueryRequest;
93
94impl StateQueryRequest {
95    /// Begin a request against state store `name`.
96    #[must_use]
97    pub fn in_store(name: impl Into<String>) -> StateQueryRequestBuilder {
98        StateQueryRequestBuilder { store: name.into() }
99    }
100}
101
102/// Half-built request awaiting `.with_query(q)`.
103pub struct StateQueryRequestBuilder {
104    store: String,
105}
106
107impl StateQueryRequestBuilder {
108    /// Attach the query, finalizing the request.
109    #[must_use]
110    pub fn with_query<Q: Query>(self, query: Q) -> StateQuery<Q> {
111        StateQuery {
112            store: self.store,
113            query,
114            partitions: PartitionSel::All,
115            bound: PositionBound::Unbounded,
116            require_active: false,
117        }
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124
125    fn pos(entries: &[(&str, i32, i64)]) -> Position {
126        let mut m: BTreeMap<String, BTreeMap<i32, i64>> = BTreeMap::new();
127        for (t, p, o) in entries {
128            m.entry((*t).to_string()).or_default().insert(*p, *o);
129        }
130        Position(m)
131    }
132
133    #[test]
134    fn dominates_requires_all_bound_partitions_met() {
135        let cur = pos(&[("in", 0, 10), ("in", 1, 5)]);
136        assert!(cur.dominates(&pos(&[("in", 0, 10)])));
137        assert!(cur.dominates(&pos(&[("in", 0, 9), ("in", 1, 5)])));
138        assert!(!cur.dominates(&pos(&[("in", 0, 11)]))); // behind
139        assert!(!cur.dominates(&pos(&[("other", 0, 1)]))); // unknown tp
140    }
141
142    #[test]
143    fn position_offset_reads_present_and_absent_tps() {
144        let p = pos(&[("in", 0, 10), ("in", 1, 5)]);
145        assert_eq!(p.offset("in", 0), Some(10));
146        assert_eq!(p.offset("in", 1), Some(5));
147        assert_eq!(p.offset("in", 2), None); // present topic, absent partition
148        assert_eq!(p.offset("other", 0), None); // absent topic
149    }
150
151    #[test]
152    fn builder_defaults_and_store_name() {
153        use crate::runtime::iqv2::query::KeyQuery;
154
155        let q = StateQueryRequest::in_store("s")
156            .with_query(KeyQuery::<String, i64>::with_key("k".into()));
157        assert_eq!(q.store, "s");
158        assert!(matches!(q.partitions, PartitionSel::All));
159        assert!(matches!(q.bound, PositionBound::Unbounded));
160        assert!(!q.require_active);
161    }
162
163    #[test]
164    fn builder_chain_sets_each_field() {
165        use crate::runtime::iqv2::query::KeyQuery;
166
167        let set: BTreeSet<i32> = [0, 2].into_iter().collect();
168        let q = StateQueryRequest::in_store("s")
169            .with_query(KeyQuery::<String, i64>::with_key("k".into()))
170            .with_partitions(set.clone());
171        match &q.partitions {
172            PartitionSel::Set(s) => assert_eq!(s, &set),
173            PartitionSel::All => panic!("expected explicit partition set"),
174        }
175
176        // with_all_partitions resets back to All.
177        let q = q.with_all_partitions();
178        assert!(matches!(q.partitions, PartitionSel::All));
179
180        // with_position_bound stores the bound.
181        let bound_pos = pos(&[("in", 0, 7)]);
182        let q = q.with_position_bound(PositionBound::At(bound_pos.clone()));
183        match &q.bound {
184            PositionBound::At(p) => assert_eq!(p, &bound_pos),
185            PositionBound::Unbounded => panic!("expected At bound"),
186        }
187
188        // require_active flips the flag.
189        let q = q.require_active();
190        assert!(q.require_active);
191    }
192}