laminar_core/state/mod.rs
1//! # State Store Module
2//!
3//! High-performance state storage for streaming operators.
4//!
5//! ## Design Goals
6//!
7//! - **< 500ns lookup latency** for point queries
8//! - **Zero-copy** access where possible
9//! - **Lock-free** for single-threaded access
10//! - **Memory-mapped** for large state
11//!
12//! ## State Backends
13//!
14//! - **[`InMemoryStore`]**: BTreeMap-based, fast lookups with O(log n + k) prefix/range scans
15//! - **[`MmapStateStore`]**: Memory-mapped, supports larger-than-memory state with optional persistence
16//! - **Hybrid**: Combination with hot/cold separation (future)
17//!
18//! ## Example
19//!
20//! ```rust
21//! use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
22//!
23//! let mut store = InMemoryStore::new();
24//!
25//! // Basic key-value operations
26//! store.put(b"user:1", b"alice").unwrap();
27//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
28//!
29//! // Typed state access (requires StateStoreExt)
30//! store.put_typed(b"count", &42u64).unwrap();
31//! let count: u64 = store.get_typed(b"count").unwrap().unwrap();
32//! assert_eq!(count, 42);
33//!
34//! // Snapshots for checkpointing
35//! let snapshot = store.snapshot();
36//! store.delete(b"user:1").unwrap();
37//! assert!(store.get(b"user:1").is_none());
38//!
39//! // Restore from snapshot
40//! store.restore(snapshot);
41//! assert_eq!(store.get(b"user:1").unwrap().as_ref(), b"alice");
42//! ```
43//!
44//! ## Memory-Mapped Store Example
45//!
46//! ```rust,no_run
47//! use laminar_core::state::{StateStore, MmapStateStore};
48//! use std::path::Path;
49//!
50//! // In-memory mode (fast, not persistent)
51//! let mut store = MmapStateStore::in_memory(1024 * 1024);
52//! store.put(b"key", b"value").unwrap();
53//!
54//! // Persistent mode (survives restarts)
55//! let mut persistent = MmapStateStore::persistent(
56//! Path::new("/tmp/state.db"),
57//! 1024 * 1024
58//! ).unwrap();
59//! persistent.put(b"key", b"value").unwrap();
60//! persistent.flush().unwrap();
61//! ```
62
63use bytes::Bytes;
64use rkyv::{
65 api::high::{self, HighDeserializer, HighSerializer, HighValidator},
66 bytecheck::CheckBytes,
67 rancor::Error as RkyvError,
68 ser::allocator::ArenaHandle,
69 util::AlignedVec,
70 Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
71};
72use std::cell::RefCell;
73use std::collections::BTreeMap;
74use std::ops::Bound;
75use std::ops::Range;
76
77/// A single mutation entry within an incremental snapshot.
78///
79/// Captures the exact key/value bytes for each state change, enabling
80/// delta-based checkpointing to object stores without full state copies.
81///
82/// - `Put(key, value)` — a key was inserted or updated
83/// - `Delete(key)` — a key was removed
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub enum ChangeEntry {
86 /// A key was inserted or updated: `(key, value)`.
87 Put(Vec<u8>, Vec<u8>),
88 /// A key was deleted.
89 Delete(Vec<u8>),
90}
91
92/// An incremental snapshot capturing only mutations since the last checkpoint.
93///
94/// State stores that support incremental snapshots return this from
95/// [`StateStore::incremental_snapshot`]. The recovery system applies
96/// these deltas on top of a base snapshot to reconstruct full state.
97#[derive(Debug, Clone)]
98pub struct IncrementalSnapshot {
99 /// The ordered list of mutations since `base_epoch`.
100 pub changes: Vec<ChangeEntry>,
101 /// The epoch of the base (full) snapshot these changes apply to.
102 pub base_epoch: u64,
103 /// The epoch this incremental snapshot represents.
104 pub epoch: u64,
105}
106
107/// Compute the lexicographic successor of a byte prefix.
108///
109/// Returns `None` if no successor exists (empty prefix or all bytes are 0xFF).
110/// Used by `BTreeMap::range()` to efficiently bound prefix scans.
111pub(crate) fn prefix_successor(prefix: &[u8]) -> Option<smallvec::SmallVec<[u8; 64]>> {
112 if prefix.is_empty() {
113 return None;
114 }
115 let mut successor = smallvec::SmallVec::<[u8; 64]>::from_slice(prefix);
116 // Walk backwards, incrementing the last non-0xFF byte
117 while let Some(last) = successor.last_mut() {
118 if *last < 0xFF {
119 *last += 1;
120 return Some(successor);
121 }
122 successor.pop();
123 }
124 // All bytes were 0xFF — no successor exists
125 None
126}
127
128/// Trait for state store implementations.
129///
130/// This is the core abstraction for operator state in Ring 0 (hot path).
131/// All implementations must achieve < 500ns lookup latency for point queries.
132///
133/// # Thread Safety
134///
135/// State stores are `Send` but not `Sync`. They are designed for single-threaded
136/// access within a reactor. Cross-thread communication uses SPSC queues.
137///
138/// # Memory Model
139///
140/// - `get()` returns `Bytes` which is a cheap reference-counted handle
141/// - `put()` copies the input to internal storage
142/// - Snapshots are copy-on-write where possible
143///
144/// # Dyn Compatibility
145///
146/// This trait is dyn-compatible for use with `Box<dyn StateStore>`. For generic
147/// convenience methods like `get_typed` and `put_typed`, use the [`StateStoreExt`]
148/// extension trait.
149pub trait StateStore: Send {
150 /// Get a value by key.
151 ///
152 /// Returns `None` if the key does not exist.
153 ///
154 /// # Performance
155 ///
156 /// Target: < 500ns for in-memory stores.
157 fn get(&self, key: &[u8]) -> Option<Bytes>;
158
159 /// Store a key-value pair.
160 ///
161 /// If the key already exists, the value is overwritten.
162 ///
163 /// # Errors
164 ///
165 /// Returns `StateError` if the operation fails (e.g., disk full for
166 /// memory-mapped stores).
167 fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError>;
168
169 /// Delete a key.
170 ///
171 /// No error is returned if the key does not exist.
172 ///
173 /// # Errors
174 ///
175 /// Returns `StateError` if the operation fails.
176 fn delete(&mut self, key: &[u8]) -> Result<(), StateError>;
177
178 /// Scan all keys with a given prefix.
179 ///
180 /// Returns an iterator over matching (key, value) pairs in
181 /// lexicographic order.
182 ///
183 /// # Performance
184 ///
185 /// O(log n + k) where n is the total number of keys and k is the
186 /// number of matching entries.
187 fn prefix_scan<'a>(&'a self, prefix: &'a [u8])
188 -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
189
190 /// Range scan between two keys (exclusive end).
191 ///
192 /// Returns an iterator over keys where `start <= key < end`
193 /// in lexicographic order.
194 ///
195 /// # Performance
196 ///
197 /// O(log n + k) where n is the total number of keys and k is the
198 /// number of matching entries.
199 fn range_scan<'a>(
200 &'a self,
201 range: Range<&'a [u8]>,
202 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a>;
203
204 /// Check if a key exists.
205 ///
206 /// More efficient than `get()` when you don't need the value.
207 fn contains(&self, key: &[u8]) -> bool {
208 self.get(key).is_some()
209 }
210
211 /// Get approximate size in bytes.
212 ///
213 /// This includes both keys and values. The exact accounting may vary
214 /// by implementation.
215 fn size_bytes(&self) -> usize;
216
217 /// Get the number of entries in the store.
218 fn len(&self) -> usize;
219
220 /// Check if the store is empty.
221 fn is_empty(&self) -> bool {
222 self.len() == 0
223 }
224
225 /// Create a snapshot for checkpointing.
226 ///
227 /// The snapshot captures the current state and can be used to restore
228 /// the store to this point in time. Snapshots are serializable for
229 /// persistence.
230 ///
231 /// # Implementation Notes
232 ///
233 /// For in-memory stores, this clones the data. For memory-mapped stores,
234 /// this may use copy-on-write semantics.
235 fn snapshot(&self) -> StateSnapshot;
236
237 /// Restore from a snapshot.
238 ///
239 /// This replaces the current state with the snapshot's state.
240 /// Any changes since the snapshot was taken are lost.
241 fn restore(&mut self, snapshot: StateSnapshot);
242
243 /// Clear all entries.
244 fn clear(&mut self);
245
246 /// Flush any pending writes to durable storage.
247 ///
248 /// For in-memory stores, this is a no-op. For memory-mapped or
249 /// disk-backed stores, this ensures data is persisted.
250 ///
251 /// # Errors
252 ///
253 /// Returns `StateError` if the flush operation fails.
254 fn flush(&mut self) -> Result<(), StateError> {
255 Ok(()) // Default no-op for in-memory stores
256 }
257
258 /// Get a zero-copy reference to a value by key.
259 ///
260 /// Returns a direct `&[u8]` slice into the store's internal buffer,
261 /// avoiding the `Bytes` ref-count overhead. Only backends that own
262 /// their storage contiguously (e.g., `AHashMapStore`) can implement
263 /// this; others return `None` and callers fall back to [`get`](Self::get).
264 ///
265 /// # Lifetime
266 ///
267 /// The returned slice borrows `self`, so no mutations are allowed
268 /// while the reference is live.
269 fn get_ref(&self, _key: &[u8]) -> Option<&[u8]> {
270 None
271 }
272
273 /// Return an incremental snapshot of mutations since the last checkpoint.
274 ///
275 /// Backends that track per-mutation changelogs return
276 /// `Some(IncrementalSnapshot)` containing only the puts and deletes
277 /// since `base_epoch`. Backends without changelog support return `None`,
278 /// and the checkpointer falls back to a full [`snapshot`](Self::snapshot).
279 fn incremental_snapshot(&self) -> Option<IncrementalSnapshot> {
280 None
281 }
282
283 /// Get a value or insert a default.
284 ///
285 /// If the key doesn't exist, the default is inserted and returned.
286 ///
287 /// # Errors
288 ///
289 /// Returns `StateError` if inserting the default value fails.
290 fn get_or_insert(&mut self, key: &[u8], default: &[u8]) -> Result<Bytes, StateError> {
291 if let Some(value) = self.get(key) {
292 Ok(value)
293 } else {
294 self.put(key, default)?;
295 Ok(Bytes::copy_from_slice(default))
296 }
297 }
298}
299
300/// Extension trait for [`StateStore`] providing typed access methods.
301///
302/// These methods use generics and thus cannot be part of the dyn-compatible
303/// `StateStore` trait. Import this trait to use typed access on any state store.
304///
305/// Uses rkyv for zero-copy serialization. Types must derive `Archive`,
306/// `rkyv::Serialize`, and `rkyv::Deserialize`.
307///
308/// # Example
309///
310/// ```rust,ignore
311/// use laminar_core::state::{StateStore, StateStoreExt, InMemoryStore};
312/// use rkyv::{Archive, Deserialize, Serialize};
313///
314/// #[derive(Archive, Serialize, Deserialize)]
315/// #[rkyv(check_bytes)]
316/// struct Counter { value: u64 }
317///
318/// let mut store = InMemoryStore::new();
319/// store.put_typed(b"count", &Counter { value: 42 }).unwrap();
320/// let count: Counter = store.get_typed(b"count").unwrap().unwrap();
321/// assert_eq!(count.value, 42);
322/// ```
323pub trait StateStoreExt: StateStore {
324 /// Get a value and deserialize it using rkyv.
325 ///
326 /// Uses zero-copy access where possible, falling back to full
327 /// deserialization to return an owned value.
328 ///
329 /// # Errors
330 ///
331 /// Returns `StateError::Serialization` if deserialization fails.
332 fn get_typed<T>(&self, key: &[u8]) -> Result<Option<T>, StateError>
333 where
334 T: Archive,
335 T::Archived: for<'a> CheckBytes<HighValidator<'a, RkyvError>>
336 + RkyvDeserialize<T, HighDeserializer<RkyvError>>,
337 {
338 // Prefer zero-copy get_ref when available
339 let bytes_ref = self.get_ref(key);
340 let bytes_owned;
341 let data: &[u8] = if let Some(r) = bytes_ref {
342 r
343 } else if let Some(b) = self.get(key) {
344 bytes_owned = b;
345 &bytes_owned
346 } else {
347 return Ok(None);
348 };
349
350 let archived = rkyv::access::<T::Archived, RkyvError>(data)
351 .map_err(|e| StateError::Serialization(e.to_string()))?;
352 let value = rkyv::deserialize::<T, RkyvError>(archived)
353 .map_err(|e| StateError::Serialization(e.to_string()))?;
354 Ok(Some(value))
355 }
356
357 /// Serialize and store a value using rkyv.
358 ///
359 /// Uses a thread-local reusable buffer to avoid per-call heap allocation
360 /// on the hot path. The buffer is cleared and reused between calls.
361 ///
362 /// # Errors
363 ///
364 /// Returns `StateError::Serialization` if serialization fails.
365 fn put_typed<T>(&mut self, key: &[u8], value: &T) -> Result<(), StateError>
366 where
367 T: for<'a> RkyvSerialize<HighSerializer<AlignedVec, ArenaHandle<'a>, RkyvError>>,
368 {
369 thread_local! {
370 static SERIALIZE_BUF: RefCell<AlignedVec> =
371 RefCell::new(AlignedVec::with_capacity(256));
372 }
373
374 SERIALIZE_BUF.with(|buf| {
375 let mut vec = buf.borrow_mut();
376 vec.clear();
377 // Take ownership to pass to to_bytes_in, then put it back
378 let taken = std::mem::take(&mut *vec);
379 match high::to_bytes_in::<_, RkyvError>(value, taken) {
380 Ok(filled) => {
381 let result = self.put(key, &filled);
382 *vec = filled;
383 result
384 }
385 Err(e) => {
386 // Restore an empty buffer on error
387 *vec = AlignedVec::new();
388 Err(StateError::Serialization(e.to_string()))
389 }
390 }
391 })
392 }
393
394 /// Update a value in place using a closure.
395 ///
396 /// The update function receives the current value (or None) and returns
397 /// the new value. If `None` is returned, the key is deleted.
398 ///
399 /// # Errors
400 ///
401 /// Returns `StateError` if the put or delete operation fails.
402 fn update<F>(&mut self, key: &[u8], f: F) -> Result<(), StateError>
403 where
404 F: FnOnce(Option<Bytes>) -> Option<Vec<u8>>,
405 {
406 let current = self.get(key);
407 match f(current) {
408 Some(new_value) => self.put(key, &new_value),
409 None => self.delete(key),
410 }
411 }
412}
413
414// Blanket implementation for all StateStore types
415impl<T: StateStore + ?Sized> StateStoreExt for T {}
416
417/// A snapshot of state store contents for checkpointing.
418///
419/// Snapshots can be serialized for persistence and restored later.
420/// They capture the complete state at a point in time.
421///
422/// Uses rkyv for zero-copy deserialization on the hot path.
423#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
424pub struct StateSnapshot {
425 /// Serialized state data
426 data: Vec<(Vec<u8>, Vec<u8>)>,
427 /// Timestamp when snapshot was created (nanoseconds since epoch)
428 timestamp_ns: u64,
429 /// Version for forward compatibility
430 version: u32,
431}
432
433impl StateSnapshot {
434 /// Create a new snapshot from key-value pairs.
435 #[must_use]
436 #[allow(clippy::cast_possible_truncation)]
437 pub fn new(data: Vec<(Vec<u8>, Vec<u8>)>) -> Self {
438 Self {
439 data,
440 // Truncation is acceptable here - we won't hit u64 overflow until ~584 years from epoch
441 timestamp_ns: std::time::SystemTime::now()
442 .duration_since(std::time::UNIX_EPOCH)
443 .map(|d| d.as_nanos() as u64)
444 .unwrap_or(0),
445 version: 1,
446 }
447 }
448
449 /// Get the snapshot data.
450 #[must_use]
451 pub fn data(&self) -> &[(Vec<u8>, Vec<u8>)] {
452 &self.data
453 }
454
455 /// Get the snapshot timestamp.
456 #[must_use]
457 pub fn timestamp_ns(&self) -> u64 {
458 self.timestamp_ns
459 }
460
461 /// Get the number of entries in the snapshot.
462 #[must_use]
463 pub fn len(&self) -> usize {
464 self.data.len()
465 }
466
467 /// Check if the snapshot is empty.
468 #[must_use]
469 pub fn is_empty(&self) -> bool {
470 self.data.is_empty()
471 }
472
473 /// Get the approximate size in bytes.
474 #[must_use]
475 pub fn size_bytes(&self) -> usize {
476 self.data.iter().map(|(k, v)| k.len() + v.len()).sum()
477 }
478
479 /// Serialize the snapshot to bytes using rkyv.
480 ///
481 /// Returns an aligned byte vector for optimal zero-copy access.
482 ///
483 /// # Errors
484 ///
485 /// Returns an error if serialization fails.
486 pub fn to_bytes(&self) -> Result<AlignedVec, StateError> {
487 rkyv::to_bytes::<RkyvError>(self).map_err(|e| StateError::Serialization(e.to_string()))
488 }
489
490 /// Deserialize a snapshot from bytes using rkyv.
491 ///
492 /// Uses zero-copy access internally for performance.
493 ///
494 /// # Errors
495 ///
496 /// Returns an error if deserialization fails.
497 pub fn from_bytes(bytes: &[u8]) -> Result<Self, StateError> {
498 let archived = rkyv::access::<<Self as Archive>::Archived, RkyvError>(bytes)
499 .map_err(|e| StateError::Serialization(e.to_string()))?;
500 rkyv::deserialize::<Self, RkyvError>(archived)
501 .map_err(|e| StateError::Serialization(e.to_string()))
502 }
503}
504
505/// In-memory state store using `BTreeMap` for sorted key access.
506///
507/// This state store is suitable for state that fits in memory. It uses
508/// `BTreeMap` which provides O(log n + k) prefix and range scans, making
509/// it efficient for join state and windowed aggregation lookups.
510///
511/// # Performance Characteristics
512///
513/// - **Get**: O(log n), < 500ns typical
514/// - **Put**: O(log n), may allocate
515/// - **Delete**: O(log n)
516/// - **Prefix scan**: O(log n + k) where k is matching entries
517/// - **Range scan**: O(log n + k) where k is matching entries
518///
519/// # Memory Usage
520///
521/// Keys and values are stored as owned `Vec<u8>` and `Bytes` respectively.
522/// Use `size_bytes()` to monitor memory usage.
523pub struct InMemoryStore {
524 /// The underlying sorted map
525 data: BTreeMap<Vec<u8>, Bytes>,
526 /// Track total size for monitoring
527 size_bytes: usize,
528}
529
530impl InMemoryStore {
531 /// Creates a new empty in-memory store.
532 #[must_use]
533 pub fn new() -> Self {
534 Self {
535 data: BTreeMap::new(),
536 size_bytes: 0,
537 }
538 }
539
540 /// Creates a new in-memory store.
541 ///
542 /// The capacity hint is accepted for API compatibility but has no
543 /// effect — `BTreeMap` does not support pre-allocation.
544 #[must_use]
545 pub fn with_capacity(_capacity: usize) -> Self {
546 Self::new()
547 }
548
549 /// Returns the number of entries in the store.
550 ///
551 /// `BTreeMap` does not expose a capacity concept, so this returns
552 /// the current entry count.
553 #[must_use]
554 pub fn capacity(&self) -> usize {
555 self.data.len()
556 }
557
558 /// No-op for API compatibility.
559 ///
560 /// `BTreeMap` manages its own memory and does not support
561 /// explicit shrinking.
562 pub fn shrink_to_fit(&mut self) {
563 // BTreeMap does not support shrink_to_fit
564 }
565}
566
567impl Default for InMemoryStore {
568 fn default() -> Self {
569 Self::new()
570 }
571}
572
573impl StateStore for InMemoryStore {
574 #[inline]
575 fn get(&self, key: &[u8]) -> Option<Bytes> {
576 self.data.get(key).cloned()
577 }
578
579 #[inline]
580 fn get_ref(&self, key: &[u8]) -> Option<&[u8]> {
581 self.data.get(key).map(Bytes::as_ref)
582 }
583
584 #[inline]
585 fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), StateError> {
586 let value_bytes = Bytes::copy_from_slice(value);
587
588 // Check-then-insert: avoids key.to_vec() allocation on the
589 // update path, which is the common case for accumulator state.
590 if let Some(existing) = self.data.get_mut(key) {
591 self.size_bytes -= existing.len();
592 self.size_bytes += value.len();
593 *existing = value_bytes;
594 } else {
595 self.size_bytes += key.len() + value.len();
596 self.data.insert(key.to_vec(), value_bytes);
597 }
598 Ok(())
599 }
600
601 fn delete(&mut self, key: &[u8]) -> Result<(), StateError> {
602 if let Some(old_value) = self.data.remove(key) {
603 self.size_bytes -= key.len() + old_value.len();
604 }
605 Ok(())
606 }
607
608 fn prefix_scan<'a>(
609 &'a self,
610 prefix: &'a [u8],
611 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
612 if prefix.is_empty() {
613 // Empty prefix matches everything
614 return Box::new(
615 self.data
616 .iter()
617 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
618 );
619 }
620 if let Some(end) = prefix_successor(prefix) {
621 Box::new(
622 self.data
623 .range::<[u8], _>((Bound::Included(prefix), Bound::Excluded(end.as_slice())))
624 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
625 )
626 } else {
627 // All-0xFF prefix: scan from prefix to end
628 Box::new(
629 self.data
630 .range::<[u8], _>((Bound::Included(prefix), Bound::Unbounded))
631 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
632 )
633 }
634 }
635
636 fn range_scan<'a>(
637 &'a self,
638 range: Range<&'a [u8]>,
639 ) -> Box<dyn Iterator<Item = (Bytes, Bytes)> + 'a> {
640 Box::new(
641 self.data
642 .range::<[u8], _>((Bound::Included(range.start), Bound::Excluded(range.end)))
643 .map(|(k, v)| (Bytes::copy_from_slice(k), v.clone())),
644 )
645 }
646
647 #[inline]
648 fn contains(&self, key: &[u8]) -> bool {
649 self.data.contains_key(key.as_ref())
650 }
651
652 fn size_bytes(&self) -> usize {
653 self.size_bytes
654 }
655
656 fn len(&self) -> usize {
657 self.data.len()
658 }
659
660 fn snapshot(&self) -> StateSnapshot {
661 let data: Vec<(Vec<u8>, Vec<u8>)> = self
662 .data
663 .iter()
664 .map(|(k, v)| (k.clone(), v.to_vec()))
665 .collect();
666 StateSnapshot::new(data)
667 }
668
669 fn restore(&mut self, snapshot: StateSnapshot) {
670 self.data.clear();
671 self.size_bytes = 0;
672
673 for (key, value) in snapshot.data {
674 self.size_bytes += key.len() + value.len();
675 self.data.insert(key, Bytes::from(value));
676 }
677 }
678
679 fn clear(&mut self) {
680 self.data.clear();
681 self.size_bytes = 0;
682 }
683}
684
685/// Errors that can occur in state operations.
686#[derive(Debug, thiserror::Error)]
687pub enum StateError {
688 /// I/O error
689 #[error("I/O error: {0}")]
690 Io(#[from] std::io::Error),
691
692 /// Serialization error
693 #[error("Serialization error: {0}")]
694 Serialization(String),
695
696 /// Deserialization error
697 #[error("Deserialization error: {0}")]
698 Deserialization(String),
699
700 /// Corruption error
701 #[error("Corruption error: {0}")]
702 Corruption(String),
703
704 /// Operation not supported by this store type
705 #[error("Operation not supported: {0}")]
706 NotSupported(String),
707
708 /// Key not found (for operations that require existing key)
709 #[error("Key not found")]
710 KeyNotFound,
711
712 /// Store capacity exceeded
713 #[error("Store capacity exceeded: {0}")]
714 CapacityExceeded(String),
715}
716
717mod mmap;
718
719/// AHashMap-backed state store with O(1) lookups and zero-copy reads.
720pub mod ahash_store;
721
722/// Changelog-aware state store wrapper.
723pub mod changelog_aware;
724
725/// Dictionary key encoder for low-cardinality GROUP BY keys.
726pub mod dict_encoder;
727
728// Re-export main types
729pub use self::StateError as Error;
730pub use ahash_store::AHashMapStore;
731pub use changelog_aware::{ChangelogAwareStore, ChangelogSink};
732pub use dict_encoder::DictionaryKeyEncoder;
733pub use mmap::MmapStateStore;
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 #[test]
740 fn test_in_memory_store_basic() {
741 let mut store = InMemoryStore::new();
742
743 // Test put and get
744 store.put(b"key1", b"value1").unwrap();
745 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
746 assert_eq!(store.len(), 1);
747
748 // Test overwrite
749 store.put(b"key1", b"value2").unwrap();
750 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value2"));
751 assert_eq!(store.len(), 1);
752
753 // Test delete
754 store.delete(b"key1").unwrap();
755 assert!(store.get(b"key1").is_none());
756 assert_eq!(store.len(), 0);
757
758 // Test delete non-existent key (should not error)
759 store.delete(b"nonexistent").unwrap();
760 }
761
762 #[test]
763 fn test_contains() {
764 let mut store = InMemoryStore::new();
765 assert!(!store.contains(b"key1"));
766
767 store.put(b"key1", b"value1").unwrap();
768 assert!(store.contains(b"key1"));
769
770 store.delete(b"key1").unwrap();
771 assert!(!store.contains(b"key1"));
772 }
773
774 #[test]
775 fn test_prefix_scan() {
776 let mut store = InMemoryStore::new();
777 store.put(b"prefix:1", b"value1").unwrap();
778 store.put(b"prefix:2", b"value2").unwrap();
779 store.put(b"prefix:10", b"value10").unwrap();
780 store.put(b"other:1", b"value3").unwrap();
781
782 let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
783 assert_eq!(results.len(), 3);
784
785 // All results should have the prefix
786 for (key, _) in &results {
787 assert!(key.starts_with(b"prefix:"));
788 }
789
790 // Empty prefix returns all
791 let all: Vec<_> = store.prefix_scan(b"").collect();
792 assert_eq!(all.len(), 4);
793 }
794
795 #[test]
796 fn test_range_scan() {
797 let mut store = InMemoryStore::new();
798 store.put(b"a", b"1").unwrap();
799 store.put(b"b", b"2").unwrap();
800 store.put(b"c", b"3").unwrap();
801 store.put(b"d", b"4").unwrap();
802
803 let results: Vec<_> = store.range_scan(b"b".as_slice()..b"d".as_slice()).collect();
804 assert_eq!(results.len(), 2);
805
806 let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref()).collect();
807 assert!(keys.contains(&b"b".as_slice()));
808 assert!(keys.contains(&b"c".as_slice()));
809 }
810
811 #[test]
812 fn test_snapshot_and_restore() {
813 let mut store = InMemoryStore::new();
814 store.put(b"key1", b"value1").unwrap();
815 store.put(b"key2", b"value2").unwrap();
816
817 // Take snapshot
818 let snapshot = store.snapshot();
819 assert_eq!(snapshot.len(), 2);
820
821 // Modify store
822 store.put(b"key1", b"modified").unwrap();
823 store.put(b"key3", b"value3").unwrap();
824 store.delete(b"key2").unwrap();
825
826 assert_eq!(store.len(), 2);
827 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("modified"));
828
829 // Restore from snapshot
830 store.restore(snapshot);
831
832 assert_eq!(store.len(), 2);
833 assert_eq!(store.get(b"key1").unwrap(), Bytes::from("value1"));
834 assert_eq!(store.get(b"key2").unwrap(), Bytes::from("value2"));
835 assert!(store.get(b"key3").is_none());
836 }
837
838 #[test]
839 fn test_snapshot_serialization() {
840 let mut store = InMemoryStore::new();
841 store.put(b"key1", b"value1").unwrap();
842 store.put(b"key2", b"value2").unwrap();
843
844 let snapshot = store.snapshot();
845
846 // Serialize and deserialize
847 let bytes = snapshot.to_bytes().unwrap();
848 let restored = StateSnapshot::from_bytes(&bytes).unwrap();
849
850 assert_eq!(restored.len(), snapshot.len());
851 assert_eq!(restored.data(), snapshot.data());
852 }
853
854 #[test]
855 fn test_typed_access() {
856 let mut store = InMemoryStore::new();
857
858 // Test with integer
859 store.put_typed(b"count", &42u64).unwrap();
860 let count: u64 = store.get_typed(b"count").unwrap().unwrap();
861 assert_eq!(count, 42);
862
863 // Test with string
864 store.put_typed(b"name", &String::from("alice")).unwrap();
865 let name: String = store.get_typed(b"name").unwrap().unwrap();
866 assert_eq!(name, "alice");
867
868 // Test with vector (complex type)
869 let nums = vec![1i64, 2, 3, 4, 5];
870 store.put_typed(b"nums", &nums).unwrap();
871 let restored: Vec<i64> = store.get_typed(b"nums").unwrap().unwrap();
872 assert_eq!(restored, nums);
873
874 // Test non-existent key
875 let missing: Option<u64> = store.get_typed(b"missing").unwrap();
876 assert!(missing.is_none());
877 }
878
879 #[test]
880 fn test_get_or_insert() {
881 let mut store = InMemoryStore::new();
882
883 // First call inserts default
884 let value = store.get_or_insert(b"key1", b"default").unwrap();
885 assert_eq!(value, Bytes::from("default"));
886 assert_eq!(store.len(), 1);
887
888 // Second call returns existing
889 store.put(b"key1", b"modified").unwrap();
890 let value = store.get_or_insert(b"key1", b"default").unwrap();
891 assert_eq!(value, Bytes::from("modified"));
892 }
893
894 #[test]
895 fn test_update() {
896 let mut store = InMemoryStore::new();
897 store.put(b"counter", b"\x00\x00\x00\x00").unwrap();
898
899 // Update existing
900 store
901 .update(b"counter", |current| {
902 let val = current.map_or(0u32, |b| {
903 u32::from_le_bytes(b.as_ref().try_into().unwrap_or([0; 4]))
904 });
905 Some((val + 1).to_le_bytes().to_vec())
906 })
907 .unwrap();
908
909 let bytes = store.get(b"counter").unwrap();
910 let val = u32::from_le_bytes(bytes.as_ref().try_into().unwrap());
911 assert_eq!(val, 1);
912
913 // Update to delete
914 store.update(b"counter", |_| None).unwrap();
915 assert!(store.get(b"counter").is_none());
916 }
917
918 #[test]
919 fn test_size_tracking() {
920 let mut store = InMemoryStore::new();
921 assert_eq!(store.size_bytes(), 0);
922
923 store.put(b"key1", b"value1").unwrap();
924 assert_eq!(store.size_bytes(), 4 + 6); // "key1" + "value1"
925
926 store.put(b"key2", b"value2").unwrap();
927 assert_eq!(store.size_bytes(), (4 + 6) * 2);
928
929 // Overwrite with smaller value
930 store.put(b"key1", b"v1").unwrap();
931 assert_eq!(store.size_bytes(), 4 + 2 + 4 + 6); // "key1" + "v1" + "key2" + "value2"
932
933 store.delete(b"key1").unwrap();
934 assert_eq!(store.size_bytes(), 4 + 6);
935
936 store.clear();
937 assert_eq!(store.size_bytes(), 0);
938 }
939
940 #[test]
941 fn test_with_capacity() {
942 let store = InMemoryStore::with_capacity(1000);
943 // BTreeMap does not pre-allocate; capacity() returns len() which is 0
944 assert_eq!(store.capacity(), 0);
945 assert!(store.is_empty());
946 }
947
948 #[test]
949 fn test_clear() {
950 let mut store = InMemoryStore::new();
951 store.put(b"key1", b"value1").unwrap();
952 store.put(b"key2", b"value2").unwrap();
953
954 assert_eq!(store.len(), 2);
955 assert!(store.size_bytes() > 0);
956
957 store.clear();
958
959 assert_eq!(store.len(), 0);
960 assert_eq!(store.size_bytes(), 0);
961 assert!(store.get(b"key1").is_none());
962 }
963
964 #[test]
965 fn test_prefix_successor() {
966 // Normal case
967 assert_eq!(prefix_successor(b"abc").as_deref(), Some(b"abd".as_slice()));
968
969 // Empty prefix
970 assert!(prefix_successor(b"").is_none());
971
972 // All 0xFF bytes — no successor
973 assert!(prefix_successor(&[0xFF, 0xFF, 0xFF]).is_none());
974
975 // Trailing 0xFF bytes are truncated and previous byte incremented
976 assert_eq!(
977 prefix_successor(&[0x01, 0xFF]).as_deref(),
978 Some([0x02].as_slice())
979 );
980 assert_eq!(
981 prefix_successor(&[0x01, 0x02, 0xFF]).as_deref(),
982 Some([0x01, 0x03].as_slice())
983 );
984
985 // Single byte
986 assert_eq!(
987 prefix_successor(&[0x00]).as_deref(),
988 Some([0x01].as_slice())
989 );
990 assert_eq!(
991 prefix_successor(&[0xFE]).as_deref(),
992 Some([0xFF].as_slice())
993 );
994 assert!(prefix_successor(&[0xFF]).is_none());
995 }
996
997 #[test]
998 fn test_prefix_scan_binary_keys() {
999 let mut store = InMemoryStore::new();
1000
1001 // Simulate join state keys: partition_prefix + key_hash
1002 let prefix_a = [0x00, 0x01]; // partition 0, stream 1
1003 let prefix_b = [0x00, 0x02]; // partition 0, stream 2
1004
1005 store.put(&[0x00, 0x01, 0xAA], b"val1").unwrap();
1006 store.put(&[0x00, 0x01, 0xBB], b"val2").unwrap();
1007 store.put(&[0x00, 0x02, 0xCC], b"val3").unwrap();
1008 store.put(&[0x00, 0x02, 0xDD], b"val4").unwrap();
1009 store.put(&[0x01, 0x01, 0xEE], b"val5").unwrap();
1010
1011 // Prefix scan for partition_a
1012 let results_a: Vec<_> = store.prefix_scan(&prefix_a).collect();
1013 assert_eq!(results_a.len(), 2);
1014 for (key, _) in &results_a {
1015 assert!(key.starts_with(&prefix_a));
1016 }
1017
1018 // Prefix scan for partition_b
1019 let results_b: Vec<_> = store.prefix_scan(&prefix_b).collect();
1020 assert_eq!(results_b.len(), 2);
1021 for (key, _) in &results_b {
1022 assert!(key.starts_with(&prefix_b));
1023 }
1024
1025 // Prefix scan with all-0xFF prefix
1026 let results_ff: Vec<_> = store.prefix_scan(&[0xFF, 0xFF]).collect();
1027 assert_eq!(results_ff.len(), 0);
1028 }
1029
1030 #[test]
1031 fn test_prefix_scan_returns_sorted() {
1032 let mut store = InMemoryStore::new();
1033 store.put(b"prefix:c", b"3").unwrap();
1034 store.put(b"prefix:a", b"1").unwrap();
1035 store.put(b"prefix:b", b"2").unwrap();
1036
1037 let results: Vec<_> = store.prefix_scan(b"prefix:").collect();
1038 let keys: Vec<_> = results.iter().map(|(k, _)| k.as_ref().to_vec()).collect();
1039 assert_eq!(
1040 keys,
1041 vec![
1042 b"prefix:a".to_vec(),
1043 b"prefix:b".to_vec(),
1044 b"prefix:c".to_vec()
1045 ]
1046 );
1047 }
1048}