crabka-client-streams 0.3.2

KIP-1071 Kafka Streams rebalance-protocol client for Apache Kafka in Rust
Documentation
//! Byte-level read surface for Interactive Queries. The supervisor calls these
//! through `&dyn StateStore::as_iq()` to serve `KafkaStreams::*_store` queries
//! without knowing `K`/`V` — the typed view owns (de)serialization. All reads
//! are `&self`; only key/value **bytes** cross this trait.

use async_trait::async_trait;
use bytes::Bytes;

/// Which kind of store a query targets. Public so it can appear in `IqError`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StoreKind {
    KeyValue,
    Window,
    Session,
}

/// Byte-level IQ reads. Implemented by the three materialized `*Bytes` stores.
/// Default methods return empties so a non-matching store kind (caught earlier
/// by the supervisor's `kind()` check) never produces wrong data.
#[doc(hidden)]
#[async_trait]
pub trait IqQueryable: Send + Sync {
    fn kind(&self) -> StoreKind;

    async fn iq_kv_get(&self, _key: &[u8]) -> Option<Bytes> {
        None
    }
    /// Inclusive `[lo, hi]` in memcmp order.
    async fn iq_kv_range(&self, _lo: &[u8], _hi: &[u8]) -> Vec<(Bytes, Bytes)> {
        Vec::new()
    }
    async fn iq_kv_all(&self) -> Vec<(Bytes, Bytes)> {
        Vec::new()
    }
    async fn iq_kv_approx_count(&self) -> u64 {
        0
    }

    async fn iq_window_fetch_single(&self, _key: &[u8], _window_start: i64) -> Option<Bytes> {
        None
    }
    /// Ascending by window start, inclusive `[time_from, time_to]`.
    async fn iq_window_fetch(
        &self,
        _key: &[u8],
        _time_from: i64,
        _time_to: i64,
    ) -> Vec<(i64, Bytes)> {
        Vec::new()
    }

    /// All sessions for `key`, store order. Tuple is `(start, end)`.
    async fn iq_session_fetch_key(&self, _key: &[u8]) -> Vec<((i64, i64), Bytes)> {
        Vec::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::processor::serde::{I64Serde, Serde, StringSerde};
    use crate::store::api::{KeyValueStore, StateStore};
    use crate::store::kv::KeyValueBytesStore;
    use crate::store::session::{SessionBytesStore, SessionStore};
    use crate::store::window::{WindowBytesStore, WindowStore};

    #[tokio::test]
    async fn kv_get_range_all_count_inclusive() {
        let mut s = KeyValueBytesStore::<String, i64>::in_memory(
            "c".into(),
            Box::new(StringSerde),
            Box::new(I64Serde),
            "c-changelog".into(),
        );
        for (k, v) in [("a", 1), ("b", 2), ("c", 3)] {
            s.put(k.into(), v).await;
        }
        let q: &dyn IqQueryable = s.as_iq().unwrap();
        assert_eq!(q.iq_kv_get(b"b").await, Some(I64Serde.serialize(&2)));
        assert_eq!(q.iq_kv_get(b"z").await, None);
        let r = q.iq_kv_range(b"a", b"b").await; // inclusive => a,b
        assert_eq!(r.len(), 2);
        assert_eq!(r[0].0, bytes::Bytes::from_static(b"a"));
        assert_eq!(r[1].0, bytes::Bytes::from_static(b"b"));
        assert!(q.iq_kv_range(b"c", b"a").await.is_empty()); // lo>hi => empty
        assert_eq!(q.iq_kv_all().await.len(), 3);
        assert_eq!(q.iq_kv_approx_count().await, 3);
    }

    #[tokio::test]
    async fn window_fetch_point_and_range() {
        let mut s = WindowBytesStore::<String, i64>::in_memory(
            "w".into(),
            Box::new(StringSerde),
            Box::new(I64Serde),
            "w-changelog".into(),
        );
        s.put("k".into(), 0, 10, 5).await;
        s.put("k".into(), 1000, 20, 1005).await;
        let q: &dyn IqQueryable = s.as_iq().unwrap();
        assert_eq!(
            q.iq_window_fetch_single(b"k", 0).await,
            Some(I64Serde.serialize(&10))
        );
        assert_eq!(q.iq_window_fetch_single(b"k", 500).await, None);
        let r = q.iq_window_fetch(b"k", 0, 1000).await;
        assert_eq!(r.iter().map(|(t, _)| *t).collect::<Vec<_>>(), vec![0, 1000]);
    }

    #[tokio::test]
    async fn session_fetch_key_carries_start_end() {
        let mut s = SessionBytesStore::<String, i64>::in_memory(
            "s".into(),
            Box::new(StringSerde),
            Box::new(I64Serde),
            "s-changelog".into(),
        );
        s.put("k".into(), 0, 10, 1).await;
        s.put("k".into(), 20, 30, 2).await;
        let q: &dyn IqQueryable = s.as_iq().unwrap();
        let r = q.iq_session_fetch_key(b"k").await;
        let windows: Vec<(i64, i64)> = r.iter().map(|((st, en), _)| (*st, *en)).collect();
        assert!(windows.contains(&(0, 10)) && windows.contains(&(20, 30)));
    }
}