1use bytes::Bytes;
7use tokio::sync::{mpsc, oneshot};
8
9use crate::dsl::windows::{Window, Windowed};
10use crate::error::StreamsClientError;
11use crate::processor::serde::Serde;
12use crate::runtime::iq::{IqError, IqOp, IqPayload, IqRequest};
13use crate::store::iq::StoreKind;
14
15async fn query(
17 tx: &mpsc::Sender<IqRequest>,
18 store: &str,
19 kind: StoreKind,
20 op: IqOp,
21) -> Result<IqPayload, StreamsClientError> {
22 let (reply, rx) = oneshot::channel();
23 tx.send(IqRequest {
24 store: store.to_string(),
25 kind,
26 op,
27 reply,
28 })
29 .await
30 .map_err(|_| StreamsClientError::InteractiveQuery(IqError::RebalanceInProgress))?;
31 rx.await
32 .map_err(|_| StreamsClientError::InteractiveQuery(IqError::RebalanceInProgress))?
33 .map_err(StreamsClientError::InteractiveQuery)
34}
35
36fn deser<T: 'static>(
37 topic: &str,
38 serde: &dyn Serde<T>,
39 bytes: &[u8],
40) -> Result<T, StreamsClientError> {
41 serde
42 .deserialize(topic, bytes)
43 .map_err(|e| StreamsClientError::Runtime(format!("iq deserialize: {e}")))
44}
45
46pub(crate) fn unexpected(p: &IqPayload) -> StreamsClientError {
47 StreamsClientError::Runtime(format!("iq: unexpected payload {p:?}"))
48}
49
50pub(crate) async fn validate(
52 tx: &mpsc::Sender<IqRequest>,
53 store: &str,
54 kind: StoreKind,
55) -> Result<(), StreamsClientError> {
56 match query(tx, store, kind, IqOp::Validate).await? {
57 IqPayload::Validated => Ok(()),
58 other => Err(unexpected(&other)),
59 }
60}
61
62pub struct ReadOnlyKeyValueStore<K, V> {
64 pub(crate) tx: mpsc::Sender<IqRequest>,
65 pub(crate) store: String,
66 pub(crate) key_serde: Box<dyn Serde<K>>,
67 pub(crate) value_serde: Box<dyn Serde<V>>,
68}
69
70impl<K: 'static, V: 'static> ReadOnlyKeyValueStore<K, V> {
71 pub async fn get(&self, key: &K) -> Result<Option<V>, StreamsClientError> {
73 let kb = self.key_serde.serialize(&self.store, key);
74 match query(
75 &self.tx,
76 &self.store,
77 StoreKind::KeyValue,
78 IqOp::KvGet { key: kb },
79 )
80 .await?
81 {
82 IqPayload::Value(Some(vb)) => Ok(Some(deser(&self.store, &*self.value_serde, &vb)?)),
83 IqPayload::Value(None) => Ok(None),
84 other => Err(unexpected(&other)),
85 }
86 }
87
88 pub async fn range(&self, lo: &K, hi: &K) -> Result<Vec<(K, V)>, StreamsClientError> {
90 let lo_b = self.key_serde.serialize(&self.store, lo);
91 let hi_b = self.key_serde.serialize(&self.store, hi);
92 match query(
93 &self.tx,
94 &self.store,
95 StoreKind::KeyValue,
96 IqOp::KvRange { lo: lo_b, hi: hi_b },
97 )
98 .await?
99 {
100 IqPayload::Entries(pairs) => self.decode_pairs(pairs),
101 other => Err(unexpected(&other)),
102 }
103 }
104
105 pub async fn all(&self) -> Result<Vec<(K, V)>, StreamsClientError> {
107 match query(&self.tx, &self.store, StoreKind::KeyValue, IqOp::KvAll).await? {
108 IqPayload::Entries(pairs) => self.decode_pairs(pairs),
109 other => Err(unexpected(&other)),
110 }
111 }
112
113 pub async fn approximate_num_entries(&self) -> Result<u64, StreamsClientError> {
115 match query(
116 &self.tx,
117 &self.store,
118 StoreKind::KeyValue,
119 IqOp::KvApproxCount,
120 )
121 .await?
122 {
123 IqPayload::Count(n) => Ok(n),
124 other => Err(unexpected(&other)),
125 }
126 }
127
128 fn decode_pairs(&self, pairs: Vec<(Bytes, Bytes)>) -> Result<Vec<(K, V)>, StreamsClientError> {
129 pairs
130 .into_iter()
131 .map(|(kb, vb)| {
132 Ok((
133 deser(&self.store, &*self.key_serde, &kb)?,
134 deser(&self.store, &*self.value_serde, &vb)?,
135 ))
136 })
137 .collect()
138 }
139}
140
141pub struct ReadOnlyWindowStore<K, V> {
143 pub(crate) tx: mpsc::Sender<IqRequest>,
144 pub(crate) store: String,
145 pub(crate) key_serde: Box<dyn Serde<K>>,
146 pub(crate) value_serde: Box<dyn Serde<V>>,
147}
148
149impl<K: 'static, V: 'static> ReadOnlyWindowStore<K, V> {
150 pub async fn fetch_single(
152 &self,
153 key: &K,
154 window_start: i64,
155 ) -> Result<Option<V>, StreamsClientError> {
156 let kb = self.key_serde.serialize(&self.store, key);
157 match query(
158 &self.tx,
159 &self.store,
160 StoreKind::Window,
161 IqOp::WindowFetchSingle {
162 key: kb,
163 window_start,
164 },
165 )
166 .await?
167 {
168 IqPayload::Value(Some(vb)) => Ok(Some(deser(&self.store, &*self.value_serde, &vb)?)),
169 IqPayload::Value(None) => Ok(None),
170 other => Err(unexpected(&other)),
171 }
172 }
173
174 pub async fn fetch(
177 &self,
178 key: &K,
179 time_from: i64,
180 time_to: i64,
181 ) -> Result<Vec<(i64, V)>, StreamsClientError> {
182 let kb = self.key_serde.serialize(&self.store, key);
183 match query(
184 &self.tx,
185 &self.store,
186 StoreKind::Window,
187 IqOp::WindowFetch {
188 key: kb,
189 time_from,
190 time_to,
191 },
192 )
193 .await?
194 {
195 IqPayload::WindowEntries(rows) => rows
196 .into_iter()
197 .map(|(t, vb)| Ok((t, deser(&self.store, &*self.value_serde, &vb)?)))
198 .collect(),
199 other => Err(unexpected(&other)),
200 }
201 }
202}
203
204pub struct ReadOnlySessionStore<K, V> {
207 pub(crate) tx: mpsc::Sender<IqRequest>,
208 pub(crate) store: String,
209 pub(crate) key_serde: Box<dyn Serde<K>>,
210 pub(crate) value_serde: Box<dyn Serde<V>>,
211}
212
213impl<K: 'static, V: 'static> ReadOnlySessionStore<K, V> {
214 pub async fn fetch(&self, key: &K) -> Result<Vec<(Windowed<K>, V)>, StreamsClientError> {
216 let kb = self.key_serde.serialize(&self.store, key);
217 match query(
218 &self.tx,
219 &self.store,
220 StoreKind::Session,
221 IqOp::SessionFetchKey { key: kb },
222 )
223 .await?
224 {
225 IqPayload::SessionEntries(rows) => rows
226 .into_iter()
227 .map(|((start, end), vb)| {
228 let k = deser(
230 &self.store,
231 &*self.key_serde,
232 &self.key_serde.serialize(&self.store, key),
233 )?;
234 Ok((
235 Windowed {
236 key: k,
237 window: Window { start, end },
238 },
239 deser(&self.store, &*self.value_serde, &vb)?,
240 ))
241 })
242 .collect(),
243 other => Err(unexpected(&other)),
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::processor::serde::{I64Serde, StringSerde};
252 use crate::runtime::iq::answer_iq;
253 use crate::store::api::KeyValueStore;
254 use crate::store::kv::KeyValueBytesStore;
255 use crate::store::registry::StoreRegistry;
256
257 pub(super) fn servicer(reg: StoreRegistry) -> mpsc::Sender<IqRequest> {
259 let (tx, mut rx) = mpsc::channel::<IqRequest>(16);
260 tokio::spawn(async move {
261 while let Some(req) = rx.recv().await {
262 let matching = reg.iq_get(&req.store).into_iter().collect::<Vec<_>>();
263 let res = answer_iq(matching, req.kind, &req.op, &req.store, true).await;
264 let _ = req.reply.send(res);
265 }
266 });
267 tx
268 }
269
270 async fn kv_registry() -> StoreRegistry {
271 let mut s = KeyValueBytesStore::<String, i64>::in_memory(
272 "counts".into(),
273 Box::new(StringSerde),
274 Box::new(I64Serde),
275 "counts-changelog".into(),
276 );
277 for (k, v) in [("a", 1), ("b", 2), ("c", 3)] {
278 s.put(k.into(), v).await;
279 }
280 let mut reg = StoreRegistry::default();
281 reg.insert(Box::new(s));
282 reg
283 }
284
285 #[tokio::test]
286 async fn kv_view_get_range_all_count() {
287 let tx = servicer(kv_registry().await);
288 let view = ReadOnlyKeyValueStore::<String, i64> {
289 tx,
290 store: "counts".into(),
291 key_serde: Box::new(StringSerde),
292 value_serde: Box::new(I64Serde),
293 };
294 assert_eq!(view.get(&"b".to_string()).await.unwrap(), Some(2));
295 assert_eq!(view.get(&"z".to_string()).await.unwrap(), None);
296 let r = view
297 .range(&"a".to_string(), &"b".to_string())
298 .await
299 .unwrap();
300 assert_eq!(r, vec![("a".to_string(), 1), ("b".to_string(), 2)]);
301 assert_eq!(view.all().await.unwrap().len(), 3);
302 assert_eq!(view.approximate_num_entries().await.unwrap(), 3);
303 }
304
305 async fn window_registry() -> StoreRegistry {
306 use crate::store::window::{WindowBytesStore, WindowStore};
307 let mut s = WindowBytesStore::<String, i64>::in_memory(
308 "wc".into(),
309 Box::new(StringSerde),
310 Box::new(I64Serde),
311 "wc-changelog".into(),
312 1000,
313 );
314 s.put("k".into(), 0, 10, 5).await;
315 s.put("k".into(), 1000, 20, 1005).await;
316 let mut reg = StoreRegistry::default();
317 reg.insert(Box::new(s));
318 reg
319 }
320
321 #[tokio::test]
322 async fn window_view_fetch() {
323 let tx = servicer(window_registry().await);
324 let view = ReadOnlyWindowStore::<String, i64> {
325 tx,
326 store: "wc".into(),
327 key_serde: Box::new(StringSerde),
328 value_serde: Box::new(I64Serde),
329 };
330 assert_eq!(
331 view.fetch_single(&"k".to_string(), 0).await.unwrap(),
332 Some(10)
333 );
334 assert_eq!(view.fetch_single(&"k".to_string(), 5).await.unwrap(), None);
335 let r = view.fetch(&"k".to_string(), 0, 1000).await.unwrap();
336 assert_eq!(r, vec![(0, 10), (1000, 20)]);
337 }
338
339 async fn session_registry() -> StoreRegistry {
340 use crate::store::session::{SessionBytesStore, SessionStore};
341 let mut s = SessionBytesStore::<String, i64>::in_memory(
342 "sc".into(),
343 Box::new(StringSerde),
344 Box::new(I64Serde),
345 "sc-changelog".into(),
346 );
347 s.put("k".into(), 0, 10, 1).await;
348 s.put("k".into(), 20, 30, 2).await;
349 let mut reg = StoreRegistry::default();
350 reg.insert(Box::new(s));
351 reg
352 }
353
354 #[tokio::test]
355 async fn session_view_fetch() {
356 use crate::dsl::windows::Window;
357 let tx = servicer(session_registry().await);
358 let view = ReadOnlySessionStore::<String, i64> {
359 tx,
360 store: "sc".into(),
361 key_serde: Box::new(StringSerde),
362 value_serde: Box::new(I64Serde),
363 };
364 let rows = view.fetch(&"k".to_string()).await.unwrap();
365 let got: Vec<(Window, i64)> = rows.into_iter().map(|(w, v)| (w.window, v)).collect();
366 assert!(got.contains(&(Window { start: 0, end: 10 }, 1)));
367 assert!(got.contains(&(Window { start: 20, end: 30 }, 2)));
368 assert_eq!(got.len(), 2);
369 }
370}