crabka_client_streams/runtime/
iq.rs1use bytes::Bytes;
6use tokio::sync::oneshot;
7
8use crate::store::iq::{IqQueryable, StoreKind};
9
10#[derive(Debug)]
12pub(crate) enum IqOp {
13 Validate,
14 KvGet {
15 key: Bytes,
16 },
17 KvRange {
18 lo: Bytes,
19 hi: Bytes,
20 },
21 KvAll,
22 KvApproxCount,
23 WindowFetchSingle {
24 key: Bytes,
25 window_start: i64,
26 },
27 WindowFetch {
28 key: Bytes,
29 time_from: i64,
30 time_to: i64,
31 },
32 SessionFetchKey {
33 key: Bytes,
34 },
35}
36
37#[derive(Debug, PartialEq, Eq)]
39pub(crate) enum IqPayload {
40 Validated,
41 Value(Option<Bytes>),
42 Entries(Vec<(Bytes, Bytes)>),
43 WindowEntries(Vec<(i64, Bytes)>),
44 SessionEntries(Vec<((i64, i64), Bytes)>),
45 Count(u64),
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
50pub enum IqError {
51 #[error("state store {0:?} is not assigned to this instance")]
52 StoreNotFound(String),
53 #[error("state store {name:?} is a {found:?} store, not {requested:?}")]
54 WrongStoreKind {
55 name: String,
56 found: StoreKind,
57 requested: StoreKind,
58 },
59 #[error("streams instance is not running")]
60 NotRunning,
61 #[error("a rebalance is in progress; retry the query")]
62 RebalanceInProgress,
63}
64
65pub(crate) struct IqRequest {
67 pub store: String,
68 pub kind: StoreKind,
69 pub op: IqOp,
70 pub reply: oneshot::Sender<Result<IqPayload, IqError>>,
71}
72
73pub(crate) async fn answer_iq(
78 matching: Vec<&dyn IqQueryable>,
79 kind: StoreKind,
80 op: &IqOp,
81 store: &str,
82 any_tasks: bool,
83) -> Result<IqPayload, IqError> {
84 if matching.is_empty() {
85 return Err(if any_tasks {
86 IqError::StoreNotFound(store.to_string())
87 } else {
88 IqError::RebalanceInProgress
89 });
90 }
91 let found = matching[0].kind();
92 if found != kind {
93 return Err(IqError::WrongStoreKind {
94 name: store.to_string(),
95 found,
96 requested: kind,
97 });
98 }
99 Ok(match op {
100 IqOp::Validate => IqPayload::Validated,
101 IqOp::KvGet { key } => {
102 let mut hit = None;
103 for s in &matching {
104 if let Some(v) = s.iq_kv_get(key).await {
105 hit = Some(v);
106 break;
107 }
108 }
109 IqPayload::Value(hit)
110 }
111 IqOp::KvRange { lo, hi } => {
112 let mut out = Vec::new();
113 for s in &matching {
114 out.extend(s.iq_kv_range(lo, hi).await);
115 }
116 IqPayload::Entries(out)
117 }
118 IqOp::KvAll => {
119 let mut out = Vec::new();
120 for s in &matching {
121 out.extend(s.iq_kv_all().await);
122 }
123 IqPayload::Entries(out)
124 }
125 IqOp::KvApproxCount => {
126 let mut n = 0;
127 for s in &matching {
128 n += s.iq_kv_approx_count().await;
129 }
130 IqPayload::Count(n)
131 }
132 IqOp::WindowFetchSingle { key, window_start } => {
133 let mut hit = None;
134 for s in &matching {
135 if let Some(v) = s.iq_window_fetch_single(key, *window_start).await {
136 hit = Some(v);
137 break;
138 }
139 }
140 IqPayload::Value(hit)
141 }
142 IqOp::WindowFetch {
143 key,
144 time_from,
145 time_to,
146 } => {
147 let mut out = Vec::new();
148 for s in &matching {
149 out.extend(s.iq_window_fetch(key, *time_from, *time_to).await);
150 }
151 IqPayload::WindowEntries(out)
152 }
153 IqOp::SessionFetchKey { key } => {
154 let mut out = Vec::new();
155 for s in &matching {
156 out.extend(s.iq_session_fetch_key(key).await);
157 }
158 IqPayload::SessionEntries(out)
159 }
160 })
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use crate::processor::serde::{I64Serde, Serde, StringSerde};
167 use crate::store::api::{KeyValueStore, StateStore};
168 use crate::store::kv::KeyValueBytesStore;
169
170 #[tokio::test]
171 async fn answer_kv_get_validate_wrongkind_notfound() {
172 let mut s = KeyValueBytesStore::<String, i64>::in_memory(
173 "c".into(),
174 Box::new(StringSerde),
175 Box::new(I64Serde),
176 "c-changelog".into(),
177 );
178 s.put("x".into(), 7).await;
179 let q = s.as_iq().unwrap();
180
181 assert_eq!(
182 answer_iq(vec![q], StoreKind::KeyValue, &IqOp::Validate, "c", true).await,
183 Ok(IqPayload::Validated)
184 );
185 let got = answer_iq(
186 vec![q],
187 StoreKind::KeyValue,
188 &IqOp::KvGet {
189 key: StringSerde.serialize("t", &"x".to_string()),
190 },
191 "c",
192 true,
193 )
194 .await;
195 assert_eq!(got, Ok(IqPayload::Value(Some(I64Serde.serialize("t", &7)))));
196 assert!(matches!(
197 answer_iq(vec![q], StoreKind::Window, &IqOp::Validate, "c", true).await,
198 Err(IqError::WrongStoreKind { .. })
199 ));
200 assert_eq!(
201 answer_iq(
202 vec![],
203 StoreKind::KeyValue,
204 &IqOp::Validate,
205 "missing",
206 true
207 )
208 .await,
209 Err(IqError::StoreNotFound("missing".into()))
210 );
211 assert_eq!(
212 answer_iq(
213 vec![],
214 StoreKind::KeyValue,
215 &IqOp::Validate,
216 "missing",
217 false
218 )
219 .await,
220 Err(IqError::RebalanceInProgress)
221 );
222 }
223}