use bytes::Bytes;
use tokio::sync::oneshot;
use crate::store::iq::{IqQueryable, StoreKind};
#[derive(Debug)]
pub(crate) enum IqOp {
Validate,
KvGet {
key: Bytes,
},
KvRange {
lo: Bytes,
hi: Bytes,
},
KvAll,
KvApproxCount,
WindowFetchSingle {
key: Bytes,
window_start: i64,
},
WindowFetch {
key: Bytes,
time_from: i64,
time_to: i64,
},
SessionFetchKey {
key: Bytes,
},
}
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum IqPayload {
Validated,
Value(Option<Bytes>),
Entries(Vec<(Bytes, Bytes)>),
WindowEntries(Vec<(i64, Bytes)>),
SessionEntries(Vec<((i64, i64), Bytes)>),
Count(u64),
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum IqError {
#[error("state store {0:?} is not assigned to this instance")]
StoreNotFound(String),
#[error("state store {name:?} is a {found:?} store, not {requested:?}")]
WrongStoreKind {
name: String,
found: StoreKind,
requested: StoreKind,
},
#[error("streams instance is not running")]
NotRunning,
#[error("a rebalance is in progress; retry the query")]
RebalanceInProgress,
}
pub(crate) struct IqRequest {
pub store: String,
pub kind: StoreKind,
pub op: IqOp,
pub reply: oneshot::Sender<Result<IqPayload, IqError>>,
}
pub(crate) async fn answer_iq(
matching: Vec<&dyn IqQueryable>,
kind: StoreKind,
op: &IqOp,
store: &str,
any_tasks: bool,
) -> Result<IqPayload, IqError> {
if matching.is_empty() {
return Err(if any_tasks {
IqError::StoreNotFound(store.to_string())
} else {
IqError::RebalanceInProgress
});
}
let found = matching[0].kind();
if found != kind {
return Err(IqError::WrongStoreKind {
name: store.to_string(),
found,
requested: kind,
});
}
Ok(match op {
IqOp::Validate => IqPayload::Validated,
IqOp::KvGet { key } => {
let mut hit = None;
for s in &matching {
if let Some(v) = s.iq_kv_get(key).await {
hit = Some(v);
break;
}
}
IqPayload::Value(hit)
}
IqOp::KvRange { lo, hi } => {
let mut out = Vec::new();
for s in &matching {
out.extend(s.iq_kv_range(lo, hi).await);
}
IqPayload::Entries(out)
}
IqOp::KvAll => {
let mut out = Vec::new();
for s in &matching {
out.extend(s.iq_kv_all().await);
}
IqPayload::Entries(out)
}
IqOp::KvApproxCount => {
let mut n = 0;
for s in &matching {
n += s.iq_kv_approx_count().await;
}
IqPayload::Count(n)
}
IqOp::WindowFetchSingle { key, window_start } => {
let mut hit = None;
for s in &matching {
if let Some(v) = s.iq_window_fetch_single(key, *window_start).await {
hit = Some(v);
break;
}
}
IqPayload::Value(hit)
}
IqOp::WindowFetch {
key,
time_from,
time_to,
} => {
let mut out = Vec::new();
for s in &matching {
out.extend(s.iq_window_fetch(key, *time_from, *time_to).await);
}
IqPayload::WindowEntries(out)
}
IqOp::SessionFetchKey { key } => {
let mut out = Vec::new();
for s in &matching {
out.extend(s.iq_session_fetch_key(key).await);
}
IqPayload::SessionEntries(out)
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::serde::{I64Serde, Serde, StringSerde};
use crate::store::api::{KeyValueStore, StateStore};
use crate::store::kv::KeyValueBytesStore;
#[tokio::test]
async fn answer_kv_get_validate_wrongkind_notfound() {
let mut s = KeyValueBytesStore::<String, i64>::in_memory(
"c".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"c-changelog".into(),
);
s.put("x".into(), 7).await;
let q = s.as_iq().unwrap();
assert_eq!(
answer_iq(vec![q], StoreKind::KeyValue, &IqOp::Validate, "c", true).await,
Ok(IqPayload::Validated)
);
let got = answer_iq(
vec![q],
StoreKind::KeyValue,
&IqOp::KvGet {
key: StringSerde.serialize("t", &"x".to_string()),
},
"c",
true,
)
.await;
assert_eq!(got, Ok(IqPayload::Value(Some(I64Serde.serialize("t", &7)))));
assert!(matches!(
answer_iq(vec![q], StoreKind::Window, &IqOp::Validate, "c", true).await,
Err(IqError::WrongStoreKind { .. })
));
assert_eq!(
answer_iq(
vec![],
StoreKind::KeyValue,
&IqOp::Validate,
"missing",
true
)
.await,
Err(IqError::StoreNotFound("missing".into()))
);
assert_eq!(
answer_iq(
vec![],
StoreKind::KeyValue,
&IqOp::Validate,
"missing",
false
)
.await,
Err(IqError::RebalanceInProgress)
);
}
}