Skip to main content

laminar_core/state/
mod.rs

1//! # State Store Module
2//!
3//! High-performance state storage for streaming operators.
4//!
5//! ## Design Goals
6//!
7//! - **< 500ns lookup latency** for point queries
8//! - **Zero-copy** access where possible
9//! - **Lock-free** for single-threaded access
10//! - **Memory-mapped** for large state
11//!
12//! ## State Backends
13//!
14//! - **[`InMemoryStore`]**: BTreeMap-based, fast lookups with O(log n + k) prefix/range scans
15//! - **[`MmapStateStore`]**: Memory-mapped, supports larger-than-memory state with optional persistence
16//! - **Hybrid**: Combination with hot/cold separation (future)
17//!
18//! ## Example
19//!
20//! ```rust
21//! use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
22//!
23//! let mut store = InMemoryStore::new();
24//!
25//! // Basic key-value operations
26//! store.put(b"user:1", b"alice").unwrap();
27//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
28//!
29//! // Typed state access (requires StateStoreExt)
30//! store.put_typed(b"count", &42u64).unwrap();
31//! let count: u64 = store.get_typed(b"count").unwrap().unwrap();
32//! assert_eq!(count, 42);
33//!
34//! // Snapshots for checkpointing
35//! let snapshot = store.snapshot();
36//! store.delete(b"user:1").unwrap();
37//! assert!(store.get(b"user:1").is_none());
38//!
39//! // Restore from snapshot
40//! store.restore(snapshot);
41//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
42//! ```
43//!
44//! ## Memory-Mapped Store Example
45//!
46//! ```rust,no_run
47//! use laminar_core::state::{StateStore, MmapStateStore};
48//! use std::path::Path;
49//!
50//! // In-memory mode (fast, not persistent)
51//! let mut store = MmapStateStore::in_memory(1024 * 1024);
52//! store.put(b"key", b"value").unwrap();
53//!
54//! // Persistent mode (survives restarts)
55//! let mut persistent = MmapStateStore::persistent(
56//!     Path::new("/tmp/state.db"),
57//!     1024 * 1024
58//! ).unwrap();
59//! persistent.put(b"key", b"value").unwrap();
60//! persistent.flush().unwrap();
61//! ```
62
63use bytes::Bytes;
64use rkyv::{
65    api::high::{HighDeserializer, HighSerializer, HighValidator},
66    bytecheck::CheckBytes,
67    rancor::Error as RkyvError,
68    ser::allocator::ArenaHandle,
69    util::AlignedVec,
70    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
71};
72use std::collections::BTreeMap;
73use std::ops::Bound;
74use std::ops::Range;
75
76/// Compute the lexicographic successor of a byte prefix.
77///
78/// Returns `None` if no successor exists (empty prefix or all bytes are 0xFF).
79/// Used by `BTreeMap::range()` to efficiently bound prefix scans.
80fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
81    if prefix.is_empty() {
82        return None;
83    }
84    let mut successor = prefix.to_vec();
85    // Walk backwards, incrementing the last non-0xFF byte
86    while let Some(last) = successor.last_mut() {
87        if *last < 0xFF {
88            *last += 1;
89            return Some(successor);
90        }
91        successor.pop();
92    }
93    // All bytes were 0xFF — no successor exists
94    None
95}
96
97/// Trait for state store implementations.
98///
99/// This is the core abstraction for operator state in Ring 0 (hot path).
100/// All implementations must achieve < 500ns lookup latency for point queries.
101///
102/// # Thread Safety
103///
104/// State stores are `Send` but not `Sync`. They are designed for single-threaded
105/// access within a reactor. Cross-thread communication uses SPSC queues.
106///
107/// # Memory Model
108///
109/// - `get()` returns `Bytes` which is a cheap reference-counted handle
110/// - `put()` copies the input to internal storage
111/// - Snapshots are copy-on-write where possible
112///
113/// # Dyn Compatibility
114///
115/// This trait is dyn-compatible for use with `Box<dyn StateStore>`. For generic
116/// convenience methods like `get_typed` and `put_typed`, use the [`StateStoreExt`]
117/// extension trait.
118pub trait StateStore: Send {
119    /// Get a value by key.
120    ///
121    /// Returns `None` if the key does not exist.
122    ///
123    /// # Performance
124    ///
125    /// Target: < 500ns for in-memory stores.
126    fn get(&self, key: &[u8]) -> Option<Bytes>;
127
128    /// Store a key-value pair.
129    ///
130    /// If the key already exists, the value is overwritten.
131    ///
132    /// # Errors
133    ///
134    /// Returns `StateError` if the operation fails (e.g., disk full for
135    /// memory-mapped stores).
136    fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError>;
137
138    /// Delete a key.
139    ///
140    /// No error is returned if the key does not exist.
141    ///
142    /// # Errors
143    ///
144    /// Returns `StateError` if the operation fails.
145    fn delete(&mut self, key: &[u8]) -> Result<(), StateError>;
146
147    /// Scan all keys with a given prefix.
148    ///
149    /// Returns an iterator over matching (key, value) pairs in
150    /// lexicographic order.
151    ///
152    /// # Performance
153    ///
154    /// O(log n + k) where n is the total number of keys and k is the
155    /// number of matching entries.
156    fn prefix_scan<'a>(&'a self, prefix: &'a [u8])
157        -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
158
159    /// Range scan between two keys (exclusive end).
160    ///
161    /// Returns an iterator over keys where `start <= key < end`
162    /// in lexicographic order.
163    ///
164    /// # Performance
165    ///
166    /// O(log n + k) where n is the total number of keys and k is the
167    /// number of matching entries.
168    fn range_scan<'a>(
169        &'a self,
170        range: Range<&'a [u8]>,
171    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
172
173    /// Check if a key exists.
174    ///
175    /// More efficient than `get()` when you don't need the value.
176    fn contains(&self, key: &[u8]) -> bool {
177        self.get(key).is_some()
178    }
179
180    /// Get approximate size in bytes.
181    ///
182    /// This includes both keys and values. The exact accounting may vary
183    /// by implementation.
184    fn size_bytes(&self) -> usize;
185
186    /// Get the number of entries in the store.
187    fn len(&self) -> usize;
188
189    /// Check if the store is empty.
190    fn is_empty(&self) -> bool {
191        self.len() == 0
192    }
193
194    /// Create a snapshot for checkpointing.
195    ///
196    /// The snapshot captures the current state and can be used to restore
197    /// the store to this point in time. Snapshots are serializable for
198    /// persistence.
199    ///
200    /// # Implementation Notes
201    ///
202    /// For in-memory stores, this clones the data. For memory-mapped stores,
203    /// this may use copy-on-write semantics.
204    fn snapshot(&self) -> StateSnapshot;
205
206    /// Restore from a snapshot.
207    ///
208    /// This replaces the current state with the snapshot's state.
209    /// Any changes since the snapshot was taken are lost.
210    fn restore(&mut self, snapshot: StateSnapshot);
211
212    /// Clear all entries.
213    fn clear(&mut self);
214
215    /// Flush any pending writes to durable storage.
216    ///
217    /// For in-memory stores, this is a no-op. For memory-mapped or
218    /// disk-backed stores, this ensures data is persisted.
219    ///
220    /// # Errors
221    ///
222    /// Returns `StateError` if the flush operation fails.
223    fn flush(&mut self) -> Result<(), StateError> {
224        Ok(()) // Default no-op for in-memory stores
225    }
226
227    /// Get a value or insert a default.
228    ///
229    /// If the key doesn't exist, the default is inserted and returned.
230    ///
231    /// # Errors
232    ///
233    /// Returns `StateError` if inserting the default value fails.
234    fn get_or_insert(&mut self, key: &[u8], default: &[u8]) -> Result<Bytes, StateError> {
235        if let Some(value) = self.get(key) {
236            Ok(value)
237        } else {
238            self.put(key, default)?;
239            Ok(Bytes::copy_from_slice(default))
240        }
241    }
242}
243
244/// Extension trait for [`StateStore`] providing typed access methods.
245///
246/// These methods use generics and thus cannot be part of the dyn-compatible
247/// `StateStore` trait. Import this trait to use typed access on any state store.
248///
249/// Uses rkyv for zero-copy serialization. Types must derive `Archive`,
250/// `rkyv::Serialize`, and `rkyv::Deserialize`.
251///
252/// # Example
253///
254/// ```rust,ignore
255/// use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
256/// use rkyv::{Archive, Deserialize, Serialize};
257///
258/// #[derive(Archive, Serialize, Deserialize)]
259/// #[rkyv(check_bytes)]
260/// struct Counter { value: u64 }
261///
262/// let mut store = InMemoryStore::new();
263/// store.put_typed(b"count", &Counter { value: 42 }).unwrap();
264/// let count: Counter = store.get_typed(b"count").unwrap().unwrap();
265/// assert_eq!(count.value, 42);
266/// ```
267pub trait StateStoreExt: StateStore {
268    /// Get a value and deserialize it using rkyv.
269    ///
270    /// Uses zero-copy access where possible, falling back to full
271    /// deserialization to return an owned value.
272    ///
273    /// # Errors
274    ///
275    /// Returns `StateError::Serialization` if deserialization fails.
276    fn get_typed<T>(&self, key: &[u8]) -> Result<Option<T>, StateError>
277    where
278        T: Archive,
279        T::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
280            + RkyvDeserialize<T, HighDeserializer<RkyvError>>,
281    {
282        match self.get(key) {
283            Some(bytes) => {
284                let archived = rkyv::access::<T::Archived, RkyvError>(&bytes)
285                    .map_err(|e| StateError::Serialization(e.to_string()))?;
286                let value = rkyv::deserialize::<T, RkyvError>(archived)
287                    .map_err(|e| StateError::Serialization(e.to_string()))?;
288                Ok(Some(value))
289            }
290            None => Ok(None),
291        }
292    }
293
294    /// Serialize and store a value using rkyv.
295    ///
296    /// Uses aligned buffers for optimal performance on the hot path.
297    ///
298    /// # Errors
299    ///
300    /// Returns `StateError::Serialization` if serialization fails.
301    fn put_typed<T>(&mut self, key: &[u8], value: &T) -> Result<(), StateError>
302    where
303        T: for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
304    {
305        let bytes = rkyv::to_bytes::<RkyvError>(value)
306            .map_err(|e| StateError::Serialization(e.to_string()))?;
307        self.put(key, &bytes)
308    }
309
310    /// Update a value in place using a closure.
311    ///
312    /// The update function receives the current value (or None) and returns
313    /// the new value. If `None` is returned, the key is deleted.
314    ///
315    /// # Errors
316    ///
317    /// Returns `StateError` if the put or delete operation fails.
318    fn update<F>(&mut self, key: &[u8], f: F) -> Result<(), StateError>
319    where
320        F: FnOnce(Option<Bytes>) -> Option<Vec<u8>>,
321    {
322        let current = self.get(key);
323        match f(current) {
324            Some(new_value) => self.put(key, &new_value),
325            None => self.delete(key),
326        }
327    }
328}
329
330// Blanket implementation for all StateStore types
331impl<T: StateStore + ?Sized> StateStoreExt for T {}
332
333/// A snapshot of state store contents for checkpointing.
334///
335/// Snapshots can be serialized for persistence and restored later.
336/// They capture the complete state at a point in time.
337///
338/// Uses rkyv for zero-copy deserialization on the hot path.
339#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
340pub struct StateSnapshot {
341    /// Serialized state data
342    data: Vec<(Vec<u8>, Vec<u8>)>,
343    /// Timestamp when snapshot was created (nanoseconds since epoch)
344    timestamp_ns: u64,
345    /// Version for forward compatibility
346    version: u32,
347}
348
349impl StateSnapshot {
350    /// Create a new snapshot from key-value pairs.
351    #[must_use]
352    #[allow(clippy::cast_possible_truncation)]
353    pub fn new(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
354        Self {
355            data,
356            // Truncation is acceptable here - we won't hit u64 overflow until ~584 years from epoch
357            timestamp_ns: std::time::SystemTime::now()
358                .duration_since(std::time::UNIX_EPOCH)
359                .map(|d| d.as_nanos() as u64)
360                .unwrap_or(0),
361            version: 1,
362        }
363    }
364
365    /// Get the snapshot data.
366    #[must_use]
367    pub fn data(&self) -> &[(Vec<u8>, Vec<u8>)] {
368        &self.data
369    }
370
371    /// Get the snapshot timestamp.
372    #[must_use]
373    pub fn timestamp_ns(&self) -> u64 {
374        self.timestamp_ns
375    }
376
377    /// Get the number of entries in the snapshot.
378    #[must_use]
379    pub fn len(&self) -> usize {
380        self.data.len()
381    }
382
383    /// Check if the snapshot is empty.
384    #[must_use]
385    pub fn is_empty(&self) -> bool {
386        self.data.is_empty()
387    }
388
389    /// Get the approximate size in bytes.
390    #[must_use]
391    pub fn size_bytes(&self) -> usize {
392        self.data.iter().map(|(k, v)| k.len() + v.len()).sum()
393    }
394
395    /// Serialize the snapshot to bytes using rkyv.
396    ///
397    /// Returns an aligned byte vector for optimal zero-copy access.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if serialization fails.
402    pub fn to_bytes(&self) -> Result<AlignedVec, StateError> {
403        rkyv::to_bytes::<RkyvError>(self).map_err(|e| StateError::Serialization(e.to_string()))
404    }
405
406    /// Deserialize a snapshot from bytes using rkyv.
407    ///
408    /// Uses zero-copy access internally for performance.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if deserialization fails.
413    pub fn from_bytes(bytes: &[u8]) -> Result<Self, StateError> {
414        let archived = rkyv::access::<<Self as Archive>::Archived, RkyvError>(bytes)
415            .map_err(|e| StateError::Serialization(e.to_string()))?;
416        rkyv::deserialize::<Self, RkyvError>(archived)
417            .map_err(|e| StateError::Serialization(e.to_string()))
418    }
419}
420
421/// In-memory state store using `BTreeMap` for sorted key access.
422///
423/// This state store is suitable for state that fits in memory. It uses
424/// `BTreeMap` which provides O(log n + k) prefix and range scans, making
425/// it efficient for join state and windowed aggregation lookups.
426///
427/// # Performance Characteristics
428///
429/// - **Get**: O(log n), < 500ns typical
430/// - **Put**: O(log n), may allocate
431/// - **Delete**: O(log n)
432/// - **Prefix scan**: O(log n + k) where k is matching entries
433/// - **Range scan**: O(log n + k) where k is matching entries
434///
435/// # Memory Usage
436///
437/// Keys and values are stored as owned `Vec<u8>` and `Bytes` respectively.
438/// Use `size_bytes()` to monitor memory usage.
439pub struct InMemoryStore {
440    /// The underlying sorted map
441    data: BTreeMap<Vec<u8>, Bytes>,
442    /// Track total size for monitoring
443    size_bytes: usize,
444}
445
446impl InMemoryStore {
447    /// Creates a new empty in-memory store.
448    #[must_use]
449    pub fn new() -> Self {
450        Self {
451            data: BTreeMap::new(),
452            size_bytes: 0,
453        }
454    }
455
456    /// Creates a new in-memory store.
457    ///
458    /// The capacity hint is accepted for API compatibility but has no
459    /// effect — `BTreeMap` does not support pre-allocation.
460    #[must_use]
461    pub fn with_capacity(_capacity: usize) -> Self {
462        Self::new()
463    }
464
465    /// Returns the number of entries in the store.
466    ///
467    /// `BTreeMap` does not expose a capacity concept, so this returns
468    /// the current entry count.
469    #[must_use]
470    pub fn capacity(&self) -> usize {
471        self.data.len()
472    }
473
474    /// No-op for API compatibility.
475    ///
476    /// `BTreeMap` manages its own memory and does not support
477    /// explicit shrinking.
478    pub fn shrink_to_fit(&mut self) {
479        // BTreeMap does not support shrink_to_fit
480    }
481}
482
483impl Default for InMemoryStore {
484    fn default() -> Self {
485        Self::new()
486    }
487}
488
489impl StateStore for InMemoryStore {
490    #[inline]
491    fn get(&self, key: &[u8]) -> Option<Bytes> {
492        self.data.get(key).cloned()
493    }
494
495    #[inline]
496    fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError> {
497        let value_bytes = Bytes::copy_from_slice(value);
498
499        // Entry API: single tree traversal for both insert and update
500        match self.data.entry(key.to_vec()) {
501            std::collections::btree_map::Entry::Occupied(mut entry) => {
502                self.size_bytes -= entry.get().len();
503                self.size_bytes += value.len();
504                *entry.get_mut() = value_bytes;
505            }
506            std::collections::btree_map::Entry::Vacant(entry) => {
507                self.size_bytes += key.len() + value.len();
508                entry.insert(value_bytes);
509            }
510        }
511        Ok(())
512    }
513
514    fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
515        if let Some(old_value) = self.data.remove(key) {
516            self.size_bytes -= key.len() + old_value.len();
517        }
518        Ok(())
519    }
520
521    fn prefix_scan<'a>(
522        &'a self,
523        prefix: &'a [u8],
524    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
525        if prefix.is_empty() {
526            // Empty prefix matches everything
527            return Box::new(
528                self.data
529                    .iter()
530                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
531            );
532        }
533        if let Some(end) = prefix_successor(prefix) {
534            Box::new(
535                self.data
536                    .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
537                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
538            )
539        } else {
540            // All-0xFF prefix: scan from prefix to end
541            Box::new(
542                self.data
543                    .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
544                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
545            )
546        }
547    }
548
549    fn range_scan<'a>(
550        &'a self,
551        range: Range<&'a [u8]>,
552    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
553        Box::new(
554            self.data
555                .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
556                .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
557        )
558    }
559
560    #[inline]
561    fn contains(&self, key: &[u8]) -> bool {
562        self.data.contains_key(key.as_ref())
563    }
564
565    fn size_bytes(&self) -> usize {
566        self.size_bytes
567    }
568
569    fn len(&self) -> usize {
570        self.data.len()
571    }
572
573    fn snapshot(&self) -> StateSnapshot {
574        let data: Vec<(Vec<u8>, Vec<u8>)> = self
575            .data
576            .iter()
577            .map(|(k, v)| (k.clone(), v.to_vec()))
578            .collect();
579        StateSnapshot::new(data)
580    }
581
582    fn restore(&mut self, snapshot: StateSnapshot) {
583        self.data.clear();
584        self.size_bytes = 0;
585
586        for (key, value) in snapshot.data {
587            self.size_bytes += key.len() + value.len();
588            self.data.insert(key, Bytes::from(value));
589        }
590    }
591
592    fn clear(&mut self) {
593        self.data.clear();
594        self.size_bytes = 0;
595    }
596}
597
598/// Errors that can occur in state operations.
599#[derive(Debug, thiserror::Error)]
600pub enum StateError {
601    /// I/O error
602    #[error("I/O error: {0}")]
603    Io(#[from] std::io::Error),
604
605    /// Serialization error
606    #[error("Serialization error: {0}")]
607    Serialization(String),
608
609    /// Deserialization error
610    #[error("Deserialization error: {0}")]
611    Deserialization(String),
612
613    /// Corruption error
614    #[error("Corruption error: {0}")]
615    Corruption(String),
616
617    /// Operation not supported by this store type
618    #[error("Operation not supported: {0}")]
619    NotSupported(String),
620
621    /// Key not found (for operations that require existing key)
622    #[error("Key not found")]
623    KeyNotFound,
624
625    /// Store capacity exceeded
626    #[error("Store capacity exceeded: {0}")]
627    CapacityExceeded(String),
628}
629
630mod mmap;
631
632// Re-export main types
633pub use self::StateError as Error;
634pub use mmap::MmapStateStore;
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639
640    #[test]
641    fn test_in_memory_store_basic() {
642        let mut store = InMemoryStore::new();
643
644        // Test put and get
645        store.put(b"key1", b"value1").unwrap();
646        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
647        assert_eq!(store.len(), 1);
648
649        // Test overwrite
650        store.put(b"key1", b"value2").unwrap();
651        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
652        assert_eq!(store.len(), 1);
653
654        // Test delete
655        store.delete(b"key1").unwrap();
656        assert!(store.get(b"key1").is_none());
657        assert_eq!(store.len(), 0);
658
659        // Test delete non-existent key (should not error)
660        store.delete(b"nonexistent").unwrap();
661    }
662
663    #[test]
664    fn test_contains() {
665        let mut store = InMemoryStore::new();
666        assert!(!store.contains(b"key1"));
667
668        store.put(b"key1", b"value1").unwrap();
669        assert!(store.contains(b"key1"));
670
671        store.delete(b"key1").unwrap();
672        assert!(!store.contains(b"key1"));
673    }
674
675    #[test]
676    fn test_prefix_scan() {
677        let mut store = InMemoryStore::new();
678        store.put(b"prefix:1", b"value1").unwrap();
679        store.put(b"prefix:2", b"value2").unwrap();
680        store.put(b"prefix:10", b"value10").unwrap();
681        store.put(b"other:1", b"value3").unwrap();
682
683        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
684        assert_eq!(results.len(), 3);
685
686        // All results should have the prefix
687        for (key, _) in &results {
688            assert!(key.starts_with(b"prefix:"));
689        }
690
691        // Empty prefix returns all
692        let all: Vec<_> = store.prefix_scan(b"").collect();
693        assert_eq!(all.len(), 4);
694    }
695
696    #[test]
697    fn test_range_scan() {
698        let mut store = InMemoryStore::new();
699        store.put(b"a", b"1").unwrap();
700        store.put(b"b", b"2").unwrap();
701        store.put(b"c", b"3").unwrap();
702        store.put(b"d", b"4").unwrap();
703
704        let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
705        assert_eq!(results.len(), 2);
706
707        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
708        assert!(keys.contains(&b"b".as_slice()));
709        assert!(keys.contains(&b"c".as_slice()));
710    }
711
712    #[test]
713    fn test_snapshot_and_restore() {
714        let mut store = InMemoryStore::new();
715        store.put(b"key1", b"value1").unwrap();
716        store.put(b"key2", b"value2").unwrap();
717
718        // Take snapshot
719        let snapshot = store.snapshot();
720        assert_eq!(snapshot.len(), 2);
721
722        // Modify store
723        store.put(b"key1", b"modified").unwrap();
724        store.put(b"key3", b"value3").unwrap();
725        store.delete(b"key2").unwrap();
726
727        assert_eq!(store.len(), 2);
728        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
729
730        // Restore from snapshot
731        store.restore(snapshot);
732
733        assert_eq!(store.len(), 2);
734        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
735        assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
736        assert!(store.get(b"key3").is_none());
737    }
738
739    #[test]
740    fn test_snapshot_serialization() {
741        let mut store = InMemoryStore::new();
742        store.put(b"key1", b"value1").unwrap();
743        store.put(b"key2", b"value2").unwrap();
744
745        let snapshot = store.snapshot();
746
747        // Serialize and deserialize
748        let bytes = snapshot.to_bytes().unwrap();
749        let restored = StateSnapshot::from_bytes(&bytes).unwrap();
750
751        assert_eq!(restored.len(), snapshot.len());
752        assert_eq!(restored.data(), snapshot.data());
753    }
754
755    #[test]
756    fn test_typed_access() {
757        let mut store = InMemoryStore::new();
758
759        // Test with integer
760        store.put_typed(b"count", &42u64).unwrap();
761        let count: u64 = store.get_typed(b"count").unwrap().unwrap();
762        assert_eq!(count, 42);
763
764        // Test with string
765        store.put_typed(b"name", &String::from("alice")).unwrap();
766        let name: String = store.get_typed(b"name").unwrap().unwrap();
767        assert_eq!(name, "alice");
768
769        // Test with vector (complex type)
770        let nums = vec![1i64, 2, 3, 4, 5];
771        store.put_typed(b"nums", &nums).unwrap();
772        let restored: Vec<i64> = store.get_typed(b"nums").unwrap().unwrap();
773        assert_eq!(restored, nums);
774
775        // Test non-existent key
776        let missing: Option<u64> = store.get_typed(b"missing").unwrap();
777        assert!(missing.is_none());
778    }
779
780    #[test]
781    fn test_get_or_insert() {
782        let mut store = InMemoryStore::new();
783
784        // First call inserts default
785        let value = store.get_or_insert(b"key1", b"default").unwrap();
786        assert_eq!(value, Bytes::from("default"));
787        assert_eq!(store.len(), 1);
788
789        // Second call returns existing
790        store.put(b"key1", b"modified").unwrap();
791        let value = store.get_or_insert(b"key1", b"default").unwrap();
792        assert_eq!(value, Bytes::from("modified"));
793    }
794
795    #[test]
796    fn test_update() {
797        let mut store = InMemoryStore::new();
798        store.put(b"counter", b"\x00\x00\x00\x00").unwrap();
799
800        // Update existing
801        store
802            .update(b"counter", |current| {
803                let val = current.map_or(0u32, |b| {
804                    u32::from_le_bytes(b.as_ref().try_into().unwrap_or([0; 4]))
805                });
806                Some((val + 1).to_le_bytes().to_vec())
807            })
808            .unwrap();
809
810        let bytes = store.get(b"counter").unwrap();
811        let val = u32::from_le_bytes(bytes.as_ref().try_into().unwrap());
812        assert_eq!(val, 1);
813
814        // Update to delete
815        store.update(b"counter", |_| None).unwrap();
816        assert!(store.get(b"counter").is_none());
817    }
818
819    #[test]
820    fn test_size_tracking() {
821        let mut store = InMemoryStore::new();
822        assert_eq!(store.size_bytes(), 0);
823
824        store.put(b"key1", b"value1").unwrap();
825        assert_eq!(store.size_bytes(), 4 + 6); // "key1" + "value1"
826
827        store.put(b"key2", b"value2").unwrap();
828        assert_eq!(store.size_bytes(), (4 + 6) * 2);
829
830        // Overwrite with smaller value
831        store.put(b"key1", b"v1").unwrap();
832        assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6); // "key1" + "v1" + "key2" + "value2"
833
834        store.delete(b"key1").unwrap();
835        assert_eq!(store.size_bytes(), 4 + 6);
836
837        store.clear();
838        assert_eq!(store.size_bytes(), 0);
839    }
840
841    #[test]
842    fn test_with_capacity() {
843        let store = InMemoryStore::with_capacity(1000);
844        // BTreeMap does not pre-allocate; capacity() returns len() which is 0
845        assert_eq!(store.capacity(), 0);
846        assert!(store.is_empty());
847    }
848
849    #[test]
850    fn test_clear() {
851        let mut store = InMemoryStore::new();
852        store.put(b"key1", b"value1").unwrap();
853        store.put(b"key2", b"value2").unwrap();
854
855        assert_eq!(store.len(), 2);
856        assert!(store.size_bytes() > 0);
857
858        store.clear();
859
860        assert_eq!(store.len(), 0);
861        assert_eq!(store.size_bytes(), 0);
862        assert!(store.get(b"key1").is_none());
863    }
864
865    #[test]
866    fn test_prefix_successor() {
867        // Normal case
868        assert_eq!(prefix_successor(b"abc"), Some(b"abd".to_vec()));
869
870        // Empty prefix
871        assert_eq!(prefix_successor(b""), None);
872
873        // All 0xFF bytes — no successor
874        assert_eq!(prefix_successor(&[0xFF, 0xFF, 0xFF]), None);
875
876        // Trailing 0xFF bytes are truncated and previous byte incremented
877        assert_eq!(prefix_successor(&[0x01, 0xFF]), Some(vec![0x02]));
878        assert_eq!(
879            prefix_successor(&[0x01, 0x02, 0xFF]),
880            Some(vec![0x01, 0x03])
881        );
882
883        // Single byte
884        assert_eq!(prefix_successor(&[0x00]), Some(vec![0x01]));
885        assert_eq!(prefix_successor(&[0xFE]), Some(vec![0xFF]));
886        assert_eq!(prefix_successor(&[0xFF]), None);
887    }
888
889    #[test]
890    fn test_prefix_scan_binary_keys() {
891        let mut store = InMemoryStore::new();
892
893        // Simulate join state keys: partition_prefix + key_hash
894        let prefix_a = [0x00, 0x01]; // partition 0, stream 1
895        let prefix_b = [0x00, 0x02]; // partition 0, stream 2
896
897        store.put(&[0x00, 0x01, 0xAA], b"val1").unwrap();
898        store.put(&[0x00, 0x01, 0xBB], b"val2").unwrap();
899        store.put(&[0x00, 0x02, 0xCC], b"val3").unwrap();
900        store.put(&[0x00, 0x02, 0xDD], b"val4").unwrap();
901        store.put(&[0x01, 0x01, 0xEE], b"val5").unwrap();
902
903        // Prefix scan for partition_a
904        let results_a: Vec<_> = store.prefix_scan(&prefix_a).collect();
905        assert_eq!(results_a.len(), 2);
906        for (key, _) in &results_a {
907            assert!(key.starts_with(&prefix_a));
908        }
909
910        // Prefix scan for partition_b
911        let results_b: Vec<_> = store.prefix_scan(&prefix_b).collect();
912        assert_eq!(results_b.len(), 2);
913        for (key, _) in &results_b {
914            assert!(key.starts_with(&prefix_b));
915        }
916
917        // Prefix scan with all-0xFF prefix
918        let results_ff: Vec<_> = store.prefix_scan(&[0xFF, 0xFF]).collect();
919        assert_eq!(results_ff.len(), 0);
920    }
921
922    #[test]
923    fn test_prefix_scan_returns_sorted() {
924        let mut store = InMemoryStore::new();
925        store.put(b"prefix:c", b"3").unwrap();
926        store.put(b"prefix:a", b"1").unwrap();
927        store.put(b"prefix:b", b"2").unwrap();
928
929        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
930        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref().to_vec()).collect();
931        assert_eq!(
932            keys,
933            vec![
934                b"prefix:a".to_vec(),
935                b"prefix:b".to_vec(),
936                b"prefix:c".to_vec()
937            ]
938        );
939    }
940}