Skip to main content

crabka_client_streams/runtime/
iq_view.rs

1//! Typed, read-only composite views over local state stores (Interactive
2//! Queries). Each view owns its Serdes and round-trips byte-level `IqRequest`s
3//! to the supervisor; results are eagerly materialized `Vec`s (one intentional
4//! divergence from the JVM's lazy `KeyValueIterator`).
5
6use bytes::Bytes;
7use tokio::sync::{mpsc, oneshot};
8
9use crate::dsl::windows::{Window, Windowed};
10use crate::error::StreamsClientError;
11use crate::processor::serde::Serde;
12use crate::runtime::iq::{IqError, IqOp, IqPayload, IqRequest};
13use crate::store::iq::StoreKind;
14
15/// Round-trip one op to the supervisor. Shared by all three views.
16async fn query(
17    tx: &mpsc::Sender<IqRequest>,
18    store: &str,
19    kind: StoreKind,
20    op: IqOp,
21) -> Result<IqPayload, StreamsClientError> {
22    let (reply, rx) = oneshot::channel();
23    tx.send(IqRequest {
24        store: store.to_string(),
25        kind,
26        op,
27        reply,
28    })
29    .await
30    .map_err(|_| StreamsClientError::InteractiveQuery(IqError::RebalanceInProgress))?;
31    rx.await
32        .map_err(|_| StreamsClientError::InteractiveQuery(IqError::RebalanceInProgress))?
33        .map_err(StreamsClientError::InteractiveQuery)
34}
35
36fn deser<T: 'static>(
37    topic: &str,
38    serde: &dyn Serde<T>,
39    bytes: &[u8],
40) -> Result<T, StreamsClientError> {
41    serde
42        .deserialize(topic, bytes)
43        .map_err(|e| StreamsClientError::Runtime(format!("iq deserialize: {e}")))
44}
45
46pub(crate) fn unexpected(p: &IqPayload) -> StreamsClientError {
47    StreamsClientError::Runtime(format!("iq: unexpected payload {p:?}"))
48}
49
50/// Validate a store exists locally + has the requested kind (eager, in accessors).
51pub(crate) async fn validate(
52    tx: &mpsc::Sender<IqRequest>,
53    store: &str,
54    kind: StoreKind,
55) -> Result<(), StreamsClientError> {
56    match query(tx, store, kind, IqOp::Validate).await? {
57        IqPayload::Validated => Ok(()),
58        other => Err(unexpected(&other)),
59    }
60}
61
62/// Read-only composite KV store view (Interactive Queries).
63pub struct ReadOnlyKeyValueStore<K, V> {
64    pub(crate) tx: mpsc::Sender<IqRequest>,
65    pub(crate) store: String,
66    pub(crate) key_serde: Box<dyn Serde<K>>,
67    pub(crate) value_serde: Box<dyn Serde<V>>,
68}
69
70impl<K: 'static, V: 'static> ReadOnlyKeyValueStore<K, V> {
71    /// Value for `key`, or `None` if absent.
72    pub async fn get(&self, key: &K) -> Result<Option<V>, StreamsClientError> {
73        let kb = self.key_serde.serialize(&self.store, key);
74        match query(
75            &self.tx,
76            &self.store,
77            StoreKind::KeyValue,
78            IqOp::KvGet { key: kb },
79        )
80        .await?
81        {
82            IqPayload::Value(Some(vb)) => Ok(Some(deser(&self.store, &*self.value_serde, &vb)?)),
83            IqPayload::Value(None) => Ok(None),
84            other => Err(unexpected(&other)),
85        }
86    }
87
88    /// Inclusive `[lo, hi]` range, ascending memcmp key order.
89    pub async fn range(&self, lo: &K, hi: &K) -> Result<Vec<(K, V)>, StreamsClientError> {
90        let lo_b = self.key_serde.serialize(&self.store, lo);
91        let hi_b = self.key_serde.serialize(&self.store, hi);
92        match query(
93            &self.tx,
94            &self.store,
95            StoreKind::KeyValue,
96            IqOp::KvRange { lo: lo_b, hi: hi_b },
97        )
98        .await?
99        {
100            IqPayload::Entries(pairs) => self.decode_pairs(pairs),
101            other => Err(unexpected(&other)),
102        }
103    }
104
105    /// Every entry.
106    pub async fn all(&self) -> Result<Vec<(K, V)>, StreamsClientError> {
107        match query(&self.tx, &self.store, StoreKind::KeyValue, IqOp::KvAll).await? {
108            IqPayload::Entries(pairs) => self.decode_pairs(pairs),
109            other => Err(unexpected(&other)),
110        }
111    }
112
113    /// Approximate entry count (exact for in-memory; summed across partitions).
114    pub async fn approximate_num_entries(&self) -> Result<u64, StreamsClientError> {
115        match query(
116            &self.tx,
117            &self.store,
118            StoreKind::KeyValue,
119            IqOp::KvApproxCount,
120        )
121        .await?
122        {
123            IqPayload::Count(n) => Ok(n),
124            other => Err(unexpected(&other)),
125        }
126    }
127
128    fn decode_pairs(&self, pairs: Vec<(Bytes, Bytes)>) -> Result<Vec<(K, V)>, StreamsClientError> {
129        pairs
130            .into_iter()
131            .map(|(kb, vb)| {
132                Ok((
133                    deser(&self.store, &*self.key_serde, &kb)?,
134                    deser(&self.store, &*self.value_serde, &vb)?,
135                ))
136            })
137            .collect()
138    }
139}
140
141/// Read-only composite window store view. `fetch` yields `(windowStart, V)`.
142pub struct ReadOnlyWindowStore<K, V> {
143    pub(crate) tx: mpsc::Sender<IqRequest>,
144    pub(crate) store: String,
145    pub(crate) key_serde: Box<dyn Serde<K>>,
146    pub(crate) value_serde: Box<dyn Serde<V>>,
147}
148
149impl<K: 'static, V: 'static> ReadOnlyWindowStore<K, V> {
150    /// Value of the window for `key` starting exactly at `window_start`, else `None`.
151    pub async fn fetch_single(
152        &self,
153        key: &K,
154        window_start: i64,
155    ) -> Result<Option<V>, StreamsClientError> {
156        let kb = self.key_serde.serialize(&self.store, key);
157        match query(
158            &self.tx,
159            &self.store,
160            StoreKind::Window,
161            IqOp::WindowFetchSingle {
162                key: kb,
163                window_start,
164            },
165        )
166        .await?
167        {
168            IqPayload::Value(Some(vb)) => Ok(Some(deser(&self.store, &*self.value_serde, &vb)?)),
169            IqPayload::Value(None) => Ok(None),
170            other => Err(unexpected(&other)),
171        }
172    }
173
174    /// Windows for `key` with start in inclusive `[time_from, time_to]`,
175    /// ascending by start. Each item is `(windowStart, value)`.
176    pub async fn fetch(
177        &self,
178        key: &K,
179        time_from: i64,
180        time_to: i64,
181    ) -> Result<Vec<(i64, V)>, StreamsClientError> {
182        let kb = self.key_serde.serialize(&self.store, key);
183        match query(
184            &self.tx,
185            &self.store,
186            StoreKind::Window,
187            IqOp::WindowFetch {
188                key: kb,
189                time_from,
190                time_to,
191            },
192        )
193        .await?
194        {
195            IqPayload::WindowEntries(rows) => rows
196                .into_iter()
197                .map(|(t, vb)| Ok((t, deser(&self.store, &*self.value_serde, &vb)?)))
198                .collect(),
199            other => Err(unexpected(&other)),
200        }
201    }
202}
203
204/// Read-only composite session store view. `fetch` yields each session as a
205/// `Windowed<K>` (key + `[start, end]`) with its value.
206pub struct ReadOnlySessionStore<K, V> {
207    pub(crate) tx: mpsc::Sender<IqRequest>,
208    pub(crate) store: String,
209    pub(crate) key_serde: Box<dyn Serde<K>>,
210    pub(crate) value_serde: Box<dyn Serde<V>>,
211}
212
213impl<K: 'static, V: 'static> ReadOnlySessionStore<K, V> {
214    /// All sessions for `key`, in store order.
215    pub async fn fetch(&self, key: &K) -> Result<Vec<(Windowed<K>, V)>, StreamsClientError> {
216        let kb = self.key_serde.serialize(&self.store, key);
217        match query(
218            &self.tx,
219            &self.store,
220            StoreKind::Session,
221            IqOp::SessionFetchKey { key: kb },
222        )
223        .await?
224        {
225            IqPayload::SessionEntries(rows) => rows
226                .into_iter()
227                .map(|((start, end), vb)| {
228                    // Re-deserialize the key per row (avoids a `K: Clone` bound).
229                    let k = deser(
230                        &self.store,
231                        &*self.key_serde,
232                        &self.key_serde.serialize(&self.store, key),
233                    )?;
234                    Ok((
235                        Windowed {
236                            key: k,
237                            window: Window { start, end },
238                        },
239                        deser(&self.store, &*self.value_serde, &vb)?,
240                    ))
241                })
242                .collect(),
243            other => Err(unexpected(&other)),
244        }
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::processor::serde::{I64Serde, StringSerde};
252    use crate::runtime::iq::answer_iq;
253    use crate::store::api::KeyValueStore;
254    use crate::store::kv::KeyValueBytesStore;
255    use crate::store::registry::StoreRegistry;
256
257    /// Spawn a tiny servicer over one registry; returns the sender the views use.
258    pub(super) fn servicer(reg: StoreRegistry) -> mpsc::Sender<IqRequest> {
259        let (tx, mut rx) = mpsc::channel::<IqRequest>(16);
260        tokio::spawn(async move {
261            while let Some(req) = rx.recv().await {
262                let matching = reg.iq_get(&req.store).into_iter().collect::<Vec<_>>();
263                let res = answer_iq(matching, req.kind, &req.op, &req.store, true).await;
264                let _ = req.reply.send(res);
265            }
266        });
267        tx
268    }
269
270    async fn kv_registry() -> StoreRegistry {
271        let mut s = KeyValueBytesStore::<String, i64>::in_memory(
272            "counts".into(),
273            Box::new(StringSerde),
274            Box::new(I64Serde),
275            "counts-changelog".into(),
276        );
277        for (k, v) in [("a", 1), ("b", 2), ("c", 3)] {
278            s.put(k.into(), v).await;
279        }
280        let mut reg = StoreRegistry::default();
281        reg.insert(Box::new(s));
282        reg
283    }
284
285    #[tokio::test]
286    async fn kv_view_get_range_all_count() {
287        let tx = servicer(kv_registry().await);
288        let view = ReadOnlyKeyValueStore::<String, i64> {
289            tx,
290            store: "counts".into(),
291            key_serde: Box::new(StringSerde),
292            value_serde: Box::new(I64Serde),
293        };
294        assert_eq!(view.get(&"b".to_string()).await.unwrap(), Some(2));
295        assert_eq!(view.get(&"z".to_string()).await.unwrap(), None);
296        let r = view
297            .range(&"a".to_string(), &"b".to_string())
298            .await
299            .unwrap();
300        assert_eq!(r, vec![("a".to_string(), 1), ("b".to_string(), 2)]);
301        assert_eq!(view.all().await.unwrap().len(), 3);
302        assert_eq!(view.approximate_num_entries().await.unwrap(), 3);
303    }
304
305    async fn window_registry() -> StoreRegistry {
306        use crate::store::window::{WindowBytesStore, WindowStore};
307        let mut s = WindowBytesStore::<String, i64>::in_memory(
308            "wc".into(),
309            Box::new(StringSerde),
310            Box::new(I64Serde),
311            "wc-changelog".into(),
312            1000,
313        );
314        s.put("k".into(), 0, 10, 5).await;
315        s.put("k".into(), 1000, 20, 1005).await;
316        let mut reg = StoreRegistry::default();
317        reg.insert(Box::new(s));
318        reg
319    }
320
321    #[tokio::test]
322    async fn window_view_fetch() {
323        let tx = servicer(window_registry().await);
324        let view = ReadOnlyWindowStore::<String, i64> {
325            tx,
326            store: "wc".into(),
327            key_serde: Box::new(StringSerde),
328            value_serde: Box::new(I64Serde),
329        };
330        assert_eq!(
331            view.fetch_single(&"k".to_string(), 0).await.unwrap(),
332            Some(10)
333        );
334        assert_eq!(view.fetch_single(&"k".to_string(), 5).await.unwrap(), None);
335        let r = view.fetch(&"k".to_string(), 0, 1000).await.unwrap();
336        assert_eq!(r, vec![(0, 10), (1000, 20)]);
337    }
338
339    async fn session_registry() -> StoreRegistry {
340        use crate::store::session::{SessionBytesStore, SessionStore};
341        let mut s = SessionBytesStore::<String, i64>::in_memory(
342            "sc".into(),
343            Box::new(StringSerde),
344            Box::new(I64Serde),
345            "sc-changelog".into(),
346        );
347        s.put("k".into(), 0, 10, 1).await;
348        s.put("k".into(), 20, 30, 2).await;
349        let mut reg = StoreRegistry::default();
350        reg.insert(Box::new(s));
351        reg
352    }
353
354    #[tokio::test]
355    async fn session_view_fetch() {
356        use crate::dsl::windows::Window;
357        let tx = servicer(session_registry().await);
358        let view = ReadOnlySessionStore::<String, i64> {
359            tx,
360            store: "sc".into(),
361            key_serde: Box::new(StringSerde),
362            value_serde: Box::new(I64Serde),
363        };
364        let rows = view.fetch(&"k".to_string()).await.unwrap();
365        let got: Vec<(Window, i64)> = rows.into_iter().map(|(w, v)| (w.window, v)).collect();
366        assert!(got.contains(&(Window { start: 0, end: 10 }, 1)));
367        assert!(got.contains(&(Window { start: 20, end: 30 }, 2)));
368        assert_eq!(got.len(), 2);
369    }
370}