use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use tokio::sync::Mutex as AsyncMutex;
use crate::processor::record::RecordContext;
use crate::store::byte::ByteKeyValueStore;
use crate::store::cache::entry::LruCacheEntry;
use crate::store::cache::named::NamedCache;
use crate::store::window_schema::{key_bytes_of, store_key, window_start_of};
pub(crate) struct CachingWindowStore {
cache: Arc<Mutex<NamedCache>>,
inner: Arc<AsyncMutex<Box<dyn ByteKeyValueStore>>>,
name: String,
}
impl CachingWindowStore {
pub fn new(
cache: Arc<Mutex<NamedCache>>,
inner: Arc<AsyncMutex<Box<dyn ByteKeyValueStore>>>,
) -> Self {
Self {
cache,
inner,
name: String::new(),
}
}
pub fn with_name(
cache: Arc<Mutex<NamedCache>>,
inner: Arc<AsyncMutex<Box<dyn ByteKeyValueStore>>>,
name: String,
) -> Self {
Self { cache, inner, name }
}
pub async fn get(&self, key: &[u8]) -> Option<Bytes> {
let key = Bytes::copy_from_slice(key);
let cached = {
let mut cache = self.cache.lock().unwrap();
cache.get_promote(&key).map(|e| e.value.clone())
};
match cached {
Some(value) => value,
None => self.inner.lock().await.get(&key).await,
}
}
pub async fn range(&self, lo: &[u8], hi: &[u8]) -> Vec<(Bytes, Bytes)> {
let mut merged: BTreeMap<Bytes, Bytes> = {
let inner = self.inner.lock().await;
inner.range(lo, hi).await.into_iter().collect()
};
let cached = {
let cache = self.cache.lock().unwrap();
cache.range(lo, hi)
};
for (k, e) in cached {
match e.value {
Some(v) => {
merged.insert(k, v);
}
None => {
merged.remove(&k);
}
}
}
merged.into_iter().collect()
}
pub async fn scan_all(&self) -> Vec<(Bytes, Bytes)> {
let mut merged: BTreeMap<Bytes, Bytes> = {
let inner = self.inner.lock().await;
inner.scan_all().await.into_iter().collect()
};
let cached = {
let cache = self.cache.lock().unwrap();
cache.all()
};
for (k, e) in cached {
match e.value {
Some(v) => {
merged.insert(k, v);
}
None => {
merged.remove(&k);
}
}
}
merged.into_iter().collect()
}
pub async fn put_inner(&self, key: Bytes, value: Bytes) {
self.inner.lock().await.put(key, value).await;
}
pub async fn delete_inner(&self, key: &[u8]) {
self.inner.lock().await.delete(key).await;
}
pub async fn clear(&self) {
{
let mut cache = self.cache.lock().unwrap();
*cache = NamedCache::new(self.name.clone());
}
self.inner.lock().await.clear().await;
}
pub fn put(&self, key_schema_bytes: Bytes, value: Bytes, ctx: RecordContext) {
let mut cache = self.cache.lock().unwrap();
cache.put(key_schema_bytes, LruCacheEntry::new(Some(value), true, ctx));
}
pub fn delete(&self, key_schema_bytes: Bytes, ctx: RecordContext) {
let mut cache = self.cache.lock().unwrap();
cache.delete(key_schema_bytes, ctx);
}
pub async fn fetch(&self, key: &[u8], time_from: i64, time_to: i64) -> Vec<(Bytes, Bytes)> {
let lo = store_key(key, time_from, 0);
let hi = store_key(key, time_to.saturating_add(1), 0);
let cached: Vec<(Bytes, LruCacheEntry)> = {
let cache = self.cache.lock().unwrap();
cache
.range(&lo, &hi)
.into_iter()
.filter(|(k, _)| key_bytes_of(k) == key)
.collect()
};
let from_inner: Vec<(Bytes, Bytes)> = self
.inner
.lock()
.await
.range(&lo, &hi)
.await
.into_iter()
.filter(|(k, _)| key_bytes_of(k) == key)
.collect();
merge(cached, from_inner)
}
pub async fn fetch_all(&self, time_from: i64, time_to: i64) -> Vec<(Bytes, Bytes)> {
let in_range = |k: &[u8]| {
let ws = window_start_of(k);
ws >= time_from && ws <= time_to
};
let cached: Vec<(Bytes, LruCacheEntry)> = {
let cache = self.cache.lock().unwrap();
cache
.all()
.into_iter()
.filter(|(k, _)| in_range(k))
.collect()
};
let from_inner: Vec<(Bytes, Bytes)> = self
.inner
.lock()
.await
.scan_all()
.await
.into_iter()
.filter(|(k, _)| in_range(k))
.collect();
merge(cached, from_inner)
}
pub async fn flush(&self) -> Vec<(Bytes, LruCacheEntry)> {
let mut collected: Vec<(Bytes, LruCacheEntry)> = Vec::new();
{
let mut cache = self.cache.lock().unwrap();
let mut listener =
|k: &Bytes, e: &LruCacheEntry| collected.push((k.clone(), e.clone()));
cache.flush(&mut listener);
}
let mut inner = self.inner.lock().await;
for (k, e) in &collected {
match &e.value {
Some(v) => inner.put(k.clone(), v.clone()).await,
None => {
inner.delete(k).await;
}
}
}
drop(inner);
collected
}
pub async fn flush_with_old(
&self,
) -> Vec<(Bytes, Option<Bytes>, Option<Bytes>, RecordContext)> {
let mut dirty: Vec<(Bytes, LruCacheEntry)> = Vec::new();
{
let mut cache = self.cache.lock().unwrap();
let mut listener = |k: &Bytes, e: &LruCacheEntry| dirty.push((k.clone(), e.clone()));
cache.flush(&mut listener);
}
let mut out = Vec::with_capacity(dirty.len());
{
let mut inner = self.inner.lock().await;
for (k, e) in dirty {
let old = inner.get(&k).await;
match &e.value {
Some(v) => inner.put(k.clone(), v.clone()).await,
None => {
inner.delete(&k).await;
}
}
out.push((k, old, e.value, e.context));
}
}
out
}
}
fn merge(
cached: Vec<(Bytes, LruCacheEntry)>,
from_inner: Vec<(Bytes, Bytes)>,
) -> Vec<(Bytes, Bytes)> {
use std::collections::BTreeMap;
let mut merged: BTreeMap<Bytes, Bytes> = from_inner.into_iter().collect();
for (k, e) in cached {
match e.value {
Some(v) => {
merged.insert(k, v);
}
None => {
merged.remove(&k);
}
}
}
merged.into_iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::byte::InMemoryBytes;
use crate::store::window_schema::wrap_value;
fn ctx() -> RecordContext {
RecordContext {
topic: "t".to_string(),
partition: 0,
offset: 0,
timestamp: 0,
}
}
fn store() -> CachingWindowStore {
let cache = Arc::new(Mutex::new(NamedCache::new("w".to_string())));
let inner: Arc<AsyncMutex<Box<dyn ByteKeyValueStore>>> =
Arc::new(AsyncMutex::new(Box::new(InMemoryBytes::default())));
CachingWindowStore::new(cache, inner)
}
fn wrapped(record_ts: i64, raw: &[u8]) -> Bytes {
wrap_value(record_ts, raw)
}
async fn inner_put(s: &CachingWindowStore, k: Bytes, v: Bytes) {
s.inner.lock().await.put(k, v).await;
}
async fn inner_get(s: &CachingWindowStore, k: &Bytes) -> Option<Bytes> {
s.inner.lock().await.get(k).await
}
#[tokio::test]
async fn fetch_returns_cached() {
let s = store();
let key = b"a";
let sk = store_key(key, 0, 0);
let val = wrapped(5, b"v1");
s.put(sk.clone(), val.clone(), ctx());
let got = s.fetch(key, 0, 100).await;
assert_eq!(got, vec![(sk, val)]);
}
#[tokio::test]
async fn flush_writes_through_and_returns_entries() {
let s = store();
let key = b"a";
let sk = store_key(key, 0, 0);
let val = wrapped(5, b"v1");
s.put(sk.clone(), val.clone(), ctx());
let flushed = s.flush().await;
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].0, sk);
assert_eq!(flushed[0].1.value, Some(val.clone()));
assert_eq!(inner_get(&s, &sk).await, Some(val));
}
#[tokio::test]
async fn flush_deletes_tombstone_through() {
let s = store();
let key = b"a";
let sk = store_key(key, 0, 0);
inner_put(&s, sk.clone(), wrapped(1, b"old")).await;
s.delete(sk.clone(), ctx());
let flushed = s.flush().await;
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].1.value, None);
assert_eq!(
inner_get(&s, &sk).await,
None,
"tombstone deleted the inner value"
);
}
#[tokio::test]
async fn fetch_merges_cache_and_underlying_in_window_order() {
let s = store();
let key = b"a";
let win_lo = store_key(key, 0, 0);
let win_mid = store_key(key, 10, 0);
let win_hi = store_key(key, 20, 0);
inner_put(&s, win_lo.clone(), wrapped(1, b"inner-lo")).await;
inner_put(&s, win_hi.clone(), wrapped(2, b"inner-hi")).await;
let cache_lo = wrapped(9, b"cache-lo");
let cache_mid = wrapped(9, b"cache-mid");
s.put(win_lo.clone(), cache_lo.clone(), ctx());
s.put(win_mid.clone(), cache_mid.clone(), ctx());
let got = s.fetch(key, 0, 20).await;
assert_eq!(
got,
vec![
(win_lo.clone(), cache_lo.clone()), (win_mid.clone(), cache_mid.clone()), (win_hi.clone(), wrapped(2, b"inner-hi")), ]
);
let got_all = s.fetch_all(0, 20).await;
assert_eq!(
got_all,
vec![
(win_lo, cache_lo),
(win_mid, cache_mid),
(win_hi, wrapped(2, b"inner-hi")),
]
);
}
#[tokio::test]
async fn fetch_tombstone_hides_inner() {
let s = store();
let key = b"a";
let sk = store_key(key, 0, 0);
inner_put(&s, sk.clone(), wrapped(1, b"inner")).await;
s.delete(sk.clone(), ctx());
let got = s.fetch(key, 0, 100).await;
assert!(got.is_empty(), "cache tombstone hides the inner value");
}
#[tokio::test]
async fn fetch_all_filters_by_window_start() {
let s = store();
let a0 = store_key(b"a", 0, 0);
let b50 = store_key(b"b", 50, 0);
s.put(a0.clone(), wrapped(1, b"a0"), ctx());
inner_put(&s, b50.clone(), wrapped(2, b"b50")).await;
let got = s.fetch_all(0, 10).await;
assert_eq!(got, vec![(a0.clone(), wrapped(1, b"a0"))]);
let got_both = s.fetch_all(0, 50).await;
assert_eq!(
got_both,
vec![(a0, wrapped(1, b"a0")), (b50, wrapped(2, b"b50"))]
);
}
#[tokio::test]
async fn flush_with_old_returns_inner_old_then_writes_through() {
let s = store();
let key = b"a";
let sk = store_key(key, 0, 0);
inner_put(&s, sk.clone(), wrapped(1, b"old")).await;
s.put(sk.clone(), wrapped(2, b"new"), ctx());
let drained = s.flush_with_old().await;
assert_eq!(drained.len(), 1);
let (k, old, new, _ctx) = &drained[0];
assert_eq!(k, &sk);
assert_eq!(old.as_ref(), Some(&wrapped(1, b"old"))); assert_eq!(new.as_ref(), Some(&wrapped(2, b"new")));
assert_eq!(inner_get(&s, &sk).await, Some(wrapped(2, b"new")));
}
#[tokio::test]
async fn flush_with_old_tombstone_deletes_through() {
let s = store();
let key = b"a";
let sk = store_key(key, 0, 0);
inner_put(&s, sk.clone(), wrapped(1, b"old")).await;
s.delete(sk.clone(), ctx());
let drained = s.flush_with_old().await;
assert_eq!(drained.len(), 1);
let (k, old, new, _ctx) = &drained[0];
assert_eq!(k, &sk);
assert_eq!(old.as_ref(), Some(&wrapped(1, b"old"))); assert_eq!(new.as_ref(), None); assert_eq!(inner_get(&s, &sk).await, None); }
#[tokio::test]
async fn range_merges_cache_over_inner_with_tombstone() {
let s = store();
let key = b"a";
let k0 = store_key(key, 0, 0);
let k1 = store_key(key, 10, 0);
let k2 = store_key(key, 20, 0);
inner_put(&s, k0.clone(), wrapped(1, b"i0")).await;
inner_put(&s, k1.clone(), wrapped(1, b"i1")).await;
inner_put(&s, k2.clone(), wrapped(1, b"i2")).await;
s.put(k1.clone(), wrapped(9, b"c1"), ctx()); s.delete(k2.clone(), ctx());
let lo = store_key(key, 0, 0);
let hi = store_key(key, i64::MAX, 0);
let r = s.range(&lo, &hi).await;
assert_eq!(r, vec![(k0, wrapped(1, b"i0")), (k1, wrapped(9, b"c1"))]);
}
#[tokio::test]
async fn scan_all_merges_cache_and_underlying() {
let s = store();
let key = b"a";
let k0 = store_key(key, 0, 0);
let k1 = store_key(key, 10, 0);
let k3 = store_key(key, 30, 0);
inner_put(&s, k0.clone(), wrapped(1, b"i0")).await;
inner_put(&s, k1.clone(), wrapped(1, b"i1")).await;
inner_put(&s, k3.clone(), wrapped(1, b"i3")).await;
s.put(k1.clone(), wrapped(9, b"c1"), ctx()); let k2 = store_key(key, 20, 0);
s.put(k2.clone(), wrapped(9, b"c2"), ctx()); s.delete(k3.clone(), ctx());
let r = s.scan_all().await;
assert_eq!(
r,
vec![
(k0, wrapped(1, b"i0")),
(k1, wrapped(9, b"c1")),
(k2, wrapped(9, b"c2")),
]
);
}
#[tokio::test]
async fn put_and_delete_inner_bypass_the_cache() {
let s = store();
let sk = store_key(b"a", 0, 0);
s.put_inner(sk.clone(), wrapped(1, b"v")).await;
assert_eq!(s.get(&sk).await, Some(wrapped(1, b"v")));
assert!(s.flush().await.is_empty());
s.delete_inner(&sk).await;
assert_eq!(s.get(&sk).await, None);
assert!(s.flush().await.is_empty());
}
#[tokio::test]
async fn clear_empties_cache_and_inner() {
let s = store();
let k0 = store_key(b"a", 0, 0);
inner_put(&s, k0.clone(), wrapped(1, b"i")).await;
s.put(store_key(b"a", 10, 0), wrapped(9, b"c"), ctx());
s.clear().await;
assert!(s.scan_all().await.is_empty());
assert!(s.flush().await.is_empty());
assert_eq!(s.get(&k0).await, None);
}
}