crabka_client_streams/runtime/iqv2/
request.rs1use std::collections::{BTreeMap, BTreeSet};
6
7use super::query::Query;
8
9#[derive(Debug, Clone, Default, PartialEq, Eq)]
12pub struct Position(pub BTreeMap<String, BTreeMap<i32, i64>>);
13
14impl Position {
15 #[must_use]
17 pub fn offset(&self, topic: &str, partition: i32) -> Option<i64> {
18 self.0.get(topic).and_then(|m| m.get(&partition)).copied()
19 }
20
21 #[must_use]
24 pub(crate) fn dominates(&self, bound: &Position) -> bool {
25 bound.0.iter().all(|(topic, parts)| {
26 parts
27 .iter()
28 .all(|(p, off)| self.offset(topic, *p).is_some_and(|cur| cur >= *off))
29 })
30 }
31}
32
33#[derive(Debug, Clone, Default)]
37pub enum PositionBound {
38 #[default]
39 Unbounded,
40 At(Position),
41}
42
43#[derive(Debug, Clone, Default)]
45pub(crate) enum PartitionSel {
46 #[default]
47 All,
48 Set(BTreeSet<i32>),
49}
50
51pub struct StateQuery<Q: Query> {
54 pub(crate) store: String,
55 pub(crate) query: Q,
56 pub(crate) partitions: PartitionSel,
57 pub(crate) bound: PositionBound,
58 pub(crate) require_active: bool,
59}
60
61impl<Q: Query> StateQuery<Q> {
62 #[must_use]
64 pub fn with_partitions(mut self, set: BTreeSet<i32>) -> Self {
65 self.partitions = PartitionSel::Set(set);
66 self
67 }
68
69 #[must_use]
71 pub fn with_all_partitions(mut self) -> Self {
72 self.partitions = PartitionSel::All;
73 self
74 }
75
76 #[must_use]
78 pub fn with_position_bound(mut self, bound: PositionBound) -> Self {
79 self.bound = bound;
80 self
81 }
82
83 #[must_use]
85 pub fn require_active(mut self) -> Self {
86 self.require_active = true;
87 self
88 }
89}
90
91pub struct StateQueryRequest;
93
94impl StateQueryRequest {
95 #[must_use]
97 pub fn in_store(name: impl Into<String>) -> StateQueryRequestBuilder {
98 StateQueryRequestBuilder { store: name.into() }
99 }
100}
101
102pub struct StateQueryRequestBuilder {
104 store: String,
105}
106
107impl StateQueryRequestBuilder {
108 #[must_use]
110 pub fn with_query<Q: Query>(self, query: Q) -> StateQuery<Q> {
111 StateQuery {
112 store: self.store,
113 query,
114 partitions: PartitionSel::All,
115 bound: PositionBound::Unbounded,
116 require_active: false,
117 }
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124
125 fn pos(entries: &[(&str, i32, i64)]) -> Position {
126 let mut m: BTreeMap<String, BTreeMap<i32, i64>> = BTreeMap::new();
127 for (t, p, o) in entries {
128 m.entry((*t).to_string()).or_default().insert(*p, *o);
129 }
130 Position(m)
131 }
132
133 #[test]
134 fn dominates_requires_all_bound_partitions_met() {
135 let cur = pos(&[("in", 0, 10), ("in", 1, 5)]);
136 assert!(cur.dominates(&pos(&[("in", 0, 10)])));
137 assert!(cur.dominates(&pos(&[("in", 0, 9), ("in", 1, 5)])));
138 assert!(!cur.dominates(&pos(&[("in", 0, 11)]))); assert!(!cur.dominates(&pos(&[("other", 0, 1)]))); }
141
142 #[test]
143 fn position_offset_reads_present_and_absent_tps() {
144 let p = pos(&[("in", 0, 10), ("in", 1, 5)]);
145 assert_eq!(p.offset("in", 0), Some(10));
146 assert_eq!(p.offset("in", 1), Some(5));
147 assert_eq!(p.offset("in", 2), None); assert_eq!(p.offset("other", 0), None); }
150
151 #[test]
152 fn builder_defaults_and_store_name() {
153 use crate::runtime::iqv2::query::KeyQuery;
154
155 let q = StateQueryRequest::in_store("s")
156 .with_query(KeyQuery::<String, i64>::with_key("k".into()));
157 assert_eq!(q.store, "s");
158 assert!(matches!(q.partitions, PartitionSel::All));
159 assert!(matches!(q.bound, PositionBound::Unbounded));
160 assert!(!q.require_active);
161 }
162
163 #[test]
164 fn builder_chain_sets_each_field() {
165 use crate::runtime::iqv2::query::KeyQuery;
166
167 let set: BTreeSet<i32> = [0, 2].into_iter().collect();
168 let q = StateQueryRequest::in_store("s")
169 .with_query(KeyQuery::<String, i64>::with_key("k".into()))
170 .with_partitions(set.clone());
171 match &q.partitions {
172 PartitionSel::Set(s) => assert_eq!(s, &set),
173 PartitionSel::All => panic!("expected explicit partition set"),
174 }
175
176 let q = q.with_all_partitions();
178 assert!(matches!(q.partitions, PartitionSel::All));
179
180 let bound_pos = pos(&[("in", 0, 7)]);
182 let q = q.with_position_bound(PositionBound::At(bound_pos.clone()));
183 match &q.bound {
184 PositionBound::At(p) => assert_eq!(p, &bound_pos),
185 PositionBound::Unbounded => panic!("expected At bound"),
186 }
187
188 let q = q.require_active();
190 assert!(q.require_active);
191 }
192}