use std::any::Any;
use std::collections::BTreeMap;
use async_trait::async_trait;
use bytes::Bytes;
use crate::processor::serde::Serde;
use crate::store::api::StateStore;
fn encode_id(id: (i64, u32)) -> Bytes {
let mut buf = Vec::with_capacity(12);
buf.extend_from_slice(&id.0.to_be_bytes());
buf.extend_from_slice(&id.1.to_be_bytes());
Bytes::from(buf)
}
fn decode_id(bytes: &[u8]) -> (i64, u32) {
let ts = i64::from_be_bytes(bytes[0..8].try_into().expect("8-byte ts prefix"));
let seq = u32::from_be_bytes(bytes[8..12].try_into().expect("4-byte seq suffix"));
(ts, seq)
}
fn encode_payload(kb: &[u8], vb: &[u8]) -> Bytes {
let mut buf = Vec::with_capacity(4 + kb.len() + vb.len());
buf.extend_from_slice(
&u32::try_from(kb.len())
.expect("key len fits u32")
.to_be_bytes(),
);
buf.extend_from_slice(kb);
buf.extend_from_slice(vb);
Bytes::from(buf)
}
fn decode_payload(bytes: &[u8]) -> (Bytes, Bytes) {
let klen = u32::from_be_bytes(bytes[0..4].try_into().expect("4-byte key len")) as usize;
let kb = Bytes::copy_from_slice(&bytes[4..4 + klen]);
let vb = Bytes::copy_from_slice(&bytes[4 + klen..]);
(kb, vb)
}
pub struct JoinGraceBufferStore<K, V> {
name: String,
changelog_topic: String,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
buffer: BTreeMap<(i64, u32), (Bytes, Bytes)>,
seq: u32,
changelog: Vec<(Bytes, Option<Bytes>)>,
logging: bool,
}
impl<K: 'static, V: 'static> JoinGraceBufferStore<K, V> {
#[must_use]
pub(crate) fn new(
name: String,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
changelog_topic: String,
) -> Self {
Self {
name,
changelog_topic,
key_serde,
value_serde,
buffer: BTreeMap::new(),
seq: 0,
changelog: Vec::new(),
logging: true,
}
}
#[must_use]
#[allow(dead_code)] pub fn in_memory(
name: String,
key_serde: Box<dyn Serde<K>>,
value_serde: Box<dyn Serde<V>>,
changelog_topic: String,
) -> Self {
Self::new(name, key_serde, value_serde, changelog_topic)
}
#[allow(clippy::unused_async)]
pub async fn put(&mut self, key: K, value: V, ts: i64) {
let kb = self.key_serde.serialize(&self.changelog_topic, &key);
let vb = self.value_serde.serialize(&self.changelog_topic, &value);
let id = (ts, self.seq);
self.seq = self.seq.wrapping_add(1);
if self.logging {
self.changelog
.push((encode_id(id), Some(encode_payload(&kb, &vb))));
}
self.buffer.insert(id, (kb, vb));
}
#[allow(clippy::unused_async)]
pub async fn drain_due(&mut self, threshold: i64) -> Vec<(K, V, i64)> {
let ids: Vec<(i64, u32)> = self
.buffer
.range(..=(threshold, u32::MAX))
.map(|(k, _)| *k)
.collect();
let mut out = Vec::with_capacity(ids.len());
for id in ids {
let (kb, vb) = self.buffer.remove(&id).expect("present");
if self.logging {
self.changelog.push((encode_id(id), None));
}
let key = self
.key_serde
.deserialize(&self.changelog_topic, &kb)
.expect("grace buffer key deserialize");
let value = self
.value_serde
.deserialize(&self.changelog_topic, &vb)
.expect("grace buffer value deserialize");
out.push((key, value, id.0));
}
out
}
#[allow(dead_code)] pub fn len(&self) -> usize {
self.buffer.len()
}
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
}
#[async_trait]
impl<K: 'static, V: 'static> StateStore for JoinGraceBufferStore<K, V> {
fn name(&self) -> &str {
&self.name
}
async fn flush(&mut self) {}
fn close(&mut self) {}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn changelog_topic(&self) -> &str {
&self.changelog_topic
}
fn take_changelog(&mut self) -> Vec<(Bytes, Option<Bytes>)> {
std::mem::take(&mut self.changelog)
}
async fn apply_changelog(&mut self, key: Bytes, value: Option<Bytes>) {
let id = decode_id(&key);
self.seq = self.seq.max(id.1.wrapping_add(1));
match value {
Some(v) => {
let (kb, vb) = decode_payload(&v);
self.buffer.insert(id, (kb, vb));
}
None => {
self.buffer.remove(&id);
}
}
}
fn set_logging(&mut self, on: bool) {
self.logging = on;
}
async fn clear(&mut self) {
self.buffer.clear();
self.seq = 0;
self.changelog.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processor::serde::{I64Serde, StringSerde};
use assert2::check;
#[tokio::test]
async fn buffers_and_drains_in_ts_order() {
let mut s = JoinGraceBufferStore::<String, i64>::in_memory(
"jb".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"jb-cl".into(),
);
s.put("a".into(), 2, 200).await;
s.put("a".into(), 1, 100).await; s.put("b".into(), 3, 150).await;
let due = s.drain_due(150).await; assert_eq!(due, vec![("a".into(), 1, 100), ("b".into(), 3, 150)]);
let rest = s.drain_due(i64::MAX).await; assert_eq!(rest, vec![("a".into(), 2, 200)]);
}
#[tokio::test]
async fn keeps_every_record_no_replace_by_key() {
let mut s = JoinGraceBufferStore::<String, i64>::in_memory(
"jb".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"jb-cl".into(),
);
s.put("k".into(), 1, 100).await;
s.put("k".into(), 2, 100).await;
check!(s.len() == 2);
let due = s.drain_due(100).await;
check!(due == vec![("k".into(), 1, 100), ("k".into(), 2, 100)]);
check!(s.is_empty());
}
#[tokio::test]
async fn changelog_records_puts_then_tombstones_on_drain() {
let mut s = JoinGraceBufferStore::<String, i64>::in_memory(
"jb".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"jb-cl".into(),
);
s.put("a".into(), 1, 100).await;
s.put("b".into(), 2, 200).await;
let cl = s.take_changelog();
check!(cl.len() == 2);
check!(cl.iter().all(|(_, v)| v.is_some()));
let _ = s.drain_due(100).await;
let cl = s.take_changelog();
check!(cl.len() == 1);
check!(cl[0].1.is_none());
}
#[tokio::test]
async fn apply_changelog_round_trips_then_tombstone_removes() {
let mut src = JoinGraceBufferStore::<String, i64>::in_memory(
"jb".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"jb-cl".into(),
);
src.put("a".into(), 9, 200).await;
src.put("b".into(), 7, 100).await;
let cl = src.take_changelog();
check!(cl.len() == 2);
let mut dst = JoinGraceBufferStore::<String, i64>::in_memory(
"jb".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"jb-cl".into(),
);
for (k, v) in cl {
dst.apply_changelog(k, v).await;
}
check!(dst.len() == 2);
check!(dst.take_changelog().is_empty());
let out = dst.drain_due(i64::MAX).await;
check!(out == vec![("b".into(), 7, 100), ("a".into(), 9, 200)]);
}
}