use async_trait::async_trait;
use bytes::Bytes;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StoreKind {
KeyValue,
Window,
Session,
}
#[doc(hidden)]
#[async_trait]
pub trait IqQueryable: Send + Sync {
fn kind(&self) -> StoreKind;
async fn iq_kv_get(&self, _key: &[u8]) -> Option<Bytes> {
None
}
async fn iq_kv_range(&self, _lo: &[u8], _hi: &[u8]) -> Vec<(Bytes, Bytes)> {
Vec::new()
}
async fn iq_kv_all(&self) -> Vec<(Bytes, Bytes)> {
Vec::new()
}
async fn iq_kv_approx_count(&self) -> u64 {
0
}
async fn iq_window_fetch_single(&self, _key: &[u8], _window_start: i64) -> Option<Bytes> {
None
}
async fn iq_window_fetch(
&self,
_key: &[u8],
_time_from: i64,
_time_to: i64,
) -> Vec<(i64, Bytes)> {
Vec::new()
}
async fn iq_session_fetch_key(&self, _key: &[u8]) -> Vec<((i64, i64), Bytes)> {
Vec::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::serde::{I64Serde, Serde, StringSerde};
use crate::store::api::{KeyValueStore, StateStore};
use crate::store::kv::KeyValueBytesStore;
use crate::store::session::{SessionBytesStore, SessionStore};
use crate::store::window::{WindowBytesStore, WindowStore};
#[tokio::test]
async fn kv_get_range_all_count_inclusive() {
let mut s = KeyValueBytesStore::<String, i64>::in_memory(
"c".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"c-changelog".into(),
);
for (k, v) in [("a", 1), ("b", 2), ("c", 3)] {
s.put(k.into(), v).await;
}
let q: &dyn IqQueryable = s.as_iq().unwrap();
assert_eq!(q.iq_kv_get(b"b").await, Some(I64Serde.serialize(&2)));
assert_eq!(q.iq_kv_get(b"z").await, None);
let r = q.iq_kv_range(b"a", b"b").await; assert_eq!(r.len(), 2);
assert_eq!(r[0].0, bytes::Bytes::from_static(b"a"));
assert_eq!(r[1].0, bytes::Bytes::from_static(b"b"));
assert!(q.iq_kv_range(b"c", b"a").await.is_empty()); assert_eq!(q.iq_kv_all().await.len(), 3);
assert_eq!(q.iq_kv_approx_count().await, 3);
}
#[tokio::test]
async fn window_fetch_point_and_range() {
let mut s = WindowBytesStore::<String, i64>::in_memory(
"w".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"w-changelog".into(),
);
s.put("k".into(), 0, 10, 5).await;
s.put("k".into(), 1000, 20, 1005).await;
let q: &dyn IqQueryable = s.as_iq().unwrap();
assert_eq!(
q.iq_window_fetch_single(b"k", 0).await,
Some(I64Serde.serialize(&10))
);
assert_eq!(q.iq_window_fetch_single(b"k", 500).await, None);
let r = q.iq_window_fetch(b"k", 0, 1000).await;
assert_eq!(r.iter().map(|(t, _)| *t).collect::<Vec<_>>(), vec![0, 1000]);
}
#[tokio::test]
async fn session_fetch_key_carries_start_end() {
let mut s = SessionBytesStore::<String, i64>::in_memory(
"s".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"s-changelog".into(),
);
s.put("k".into(), 0, 10, 1).await;
s.put("k".into(), 20, 30, 2).await;
let q: &dyn IqQueryable = s.as_iq().unwrap();
let r = q.iq_session_fetch_key(b"k").await;
let windows: Vec<(i64, i64)> = r.iter().map(|((st, en), _)| (*st, *en)).collect();
assert!(windows.contains(&(0, 10)) && windows.contains(&(20, 30)));
}
}