use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use async_trait::async_trait;
use bytes::Bytes;
use crate::dsl::processors::change::Change;
use crate::processor::serde::Serde;
use crate::store::api::StateStore;
use crate::store::suppress_bufval::{
SuppressRecordCtx, deserialize_buffer_change, serialize_buffer_change,
};
#[async_trait]
pub(crate) trait SuppressStore<K: Send + Sync, V: Send>: StateStore {
async fn put(&mut self, key: K, buffer_time: i64, change: Change<V>, ctx: SuppressRecordCtx);
async fn evict_while(&mut self, threshold: i64) -> Vec<(K, Change<V>, i64)>;
async fn evict_oldest(&mut self) -> Option<(K, Change<V>, i64)>;
fn len(&self) -> usize;
#[allow(dead_code)]
fn is_empty(&self) -> bool;
fn byte_size(&self) -> usize;
}
struct Entry {
key_bytes: Bytes,
new_bytes: Option<Bytes>,
old_bytes: Option<Bytes>,
record_ts: i64,
}
fn entry_size(key_bytes: &Bytes, new_bytes: Option<&Bytes>) -> usize {
key_bytes.len() + new_bytes.map_or(0, Bytes::len)
}
pub struct SuppressBytesStore<K, V> {
name: String,
changelog_topic: String,
logging: bool,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
entries: BTreeMap<(i64, u64), Entry>,
index: HashMap<Bytes, (i64, u64)>,
seq: u64,
byte_size: usize,
changelog: Vec<(Bytes, Option<Bytes>)>,
}
impl<K: 'static, V: 'static> SuppressBytesStore<K, V> {
#[must_use]
pub(crate) fn new(
name: String,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
changelog_topic: String,
) -> Self {
Self {
name,
changelog_topic,
logging: true,
key_serde,
value_serde,
entries: BTreeMap::new(),
index: HashMap::new(),
seq: 0,
byte_size: 0,
changelog: Vec::new(),
}
}
#[must_use]
pub fn in_memory(
name: String,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
changelog_topic: String,
) -> Self {
Self::new(name, key_serde, value_serde, changelog_topic)
}
fn remove_existing(&mut self, key_bytes: &Bytes) -> Option<Bytes> {
let slot = self.index.remove(key_bytes)?;
let entry = self.entries.remove(&slot).expect("indexed slot present");
self.byte_size -= entry_size(&entry.key_bytes, entry.new_bytes.as_ref());
entry.new_bytes
}
fn insert_slot(&mut self, buffer_time: i64, entry: Entry) {
let slot = (buffer_time, self.seq);
self.seq += 1;
self.index.insert(entry.key_bytes.clone(), slot);
self.byte_size += entry_size(&entry.key_bytes, entry.new_bytes.as_ref());
self.entries.insert(slot, entry);
}
fn pop_slot(&mut self, slot: (i64, u64)) -> (K, Change<V>, i64) {
let entry = self.entries.remove(&slot).expect("slot present");
self.index.remove(&entry.key_bytes);
self.byte_size -= entry_size(&entry.key_bytes, entry.new_bytes.as_ref());
if self.logging {
self.changelog.push((entry.key_bytes.clone(), None));
}
let key = self
.key_serde
.deserialize(&entry.key_bytes)
.expect("suppress key deserialize");
let change = Change {
old: entry.old_bytes.map(|b| {
self.value_serde
.deserialize(&b)
.expect("old value deserialize")
}),
new: entry.new_bytes.map(|b| {
self.value_serde
.deserialize(&b)
.expect("new value deserialize")
}),
};
(key, change, entry.record_ts)
}
}
#[async_trait]
impl<K: 'static, V: 'static> StateStore for SuppressBytesStore<K, V> {
fn name(&self) -> &str {
&self.name
}
async fn flush(&mut self) {}
fn close(&mut self) {}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn changelog_topic(&self) -> &str {
&self.changelog_topic
}
fn take_changelog(&mut self) -> Vec<(Bytes, Option<Bytes>)> {
std::mem::take(&mut self.changelog)
}
async fn apply_changelog(&mut self, key: Bytes, value: Option<Bytes>) {
match value {
Some(v) => {
let d = deserialize_buffer_change(&v);
self.remove_existing(&key);
let entry = Entry {
key_bytes: key,
new_bytes: d.new.map(Bytes::from),
old_bytes: d.old.map(Bytes::from),
record_ts: d.ctx.timestamp,
};
self.insert_slot(d.buffer_time, entry);
}
None => {
self.remove_existing(&key);
}
}
}
fn set_logging(&mut self, on: bool) {
self.logging = on;
}
async fn clear(&mut self) {
self.entries.clear();
self.index.clear();
self.seq = 0;
self.byte_size = 0;
self.changelog.clear();
}
}
#[async_trait]
impl<K: Send + Sync + 'static, V: Send + 'static> SuppressStore<K, V> for SuppressBytesStore<K, V> {
async fn put(&mut self, key: K, buffer_time: i64, change: Change<V>, ctx: SuppressRecordCtx) {
let kb = self.key_serde.serialize(&key);
let new_bytes = change.new.as_ref().map(|v| self.value_serde.serialize(v));
let old_bytes = change.old.as_ref().map(|v| self.value_serde.serialize(v));
let existing_new = self.remove_existing(&kb);
let prior_bytes = existing_new.or_else(|| old_bytes.clone());
let entry = Entry {
key_bytes: kb.clone(),
new_bytes: new_bytes.clone(),
old_bytes: old_bytes.clone(),
record_ts: ctx.timestamp,
};
self.insert_slot(buffer_time, entry);
if self.logging {
let value = serialize_buffer_change(
&ctx,
prior_bytes.as_deref(),
old_bytes.as_deref(),
new_bytes.as_deref(),
buffer_time,
);
self.changelog.push((kb, Some(value)));
}
}
async fn evict_while(&mut self, threshold: i64) -> Vec<(K, Change<V>, i64)> {
let mut out = Vec::new();
while let Some((&slot, _)) = self.entries.iter().next() {
if slot.0 > threshold {
break;
}
out.push(self.pop_slot(slot));
}
out
}
async fn evict_oldest(&mut self) -> Option<(K, Change<V>, i64)> {
let (&slot, _) = self.entries.iter().next()?;
Some(self.pop_slot(slot))
}
fn len(&self) -> usize {
self.entries.len()
}
fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn byte_size(&self) -> usize {
self.byte_size
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::serde::{I64Serde, StringSerde};
use assert2::check;
fn store() -> SuppressBytesStore<String, i64> {
SuppressBytesStore::<String, i64>::in_memory(
"sup".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"app-sup-changelog".into(),
)
}
fn ctx(timestamp: i64) -> SuppressRecordCtx {
SuppressRecordCtx {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp,
}
}
#[tokio::test]
async fn put_then_evict_while_in_buffer_time_order() {
let mut s = store();
s.put("a".into(), 30, Change::update(None, 1), ctx(30))
.await;
s.put("b".into(), 10, Change::update(None, 2), ctx(10))
.await;
check!(s.len() == 2);
check!(!s.is_empty());
check!(s.byte_size() > 0);
let out = s.evict_while(30).await;
check!(out.len() == 2);
check!(out[0].0 == "b" && out[0].1.new == Some(2) && out[0].2 == 10);
check!(out[1].0 == "a" && out[1].1.new == Some(1) && out[1].2 == 30);
check!(s.len() == 0);
check!(s.is_empty());
check!(s.byte_size() == 0);
}
#[tokio::test]
async fn replace_by_key_keeps_one_entry_with_latest_change() {
let mut s = store();
s.put("k".into(), 10, Change::update(None, 1), ctx(10))
.await;
s.put("k".into(), 10, Change::update(Some(1), 2), ctx(12))
.await;
check!(s.len() == 1);
let out = s.evict_while(10).await;
check!(out.len() == 1);
check!(out[0].1.new == Some(2));
check!(out[0].1.old == Some(1));
}
#[tokio::test]
async fn evict_oldest_pops_lowest_buffer_time_then_none() {
let mut s = store();
s.put("a".into(), 30, Change::update(None, 1), ctx(30))
.await;
s.put("b".into(), 10, Change::update(None, 2), ctx(10))
.await;
let first = s.evict_oldest().await.unwrap();
check!(first.0 == "b" && first.1.new == Some(2));
let second = s.evict_oldest().await.unwrap();
check!(second.0 == "a" && second.1.new == Some(1));
check!(s.evict_oldest().await.is_none());
}
#[tokio::test]
async fn changelog_records_puts_then_tombstones_on_evict() {
let mut s = store();
s.put("a".into(), 10, Change::update(None, 1), ctx(10))
.await;
s.put("b".into(), 20, Change::update(None, 2), ctx(20))
.await;
let cl = s.take_changelog();
check!(cl.len() == 2);
check!(cl.iter().all(|(_, v)| v.is_some()));
let _ = s.evict_while(20).await;
let cl = s.take_changelog();
check!(cl.len() == 2);
check!(cl.iter().all(|(_, v)| v.is_none()));
check!(cl[0].0.as_ref() == b"a");
check!(cl[1].0.as_ref() == b"b");
}
#[tokio::test]
async fn apply_changelog_round_trips_then_tombstone_removes() {
let mut src = store();
src.put("k".into(), 42, Change::update(Some(7), 9), ctx(42))
.await;
let cl = src.take_changelog();
check!(cl.len() == 1);
let (key, value) = cl.into_iter().next().unwrap();
let mut dst = store();
dst.apply_changelog(key.clone(), value).await;
check!(dst.len() == 1);
check!(dst.take_changelog().is_empty()); let out = dst.evict_while(42).await;
check!(out.len() == 1);
check!(out[0].0 == "k");
check!(out[0].1.new == Some(9));
check!(out[0].1.old == Some(7));
check!(out[0].2 == 42);
dst.apply_changelog(
key.clone(),
Some(serialize_buffer_change(
&ctx(42),
None,
Some(&7i64.to_be_bytes()),
Some(&9i64.to_be_bytes()),
42,
)),
)
.await;
check!(dst.len() == 1);
dst.apply_changelog(key, None).await;
check!(dst.is_empty());
}
#[tokio::test]
async fn logging_off_suppresses_changelog() {
let mut s = store();
s.set_logging(false);
s.put("k".into(), 10, Change::update(None, 1), ctx(10))
.await;
check!(s.take_changelog().is_empty());
let _ = s.evict_while(10).await;
check!(s.take_changelog().is_empty());
}
#[tokio::test]
async fn windowed_buffer_restores_and_emits_on_close() {
use crate::dsl::windows::{TimeWindowedSerde, Window, Windowed};
fn win_store() -> SuppressBytesStore<Windowed<String>, i64> {
SuppressBytesStore::<Windowed<String>, i64>::in_memory(
"sup".into(),
Box::new(TimeWindowedSerde::new(StringSerde, 10)),
Box::new(I64Serde),
"app-sup-changelog".into(),
)
}
fn wk(key: &str, start: i64) -> Windowed<String> {
Windowed {
key: key.into(),
window: Window {
start,
end: start + 10,
},
}
}
let mut src = win_store();
src.put(wk("a", 0), 10, Change::update(None, 2), ctx(5))
.await;
src.put(wk("b", 20), 30, Change::update(None, 7), ctx(25))
.await;
let changelog = src.take_changelog();
check!(changelog.len() == 2);
let mut restored = win_store();
for (key, value) in changelog {
restored.apply_changelog(key, value).await;
}
check!(restored.len() == 2);
check!(restored.take_changelog().is_empty());
let closed = restored.evict_while(10).await;
check!(closed.len() == 1);
check!(closed[0].0 == wk("a", 0));
check!(closed[0].1.new == Some(2));
check!(restored.len() == 1);
let rest = restored.evict_while(30).await;
check!(rest.len() == 1);
check!(rest[0].0 == wk("b", 20));
check!(rest[0].1.new == Some(7));
}
}