Skip to main content

crabka_client_streams/runtime/
iq.rs

1//! Interactive-query channel protocol. The `KafkaStreams` handle sends byte-level
2//! `IqRequest`s to the supervisor task, which resolves them against local stores
3//! with `answer_iq` and replies on a `oneshot`.
4
5use bytes::Bytes;
6use tokio::sync::oneshot;
7
8use crate::store::iq::{IqQueryable, StoreKind};
9
10/// A byte-level query op. No `K`/`V` — the typed view (de)serializes.
11#[derive(Debug)]
12pub(crate) enum IqOp {
13    Validate,
14    KvGet {
15        key: Bytes,
16    },
17    KvRange {
18        lo: Bytes,
19        hi: Bytes,
20    },
21    KvAll,
22    KvApproxCount,
23    WindowFetchSingle {
24        key: Bytes,
25        window_start: i64,
26    },
27    WindowFetch {
28        key: Bytes,
29        time_from: i64,
30        time_to: i64,
31    },
32    SessionFetchKey {
33        key: Bytes,
34    },
35}
36
37/// A byte-level query result.
38#[derive(Debug, PartialEq, Eq)]
39pub(crate) enum IqPayload {
40    Validated,
41    Value(Option<Bytes>),
42    Entries(Vec<(Bytes, Bytes)>),
43    WindowEntries(Vec<(i64, Bytes)>),
44    SessionEntries(Vec<((i64, i64), Bytes)>),
45    Count(u64),
46}
47
48/// Why an interactive query failed.
49#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
50pub enum IqError {
51    #[error("state store {0:?} is not assigned to this instance")]
52    StoreNotFound(String),
53    #[error("state store {name:?} is a {found:?} store, not {requested:?}")]
54    WrongStoreKind {
55        name: String,
56        found: StoreKind,
57        requested: StoreKind,
58    },
59    #[error("streams instance is not running")]
60    NotRunning,
61    #[error("a rebalance is in progress; retry the query")]
62    RebalanceInProgress,
63}
64
65/// One query addressed to the supervisor.
66pub(crate) struct IqRequest {
67    pub store: String,
68    pub kind: StoreKind,
69    pub op: IqOp,
70    pub reply: oneshot::Sender<Result<IqPayload, IqError>>,
71}
72
73/// Resolve one op against every local store named `store` (composite across
74/// partitions). `matching` is the set of `IqQueryable` views for that name on
75/// this instance; `any_tasks` distinguishes "rebalancing" (no tasks at all)
76/// from "store genuinely not assigned" (have tasks, none host this store).
77pub(crate) async fn answer_iq(
78    matching: Vec<&dyn IqQueryable>,
79    kind: StoreKind,
80    op: &IqOp,
81    store: &str,
82    any_tasks: bool,
83) -> Result<IqPayload, IqError> {
84    if matching.is_empty() {
85        return Err(if any_tasks {
86            IqError::StoreNotFound(store.to_string())
87        } else {
88            IqError::RebalanceInProgress
89        });
90    }
91    let found = matching[0].kind();
92    if found != kind {
93        return Err(IqError::WrongStoreKind {
94            name: store.to_string(),
95            found,
96            requested: kind,
97        });
98    }
99    Ok(match op {
100        IqOp::Validate => IqPayload::Validated,
101        IqOp::KvGet { key } => {
102            let mut hit = None;
103            for s in &matching {
104                if let Some(v) = s.iq_kv_get(key).await {
105                    hit = Some(v);
106                    break;
107                }
108            }
109            IqPayload::Value(hit)
110        }
111        IqOp::KvRange { lo, hi } => {
112            let mut out = Vec::new();
113            for s in &matching {
114                out.extend(s.iq_kv_range(lo, hi).await);
115            }
116            IqPayload::Entries(out)
117        }
118        IqOp::KvAll => {
119            let mut out = Vec::new();
120            for s in &matching {
121                out.extend(s.iq_kv_all().await);
122            }
123            IqPayload::Entries(out)
124        }
125        IqOp::KvApproxCount => {
126            let mut n = 0;
127            for s in &matching {
128                n += s.iq_kv_approx_count().await;
129            }
130            IqPayload::Count(n)
131        }
132        IqOp::WindowFetchSingle { key, window_start } => {
133            let mut hit = None;
134            for s in &matching {
135                if let Some(v) = s.iq_window_fetch_single(key, *window_start).await {
136                    hit = Some(v);
137                    break;
138                }
139            }
140            IqPayload::Value(hit)
141        }
142        IqOp::WindowFetch {
143            key,
144            time_from,
145            time_to,
146        } => {
147            let mut out = Vec::new();
148            for s in &matching {
149                out.extend(s.iq_window_fetch(key, *time_from, *time_to).await);
150            }
151            IqPayload::WindowEntries(out)
152        }
153        IqOp::SessionFetchKey { key } => {
154            let mut out = Vec::new();
155            for s in &matching {
156                out.extend(s.iq_session_fetch_key(key).await);
157            }
158            IqPayload::SessionEntries(out)
159        }
160    })
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use crate::processor::serde::{I64Serde, Serde, StringSerde};
167    use crate::store::api::{KeyValueStore, StateStore};
168    use crate::store::kv::KeyValueBytesStore;
169
170    #[tokio::test]
171    async fn answer_kv_get_validate_wrongkind_notfound() {
172        let mut s = KeyValueBytesStore::<String, i64>::in_memory(
173            "c".into(),
174            Box::new(StringSerde),
175            Box::new(I64Serde),
176            "c-changelog".into(),
177        );
178        s.put("x".into(), 7).await;
179        let q = s.as_iq().unwrap();
180
181        assert_eq!(
182            answer_iq(vec![q], StoreKind::KeyValue, &IqOp::Validate, "c", true).await,
183            Ok(IqPayload::Validated)
184        );
185        let got = answer_iq(
186            vec![q],
187            StoreKind::KeyValue,
188            &IqOp::KvGet {
189                key: StringSerde.serialize("t", &"x".to_string()),
190            },
191            "c",
192            true,
193        )
194        .await;
195        assert_eq!(got, Ok(IqPayload::Value(Some(I64Serde.serialize("t", &7)))));
196        assert!(matches!(
197            answer_iq(vec![q], StoreKind::Window, &IqOp::Validate, "c", true).await,
198            Err(IqError::WrongStoreKind { .. })
199        ));
200        assert_eq!(
201            answer_iq(
202                vec![],
203                StoreKind::KeyValue,
204                &IqOp::Validate,
205                "missing",
206                true
207            )
208            .await,
209            Err(IqError::StoreNotFound("missing".into()))
210        );
211        assert_eq!(
212            answer_iq(
213                vec![],
214                StoreKind::KeyValue,
215                &IqOp::Validate,
216                "missing",
217                false
218            )
219            .await,
220            Err(IqError::RebalanceInProgress)
221        );
222    }
223}