use std::any::Any;
use async_trait::async_trait;
use bytes::Bytes;
use crate::processor::serde::Serde;
use crate::store::api::{KeyValueStore, StateStore};
use crate::store::byte::{ByteKeyValueStore, InMemoryBytes};
pub struct KeyValueBytesStore<K, V> {
name: String,
changelog_topic: String,
backend: Box<dyn ByteKeyValueStore>,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
changelog: Vec<(Bytes, Option<Bytes>)>,
logging: bool,
}
impl<K: 'static, V: 'static> KeyValueBytesStore<K, V> {
#[must_use]
pub(crate) fn new(
name: String,
backend: Box<dyn ByteKeyValueStore>,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
changelog_topic: String,
) -> Self {
Self {
name,
changelog_topic,
backend,
key_serde,
value_serde,
changelog: Vec::new(),
logging: true,
}
}
#[must_use]
pub fn in_memory(
name: String,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
changelog_topic: String,
) -> Self {
Self::new(
name,
Box::new(InMemoryBytes::default()),
key_serde,
value_serde,
changelog_topic,
)
}
}
#[async_trait]
impl<K: Send + 'static, V: Send + 'static> StateStore for KeyValueBytesStore<K, V> {
fn name(&self) -> &str {
&self.name
}
async fn flush(&mut self) {}
fn close(&mut self) {}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn changelog_topic(&self) -> &str {
&self.changelog_topic
}
fn take_changelog(&mut self) -> Vec<(Bytes, Option<Bytes>)> {
std::mem::take(&mut self.changelog)
}
async fn apply_changelog(&mut self, key: Bytes, value: Option<Bytes>) {
match value {
Some(v) => self.backend.put(key, v).await,
None => {
self.backend.delete(&key).await;
}
}
}
fn set_logging(&mut self, on: bool) {
self.logging = on;
}
fn as_iq(&self) -> Option<&dyn crate::store::iq::IqQueryable> {
Some(self)
}
async fn clear(&mut self) {
self.backend.clear().await;
self.changelog.clear();
}
}
#[async_trait::async_trait]
impl<K: 'static, V: 'static> crate::store::iq::IqQueryable for KeyValueBytesStore<K, V> {
fn kind(&self) -> crate::store::iq::StoreKind {
crate::store::iq::StoreKind::KeyValue
}
async fn iq_kv_get(&self, key: &[u8]) -> Option<bytes::Bytes> {
self.backend.get(key).await
}
async fn iq_kv_range(&self, lo: &[u8], hi: &[u8]) -> Vec<(bytes::Bytes, bytes::Bytes)> {
let mut hi_succ = hi.to_vec();
hi_succ.push(0);
self.backend.range(lo, &hi_succ).await
}
async fn iq_kv_all(&self) -> Vec<(bytes::Bytes, bytes::Bytes)> {
self.backend.scan_all().await
}
async fn iq_kv_approx_count(&self) -> u64 {
self.backend.approx_len().await
}
}
#[async_trait]
impl<K: Send + Sync + 'static, V: Send + 'static> KeyValueStore<K, V> for KeyValueBytesStore<K, V> {
async fn get(&self, key: &K) -> Option<V> {
let kb = self.key_serde.serialize(key);
self.backend.get(&kb).await.map(|vb| {
self.value_serde
.deserialize(&vb)
.expect("store value deserialize")
})
}
async fn put(&mut self, key: K, value: V) {
let kb = self.key_serde.serialize(&key);
let vb = self.value_serde.serialize(&value);
self.backend.put(kb.clone(), vb.clone()).await;
if self.logging {
self.changelog.push((kb, Some(vb)));
}
}
async fn delete(&mut self, key: &K) -> Option<V> {
let kb = self.key_serde.serialize(key);
let prev = self.backend.delete(&kb).await.map(|vb| {
self.value_serde
.deserialize(&vb)
.expect("store value deserialize")
});
if self.logging {
self.changelog.push((kb, None));
}
prev
}
async fn range(&self, lo: &K, hi: &K) -> Vec<(K, V)> {
let lo_b = self.key_serde.serialize(lo);
let hi_b = self.key_serde.serialize(hi);
self.backend
.range(&lo_b, &hi_b)
.await
.into_iter()
.map(|(kb, vb)| {
(
self.key_serde
.deserialize(&kb)
.expect("kv range key deserialize"),
self.value_serde
.deserialize(&vb)
.expect("kv range value deserialize"),
)
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::serde::{I64Serde, StringSerde};
use assert2::check;
fn store() -> KeyValueBytesStore<String, i64> {
KeyValueBytesStore::in_memory(
"s".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"s-changelog".into(),
)
}
#[tokio::test]
async fn put_get_delete_and_changelog_buffer() {
let mut s = store();
s.put("a".into(), 1).await;
s.put("a".into(), 2).await;
check!(s.get(&"a".to_string()).await == Some(2));
check!(s.delete(&"a".to_string()).await == Some(2));
check!(s.get(&"a".to_string()).await == None);
let cl = s.take_changelog();
check!(cl.len() == 3);
check!(cl[2].1.is_none());
check!(s.take_changelog().is_empty());
}
#[tokio::test]
async fn range_returns_ordered_half_open() {
use crate::processor::serde::BytesSerde;
use bytes::Bytes;
let mut s = KeyValueBytesStore::<Bytes, Bytes>::in_memory(
"r".into(),
Box::new(BytesSerde),
Box::new(BytesSerde),
"r-cl".into(),
);
s.put(Bytes::from_static(&[1, 0]), Bytes::from_static(b"a"))
.await;
s.put(Bytes::from_static(&[1, 5]), Bytes::from_static(b"b"))
.await;
s.put(Bytes::from_static(&[2, 0]), Bytes::from_static(b"c"))
.await;
let r = s
.range(&Bytes::from_static(&[1, 0]), &Bytes::from_static(&[2, 0]))
.await; assert_eq!(
r,
vec![
(Bytes::from_static(&[1, 0]), Bytes::from_static(b"a")),
(Bytes::from_static(&[1, 5]), Bytes::from_static(b"b")),
]
);
}
#[tokio::test]
async fn apply_changelog_restores_without_re_logging() {
let mut s = store();
s.apply_changelog(
b"k".to_vec().into(),
Some(bytes::Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 7])),
)
.await;
check!(s.get(&"k".to_string()).await == Some(7));
check!(s.take_changelog().is_empty());
s.apply_changelog(b"k".to_vec().into(), None).await;
check!(s.get(&"k".to_string()).await == None);
}
}