Skip to main content

crabka_client_streams/runtime/iqv2/
query.rs

1//! `IQv2` query objects. Each builder lowers (serde-free) to a
2//! `store::iq::Iq2Query`; the store supplies the actual serdes.
3
4use std::any::Any;
5use std::marker::PhantomData;
6
7use crate::store::iq::{Iq2Query, StoreKind};
8use crate::store::versioned::VersionedRecord;
9
10mod sealed {
11    pub trait Sealed {}
12}
13
14/// A typed `IQv2` query. `Result` is the type `KafkaStreams::query` returns per
15/// partition. Sealed: only the in-crate query types implement it.
16pub trait Query: sealed::Sealed {
17    /// What a successful `QueryResult` carries.
18    type Result: 'static;
19    #[doc(hidden)]
20    fn store_kind(&self) -> StoreKind;
21    #[doc(hidden)]
22    fn lower(self) -> Iq2Query;
23}
24
25/// Single-key lookup. Result: `Option<V>`.
26pub struct KeyQuery<K, V> {
27    key: K,
28    _v: PhantomData<fn() -> V>,
29}
30impl<K, V> KeyQuery<K, V> {
31    #[must_use]
32    pub fn with_key(key: K) -> Self {
33        Self {
34            key,
35            _v: PhantomData,
36        }
37    }
38}
39impl<K: Send + Sync + 'static, V: 'static> sealed::Sealed for KeyQuery<K, V> {}
40impl<K: Send + Sync + 'static, V: 'static> Query for KeyQuery<K, V> {
41    type Result = Option<V>;
42    fn store_kind(&self) -> StoreKind {
43        StoreKind::KeyValue
44    }
45    fn lower(self) -> Iq2Query {
46        Iq2Query::Key {
47            key: Box::new(self.key),
48        }
49    }
50}
51
52/// Key-range scan. `None` bound = unbounded that side. Result: `Vec<(K,V)>`.
53pub struct RangeQuery<K, V> {
54    lo: Option<K>,
55    hi: Option<K>,
56    descending: bool,
57    _v: PhantomData<fn() -> V>,
58}
59impl<K, V> RangeQuery<K, V> {
60    #[must_use]
61    pub fn with_range(lo: K, hi: K) -> Self {
62        Self {
63            lo: Some(lo),
64            hi: Some(hi),
65            descending: false,
66            _v: PhantomData,
67        }
68    }
69    #[must_use]
70    pub fn with_lower_bound(lo: K) -> Self {
71        Self {
72            lo: Some(lo),
73            hi: None,
74            descending: false,
75            _v: PhantomData,
76        }
77    }
78    #[must_use]
79    pub fn with_upper_bound(hi: K) -> Self {
80        Self {
81            lo: None,
82            hi: Some(hi),
83            descending: false,
84            _v: PhantomData,
85        }
86    }
87    #[must_use]
88    pub fn with_no_bounds() -> Self {
89        Self {
90            lo: None,
91            hi: None,
92            descending: false,
93            _v: PhantomData,
94        }
95    }
96    #[must_use]
97    pub fn with_ascending_keys(mut self) -> Self {
98        self.descending = false;
99        self
100    }
101    #[must_use]
102    pub fn with_descending_keys(mut self) -> Self {
103        self.descending = true;
104        self
105    }
106}
107impl<K: Send + Sync + 'static, V: 'static> sealed::Sealed for RangeQuery<K, V> {}
108impl<K: Send + Sync + 'static, V: 'static> Query for RangeQuery<K, V> {
109    type Result = Vec<(K, V)>;
110    fn store_kind(&self) -> StoreKind {
111        StoreKind::KeyValue
112    }
113    fn lower(self) -> Iq2Query {
114        let bx = |k: K| -> Box<dyn Any + Send + Sync> { Box::new(k) };
115        Iq2Query::Range {
116            lo: self.lo.map(bx),
117            hi: self.hi.map(bx),
118            descending: self.descending,
119        }
120    }
121}
122
123/// One key, window starts in `[from_time, to_time]`. Result: `Vec<(i64, V)>`.
124pub struct WindowKeyQuery<K, V> {
125    key: K,
126    from_ts: i64,
127    to_ts: i64,
128    _v: PhantomData<fn() -> V>,
129}
130impl<K, V> WindowKeyQuery<K, V> {
131    #[must_use]
132    pub fn with_key(key: K) -> Self {
133        Self {
134            key,
135            from_ts: i64::MIN,
136            to_ts: i64::MAX,
137            _v: PhantomData,
138        }
139    }
140    #[must_use]
141    pub fn from_time(mut self, t: i64) -> Self {
142        self.from_ts = t;
143        self
144    }
145    #[must_use]
146    pub fn to_time(mut self, t: i64) -> Self {
147        self.to_ts = t;
148        self
149    }
150}
151impl<K: Send + Sync + 'static, V: 'static> sealed::Sealed for WindowKeyQuery<K, V> {}
152impl<K: Send + Sync + 'static, V: 'static> Query for WindowKeyQuery<K, V> {
153    type Result = Vec<(i64, V)>;
154    fn store_kind(&self) -> StoreKind {
155        StoreKind::Window
156    }
157    fn lower(self) -> Iq2Query {
158        Iq2Query::WindowKey {
159            key: Box::new(self.key),
160            from_ts: self.from_ts,
161            to_ts: self.to_ts,
162        }
163    }
164}
165
166/// Key range × window-start range. Result: `Vec<((K, i64), V)>`.
167pub struct WindowRangeQuery<K, V> {
168    lo: Option<K>,
169    hi: Option<K>,
170    from_ts: i64,
171    to_ts: i64,
172    _v: PhantomData<fn() -> V>,
173}
174impl<K, V> WindowRangeQuery<K, V> {
175    #[must_use]
176    pub fn with_key_range(lo: K, hi: K) -> Self {
177        Self {
178            lo: Some(lo),
179            hi: Some(hi),
180            from_ts: i64::MIN,
181            to_ts: i64::MAX,
182            _v: PhantomData,
183        }
184    }
185    #[must_use]
186    pub fn with_all_keys() -> Self {
187        Self {
188            lo: None,
189            hi: None,
190            from_ts: i64::MIN,
191            to_ts: i64::MAX,
192            _v: PhantomData,
193        }
194    }
195    #[must_use]
196    pub fn from_time(mut self, t: i64) -> Self {
197        self.from_ts = t;
198        self
199    }
200    #[must_use]
201    pub fn to_time(mut self, t: i64) -> Self {
202        self.to_ts = t;
203        self
204    }
205}
206impl<K: Send + Sync + 'static, V: 'static> sealed::Sealed for WindowRangeQuery<K, V> {}
207impl<K: Send + Sync + 'static, V: 'static> Query for WindowRangeQuery<K, V> {
208    type Result = Vec<((K, i64), V)>;
209    fn store_kind(&self) -> StoreKind {
210        StoreKind::Window
211    }
212    fn lower(self) -> Iq2Query {
213        let bx = |k: K| -> Box<dyn Any + Send + Sync> { Box::new(k) };
214        Iq2Query::WindowRange {
215            lo: self.lo.map(bx),
216            hi: self.hi.map(bx),
217            from_ts: self.from_ts,
218            to_ts: self.to_ts,
219        }
220    }
221}
222
223/// Single versioned-key lookup (KIP-960). `as_of = None` ⇒ latest live version.
224/// Result: `Option<VersionedRecord<V>>`.
225pub struct VersionedKeyQuery<K, V> {
226    key: K,
227    as_of: Option<i64>,
228    _v: PhantomData<fn() -> V>,
229}
230impl<K, V> VersionedKeyQuery<K, V> {
231    #[must_use]
232    pub fn with_key(key: K) -> Self {
233        Self {
234            key,
235            as_of: None,
236            _v: PhantomData,
237        }
238    }
239    #[must_use]
240    pub fn as_of(mut self, timestamp: i64) -> Self {
241        self.as_of = Some(timestamp);
242        self
243    }
244}
245impl<K: Send + Sync + 'static, V: 'static> sealed::Sealed for VersionedKeyQuery<K, V> {}
246impl<K: Send + Sync + 'static, V: 'static> Query for VersionedKeyQuery<K, V> {
247    type Result = Option<VersionedRecord<V>>;
248    fn store_kind(&self) -> StoreKind {
249        StoreKind::Versioned
250    }
251    fn lower(self) -> Iq2Query {
252        Iq2Query::VersionedKey {
253            key: Box::new(self.key),
254            as_of: self.as_of,
255        }
256    }
257}
258
259/// All versions of a key whose validity overlaps `[from_time, to_time]`
260/// (KIP-968). `None` bound = unbounded that side; ascending by `valid_from`
261/// unless `with_descending_timestamps()`. Result: `Vec<VersionedRecord<V>>`.
262pub struct MultiVersionedKeyQuery<K, V> {
263    key: K,
264    from_ts: Option<i64>,
265    to_ts: Option<i64>,
266    descending: bool,
267    _v: PhantomData<fn() -> V>,
268}
269impl<K, V> MultiVersionedKeyQuery<K, V> {
270    #[must_use]
271    pub fn with_key(key: K) -> Self {
272        Self {
273            key,
274            from_ts: None,
275            to_ts: None,
276            descending: false,
277            _v: PhantomData,
278        }
279    }
280    #[must_use]
281    pub fn from_time(mut self, t: i64) -> Self {
282        self.from_ts = Some(t);
283        self
284    }
285    #[must_use]
286    pub fn to_time(mut self, t: i64) -> Self {
287        self.to_ts = Some(t);
288        self
289    }
290    #[must_use]
291    pub fn with_ascending_timestamps(mut self) -> Self {
292        self.descending = false;
293        self
294    }
295    #[must_use]
296    pub fn with_descending_timestamps(mut self) -> Self {
297        self.descending = true;
298        self
299    }
300}
301impl<K: Send + Sync + 'static, V: 'static> sealed::Sealed for MultiVersionedKeyQuery<K, V> {}
302impl<K: Send + Sync + 'static, V: 'static> Query for MultiVersionedKeyQuery<K, V> {
303    type Result = Vec<VersionedRecord<V>>;
304    fn store_kind(&self) -> StoreKind {
305        StoreKind::Versioned
306    }
307    fn lower(self) -> Iq2Query {
308        Iq2Query::MultiVersionedKey {
309            key: Box::new(self.key),
310            from_ts: self.from_ts,
311            to_ts: self.to_ts,
312            descending: self.descending,
313        }
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320
321    #[test]
322    fn lowering_picks_the_right_variant_and_kind() {
323        let kq = KeyQuery::<String, i64>::with_key("a".into());
324        assert_eq!(kq.store_kind(), StoreKind::KeyValue);
325        assert!(matches!(kq.lower(), Iq2Query::Key { .. }));
326
327        let rq = RangeQuery::<String, i64>::with_lower_bound("a".into()).with_descending_keys();
328        assert!(matches!(
329            rq.lower(),
330            Iq2Query::Range {
331                lo: Some(_),
332                hi: None,
333                descending: true
334            }
335        ));
336
337        let wk = WindowKeyQuery::<String, i64>::with_key("a".into())
338            .from_time(0)
339            .to_time(9);
340        assert_eq!(wk.store_kind(), StoreKind::Window);
341        assert!(matches!(
342            wk.lower(),
343            Iq2Query::WindowKey {
344                from_ts: 0,
345                to_ts: 9,
346                ..
347            }
348        ));
349
350        let wr = WindowRangeQuery::<String, i64>::with_all_keys();
351        assert!(matches!(
352            wr.lower(),
353            Iq2Query::WindowRange {
354                lo: None,
355                hi: None,
356                ..
357            }
358        ));
359    }
360
361    #[test]
362    fn versioned_queries_lower_correctly() {
363        let vk = VersionedKeyQuery::<String, i64>::with_key("k".into()).as_of(250);
364        assert_eq!(vk.store_kind(), StoreKind::Versioned);
365        assert!(matches!(
366            vk.lower(),
367            Iq2Query::VersionedKey {
368                as_of: Some(250),
369                ..
370            }
371        ));
372
373        let vk_latest = VersionedKeyQuery::<String, i64>::with_key("k".into());
374        assert!(matches!(
375            vk_latest.lower(),
376            Iq2Query::VersionedKey { as_of: None, .. }
377        ));
378
379        let mv = MultiVersionedKeyQuery::<String, i64>::with_key("k".into())
380            .from_time(150)
381            .to_time(250)
382            .with_descending_timestamps();
383        assert_eq!(mv.store_kind(), StoreKind::Versioned);
384        assert!(matches!(
385            mv.lower(),
386            Iq2Query::MultiVersionedKey {
387                from_ts: Some(150),
388                to_ts: Some(250),
389                descending: true,
390                ..
391            }
392        ));
393
394        let mv_all = MultiVersionedKeyQuery::<String, i64>::with_key("k".into());
395        assert!(matches!(
396            mv_all.lower(),
397            Iq2Query::MultiVersionedKey {
398                from_ts: None,
399                to_ts: None,
400                descending: false,
401                ..
402            }
403        ));
404
405        // `with_ascending_timestamps()` is the default; calling it explicitly
406        // (after a descending toggle) must restore ascending.
407        let mv_asc = MultiVersionedKeyQuery::<String, i64>::with_key("k".into())
408            .with_descending_timestamps()
409            .with_ascending_timestamps();
410        assert!(matches!(
411            mv_asc.lower(),
412            Iq2Query::MultiVersionedKey {
413                descending: false,
414                ..
415            }
416        ));
417    }
418
419    #[test]
420    fn range_query_bound_variants_lower_correctly() {
421        let both = RangeQuery::<String, i64>::with_range("a".into(), "b".into());
422        assert_eq!(both.store_kind(), StoreKind::KeyValue);
423        assert!(matches!(
424            both.lower(),
425            Iq2Query::Range {
426                lo: Some(_),
427                hi: Some(_),
428                descending: false,
429            }
430        ));
431
432        let upper = RangeQuery::<String, i64>::with_upper_bound("b".into());
433        assert!(matches!(
434            upper.lower(),
435            Iq2Query::Range {
436                lo: None,
437                hi: Some(_),
438                descending: false,
439            }
440        ));
441
442        let none = RangeQuery::<String, i64>::with_no_bounds().with_ascending_keys();
443        assert!(matches!(
444            none.lower(),
445            Iq2Query::Range {
446                lo: None,
447                hi: None,
448                descending: false,
449            }
450        ));
451    }
452
453    #[test]
454    fn window_key_query_default_and_explicit_bounds() {
455        let dflt = WindowKeyQuery::<String, i64>::with_key("a".into());
456        assert!(matches!(
457            dflt.lower(),
458            Iq2Query::WindowKey {
459                from_ts: i64::MIN,
460                to_ts: i64::MAX,
461                ..
462            }
463        ));
464
465        let bounded = WindowKeyQuery::<String, i64>::with_key("a".into())
466            .from_time(5)
467            .to_time(9);
468        assert!(matches!(
469            bounded.lower(),
470            Iq2Query::WindowKey {
471                from_ts: 5,
472                to_ts: 9,
473                ..
474            }
475        ));
476    }
477
478    #[test]
479    fn window_range_query_key_range_and_all_keys() {
480        let ranged = WindowRangeQuery::<String, i64>::with_key_range("a".into(), "b".into())
481            .from_time(1)
482            .to_time(2);
483        assert_eq!(ranged.store_kind(), StoreKind::Window);
484        assert!(matches!(
485            ranged.lower(),
486            Iq2Query::WindowRange {
487                lo: Some(_),
488                hi: Some(_),
489                from_ts: 1,
490                to_ts: 2,
491            }
492        ));
493
494        let all = WindowRangeQuery::<String, i64>::with_all_keys();
495        assert!(matches!(
496            all.lower(),
497            Iq2Query::WindowRange {
498                lo: None,
499                hi: None,
500                ..
501            }
502        ));
503    }
504}