crabka-client-streams 0.3.2

KIP-1071 Kafka Streams rebalance-protocol client for Apache Kafka in Rust
Documentation
//! `KeyValueBytesStore<K,V>`: the single typed store the registry holds and
//! downcasts to. Serde + changelog-buffer logic over a pluggable `ByteKeyValueStore`.
use std::any::Any;

use async_trait::async_trait;
use bytes::Bytes;

use crate::processor::serde::Serde;
use crate::store::api::{KeyValueStore, StateStore};
use crate::store::byte::{ByteKeyValueStore, InMemoryBytes};

pub struct KeyValueBytesStore<K, V> {
    name: String,
    changelog_topic: String,
    backend: Box<dyn ByteKeyValueStore>,
    key_serde: Box<dyn Serde<K>>,
    value_serde: Box<dyn Serde<V>>,
    changelog: Vec<(Bytes, Option<Bytes>)>,
    logging: bool,
}

impl<K: 'static, V: 'static> KeyValueBytesStore<K, V> {
    #[must_use]
    pub(crate) fn new(
        name: String,
        backend: Box<dyn ByteKeyValueStore>,
        key_serde: Box<dyn Serde<K>>,
        value_serde: Box<dyn Serde<V>>,
        changelog_topic: String,
    ) -> Self {
        Self {
            name,
            changelog_topic,
            backend,
            key_serde,
            value_serde,
            changelog: Vec::new(),
            logging: true,
        }
    }

    /// Convenience constructor for tests: an in-memory-backed store.
    #[must_use]
    pub fn in_memory(
        name: String,
        key_serde: Box<dyn Serde<K>>,
        value_serde: Box<dyn Serde<V>>,
        changelog_topic: String,
    ) -> Self {
        Self::new(
            name,
            Box::new(InMemoryBytes::default()),
            key_serde,
            value_serde,
            changelog_topic,
        )
    }
}

#[async_trait]
impl<K: Send + 'static, V: Send + 'static> StateStore for KeyValueBytesStore<K, V> {
    fn name(&self) -> &str {
        &self.name
    }
    async fn flush(&mut self) {}
    fn close(&mut self) {}
    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
    fn changelog_topic(&self) -> &str {
        &self.changelog_topic
    }
    fn take_changelog(&mut self) -> Vec<(Bytes, Option<Bytes>)> {
        std::mem::take(&mut self.changelog)
    }
    async fn apply_changelog(&mut self, key: Bytes, value: Option<Bytes>) {
        match value {
            Some(v) => self.backend.put(key, v).await,
            None => {
                self.backend.delete(&key).await;
            }
        }
    }
    fn set_logging(&mut self, on: bool) {
        self.logging = on;
    }
    fn as_iq(&self) -> Option<&dyn crate::store::iq::IqQueryable> {
        Some(self)
    }
    async fn clear(&mut self) {
        self.backend.clear().await;
        self.changelog.clear();
    }
}

// The store struct holds only `Box<dyn Serde<_>>` + byte buffers (no bare `K`/`V`
// fields), so it is `Send + Sync` for *any* `K`/`V` — no `Sync` bound needed here.
#[async_trait::async_trait]
impl<K: 'static, V: 'static> crate::store::iq::IqQueryable for KeyValueBytesStore<K, V> {
    fn kind(&self) -> crate::store::iq::StoreKind {
        crate::store::iq::StoreKind::KeyValue
    }
    async fn iq_kv_get(&self, key: &[u8]) -> Option<bytes::Bytes> {
        self.backend.get(key).await
    }
    async fn iq_kv_range(&self, lo: &[u8], hi: &[u8]) -> Vec<(bytes::Bytes, bytes::Bytes)> {
        // JVM `range` is inclusive `[lo, hi]`; the byte backend is half-open
        // `[lo, hi)`. `hi ++ 0x00` is the least key strictly greater than `hi`,
        // so `[lo, hi ++ 0x00)` == inclusive `[lo, hi]`.
        let mut hi_succ = hi.to_vec();
        hi_succ.push(0);
        self.backend.range(lo, &hi_succ).await
    }
    async fn iq_kv_all(&self) -> Vec<(bytes::Bytes, bytes::Bytes)> {
        self.backend.scan_all().await
    }
    async fn iq_kv_approx_count(&self) -> u64 {
        self.backend.approx_len().await
    }
}

#[async_trait]
impl<K: Send + Sync + 'static, V: Send + 'static> KeyValueStore<K, V> for KeyValueBytesStore<K, V> {
    async fn get(&self, key: &K) -> Option<V> {
        let kb = self.key_serde.serialize(key);
        self.backend.get(&kb).await.map(|vb| {
            self.value_serde
                .deserialize(&vb)
                .expect("store value deserialize")
        })
    }
    async fn put(&mut self, key: K, value: V) {
        let kb = self.key_serde.serialize(&key);
        let vb = self.value_serde.serialize(&value);
        self.backend.put(kb.clone(), vb.clone()).await;
        if self.logging {
            self.changelog.push((kb, Some(vb)));
        }
    }
    async fn delete(&mut self, key: &K) -> Option<V> {
        let kb = self.key_serde.serialize(key);
        let prev = self.backend.delete(&kb).await.map(|vb| {
            self.value_serde
                .deserialize(&vb)
                .expect("store value deserialize")
        });
        if self.logging {
            self.changelog.push((kb, None));
        }
        prev
    }
    async fn range(&self, lo: &K, hi: &K) -> Vec<(K, V)> {
        let lo_b = self.key_serde.serialize(lo);
        let hi_b = self.key_serde.serialize(hi);
        self.backend
            .range(&lo_b, &hi_b)
            .await
            .into_iter()
            .map(|(kb, vb)| {
                (
                    self.key_serde
                        .deserialize(&kb)
                        .expect("kv range key deserialize"),
                    self.value_serde
                        .deserialize(&vb)
                        .expect("kv range value deserialize"),
                )
            })
            .collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::processor::serde::{I64Serde, StringSerde};
    use assert2::check;

    fn store() -> KeyValueBytesStore<String, i64> {
        KeyValueBytesStore::in_memory(
            "s".into(),
            Box::new(StringSerde),
            Box::new(I64Serde),
            "s-changelog".into(),
        )
    }

    #[tokio::test]
    async fn put_get_delete_and_changelog_buffer() {
        let mut s = store();
        s.put("a".into(), 1).await;
        s.put("a".into(), 2).await;
        check!(s.get(&"a".to_string()).await == Some(2));
        check!(s.delete(&"a".to_string()).await == Some(2));
        check!(s.get(&"a".to_string()).await == None);
        let cl = s.take_changelog();
        check!(cl.len() == 3);
        check!(cl[2].1.is_none());
        check!(s.take_changelog().is_empty());
    }

    #[tokio::test]
    async fn range_returns_ordered_half_open() {
        use crate::processor::serde::BytesSerde;
        use bytes::Bytes;
        let mut s = KeyValueBytesStore::<Bytes, Bytes>::in_memory(
            "r".into(),
            Box::new(BytesSerde),
            Box::new(BytesSerde),
            "r-cl".into(),
        );
        s.put(Bytes::from_static(&[1, 0]), Bytes::from_static(b"a"))
            .await;
        s.put(Bytes::from_static(&[1, 5]), Bytes::from_static(b"b"))
            .await;
        s.put(Bytes::from_static(&[2, 0]), Bytes::from_static(b"c"))
            .await;
        let r = s
            .range(&Bytes::from_static(&[1, 0]), &Bytes::from_static(&[2, 0]))
            .await; // [lo, hi)
        assert_eq!(
            r,
            vec![
                (Bytes::from_static(&[1, 0]), Bytes::from_static(b"a")),
                (Bytes::from_static(&[1, 5]), Bytes::from_static(b"b")),
            ]
        );
    }

    #[tokio::test]
    async fn apply_changelog_restores_without_re_logging() {
        let mut s = store();
        s.apply_changelog(
            b"k".to_vec().into(),
            Some(bytes::Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 7])),
        )
        .await;
        check!(s.get(&"k".to_string()).await == Some(7));
        check!(s.take_changelog().is_empty());
        s.apply_changelog(b"k".to_vec().into(), None).await;
        check!(s.get(&"k".to_string()).await == None);
    }
}