Skip to main content

laminar_core/state/
mod.rs

1//! # State Store Module
2//!
3//! High-performance state storage for streaming operators.
4//!
5//! ## Design Goals
6//!
7//! - **< 500ns lookup latency** for point queries
8//! - **Zero-copy** access where possible
9//! - **Lock-free** for single-threaded access
10//! - **Memory-mapped** for large state
11//!
12//! ## State Backends
13//!
14//! - **[`InMemoryStore`]**: BTreeMap-based, fast lookups with O(log n + k) prefix/range scans
15//! - **[`MmapStateStore`]**: Memory-mapped, supports larger-than-memory state with optional persistence
16//! - **Hybrid**: Combination with hot/cold separation (future)
17//!
18//! ## Example
19//!
20//! ```rust
21//! use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
22//!
23//! let mut store = InMemoryStore::new();
24//!
25//! // Basic key-value operations
26//! store.put(b"user:1", b"alice").unwrap();
27//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
28//!
29//! // Typed state access (requires StateStoreExt)
30//! store.put_typed(b"count", &42u64).unwrap();
31//! let count: u64 = store.get_typed(b"count").unwrap().unwrap();
32//! assert_eq!(count, 42);
33//!
34//! // Snapshots for checkpointing
35//! let snapshot = store.snapshot();
36//! store.delete(b"user:1").unwrap();
37//! assert!(store.get(b"user:1").is_none());
38//!
39//! // Restore from snapshot
40//! store.restore(snapshot);
41//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
42//! ```
43//!
44//! ## Memory-Mapped Store Example
45//!
46//! ```rust,no_run
47//! use laminar_core::state::{StateStore, MmapStateStore};
48//! use std::path::Path;
49//!
50//! // In-memory mode (fast, not persistent)
51//! let mut store = MmapStateStore::in_memory(1024 * 1024);
52//! store.put(b"key", b"value").unwrap();
53//!
54//! // Persistent mode (survives restarts)
55//! let mut persistent = MmapStateStore::persistent(
56//!     Path::new("/tmp/state.db"),
57//!     1024 * 1024
58//! ).unwrap();
59//! persistent.put(b"key", b"value").unwrap();
60//! persistent.flush().unwrap();
61//! ```
62
63use bytes::Bytes;
64use rkyv::{
65    api::high::{self, HighDeserializer, HighSerializer, HighValidator},
66    bytecheck::CheckBytes,
67    rancor::Error as RkyvError,
68    ser::allocator::ArenaHandle,
69    util::AlignedVec,
70    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
71};
72use std::cell::RefCell;
73use std::collections::BTreeMap;
74use std::ops::Bound;
75use std::ops::Range;
76
77/// A single mutation entry within an incremental snapshot.
78///
79/// Captures the exact key/value bytes for each state change, enabling
80/// delta-based checkpointing to object stores without full state copies.
81///
82/// - `Put(key, value)` — a key was inserted or updated
83/// - `Delete(key)` — a key was removed
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub enum ChangeEntry {
86    /// A key was inserted or updated: `(key, value)`.
87    Put(Vec<u8>, Vec<u8>),
88    /// A key was deleted.
89    Delete(Vec<u8>),
90}
91
92/// An incremental snapshot capturing only mutations since the last checkpoint.
93///
94/// State stores that support incremental snapshots return this from
95/// [`StateStore::incremental_snapshot`]. The recovery system applies
96/// these deltas on top of a base snapshot to reconstruct full state.
97#[derive(Debug, Clone)]
98pub struct IncrementalSnapshot {
99    /// The ordered list of mutations since `base_epoch`.
100    pub changes: Vec<ChangeEntry>,
101    /// The epoch of the base (full) snapshot these changes apply to.
102    pub base_epoch: u64,
103    /// The epoch this incremental snapshot represents.
104    pub epoch: u64,
105}
106
107/// Compute the lexicographic successor of a byte prefix.
108///
109/// Returns `None` if no successor exists (empty prefix or all bytes are 0xFF).
110/// Used by `BTreeMap::range()` to efficiently bound prefix scans.
111pub(crate) fn prefix_successor(prefix: &[u8]) -> Option<smallvec::SmallVec<[u8; 64]>> {
112    if prefix.is_empty() {
113        return None;
114    }
115    let mut successor = smallvec::SmallVec::<[u8; 64]>::from_slice(prefix);
116    // Walk backwards, incrementing the last non-0xFF byte
117    while let Some(last) = successor.last_mut() {
118        if *last < 0xFF {
119            *last += 1;
120            return Some(successor);
121        }
122        successor.pop();
123    }
124    // All bytes were 0xFF — no successor exists
125    None
126}
127
128/// Trait for state store implementations.
129///
130/// This is the core abstraction for operator state in Ring 0 (hot path).
131/// All implementations must achieve < 500ns lookup latency for point queries.
132///
133/// # Thread Safety
134///
135/// State stores are `Send` but not `Sync`. They are designed for single-threaded
136/// access within a reactor. Cross-thread communication uses SPSC queues.
137///
138/// # Memory Model
139///
140/// - `get()` returns `Bytes` which is a cheap reference-counted handle
141/// - `put()` copies the input to internal storage
142/// - Snapshots are copy-on-write where possible
143///
144/// # Dyn Compatibility
145///
146/// This trait is dyn-compatible for use with `Box<dyn StateStore>`. For generic
147/// convenience methods like `get_typed` and `put_typed`, use the [`StateStoreExt`]
148/// extension trait.
149pub trait StateStore: Send {
150    /// Get a value by key.
151    ///
152    /// Returns `None` if the key does not exist.
153    ///
154    /// # Performance
155    ///
156    /// Target: < 500ns for in-memory stores.
157    fn get(&self, key: &[u8]) -> Option<Bytes>;
158
159    /// Store a key-value pair.
160    ///
161    /// If the key already exists, the value is overwritten.
162    ///
163    /// # Errors
164    ///
165    /// Returns `StateError` if the operation fails (e.g., disk full for
166    /// memory-mapped stores).
167    fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError>;
168
169    /// Delete a key.
170    ///
171    /// No error is returned if the key does not exist.
172    ///
173    /// # Errors
174    ///
175    /// Returns `StateError` if the operation fails.
176    fn delete(&mut self, key: &[u8]) -> Result<(), StateError>;
177
178    /// Scan all keys with a given prefix.
179    ///
180    /// Returns an iterator over matching (key, value) pairs in
181    /// lexicographic order.
182    ///
183    /// # Performance
184    ///
185    /// O(log n + k) where n is the total number of keys and k is the
186    /// number of matching entries.
187    fn prefix_scan<'a>(&'a self, prefix: &'a [u8])
188        -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
189
190    /// Range scan between two keys (exclusive end).
191    ///
192    /// Returns an iterator over keys where `start <= key < end`
193    /// in lexicographic order.
194    ///
195    /// # Performance
196    ///
197    /// O(log n + k) where n is the total number of keys and k is the
198    /// number of matching entries.
199    fn range_scan<'a>(
200        &'a self,
201        range: Range<&'a [u8]>,
202    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
203
204    /// Check if a key exists.
205    ///
206    /// More efficient than `get()` when you don't need the value.
207    fn contains(&self, key: &[u8]) -> bool {
208        self.get(key).is_some()
209    }
210
211    /// Get approximate size in bytes.
212    ///
213    /// This includes both keys and values. The exact accounting may vary
214    /// by implementation.
215    fn size_bytes(&self) -> usize;
216
217    /// Get the number of entries in the store.
218    fn len(&self) -> usize;
219
220    /// Check if the store is empty.
221    fn is_empty(&self) -> bool {
222        self.len() == 0
223    }
224
225    /// Create a snapshot for checkpointing.
226    ///
227    /// The snapshot captures the current state and can be used to restore
228    /// the store to this point in time. Snapshots are serializable for
229    /// persistence.
230    ///
231    /// # Implementation Notes
232    ///
233    /// For in-memory stores, this clones the data. For memory-mapped stores,
234    /// this may use copy-on-write semantics.
235    fn snapshot(&self) -> StateSnapshot;
236
237    /// Restore from a snapshot.
238    ///
239    /// This replaces the current state with the snapshot's state.
240    /// Any changes since the snapshot was taken are lost.
241    fn restore(&mut self, snapshot: StateSnapshot);
242
243    /// Clear all entries.
244    fn clear(&mut self);
245
246    /// Flush any pending writes to durable storage.
247    ///
248    /// For in-memory stores, this is a no-op. For memory-mapped or
249    /// disk-backed stores, this ensures data is persisted.
250    ///
251    /// # Errors
252    ///
253    /// Returns `StateError` if the flush operation fails.
254    fn flush(&mut self) -> Result<(), StateError> {
255        Ok(()) // Default no-op for in-memory stores
256    }
257
258    /// Get a zero-copy reference to a value by key.
259    ///
260    /// Returns a direct `&[u8]` slice into the store's internal buffer,
261    /// avoiding the `Bytes` ref-count overhead. Only backends that own
262    /// their storage contiguously (e.g., `AHashMapStore`) can implement
263    /// this; others return `None` and callers fall back to [`get`](Self::get).
264    ///
265    /// # Lifetime
266    ///
267    /// The returned slice borrows `self`, so no mutations are allowed
268    /// while the reference is live.
269    fn get_ref(&self, _key: &[u8]) -> Option<&[u8]> {
270        None
271    }
272
273    /// Return an incremental snapshot of mutations since the last checkpoint.
274    ///
275    /// Backends that track per-mutation changelogs return
276    /// `Some(IncrementalSnapshot)` containing only the puts and deletes
277    /// since `base_epoch`. Backends without changelog support return `None`,
278    /// and the checkpointer falls back to a full [`snapshot`](Self::snapshot).
279    fn incremental_snapshot(&self) -> Option<IncrementalSnapshot> {
280        None
281    }
282
283    /// Get a value or insert a default.
284    ///
285    /// If the key doesn't exist, the default is inserted and returned.
286    ///
287    /// # Errors
288    ///
289    /// Returns `StateError` if inserting the default value fails.
290    fn get_or_insert(&mut self, key: &[u8], default: &[u8]) -> Result<Bytes, StateError> {
291        if let Some(value) = self.get(key) {
292            Ok(value)
293        } else {
294            self.put(key, default)?;
295            Ok(Bytes::copy_from_slice(default))
296        }
297    }
298}
299
300/// Extension trait for [`StateStore`] providing typed access methods.
301///
302/// These methods use generics and thus cannot be part of the dyn-compatible
303/// `StateStore` trait. Import this trait to use typed access on any state store.
304///
305/// Uses rkyv for zero-copy serialization. Types must derive `Archive`,
306/// `rkyv::Serialize`, and `rkyv::Deserialize`.
307///
308/// # Example
309///
310/// ```rust,ignore
311/// use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
312/// use rkyv::{Archive, Deserialize, Serialize};
313///
314/// #[derive(Archive, Serialize, Deserialize)]
315/// #[rkyv(check_bytes)]
316/// struct Counter { value: u64 }
317///
318/// let mut store = InMemoryStore::new();
319/// store.put_typed(b"count", &Counter { value: 42 }).unwrap();
320/// let count: Counter = store.get_typed(b"count").unwrap().unwrap();
321/// assert_eq!(count.value, 42);
322/// ```
323pub trait StateStoreExt: StateStore {
324    /// Get a value and deserialize it using rkyv.
325    ///
326    /// Uses zero-copy access where possible, falling back to full
327    /// deserialization to return an owned value.
328    ///
329    /// # Errors
330    ///
331    /// Returns `StateError::Serialization` if deserialization fails.
332    fn get_typed<T>(&self, key: &[u8]) -> Result<Option<T>, StateError>
333    where
334        T: Archive,
335        T::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
336            + RkyvDeserialize<T, HighDeserializer<RkyvError>>,
337    {
338        // Prefer zero-copy get_ref when available
339        let bytes_ref = self.get_ref(key);
340        let bytes_owned;
341        let data: &[u8] = if let Some(r) = bytes_ref {
342            r
343        } else if let Some(b) = self.get(key) {
344            bytes_owned = b;
345            &bytes_owned
346        } else {
347            return Ok(None);
348        };
349
350        let archived = rkyv::access::<T::Archived, RkyvError>(data)
351            .map_err(|e| StateError::Serialization(e.to_string()))?;
352        let value = rkyv::deserialize::<T, RkyvError>(archived)
353            .map_err(|e| StateError::Serialization(e.to_string()))?;
354        Ok(Some(value))
355    }
356
357    /// Serialize and store a value using rkyv.
358    ///
359    /// Uses a thread-local reusable buffer to avoid per-call heap allocation
360    /// on the hot path. The buffer is cleared and reused between calls.
361    ///
362    /// # Errors
363    ///
364    /// Returns `StateError::Serialization` if serialization fails.
365    fn put_typed<T>(&mut self, key: &[u8], value: &T) -> Result<(), StateError>
366    where
367        T: for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
368    {
369        thread_local! {
370            static SERIALIZE_BUF: RefCell<AlignedVec> =
371                RefCell::new(AlignedVec::with_capacity(256));
372        }
373
374        SERIALIZE_BUF.with(|buf| {
375            let mut vec = buf.borrow_mut();
376            vec.clear();
377            // Take ownership to pass to to_bytes_in, then put it back
378            let taken = std::mem::take(&mut *vec);
379            match high::to_bytes_in::<_, RkyvError>(value, taken) {
380                Ok(filled) => {
381                    let result = self.put(key, &filled);
382                    *vec = filled;
383                    result
384                }
385                Err(e) => {
386                    // Restore an empty buffer on error
387                    *vec = AlignedVec::new();
388                    Err(StateError::Serialization(e.to_string()))
389                }
390            }
391        })
392    }
393
394    /// Update a value in place using a closure.
395    ///
396    /// The update function receives the current value (or None) and returns
397    /// the new value. If `None` is returned, the key is deleted.
398    ///
399    /// # Errors
400    ///
401    /// Returns `StateError` if the put or delete operation fails.
402    fn update<F>(&mut self, key: &[u8], f: F) -> Result<(), StateError>
403    where
404        F: FnOnce(Option<Bytes>) -> Option<Vec<u8>>,
405    {
406        let current = self.get(key);
407        match f(current) {
408            Some(new_value) => self.put(key, &new_value),
409            None => self.delete(key),
410        }
411    }
412}
413
414// Blanket implementation for all StateStore types
415impl<T: StateStore + ?Sized> StateStoreExt for T {}
416
417/// A snapshot of state store contents for checkpointing.
418///
419/// Snapshots can be serialized for persistence and restored later.
420/// They capture the complete state at a point in time.
421///
422/// Uses rkyv for zero-copy deserialization on the hot path.
423#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
424pub struct StateSnapshot {
425    /// Serialized state data
426    data: Vec<(Vec<u8>, Vec<u8>)>,
427    /// Timestamp when snapshot was created (nanoseconds since epoch)
428    timestamp_ns: u64,
429    /// Version for forward compatibility
430    version: u32,
431}
432
433impl StateSnapshot {
434    /// Create a new snapshot from key-value pairs.
435    #[must_use]
436    #[allow(clippy::cast_possible_truncation)]
437    pub fn new(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
438        Self {
439            data,
440            // Truncation is acceptable here - we won't hit u64 overflow until ~584 years from epoch
441            timestamp_ns: std::time::SystemTime::now()
442                .duration_since(std::time::UNIX_EPOCH)
443                .map(|d| d.as_nanos() as u64)
444                .unwrap_or(0),
445            version: 1,
446        }
447    }
448
449    /// Get the snapshot data.
450    #[must_use]
451    pub fn data(&self) -> &[(Vec<u8>, Vec<u8>)] {
452        &self.data
453    }
454
455    /// Get the snapshot timestamp.
456    #[must_use]
457    pub fn timestamp_ns(&self) -> u64 {
458        self.timestamp_ns
459    }
460
461    /// Get the number of entries in the snapshot.
462    #[must_use]
463    pub fn len(&self) -> usize {
464        self.data.len()
465    }
466
467    /// Check if the snapshot is empty.
468    #[must_use]
469    pub fn is_empty(&self) -> bool {
470        self.data.is_empty()
471    }
472
473    /// Get the approximate size in bytes.
474    #[must_use]
475    pub fn size_bytes(&self) -> usize {
476        self.data.iter().map(|(k, v)| k.len() + v.len()).sum()
477    }
478
479    /// Serialize the snapshot to bytes using rkyv.
480    ///
481    /// Returns an aligned byte vector for optimal zero-copy access.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if serialization fails.
486    pub fn to_bytes(&self) -> Result<AlignedVec, StateError> {
487        rkyv::to_bytes::<RkyvError>(self).map_err(|e| StateError::Serialization(e.to_string()))
488    }
489
490    /// Deserialize a snapshot from bytes using rkyv.
491    ///
492    /// Uses zero-copy access internally for performance.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if deserialization fails.
497    pub fn from_bytes(bytes: &[u8]) -> Result<Self, StateError> {
498        let archived = rkyv::access::<<Self as Archive>::Archived, RkyvError>(bytes)
499            .map_err(|e| StateError::Serialization(e.to_string()))?;
500        rkyv::deserialize::<Self, RkyvError>(archived)
501            .map_err(|e| StateError::Serialization(e.to_string()))
502    }
503}
504
505/// In-memory state store using `BTreeMap` for sorted key access.
506///
507/// This state store is suitable for state that fits in memory. It uses
508/// `BTreeMap` which provides O(log n + k) prefix and range scans, making
509/// it efficient for join state and windowed aggregation lookups.
510///
511/// # Performance Characteristics
512///
513/// - **Get**: O(log n), < 500ns typical
514/// - **Put**: O(log n), may allocate
515/// - **Delete**: O(log n)
516/// - **Prefix scan**: O(log n + k) where k is matching entries
517/// - **Range scan**: O(log n + k) where k is matching entries
518///
519/// # Memory Usage
520///
521/// Keys and values are stored as owned `Vec<u8>` and `Bytes` respectively.
522/// Use `size_bytes()` to monitor memory usage.
523pub struct InMemoryStore {
524    /// The underlying sorted map
525    data: BTreeMap<Vec<u8>, Bytes>,
526    /// Track total size for monitoring
527    size_bytes: usize,
528}
529
530impl InMemoryStore {
531    /// Creates a new empty in-memory store.
532    #[must_use]
533    pub fn new() -> Self {
534        Self {
535            data: BTreeMap::new(),
536            size_bytes: 0,
537        }
538    }
539
540    /// Creates a new in-memory store.
541    ///
542    /// The capacity hint is accepted for API compatibility but has no
543    /// effect — `BTreeMap` does not support pre-allocation.
544    #[must_use]
545    pub fn with_capacity(_capacity: usize) -> Self {
546        Self::new()
547    }
548
549    /// Returns the number of entries in the store.
550    ///
551    /// `BTreeMap` does not expose a capacity concept, so this returns
552    /// the current entry count.
553    #[must_use]
554    pub fn capacity(&self) -> usize {
555        self.data.len()
556    }
557
558    /// No-op for API compatibility.
559    ///
560    /// `BTreeMap` manages its own memory and does not support
561    /// explicit shrinking.
562    pub fn shrink_to_fit(&mut self) {
563        // BTreeMap does not support shrink_to_fit
564    }
565}
566
567impl Default for InMemoryStore {
568    fn default() -> Self {
569        Self::new()
570    }
571}
572
573impl StateStore for InMemoryStore {
574    #[inline]
575    fn get(&self, key: &[u8]) -> Option<Bytes> {
576        self.data.get(key).cloned()
577    }
578
579    #[inline]
580    fn get_ref(&self, key: &[u8]) -> Option<&[u8]> {
581        self.data.get(key).map(Bytes::as_ref)
582    }
583
584    #[inline]
585    fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError> {
586        let value_bytes = Bytes::copy_from_slice(value);
587
588        // Check-then-insert: avoids key.to_vec() allocation on the
589        // update path, which is the common case for accumulator state.
590        if let Some(existing) = self.data.get_mut(key) {
591            self.size_bytes -= existing.len();
592            self.size_bytes += value.len();
593            *existing = value_bytes;
594        } else {
595            self.size_bytes += key.len() + value.len();
596            self.data.insert(key.to_vec(), value_bytes);
597        }
598        Ok(())
599    }
600
601    fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
602        if let Some(old_value) = self.data.remove(key) {
603            self.size_bytes -= key.len() + old_value.len();
604        }
605        Ok(())
606    }
607
608    fn prefix_scan<'a>(
609        &'a self,
610        prefix: &'a [u8],
611    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
612        if prefix.is_empty() {
613            // Empty prefix matches everything
614            return Box::new(
615                self.data
616                    .iter()
617                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
618            );
619        }
620        if let Some(end) = prefix_successor(prefix) {
621            Box::new(
622                self.data
623                    .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
624                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
625            )
626        } else {
627            // All-0xFF prefix: scan from prefix to end
628            Box::new(
629                self.data
630                    .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
631                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
632            )
633        }
634    }
635
636    fn range_scan<'a>(
637        &'a self,
638        range: Range<&'a [u8]>,
639    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
640        Box::new(
641            self.data
642                .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
643                .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
644        )
645    }
646
647    #[inline]
648    fn contains(&self, key: &[u8]) -> bool {
649        self.data.contains_key(key.as_ref())
650    }
651
652    fn size_bytes(&self) -> usize {
653        self.size_bytes
654    }
655
656    fn len(&self) -> usize {
657        self.data.len()
658    }
659
660    fn snapshot(&self) -> StateSnapshot {
661        let data: Vec<(Vec<u8>, Vec<u8>)> = self
662            .data
663            .iter()
664            .map(|(k, v)| (k.clone(), v.to_vec()))
665            .collect();
666        StateSnapshot::new(data)
667    }
668
669    fn restore(&mut self, snapshot: StateSnapshot) {
670        self.data.clear();
671        self.size_bytes = 0;
672
673        for (key, value) in snapshot.data {
674            self.size_bytes += key.len() + value.len();
675            self.data.insert(key, Bytes::from(value));
676        }
677    }
678
679    fn clear(&mut self) {
680        self.data.clear();
681        self.size_bytes = 0;
682    }
683}
684
685/// Errors that can occur in state operations.
686#[derive(Debug, thiserror::Error)]
687pub enum StateError {
688    /// I/O error
689    #[error("I/O error: {0}")]
690    Io(#[from] std::io::Error),
691
692    /// Serialization error
693    #[error("Serialization error: {0}")]
694    Serialization(String),
695
696    /// Deserialization error
697    #[error("Deserialization error: {0}")]
698    Deserialization(String),
699
700    /// Corruption error
701    #[error("Corruption error: {0}")]
702    Corruption(String),
703
704    /// Operation not supported by this store type
705    #[error("Operation not supported: {0}")]
706    NotSupported(String),
707
708    /// Key not found (for operations that require existing key)
709    #[error("Key not found")]
710    KeyNotFound,
711
712    /// Store capacity exceeded
713    #[error("Store capacity exceeded: {0}")]
714    CapacityExceeded(String),
715}
716
717mod mmap;
718
719/// AHashMap-backed state store with O(1) lookups and zero-copy reads.
720pub mod ahash_store;
721
722/// Changelog-aware state store wrapper.
723pub mod changelog_aware;
724
725/// Dictionary key encoder for low-cardinality GROUP BY keys.
726pub mod dict_encoder;
727
728// Re-export main types
729pub use self::StateError as Error;
730pub use ahash_store::AHashMapStore;
731pub use changelog_aware::{ChangelogAwareStore, ChangelogSink};
732pub use dict_encoder::DictionaryKeyEncoder;
733pub use mmap::MmapStateStore;
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    #[test]
740    fn test_in_memory_store_basic() {
741        let mut store = InMemoryStore::new();
742
743        // Test put and get
744        store.put(b"key1", b"value1").unwrap();
745        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
746        assert_eq!(store.len(), 1);
747
748        // Test overwrite
749        store.put(b"key1", b"value2").unwrap();
750        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
751        assert_eq!(store.len(), 1);
752
753        // Test delete
754        store.delete(b"key1").unwrap();
755        assert!(store.get(b"key1").is_none());
756        assert_eq!(store.len(), 0);
757
758        // Test delete non-existent key (should not error)
759        store.delete(b"nonexistent").unwrap();
760    }
761
762    #[test]
763    fn test_contains() {
764        let mut store = InMemoryStore::new();
765        assert!(!store.contains(b"key1"));
766
767        store.put(b"key1", b"value1").unwrap();
768        assert!(store.contains(b"key1"));
769
770        store.delete(b"key1").unwrap();
771        assert!(!store.contains(b"key1"));
772    }
773
774    #[test]
775    fn test_prefix_scan() {
776        let mut store = InMemoryStore::new();
777        store.put(b"prefix:1", b"value1").unwrap();
778        store.put(b"prefix:2", b"value2").unwrap();
779        store.put(b"prefix:10", b"value10").unwrap();
780        store.put(b"other:1", b"value3").unwrap();
781
782        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
783        assert_eq!(results.len(), 3);
784
785        // All results should have the prefix
786        for (key, _) in &results {
787            assert!(key.starts_with(b"prefix:"));
788        }
789
790        // Empty prefix returns all
791        let all: Vec<_> = store.prefix_scan(b"").collect();
792        assert_eq!(all.len(), 4);
793    }
794
795    #[test]
796    fn test_range_scan() {
797        let mut store = InMemoryStore::new();
798        store.put(b"a", b"1").unwrap();
799        store.put(b"b", b"2").unwrap();
800        store.put(b"c", b"3").unwrap();
801        store.put(b"d", b"4").unwrap();
802
803        let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
804        assert_eq!(results.len(), 2);
805
806        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
807        assert!(keys.contains(&b"b".as_slice()));
808        assert!(keys.contains(&b"c".as_slice()));
809    }
810
811    #[test]
812    fn test_snapshot_and_restore() {
813        let mut store = InMemoryStore::new();
814        store.put(b"key1", b"value1").unwrap();
815        store.put(b"key2", b"value2").unwrap();
816
817        // Take snapshot
818        let snapshot = store.snapshot();
819        assert_eq!(snapshot.len(), 2);
820
821        // Modify store
822        store.put(b"key1", b"modified").unwrap();
823        store.put(b"key3", b"value3").unwrap();
824        store.delete(b"key2").unwrap();
825
826        assert_eq!(store.len(), 2);
827        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
828
829        // Restore from snapshot
830        store.restore(snapshot);
831
832        assert_eq!(store.len(), 2);
833        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
834        assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
835        assert!(store.get(b"key3").is_none());
836    }
837
838    #[test]
839    fn test_snapshot_serialization() {
840        let mut store = InMemoryStore::new();
841        store.put(b"key1", b"value1").unwrap();
842        store.put(b"key2", b"value2").unwrap();
843
844        let snapshot = store.snapshot();
845
846        // Serialize and deserialize
847        let bytes = snapshot.to_bytes().unwrap();
848        let restored = StateSnapshot::from_bytes(&bytes).unwrap();
849
850        assert_eq!(restored.len(), snapshot.len());
851        assert_eq!(restored.data(), snapshot.data());
852    }
853
854    #[test]
855    fn test_typed_access() {
856        let mut store = InMemoryStore::new();
857
858        // Test with integer
859        store.put_typed(b"count", &42u64).unwrap();
860        let count: u64 = store.get_typed(b"count").unwrap().unwrap();
861        assert_eq!(count, 42);
862
863        // Test with string
864        store.put_typed(b"name", &String::from("alice")).unwrap();
865        let name: String = store.get_typed(b"name").unwrap().unwrap();
866        assert_eq!(name, "alice");
867
868        // Test with vector (complex type)
869        let nums = vec![1i64, 2, 3, 4, 5];
870        store.put_typed(b"nums", &nums).unwrap();
871        let restored: Vec<i64> = store.get_typed(b"nums").unwrap().unwrap();
872        assert_eq!(restored, nums);
873
874        // Test non-existent key
875        let missing: Option<u64> = store.get_typed(b"missing").unwrap();
876        assert!(missing.is_none());
877    }
878
879    #[test]
880    fn test_get_or_insert() {
881        let mut store = InMemoryStore::new();
882
883        // First call inserts default
884        let value = store.get_or_insert(b"key1", b"default").unwrap();
885        assert_eq!(value, Bytes::from("default"));
886        assert_eq!(store.len(), 1);
887
888        // Second call returns existing
889        store.put(b"key1", b"modified").unwrap();
890        let value = store.get_or_insert(b"key1", b"default").unwrap();
891        assert_eq!(value, Bytes::from("modified"));
892    }
893
894    #[test]
895    fn test_update() {
896        let mut store = InMemoryStore::new();
897        store.put(b"counter", b"\x00\x00\x00\x00").unwrap();
898
899        // Update existing
900        store
901            .update(b"counter", |current| {
902                let val = current.map_or(0u32, |b| {
903                    u32::from_le_bytes(b.as_ref().try_into().unwrap_or([0; 4]))
904                });
905                Some((val + 1).to_le_bytes().to_vec())
906            })
907            .unwrap();
908
909        let bytes = store.get(b"counter").unwrap();
910        let val = u32::from_le_bytes(bytes.as_ref().try_into().unwrap());
911        assert_eq!(val, 1);
912
913        // Update to delete
914        store.update(b"counter", |_| None).unwrap();
915        assert!(store.get(b"counter").is_none());
916    }
917
918    #[test]
919    fn test_size_tracking() {
920        let mut store = InMemoryStore::new();
921        assert_eq!(store.size_bytes(), 0);
922
923        store.put(b"key1", b"value1").unwrap();
924        assert_eq!(store.size_bytes(), 4 + 6); // "key1" + "value1"
925
926        store.put(b"key2", b"value2").unwrap();
927        assert_eq!(store.size_bytes(), (4 + 6) * 2);
928
929        // Overwrite with smaller value
930        store.put(b"key1", b"v1").unwrap();
931        assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6); // "key1" + "v1" + "key2" + "value2"
932
933        store.delete(b"key1").unwrap();
934        assert_eq!(store.size_bytes(), 4 + 6);
935
936        store.clear();
937        assert_eq!(store.size_bytes(), 0);
938    }
939
940    #[test]
941    fn test_with_capacity() {
942        let store = InMemoryStore::with_capacity(1000);
943        // BTreeMap does not pre-allocate; capacity() returns len() which is 0
944        assert_eq!(store.capacity(), 0);
945        assert!(store.is_empty());
946    }
947
948    #[test]
949    fn test_clear() {
950        let mut store = InMemoryStore::new();
951        store.put(b"key1", b"value1").unwrap();
952        store.put(b"key2", b"value2").unwrap();
953
954        assert_eq!(store.len(), 2);
955        assert!(store.size_bytes() > 0);
956
957        store.clear();
958
959        assert_eq!(store.len(), 0);
960        assert_eq!(store.size_bytes(), 0);
961        assert!(store.get(b"key1").is_none());
962    }
963
964    #[test]
965    fn test_prefix_successor() {
966        // Normal case
967        assert_eq!(prefix_successor(b"abc").as_deref(), Some(b"abd".as_slice()));
968
969        // Empty prefix
970        assert!(prefix_successor(b"").is_none());
971
972        // All 0xFF bytes — no successor
973        assert!(prefix_successor(&[0xFF, 0xFF, 0xFF]).is_none());
974
975        // Trailing 0xFF bytes are truncated and previous byte incremented
976        assert_eq!(
977            prefix_successor(&[0x01, 0xFF]).as_deref(),
978            Some([0x02].as_slice())
979        );
980        assert_eq!(
981            prefix_successor(&[0x01, 0x02, 0xFF]).as_deref(),
982            Some([0x01, 0x03].as_slice())
983        );
984
985        // Single byte
986        assert_eq!(
987            prefix_successor(&[0x00]).as_deref(),
988            Some([0x01].as_slice())
989        );
990        assert_eq!(
991            prefix_successor(&[0xFE]).as_deref(),
992            Some([0xFF].as_slice())
993        );
994        assert!(prefix_successor(&[0xFF]).is_none());
995    }
996
997    #[test]
998    fn test_prefix_scan_binary_keys() {
999        let mut store = InMemoryStore::new();
1000
1001        // Simulate join state keys: partition_prefix + key_hash
1002        let prefix_a = [0x00, 0x01]; // partition 0, stream 1
1003        let prefix_b = [0x00, 0x02]; // partition 0, stream 2
1004
1005        store.put(&[0x00, 0x01, 0xAA], b"val1").unwrap();
1006        store.put(&[0x00, 0x01, 0xBB], b"val2").unwrap();
1007        store.put(&[0x00, 0x02, 0xCC], b"val3").unwrap();
1008        store.put(&[0x00, 0x02, 0xDD], b"val4").unwrap();
1009        store.put(&[0x01, 0x01, 0xEE], b"val5").unwrap();
1010
1011        // Prefix scan for partition_a
1012        let results_a: Vec<_> = store.prefix_scan(&prefix_a).collect();
1013        assert_eq!(results_a.len(), 2);
1014        for (key, _) in &results_a {
1015            assert!(key.starts_with(&prefix_a));
1016        }
1017
1018        // Prefix scan for partition_b
1019        let results_b: Vec<_> = store.prefix_scan(&prefix_b).collect();
1020        assert_eq!(results_b.len(), 2);
1021        for (key, _) in &results_b {
1022            assert!(key.starts_with(&prefix_b));
1023        }
1024
1025        // Prefix scan with all-0xFF prefix
1026        let results_ff: Vec<_> = store.prefix_scan(&[0xFF, 0xFF]).collect();
1027        assert_eq!(results_ff.len(), 0);
1028    }
1029
1030    #[test]
1031    fn test_prefix_scan_returns_sorted() {
1032        let mut store = InMemoryStore::new();
1033        store.put(b"prefix:c", b"3").unwrap();
1034        store.put(b"prefix:a", b"1").unwrap();
1035        store.put(b"prefix:b", b"2").unwrap();
1036
1037        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
1038        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref().to_vec()).collect();
1039        assert_eq!(
1040            keys,
1041            vec![
1042                b"prefix:a".to_vec(),
1043                b"prefix:b".to_vec(),
1044                b"prefix:c".to_vec()
1045            ]
1046        );
1047    }
1048}