Skip to main content

laminar_core/state/
mod.rs

1//! # State Store Module
2//!
3//! High-performance state storage for streaming operators.
4//!
5//! ## Design Goals
6//!
7//! - **< 500ns lookup latency** for point queries
8//! - **Zero-copy** access where possible
9//! - **Lock-free** for single-threaded access
10//! - **Memory-mapped** for large state
11//!
12//! ## State Backends
13//!
14//! - **[`InMemoryStore`]**: BTreeMap-based, fast lookups with O(log n + k) prefix/range scans
15//! - **[`MmapStateStore`]**: Memory-mapped, supports larger-than-memory state with optional persistence
16//! - **Hybrid**: Combination with hot/cold separation (future)
17//!
18//! ## Example
19//!
20//! ```rust
21//! use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
22//!
23//! let mut store = InMemoryStore::new();
24//!
25//! // Basic key-value operations
26//! store.put(b"user:1", b"alice").unwrap();
27//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
28//!
29//! // Typed state access (requires StateStoreExt)
30//! store.put_typed(b"count", &42u64).unwrap();
31//! let count: u64 = store.get_typed(b"count").unwrap().unwrap();
32//! assert_eq!(count, 42);
33//!
34//! // Snapshots for checkpointing
35//! let snapshot = store.snapshot();
36//! store.delete(b"user:1").unwrap();
37//! assert!(store.get(b"user:1").is_none());
38//!
39//! // Restore from snapshot
40//! store.restore(snapshot);
41//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
42//! ```
43//!
44//! ## Memory-Mapped Store Example
45//!
46//! ```rust,no_run
47//! use laminar_core::state::{StateStore, MmapStateStore};
48//! use std::path::Path;
49//!
50//! // In-memory mode (fast, not persistent)
51//! let mut store = MmapStateStore::in_memory(1024 * 1024);
52//! store.put(b"key", b"value").unwrap();
53//!
54//! // Persistent mode (survives restarts)
55//! let mut persistent = MmapStateStore::persistent(
56//!     Path::new("/tmp/state.db"),
57//!     1024 * 1024
58//! ).unwrap();
59//! persistent.put(b"key", b"value").unwrap();
60//! persistent.flush().unwrap();
61//! ```
62
63use bytes::Bytes;
64use rkyv::{
65    api::high::{HighDeserializer, HighSerializer, HighValidator},
66    bytecheck::CheckBytes,
67    rancor::Error as RkyvError,
68    ser::allocator::ArenaHandle,
69    util::AlignedVec,
70    Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
71};
72use std::collections::BTreeMap;
73use std::ops::Bound;
74use std::ops::Range;
75
76/// Compute the lexicographic successor of a byte prefix.
77///
78/// Returns `None` if no successor exists (empty prefix or all bytes are 0xFF).
79/// Used by `BTreeMap::range()` to efficiently bound prefix scans.
80fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
81    if prefix.is_empty() {
82        return None;
83    }
84    let mut successor = prefix.to_vec();
85    // Walk backwards, incrementing the last non-0xFF byte
86    while let Some(last) = successor.last_mut() {
87        if *last < 0xFF {
88            *last += 1;
89            return Some(successor);
90        }
91        successor.pop();
92    }
93    // All bytes were 0xFF — no successor exists
94    None
95}
96
97/// Trait for state store implementations.
98///
99/// This is the core abstraction for operator state in Ring 0 (hot path).
100/// All implementations must achieve < 500ns lookup latency for point queries.
101///
102/// # Thread Safety
103///
104/// State stores are `Send` but not `Sync`. They are designed for single-threaded
105/// access within a reactor. Cross-thread communication uses SPSC queues.
106///
107/// # Memory Model
108///
109/// - `get()` returns `Bytes` which is a cheap reference-counted handle
110/// - `put()` copies the input to internal storage
111/// - Snapshots are copy-on-write where possible
112///
113/// # Dyn Compatibility
114///
115/// This trait is dyn-compatible for use with `Box<dyn StateStore>`. For generic
116/// convenience methods like `get_typed` and `put_typed`, use the [`StateStoreExt`]
117/// extension trait.
118pub trait StateStore: Send {
119    /// Get a value by key.
120    ///
121    /// Returns `None` if the key does not exist.
122    ///
123    /// # Performance
124    ///
125    /// Target: < 500ns for in-memory stores.
126    fn get(&self, key: &[u8]) -> Option<Bytes>;
127
128    /// Store a key-value pair.
129    ///
130    /// If the key already exists, the value is overwritten.
131    ///
132    /// # Errors
133    ///
134    /// Returns `StateError` if the operation fails (e.g., disk full for
135    /// memory-mapped stores).
136    fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError>;
137
138    /// Delete a key.
139    ///
140    /// No error is returned if the key does not exist.
141    ///
142    /// # Errors
143    ///
144    /// Returns `StateError` if the operation fails.
145    fn delete(&mut self, key: &[u8]) -> Result<(), StateError>;
146
147    /// Scan all keys with a given prefix.
148    ///
149    /// Returns an iterator over matching (key, value) pairs in
150    /// lexicographic order.
151    ///
152    /// # Performance
153    ///
154    /// O(log n + k) where n is the total number of keys and k is the
155    /// number of matching entries.
156    fn prefix_scan<'a>(&'a self, prefix: &'a [u8])
157        -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
158
159    /// Range scan between two keys (exclusive end).
160    ///
161    /// Returns an iterator over keys where `start <= key < end`
162    /// in lexicographic order.
163    ///
164    /// # Performance
165    ///
166    /// O(log n + k) where n is the total number of keys and k is the
167    /// number of matching entries.
168    fn range_scan<'a>(
169        &'a self,
170        range: Range<&'a [u8]>,
171    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
172
173    /// Check if a key exists.
174    ///
175    /// More efficient than `get()` when you don't need the value.
176    fn contains(&self, key: &[u8]) -> bool {
177        self.get(key).is_some()
178    }
179
180    /// Get approximate size in bytes.
181    ///
182    /// This includes both keys and values. The exact accounting may vary
183    /// by implementation.
184    fn size_bytes(&self) -> usize;
185
186    /// Get the number of entries in the store.
187    fn len(&self) -> usize;
188
189    /// Check if the store is empty.
190    fn is_empty(&self) -> bool {
191        self.len() == 0
192    }
193
194    /// Create a snapshot for checkpointing.
195    ///
196    /// The snapshot captures the current state and can be used to restore
197    /// the store to this point in time. Snapshots are serializable for
198    /// persistence.
199    ///
200    /// # Implementation Notes
201    ///
202    /// For in-memory stores, this clones the data. For memory-mapped stores,
203    /// this may use copy-on-write semantics.
204    fn snapshot(&self) -> StateSnapshot;
205
206    /// Restore from a snapshot.
207    ///
208    /// This replaces the current state with the snapshot's state.
209    /// Any changes since the snapshot was taken are lost.
210    fn restore(&mut self, snapshot: StateSnapshot);
211
212    /// Clear all entries.
213    fn clear(&mut self);
214
215    /// Flush any pending writes to durable storage.
216    ///
217    /// For in-memory stores, this is a no-op. For memory-mapped or
218    /// disk-backed stores, this ensures data is persisted.
219    ///
220    /// # Errors
221    ///
222    /// Returns `StateError` if the flush operation fails.
223    fn flush(&mut self) -> Result<(), StateError> {
224        Ok(()) // Default no-op for in-memory stores
225    }
226
227    /// Get a value or insert a default.
228    ///
229    /// If the key doesn't exist, the default is inserted and returned.
230    ///
231    /// # Errors
232    ///
233    /// Returns `StateError` if inserting the default value fails.
234    fn get_or_insert(&mut self, key: &[u8], default: &[u8]) -> Result<Bytes, StateError> {
235        if let Some(value) = self.get(key) {
236            Ok(value)
237        } else {
238            self.put(key, default)?;
239            Ok(Bytes::copy_from_slice(default))
240        }
241    }
242}
243
244/// Extension trait for [`StateStore`] providing typed access methods.
245///
246/// These methods use generics and thus cannot be part of the dyn-compatible
247/// `StateStore` trait. Import this trait to use typed access on any state store.
248///
249/// Uses rkyv for zero-copy serialization. Types must derive `Archive`,
250/// `rkyv::Serialize`, and `rkyv::Deserialize`.
251///
252/// # Example
253///
254/// ```rust,ignore
255/// use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
256/// use rkyv::{Archive, Deserialize, Serialize};
257///
258/// #[derive(Archive, Serialize, Deserialize)]
259/// #[rkyv(check_bytes)]
260/// struct Counter { value: u64 }
261///
262/// let mut store = InMemoryStore::new();
263/// store.put_typed(b"count", &Counter { value: 42 }).unwrap();
264/// let count: Counter = store.get_typed(b"count").unwrap().unwrap();
265/// assert_eq!(count.value, 42);
266/// ```
267pub trait StateStoreExt: StateStore {
268    /// Get a value and deserialize it using rkyv.
269    ///
270    /// Uses zero-copy access where possible, falling back to full
271    /// deserialization to return an owned value.
272    ///
273    /// # Errors
274    ///
275    /// Returns `StateError::Serialization` if deserialization fails.
276    fn get_typed<T>(&self, key: &[u8]) -> Result<Option<T>, StateError>
277    where
278        T: Archive,
279        T::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
280            + RkyvDeserialize<T, HighDeserializer<RkyvError>>,
281    {
282        match self.get(key) {
283            Some(bytes) => {
284                let archived = rkyv::access::<T::Archived, RkyvError>(&bytes)
285                    .map_err(|e| StateError::Serialization(e.to_string()))?;
286                let value = rkyv::deserialize::<T, RkyvError>(archived)
287                    .map_err(|e| StateError::Serialization(e.to_string()))?;
288                Ok(Some(value))
289            }
290            None => Ok(None),
291        }
292    }
293
294    /// Serialize and store a value using rkyv.
295    ///
296    /// Uses aligned buffers for optimal performance on the hot path.
297    ///
298    /// # Errors
299    ///
300    /// Returns `StateError::Serialization` if serialization fails.
301    fn put_typed<T>(&mut self, key: &[u8], value: &T) -> Result<(), StateError>
302    where
303        T: for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
304    {
305        let bytes = rkyv::to_bytes::<RkyvError>(value)
306            .map_err(|e| StateError::Serialization(e.to_string()))?;
307        self.put(key, &bytes)
308    }
309
310    /// Update a value in place using a closure.
311    ///
312    /// The update function receives the current value (or None) and returns
313    /// the new value. If `None` is returned, the key is deleted.
314    ///
315    /// # Errors
316    ///
317    /// Returns `StateError` if the put or delete operation fails.
318    fn update<F>(&mut self, key: &[u8], f: F) -> Result<(), StateError>
319    where
320        F: FnOnce(Option<Bytes>) -> Option<Vec<u8>>,
321    {
322        let current = self.get(key);
323        match f(current) {
324            Some(new_value) => self.put(key, &new_value),
325            None => self.delete(key),
326        }
327    }
328}
329
330// Blanket implementation for all StateStore types
331impl<T: StateStore + ?Sized> StateStoreExt for T {}
332
333/// A snapshot of state store contents for checkpointing.
334///
335/// Snapshots can be serialized for persistence and restored later.
336/// They capture the complete state at a point in time.
337///
338/// Uses rkyv for zero-copy deserialization on the hot path.
339#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
340pub struct StateSnapshot {
341    /// Serialized state data
342    data: Vec<(Vec<u8>, Vec<u8>)>,
343    /// Timestamp when snapshot was created (nanoseconds since epoch)
344    timestamp_ns: u64,
345    /// Version for forward compatibility
346    version: u32,
347}
348
349impl StateSnapshot {
350    /// Create a new snapshot from key-value pairs.
351    #[must_use]
352    #[allow(clippy::cast_possible_truncation)]
353    pub fn new(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
354        Self {
355            data,
356            // Truncation is acceptable here - we won't hit u64 overflow until ~584 years from epoch
357            timestamp_ns: std::time::SystemTime::now()
358                .duration_since(std::time::UNIX_EPOCH)
359                .map(|d| d.as_nanos() as u64)
360                .unwrap_or(0),
361            version: 1,
362        }
363    }
364
365    /// Get the snapshot data.
366    #[must_use]
367    pub fn data(&self) -> &[(Vec<u8>, Vec<u8>)] {
368        &self.data
369    }
370
371    /// Get the snapshot timestamp.
372    #[must_use]
373    pub fn timestamp_ns(&self) -> u64 {
374        self.timestamp_ns
375    }
376
377    /// Get the number of entries in the snapshot.
378    #[must_use]
379    pub fn len(&self) -> usize {
380        self.data.len()
381    }
382
383    /// Check if the snapshot is empty.
384    #[must_use]
385    pub fn is_empty(&self) -> bool {
386        self.data.is_empty()
387    }
388
389    /// Get the approximate size in bytes.
390    #[must_use]
391    pub fn size_bytes(&self) -> usize {
392        self.data.iter().map(|(k, v)| k.len() + v.len()).sum()
393    }
394
395    /// Serialize the snapshot to bytes using rkyv.
396    ///
397    /// Returns an aligned byte vector for optimal zero-copy access.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if serialization fails.
402    pub fn to_bytes(&self) -> Result<AlignedVec, StateError> {
403        rkyv::to_bytes::<RkyvError>(self).map_err(|e| StateError::Serialization(e.to_string()))
404    }
405
406    /// Deserialize a snapshot from bytes using rkyv.
407    ///
408    /// Uses zero-copy access internally for performance.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if deserialization fails.
413    pub fn from_bytes(bytes: &[u8]) -> Result<Self, StateError> {
414        let archived = rkyv::access::<<Self as Archive>::Archived, RkyvError>(bytes)
415            .map_err(|e| StateError::Serialization(e.to_string()))?;
416        rkyv::deserialize::<Self, RkyvError>(archived)
417            .map_err(|e| StateError::Serialization(e.to_string()))
418    }
419}
420
421/// In-memory state store using `BTreeMap` for sorted key access.
422///
423/// This state store is suitable for state that fits in memory. It uses
424/// `BTreeMap` which provides O(log n + k) prefix and range scans, making
425/// it efficient for join state and windowed aggregation lookups.
426///
427/// # Performance Characteristics
428///
429/// - **Get**: O(log n), < 500ns typical
430/// - **Put**: O(log n), may allocate
431/// - **Delete**: O(log n)
432/// - **Prefix scan**: O(log n + k) where k is matching entries
433/// - **Range scan**: O(log n + k) where k is matching entries
434///
435/// # Memory Usage
436///
437/// Keys and values are stored as owned `Vec<u8>` and `Bytes` respectively.
438/// Use `size_bytes()` to monitor memory usage.
439pub struct InMemoryStore {
440    /// The underlying sorted map
441    data: BTreeMap<Vec<u8>, Bytes>,
442    /// Track total size for monitoring
443    size_bytes: usize,
444}
445
446impl InMemoryStore {
447    /// Creates a new empty in-memory store.
448    #[must_use]
449    pub fn new() -> Self {
450        Self {
451            data: BTreeMap::new(),
452            size_bytes: 0,
453        }
454    }
455
456    /// Creates a new in-memory store.
457    ///
458    /// The capacity hint is accepted for API compatibility but has no
459    /// effect — `BTreeMap` does not support pre-allocation.
460    #[must_use]
461    pub fn with_capacity(_capacity: usize) -> Self {
462        Self::new()
463    }
464
465    /// Returns the number of entries in the store.
466    ///
467    /// `BTreeMap` does not expose a capacity concept, so this returns
468    /// the current entry count.
469    #[must_use]
470    pub fn capacity(&self) -> usize {
471        self.data.len()
472    }
473
474    /// No-op for API compatibility.
475    ///
476    /// `BTreeMap` manages its own memory and does not support
477    /// explicit shrinking.
478    pub fn shrink_to_fit(&mut self) {
479        // BTreeMap does not support shrink_to_fit
480    }
481}
482
483impl Default for InMemoryStore {
484    fn default() -> Self {
485        Self::new()
486    }
487}
488
489impl StateStore for InMemoryStore {
490    #[inline]
491    fn get(&self, key: &[u8]) -> Option<Bytes> {
492        self.data.get(key).cloned()
493    }
494
495    #[inline]
496    fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError> {
497        let value_bytes = Bytes::copy_from_slice(value);
498
499        // Entry API: single tree traversal for both insert and update
500        match self.data.entry(key.to_vec()) {
501            std::collections::btree_map::Entry::Occupied(mut entry) => {
502                self.size_bytes -= entry.get().len();
503                self.size_bytes += value.len();
504                *entry.get_mut() = value_bytes;
505            }
506            std::collections::btree_map::Entry::Vacant(entry) => {
507                self.size_bytes += key.len() + value.len();
508                entry.insert(value_bytes);
509            }
510        }
511        Ok(())
512    }
513
514    fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
515        if let Some(old_value) = self.data.remove(key) {
516            self.size_bytes -= key.len() + old_value.len();
517        }
518        Ok(())
519    }
520
521    fn prefix_scan<'a>(
522        &'a self,
523        prefix: &'a [u8],
524    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
525        if prefix.is_empty() {
526            // Empty prefix matches everything
527            return Box::new(
528                self.data
529                    .iter()
530                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
531            );
532        }
533        if let Some(end) = prefix_successor(prefix) {
534            Box::new(
535                self.data
536                    .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
537                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
538            )
539        } else {
540            // All-0xFF prefix: scan from prefix to end
541            Box::new(
542                self.data
543                    .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
544                    .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
545            )
546        }
547    }
548
549    fn range_scan<'a>(
550        &'a self,
551        range: Range<&'a [u8]>,
552    ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
553        Box::new(
554            self.data
555                .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
556                .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
557        )
558    }
559
560    #[inline]
561    fn contains(&self, key: &[u8]) -> bool {
562        self.data.contains_key(key.as_ref())
563    }
564
565    fn size_bytes(&self) -> usize {
566        self.size_bytes
567    }
568
569    fn len(&self) -> usize {
570        self.data.len()
571    }
572
573    fn snapshot(&self) -> StateSnapshot {
574        let data: Vec<(Vec<u8>, Vec<u8>)> = self
575            .data
576            .iter()
577            .map(|(k, v)| (k.clone(), v.to_vec()))
578            .collect();
579        StateSnapshot::new(data)
580    }
581
582    fn restore(&mut self, snapshot: StateSnapshot) {
583        self.data.clear();
584        self.size_bytes = 0;
585
586        for (key, value) in snapshot.data {
587            self.size_bytes += key.len() + value.len();
588            self.data.insert(key, Bytes::from(value));
589        }
590    }
591
592    fn clear(&mut self) {
593        self.data.clear();
594        self.size_bytes = 0;
595    }
596}
597
598/// Errors that can occur in state operations.
599#[derive(Debug, thiserror::Error)]
600pub enum StateError {
601    /// I/O error
602    #[error("I/O error: {0}")]
603    Io(#[from] std::io::Error),
604
605    /// Serialization error
606    #[error("Serialization error: {0}")]
607    Serialization(String),
608
609    /// Deserialization error
610    #[error("Deserialization error: {0}")]
611    Deserialization(String),
612
613    /// Corruption error
614    #[error("Corruption error: {0}")]
615    Corruption(String),
616
617    /// Operation not supported by this store type
618    #[error("Operation not supported: {0}")]
619    NotSupported(String),
620
621    /// Key not found (for operations that require existing key)
622    #[error("Key not found")]
623    KeyNotFound,
624
625    /// Store capacity exceeded
626    #[error("Store capacity exceeded: {0}")]
627    CapacityExceeded(String),
628}
629
630mod mmap;
631
632/// Changelog-aware state store wrapper (F-CKP-005).
633pub mod changelog_aware;
634
635// Re-export main types
636pub use self::StateError as Error;
637pub use changelog_aware::{ChangelogAwareStore, ChangelogSink};
638pub use mmap::MmapStateStore;
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643
644    #[test]
645    fn test_in_memory_store_basic() {
646        let mut store = InMemoryStore::new();
647
648        // Test put and get
649        store.put(b"key1", b"value1").unwrap();
650        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
651        assert_eq!(store.len(), 1);
652
653        // Test overwrite
654        store.put(b"key1", b"value2").unwrap();
655        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
656        assert_eq!(store.len(), 1);
657
658        // Test delete
659        store.delete(b"key1").unwrap();
660        assert!(store.get(b"key1").is_none());
661        assert_eq!(store.len(), 0);
662
663        // Test delete non-existent key (should not error)
664        store.delete(b"nonexistent").unwrap();
665    }
666
667    #[test]
668    fn test_contains() {
669        let mut store = InMemoryStore::new();
670        assert!(!store.contains(b"key1"));
671
672        store.put(b"key1", b"value1").unwrap();
673        assert!(store.contains(b"key1"));
674
675        store.delete(b"key1").unwrap();
676        assert!(!store.contains(b"key1"));
677    }
678
679    #[test]
680    fn test_prefix_scan() {
681        let mut store = InMemoryStore::new();
682        store.put(b"prefix:1", b"value1").unwrap();
683        store.put(b"prefix:2", b"value2").unwrap();
684        store.put(b"prefix:10", b"value10").unwrap();
685        store.put(b"other:1", b"value3").unwrap();
686
687        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
688        assert_eq!(results.len(), 3);
689
690        // All results should have the prefix
691        for (key, _) in &results {
692            assert!(key.starts_with(b"prefix:"));
693        }
694
695        // Empty prefix returns all
696        let all: Vec<_> = store.prefix_scan(b"").collect();
697        assert_eq!(all.len(), 4);
698    }
699
700    #[test]
701    fn test_range_scan() {
702        let mut store = InMemoryStore::new();
703        store.put(b"a", b"1").unwrap();
704        store.put(b"b", b"2").unwrap();
705        store.put(b"c", b"3").unwrap();
706        store.put(b"d", b"4").unwrap();
707
708        let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
709        assert_eq!(results.len(), 2);
710
711        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
712        assert!(keys.contains(&b"b".as_slice()));
713        assert!(keys.contains(&b"c".as_slice()));
714    }
715
716    #[test]
717    fn test_snapshot_and_restore() {
718        let mut store = InMemoryStore::new();
719        store.put(b"key1", b"value1").unwrap();
720        store.put(b"key2", b"value2").unwrap();
721
722        // Take snapshot
723        let snapshot = store.snapshot();
724        assert_eq!(snapshot.len(), 2);
725
726        // Modify store
727        store.put(b"key1", b"modified").unwrap();
728        store.put(b"key3", b"value3").unwrap();
729        store.delete(b"key2").unwrap();
730
731        assert_eq!(store.len(), 2);
732        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
733
734        // Restore from snapshot
735        store.restore(snapshot);
736
737        assert_eq!(store.len(), 2);
738        assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
739        assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
740        assert!(store.get(b"key3").is_none());
741    }
742
743    #[test]
744    fn test_snapshot_serialization() {
745        let mut store = InMemoryStore::new();
746        store.put(b"key1", b"value1").unwrap();
747        store.put(b"key2", b"value2").unwrap();
748
749        let snapshot = store.snapshot();
750
751        // Serialize and deserialize
752        let bytes = snapshot.to_bytes().unwrap();
753        let restored = StateSnapshot::from_bytes(&bytes).unwrap();
754
755        assert_eq!(restored.len(), snapshot.len());
756        assert_eq!(restored.data(), snapshot.data());
757    }
758
759    #[test]
760    fn test_typed_access() {
761        let mut store = InMemoryStore::new();
762
763        // Test with integer
764        store.put_typed(b"count", &42u64).unwrap();
765        let count: u64 = store.get_typed(b"count").unwrap().unwrap();
766        assert_eq!(count, 42);
767
768        // Test with string
769        store.put_typed(b"name", &String::from("alice")).unwrap();
770        let name: String = store.get_typed(b"name").unwrap().unwrap();
771        assert_eq!(name, "alice");
772
773        // Test with vector (complex type)
774        let nums = vec![1i64, 2, 3, 4, 5];
775        store.put_typed(b"nums", &nums).unwrap();
776        let restored: Vec<i64> = store.get_typed(b"nums").unwrap().unwrap();
777        assert_eq!(restored, nums);
778
779        // Test non-existent key
780        let missing: Option<u64> = store.get_typed(b"missing").unwrap();
781        assert!(missing.is_none());
782    }
783
784    #[test]
785    fn test_get_or_insert() {
786        let mut store = InMemoryStore::new();
787
788        // First call inserts default
789        let value = store.get_or_insert(b"key1", b"default").unwrap();
790        assert_eq!(value, Bytes::from("default"));
791        assert_eq!(store.len(), 1);
792
793        // Second call returns existing
794        store.put(b"key1", b"modified").unwrap();
795        let value = store.get_or_insert(b"key1", b"default").unwrap();
796        assert_eq!(value, Bytes::from("modified"));
797    }
798
799    #[test]
800    fn test_update() {
801        let mut store = InMemoryStore::new();
802        store.put(b"counter", b"\x00\x00\x00\x00").unwrap();
803
804        // Update existing
805        store
806            .update(b"counter", |current| {
807                let val = current.map_or(0u32, |b| {
808                    u32::from_le_bytes(b.as_ref().try_into().unwrap_or([0; 4]))
809                });
810                Some((val + 1).to_le_bytes().to_vec())
811            })
812            .unwrap();
813
814        let bytes = store.get(b"counter").unwrap();
815        let val = u32::from_le_bytes(bytes.as_ref().try_into().unwrap());
816        assert_eq!(val, 1);
817
818        // Update to delete
819        store.update(b"counter", |_| None).unwrap();
820        assert!(store.get(b"counter").is_none());
821    }
822
823    #[test]
824    fn test_size_tracking() {
825        let mut store = InMemoryStore::new();
826        assert_eq!(store.size_bytes(), 0);
827
828        store.put(b"key1", b"value1").unwrap();
829        assert_eq!(store.size_bytes(), 4 + 6); // "key1" + "value1"
830
831        store.put(b"key2", b"value2").unwrap();
832        assert_eq!(store.size_bytes(), (4 + 6) * 2);
833
834        // Overwrite with smaller value
835        store.put(b"key1", b"v1").unwrap();
836        assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6); // "key1" + "v1" + "key2" + "value2"
837
838        store.delete(b"key1").unwrap();
839        assert_eq!(store.size_bytes(), 4 + 6);
840
841        store.clear();
842        assert_eq!(store.size_bytes(), 0);
843    }
844
845    #[test]
846    fn test_with_capacity() {
847        let store = InMemoryStore::with_capacity(1000);
848        // BTreeMap does not pre-allocate; capacity() returns len() which is 0
849        assert_eq!(store.capacity(), 0);
850        assert!(store.is_empty());
851    }
852
853    #[test]
854    fn test_clear() {
855        let mut store = InMemoryStore::new();
856        store.put(b"key1", b"value1").unwrap();
857        store.put(b"key2", b"value2").unwrap();
858
859        assert_eq!(store.len(), 2);
860        assert!(store.size_bytes() > 0);
861
862        store.clear();
863
864        assert_eq!(store.len(), 0);
865        assert_eq!(store.size_bytes(), 0);
866        assert!(store.get(b"key1").is_none());
867    }
868
869    #[test]
870    fn test_prefix_successor() {
871        // Normal case
872        assert_eq!(prefix_successor(b"abc"), Some(b"abd".to_vec()));
873
874        // Empty prefix
875        assert_eq!(prefix_successor(b""), None);
876
877        // All 0xFF bytes — no successor
878        assert_eq!(prefix_successor(&[0xFF, 0xFF, 0xFF]), None);
879
880        // Trailing 0xFF bytes are truncated and previous byte incremented
881        assert_eq!(prefix_successor(&[0x01, 0xFF]), Some(vec![0x02]));
882        assert_eq!(
883            prefix_successor(&[0x01, 0x02, 0xFF]),
884            Some(vec![0x01, 0x03])
885        );
886
887        // Single byte
888        assert_eq!(prefix_successor(&[0x00]), Some(vec![0x01]));
889        assert_eq!(prefix_successor(&[0xFE]), Some(vec![0xFF]));
890        assert_eq!(prefix_successor(&[0xFF]), None);
891    }
892
893    #[test]
894    fn test_prefix_scan_binary_keys() {
895        let mut store = InMemoryStore::new();
896
897        // Simulate join state keys: partition_prefix + key_hash
898        let prefix_a = [0x00, 0x01]; // partition 0, stream 1
899        let prefix_b = [0x00, 0x02]; // partition 0, stream 2
900
901        store.put(&[0x00, 0x01, 0xAA], b"val1").unwrap();
902        store.put(&[0x00, 0x01, 0xBB], b"val2").unwrap();
903        store.put(&[0x00, 0x02, 0xCC], b"val3").unwrap();
904        store.put(&[0x00, 0x02, 0xDD], b"val4").unwrap();
905        store.put(&[0x01, 0x01, 0xEE], b"val5").unwrap();
906
907        // Prefix scan for partition_a
908        let results_a: Vec<_> = store.prefix_scan(&prefix_a).collect();
909        assert_eq!(results_a.len(), 2);
910        for (key, _) in &results_a {
911            assert!(key.starts_with(&prefix_a));
912        }
913
914        // Prefix scan for partition_b
915        let results_b: Vec<_> = store.prefix_scan(&prefix_b).collect();
916        assert_eq!(results_b.len(), 2);
917        for (key, _) in &results_b {
918            assert!(key.starts_with(&prefix_b));
919        }
920
921        // Prefix scan with all-0xFF prefix
922        let results_ff: Vec<_> = store.prefix_scan(&[0xFF, 0xFF]).collect();
923        assert_eq!(results_ff.len(), 0);
924    }
925
926    #[test]
927    fn test_prefix_scan_returns_sorted() {
928        let mut store = InMemoryStore::new();
929        store.put(b"prefix:c", b"3").unwrap();
930        store.put(b"prefix:a", b"1").unwrap();
931        store.put(b"prefix:b", b"2").unwrap();
932
933        let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
934        let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref().to_vec()).collect();
935        assert_eq!(
936            keys,
937            vec![
938                b"prefix:a".to_vec(),
939                b"prefix:b".to_vec(),
940                b"prefix:c".to_vec()
941            ]
942        );
943    }
944}