Skip to main content

common/storage/
in_memory.rs

1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9    MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl};
13
14/// Trait for providing the current time.
15pub trait Clock: Send + Sync {
16    /// Returns the current time as milliseconds since the Unix epoch.
17    fn now(&self) -> i64;
18}
19
20/// Clock implementation that returns the real system time.
21pub struct WallClock;
22
23impl Clock for WallClock {
24    fn now(&self) -> i64 {
25        std::time::SystemTime::now()
26            .duration_since(std::time::UNIX_EPOCH)
27            .expect("system time before Unix epoch")
28            .as_millis() as i64
29    }
30}
31
32/// Internal wrapper that stores a value alongside its expiration timestamp.
33#[derive(Clone, Debug)]
34struct StoredValue {
35    value: Bytes,
36    /// `None` means the value never expires.
37    expire_ts: Option<i64>,
38}
39
40impl StoredValue {
41    fn is_expired(&self, now: i64) -> bool {
42        self.expire_ts.is_some_and(|ts| now >= ts)
43    }
44}
45
46/// Computes the absolute expiration timestamp from TTL options.
47fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
48    let duration = match ttl {
49        Ttl::Default => default_ttl,
50        Ttl::NoExpiry => None,
51        Ttl::ExpireAfter(ms) => Some(ms),
52    };
53    duration.map(|ms| now + ms as i64)
54}
55
56/// In-memory implementation of the Storage trait using a BTreeMap.
57///
58/// This implementation stores all data in memory and is useful for testing
59/// or scenarios where durability is not required. Supports TTL-based
60/// expiration via a configurable [`Clock`].
61pub struct InMemoryStorage {
62    data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
63    merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
64    clock: Arc<dyn Clock>,
65    default_ttl: Option<u64>,
66    written_seq: std::sync::atomic::AtomicU64,
67    durable_seq: std::sync::atomic::AtomicU64,
68    durable_tx: tokio::sync::watch::Sender<u64>,
69    defer_durability: bool,
70}
71
72impl InMemoryStorage {
73    /// Creates a new InMemoryStorage instance with an empty store.
74    pub fn new() -> Self {
75        let (durable_tx, _) = tokio::sync::watch::channel(0);
76        Self {
77            data: Arc::new(RwLock::new(BTreeMap::new())),
78            merge_operator: None,
79            clock: Arc::new(WallClock),
80            default_ttl: None,
81            written_seq: std::sync::atomic::AtomicU64::new(0),
82            durable_seq: std::sync::atomic::AtomicU64::new(0),
83            durable_tx,
84            defer_durability: false,
85        }
86    }
87
88    /// Creates a new InMemoryStorage instance with an optional merge operator.
89    ///
90    /// If a merge operator is provided, the `merge` method will use it to combine
91    /// existing values with new values. If no merge operator is provided, the
92    /// `merge` method will return an error.
93    pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
94        let (durable_tx, _) = tokio::sync::watch::channel(0);
95        Self {
96            data: Arc::new(RwLock::new(BTreeMap::new())),
97            merge_operator: Some(merge_operator),
98            clock: Arc::new(WallClock),
99            default_ttl: None,
100            written_seq: std::sync::atomic::AtomicU64::new(0),
101            durable_seq: std::sync::atomic::AtomicU64::new(0),
102            durable_tx,
103            defer_durability: false,
104        }
105    }
106
107    /// Sets a custom clock for TTL expiration checks.
108    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
109        self.clock = clock;
110        self
111    }
112
113    /// Sets the default TTL (in milliseconds) for records written with [`Ttl::Default`].
114    pub fn with_default_ttl(mut self, ttl: u64) -> Self {
115        self.default_ttl = Some(ttl);
116        self
117    }
118
119    /// Enables deferred durability mode.
120    ///
121    /// When enabled, writes are not marked as durable until [`flush()`](Storage::flush)
122    /// or [`flush_to()`](Self::flush_to) is called. This is useful for testing
123    /// `read_durable` behavior without depending on SlateDB internals.
124    pub fn with_deferred_durability(mut self) -> Self {
125        self.defer_durability = true;
126        self
127    }
128
129    /// Advances the durable watermark to a specific sequence number.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `seq` is greater than the current written sequence number
134    /// or less than the current durable sequence number.
135    pub fn flush_to(&self, seq: u64) {
136        let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
137        assert!(
138            seq <= written,
139            "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
140        );
141        let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
142        assert!(
143            seq >= durable,
144            "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
145        );
146        self.durable_seq
147            .store(seq, std::sync::atomic::Ordering::Relaxed);
148        let _ = self.durable_tx.send(seq);
149    }
150
151    /// Allocates the next sequence number.
152    ///
153    /// When `defer_durability` is false (default), the sequence number is
154    /// immediately marked as durable. When true, durability is deferred
155    /// until an explicit flush.
156    fn next_seqnum(&self) -> u64 {
157        let seq = self
158            .written_seq
159            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
160            + 1;
161        if !self.defer_durability {
162            self.durable_seq
163                .store(seq, std::sync::atomic::Ordering::Relaxed);
164            let _ = self.durable_tx.send(seq);
165        }
166        seq
167    }
168}
169
170impl Default for InMemoryStorage {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176#[async_trait]
177impl StorageRead for InMemoryStorage {
178    /// Retrieves a single record by key from the in-memory store.
179    ///
180    /// Returns `None` if the key does not exist or has expired.
181    #[tracing::instrument(level = "trace", skip_all)]
182    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
183        let data = self
184            .data
185            .read()
186            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
187
188        match data.get(&key) {
189            Some(stored) if !stored.is_expired(self.clock.now()) => {
190                Ok(Some(Record::new(key, stored.value.clone())))
191            }
192            _ => Ok(None),
193        }
194    }
195
196    #[tracing::instrument(level = "trace", skip_all)]
197    async fn scan_iter(
198        &self,
199        range: BytesRange,
200    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
201        let data = self
202            .data
203            .read()
204            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
205
206        let now = self.clock.now();
207        let records: Vec<Record> = data
208            .range((range.start_bound().cloned(), range.end_bound().cloned()))
209            .filter(|(_, stored)| !stored.is_expired(now))
210            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
211            .collect();
212
213        Ok(Box::new(InMemoryIterator { records, index: 0 }))
214    }
215}
216
217struct InMemoryIterator {
218    records: Vec<Record>,
219    index: usize,
220}
221
222#[async_trait]
223impl StorageIterator for InMemoryIterator {
224    #[tracing::instrument(level = "trace", skip_all)]
225    async fn next(&mut self) -> StorageResult<Option<Record>> {
226        if self.index >= self.records.len() {
227            Ok(None)
228        } else {
229            let record = self.records[self.index].clone();
230            self.index += 1;
231            Ok(Some(record))
232        }
233    }
234}
235
236/// In-memory snapshot that holds a copy of the data at the time of snapshot creation.
237///
238/// Provides a consistent read-only view of the database at the time the snapshot was created.
239/// Expired entries are filtered out at read time using the shared clock.
240pub struct InMemoryStorageSnapshot {
241    data: Arc<BTreeMap<Bytes, StoredValue>>,
242    clock: Arc<dyn Clock>,
243}
244
245#[async_trait]
246impl StorageRead for InMemoryStorageSnapshot {
247    #[tracing::instrument(level = "trace", skip_all)]
248    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
249        match self.data.get(&key) {
250            Some(stored) if !stored.is_expired(self.clock.now()) => {
251                Ok(Some(Record::new(key, stored.value.clone())))
252            }
253            _ => Ok(None),
254        }
255    }
256
257    #[tracing::instrument(level = "trace", skip_all)]
258    async fn scan_iter(
259        &self,
260        range: BytesRange,
261    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
262        let now = self.clock.now();
263        let records: Vec<Record> = self
264            .data
265            .range((range.start_bound().cloned(), range.end_bound().cloned()))
266            .filter(|(_, stored)| !stored.is_expired(now))
267            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
268            .collect();
269
270        Ok(Box::new(InMemoryIterator { records, index: 0 }))
271    }
272}
273
274#[async_trait]
275impl StorageSnapshot for InMemoryStorageSnapshot {}
276
277#[async_trait]
278impl Storage for InMemoryStorage {
279    async fn apply_with_options(
280        &self,
281        records: Vec<RecordOp>,
282        _options: WriteOptions,
283    ) -> StorageResult<WriteResult> {
284        let mut data = self
285            .data
286            .write()
287            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
288
289        let now = self.clock.now();
290        for record in records {
291            match record {
292                RecordOp::Put(op) => {
293                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
294                    data.insert(
295                        op.record.key,
296                        StoredValue {
297                            value: op.record.value,
298                            expire_ts,
299                        },
300                    );
301                }
302                RecordOp::Merge(op) => {
303                    let existing_value = data
304                        .get(&op.record.key)
305                        .filter(|s| !s.is_expired(now))
306                        .map(|s| s.value.clone());
307                    let merged_value = self.merge_operator.as_ref().unwrap().merge_batch(
308                        &op.record.key,
309                        existing_value,
310                        &[op.record.value],
311                    );
312                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
313                    data.insert(
314                        op.record.key,
315                        StoredValue {
316                            value: merged_value,
317                            expire_ts,
318                        },
319                    );
320                }
321                RecordOp::Delete(key) => {
322                    data.remove(&key);
323                }
324            }
325        }
326
327        Ok(WriteResult {
328            seqnum: self.next_seqnum(),
329        })
330    }
331
332    /// Writes a batch of records to the in-memory store.
333    ///
334    /// All records are written atomically within a single write lock acquisition.
335    /// For in-memory storage, write options are ignored since there is no
336    /// durable storage to await.
337    async fn put_with_options(
338        &self,
339        records: Vec<PutRecordOp>,
340        _options: WriteOptions,
341    ) -> StorageResult<WriteResult> {
342        let mut data = self
343            .data
344            .write()
345            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
346
347        let now = self.clock.now();
348        for op in records {
349            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
350            data.insert(
351                op.record.key,
352                StoredValue {
353                    value: op.record.value,
354                    expire_ts,
355                },
356            );
357        }
358
359        Ok(WriteResult {
360            seqnum: self.next_seqnum(),
361        })
362    }
363
364    /// Merges values for the given keys using the configured merge operator.
365    ///
366    /// This method requires a merge operator to be configured during construction.
367    /// For each record, it will:
368    /// 1. Get the existing value (if any), excluding expired entries
369    /// 2. Call the merge operator to combine existing and new values
370    /// 3. Put the merged result back with the new TTL
371    ///
372    /// If no merge operator is configured, this method will return a
373    /// `StorageError::Storage` error.
374    async fn merge_with_options(
375        &self,
376        records: Vec<MergeRecordOp>,
377        _options: WriteOptions,
378    ) -> StorageResult<WriteResult> {
379        let merge_op = self
380            .merge_operator
381            .as_ref()
382            .ok_or_else(|| {
383                StorageError::Storage(
384                    "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
385                )
386            })?;
387
388        let mut data = self
389            .data
390            .write()
391            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
392
393        let now = self.clock.now();
394        for op in records {
395            let existing_value = data
396                .get(&op.record.key)
397                .filter(|s| !s.is_expired(now))
398                .map(|s| s.value.clone());
399            let merged_value =
400                merge_op.merge_batch(&op.record.key, existing_value, &[op.record.value]);
401            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
402            data.insert(
403                op.record.key,
404                StoredValue {
405                    value: merged_value,
406                    expire_ts,
407                },
408            );
409        }
410
411        Ok(WriteResult {
412            seqnum: self.next_seqnum(),
413        })
414    }
415
416    /// Creates a point-in-time snapshot of the in-memory storage.
417    ///
418    /// The snapshot provides a consistent read-only view of the database at the time
419    /// the snapshot was created. Reads from the snapshot will not see any subsequent
420    /// writes to the underlying storage. Expired entries are filtered at read time.
421    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
422        let data = self
423            .data
424            .read()
425            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
426
427        let snapshot_data = Arc::new(data.clone());
428
429        Ok(Arc::new(InMemoryStorageSnapshot {
430            data: snapshot_data,
431            clock: self.clock.clone(),
432        }))
433    }
434
435    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
436        self.durable_tx.subscribe()
437    }
438
439    async fn flush(&self) -> StorageResult<()> {
440        if self.defer_durability {
441            self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
442        }
443        Ok(())
444    }
445
446    async fn close(&self) -> StorageResult<()> {
447        // No-op for in-memory storage
448        Ok(())
449    }
450}
451
452/// Injected failure that fires either once or on every call.
453#[cfg(feature = "test-utils")]
454#[derive(Clone)]
455enum Failure {
456    /// Error is returned once, then automatically cleared.
457    Once(super::StorageError),
458    /// Error is returned on every subsequent call until explicitly cleared.
459    Persistent(super::StorageError),
460}
461
462#[cfg(feature = "test-utils")]
463type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
464
465/// Checks a [`FailSlot`] and returns an error if one is set.
466///
467/// For [`Failure::Once`], the slot is atomically swapped to `None` so the
468/// error fires exactly once. For [`Failure::Persistent`], the slot is left
469/// unchanged.
470#[cfg(feature = "test-utils")]
471fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
472    let guard = slot.load();
473    match guard.as_ref() {
474        None => Ok(()),
475        Some(Failure::Persistent(err)) => Err(err.clone()),
476        Some(Failure::Once(_)) => {
477            // Swap to None; if another thread raced us, one of them gets the
478            // error and the others pass through — reasonable for tests.
479            let prev = slot.swap(Arc::new(None));
480            match prev.as_ref() {
481                Some(Failure::Once(err)) => Err(err.clone()),
482                _ => Ok(()),
483            }
484        }
485    }
486}
487
488/// A storage wrapper that delegates to an inner [`Storage`] but can inject
489/// failures into `apply`, `put`/`put_with_options`, `flush`, and `snapshot` on demand.
490///
491/// Each failure slot is controlled by a lock-free [`ArcSwap`](arc_swap::ArcSwap).
492/// The read path avoids introducing artificial synchronisation that could mask
493/// concurrency bugs in the code under test.
494///
495/// Failures can be *persistent* (returned on every call until cleared) or
496/// *once* (returned on the next call, then automatically cleared).
497///
498/// Gated behind the `test-utils` feature.
499///
500/// # Example
501///
502/// ```ignore
503/// let inner = Arc::new(InMemoryStorage::new());
504/// let storage = FailingStorage::wrap(inner);
505/// storage.fail_put(StorageError::Storage("disk full".into()));
506/// // every put_with_options call now returns Err(...)
507///
508/// storage.fail_flush_once(StorageError::Storage("io error".into()));
509/// // only the next flush call returns Err(...), then auto-clears
510/// ```
511#[cfg(feature = "test-utils")]
512pub struct FailingStorage {
513    inner: Arc<dyn super::Storage>,
514    fail_apply: FailSlot,
515    fail_put: FailSlot,
516    fail_flush: FailSlot,
517    fail_snapshot: FailSlot,
518}
519
520#[cfg(feature = "test-utils")]
521impl FailingStorage {
522    /// Wraps an existing storage, with all failure injections initially `None`.
523    pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
524        Arc::new(Self {
525            inner,
526            fail_apply: arc_swap::ArcSwap::from_pointee(None),
527            fail_put: arc_swap::ArcSwap::from_pointee(None),
528            fail_flush: arc_swap::ArcSwap::from_pointee(None),
529            fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
530        })
531    }
532
533    /// Makes `apply` return the given error on every subsequent call.
534    pub fn fail_apply(&self, err: super::StorageError) {
535        self.fail_apply
536            .store(Arc::new(Some(Failure::Persistent(err))));
537    }
538
539    /// Makes `apply` return the given error on the next call only.
540    pub fn fail_apply_once(&self, err: super::StorageError) {
541        self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
542    }
543
544    /// Makes `put` and `put_with_options` return the given error on every subsequent call.
545    pub fn fail_put(&self, err: super::StorageError) {
546        self.fail_put
547            .store(Arc::new(Some(Failure::Persistent(err))));
548    }
549
550    /// Makes `put` or `put_with_options` return the given error on the next call only.
551    ///
552    /// The failure fires on whichever of the two methods is called first, then
553    /// automatically clears.
554    pub fn fail_put_once(&self, err: super::StorageError) {
555        self.fail_put.store(Arc::new(Some(Failure::Once(err))));
556    }
557
558    /// Makes `flush` return the given error on every subsequent call.
559    pub fn fail_flush(&self, err: super::StorageError) {
560        self.fail_flush
561            .store(Arc::new(Some(Failure::Persistent(err))));
562    }
563
564    /// Makes `flush` return the given error on the next call only.
565    pub fn fail_flush_once(&self, err: super::StorageError) {
566        self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
567    }
568
569    /// Makes `snapshot` return the given error on every subsequent call.
570    pub fn fail_snapshot(&self, err: super::StorageError) {
571        self.fail_snapshot
572            .store(Arc::new(Some(Failure::Persistent(err))));
573    }
574
575    /// Makes `snapshot` return the given error on the next call only.
576    pub fn fail_snapshot_once(&self, err: super::StorageError) {
577        self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
578    }
579}
580
581#[cfg(feature = "test-utils")]
582#[async_trait]
583impl super::StorageRead for FailingStorage {
584    async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
585        self.inner.get(key).await
586    }
587
588    async fn scan_iter(
589        &self,
590        range: crate::BytesRange,
591    ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
592        self.inner.scan_iter(range).await
593    }
594}
595
596#[cfg(feature = "test-utils")]
597#[async_trait]
598impl super::Storage for FailingStorage {
599    async fn apply_with_options(
600        &self,
601        ops: Vec<super::RecordOp>,
602        options: super::WriteOptions,
603    ) -> super::StorageResult<super::WriteResult> {
604        check_failure(&self.fail_apply)?;
605        self.inner.apply_with_options(ops, options).await
606    }
607
608    async fn put_with_options(
609        &self,
610        records: Vec<super::PutRecordOp>,
611        options: super::WriteOptions,
612    ) -> super::StorageResult<super::WriteResult> {
613        check_failure(&self.fail_put)?;
614        self.inner.put_with_options(records, options).await
615    }
616
617    async fn merge_with_options(
618        &self,
619        records: Vec<super::MergeRecordOp>,
620        options: super::WriteOptions,
621    ) -> super::StorageResult<super::WriteResult> {
622        self.inner.merge_with_options(records, options).await
623    }
624
625    async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
626        check_failure(&self.fail_snapshot)?;
627        self.inner.snapshot().await
628    }
629
630    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
631        self.inner.subscribe_durable()
632    }
633
634    async fn flush(&self) -> super::StorageResult<()> {
635        check_failure(&self.fail_flush)?;
636        self.inner.flush().await
637    }
638
639    async fn close(&self) -> super::StorageResult<()> {
640        self.inner.close().await
641    }
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use bytes::BytesMut;
648    use std::ops::Bound;
649
650    /// Test merge operator that appends new value to existing value with a separator.
651    struct AppendMergeOperator;
652
653    impl MergeOperator for AppendMergeOperator {
654        fn merge_batch(
655            &self,
656            _key: &Bytes,
657            existing_value: Option<Bytes>,
658            operands: &[Bytes],
659        ) -> Bytes {
660            operands
661                .iter()
662                .fold(existing_value.unwrap_or_default(), |acc, operand| {
663                    let mut result = BytesMut::from(acc);
664                    if !result.is_empty() {
665                        result.extend_from_slice(b",");
666                    }
667                    result.extend_from_slice(operand);
668                    result.freeze()
669                })
670        }
671    }
672
673    #[tokio::test]
674    async fn should_return_none_when_key_not_found() {
675        // given
676        let storage = InMemoryStorage::new();
677
678        // when
679        let result = storage.get(Bytes::from("missing_key")).await;
680
681        // then
682        assert!(result.is_ok());
683        assert!(result.unwrap().is_none());
684    }
685
686    #[tokio::test]
687    async fn should_store_and_retrieve_record() {
688        // given
689        let storage = InMemoryStorage::new();
690        let key = Bytes::from("test_key");
691        let value = Bytes::from("test_value");
692
693        // when
694        storage
695            .put(vec![Record::new(key.clone(), value.clone()).into()])
696            .await
697            .unwrap();
698        let result = storage.get(key).await.unwrap();
699
700        // then
701        assert!(result.is_some());
702        let record = result.unwrap();
703        assert_eq!(record.key, Bytes::from("test_key"));
704        assert_eq!(record.value, value);
705    }
706
707    #[tokio::test]
708    async fn should_overwrite_existing_key() {
709        // given
710        let storage = InMemoryStorage::new();
711        let key = Bytes::from("test_key");
712        let initial_value = Bytes::from("initial_value");
713        let updated_value = Bytes::from("updated_value");
714
715        // when
716        storage
717            .put(vec![Record::new(key.clone(), initial_value).into()])
718            .await
719            .unwrap();
720        storage
721            .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
722            .await
723            .unwrap();
724        let result = storage.get(key).await.unwrap();
725
726        // then
727        assert!(result.is_some());
728        assert_eq!(result.unwrap().value, updated_value);
729    }
730
731    #[tokio::test]
732    async fn should_store_multiple_records() {
733        // given
734        let storage = InMemoryStorage::new();
735        let records = vec![
736            Record::new(Bytes::from("key1"), Bytes::from("value1")),
737            Record::new(Bytes::from("key2"), Bytes::from("value2")),
738            Record::new(Bytes::from("key3"), Bytes::from("value3")),
739        ];
740
741        // when
742        storage
743            .put(records.iter().cloned().map(PutRecordOp::new).collect())
744            .await
745            .unwrap();
746
747        // then
748        for record in records {
749            let retrieved = storage.get(record.key.clone()).await.unwrap();
750            assert!(retrieved.is_some());
751            assert_eq!(retrieved.unwrap().value, record.value);
752        }
753    }
754
755    #[tokio::test]
756    async fn should_scan_all_records_when_unbounded() {
757        // given
758        let storage = InMemoryStorage::new();
759        let records = [
760            Record::new(Bytes::from("a"), Bytes::from("value_a")),
761            Record::new(Bytes::from("b"), Bytes::from("value_b")),
762            Record::new(Bytes::from("c"), Bytes::from("value_c")),
763        ];
764        storage
765            .put(records.iter().cloned().map(PutRecordOp::new).collect())
766            .await
767            .unwrap();
768
769        // when
770        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
771
772        // then
773        assert_eq!(scanned.len(), 3);
774        assert_eq!(scanned[0].key, Bytes::from("a"));
775        assert_eq!(scanned[1].key, Bytes::from("b"));
776        assert_eq!(scanned[2].key, Bytes::from("c"));
777    }
778
779    #[tokio::test]
780    async fn should_scan_records_with_prefix() {
781        // given
782        let storage = InMemoryStorage::new();
783        let records = vec![
784            Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
785            Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
786            Record::new(Bytes::from("other_c"), Bytes::from("value3")),
787        ];
788        storage
789            .put(records.into_iter().map(PutRecordOp::new).collect())
790            .await
791            .unwrap();
792
793        // when
794        let scanned = storage
795            .scan(BytesRange::prefix(Bytes::from("prefix_")))
796            .await
797            .unwrap();
798
799        // then
800        assert_eq!(scanned.len(), 2);
801        assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
802        assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
803    }
804
805    #[tokio::test]
806    async fn should_scan_records_in_bounded_range() {
807        // given
808        let storage = InMemoryStorage::new();
809        let records = vec![
810            Record::new(Bytes::from("a"), Bytes::from("value_a")),
811            Record::new(Bytes::from("b"), Bytes::from("value_b")),
812            Record::new(Bytes::from("c"), Bytes::from("value_c")),
813            Record::new(Bytes::from("d"), Bytes::from("value_d")),
814        ];
815        storage
816            .put(records.into_iter().map(PutRecordOp::new).collect())
817            .await
818            .unwrap();
819
820        // when
821        let range = BytesRange::new(
822            Bound::Included(Bytes::from("b")),
823            Bound::Excluded(Bytes::from("d")),
824        );
825        let scanned = storage.scan(range).await.unwrap();
826
827        // then
828        assert_eq!(scanned.len(), 2);
829        assert_eq!(scanned[0].key, Bytes::from("b"));
830        assert_eq!(scanned[1].key, Bytes::from("c"));
831    }
832
833    #[tokio::test]
834    async fn should_return_empty_vec_when_scanning_empty_storage() {
835        // given
836        let storage = InMemoryStorage::new();
837
838        // when
839        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
840
841        // then
842        assert!(scanned.is_empty());
843    }
844
845    #[tokio::test]
846    async fn should_iterate_over_records() {
847        // given
848        let storage = InMemoryStorage::new();
849        let records = vec![
850            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
851            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
852        ];
853        storage.put(records).await.unwrap();
854
855        // when
856        let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
857        let first = iter.next().await.unwrap();
858        let second = iter.next().await.unwrap();
859        let third = iter.next().await.unwrap();
860
861        // then
862        assert!(first.is_some());
863        assert_eq!(first.unwrap().key, Bytes::from("key1"));
864        assert!(second.is_some());
865        assert_eq!(second.unwrap().key, Bytes::from("key2"));
866        assert!(third.is_none());
867    }
868
869    #[tokio::test]
870    async fn should_create_snapshot_with_current_data() {
871        // given
872        let storage = InMemoryStorage::new();
873        storage
874            .put(vec![
875                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
876            ])
877            .await
878            .unwrap();
879
880        // when
881        let snapshot = storage.snapshot().await.unwrap();
882
883        // then
884        let result = snapshot.get(Bytes::from("key1")).await.unwrap();
885        assert!(result.is_some());
886        assert_eq!(result.unwrap().value, Bytes::from("value1"));
887    }
888
889    #[tokio::test]
890    async fn should_not_see_writes_after_snapshot() {
891        // given
892        let storage = InMemoryStorage::new();
893        storage
894            .put(vec![
895                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
896            ])
897            .await
898            .unwrap();
899
900        // when
901        let snapshot = storage.snapshot().await.unwrap();
902        storage
903            .put(vec![
904                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
905            ])
906            .await
907            .unwrap();
908
909        // then
910        let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
911        assert!(snapshot_result.is_none());
912
913        let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
914        assert!(storage_result.is_some());
915    }
916
917    #[tokio::test]
918    async fn should_scan_snapshot_independently() {
919        // given
920        let storage = InMemoryStorage::new();
921        storage
922            .put(vec![
923                Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
924            ])
925            .await
926            .unwrap();
927
928        // when
929        let snapshot = storage.snapshot().await.unwrap();
930        storage
931            .put(vec![
932                Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
933            ])
934            .await
935            .unwrap();
936
937        // then
938        let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
939        assert_eq!(snapshot_records.len(), 1);
940        assert_eq!(snapshot_records[0].key, Bytes::from("a"));
941
942        let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
943        assert_eq!(storage_records.len(), 2);
944    }
945
946    #[tokio::test]
947    async fn should_handle_empty_record() {
948        // given
949        let storage = InMemoryStorage::new();
950        let key = Bytes::from("empty_key");
951
952        // when
953        storage
954            .put(vec![Record::empty(key.clone()).into()])
955            .await
956            .unwrap();
957        let result = storage.get(key).await.unwrap();
958
959        // then
960        assert!(result.is_some());
961        assert_eq!(result.unwrap().value, Bytes::new());
962    }
963
964    #[tokio::test]
965    async fn should_return_error_when_merge_operator_not_configured() {
966        // given
967        let storage = InMemoryStorage::new();
968        let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
969
970        // when
971        let result = storage.merge(vec![record.into()]).await;
972
973        // then
974        assert!(result.is_err());
975        assert!(
976            result
977                .unwrap_err()
978                .to_string()
979                .contains("Merge operator not configured")
980        );
981    }
982
983    #[tokio::test]
984    async fn should_merge_when_key_does_not_exist() {
985        // given
986        let merge_op = Arc::new(AppendMergeOperator);
987        let storage = InMemoryStorage::with_merge_operator(merge_op);
988        let key = Bytes::from("new_key");
989        let value = Bytes::from("value1");
990
991        // when
992        storage
993            .merge(vec![Record::new(key.clone(), value.clone()).into()])
994            .await
995            .unwrap();
996        let result = storage.get(key).await.unwrap();
997
998        // then
999        assert!(result.is_some());
1000        assert_eq!(result.unwrap().value, value);
1001    }
1002
1003    #[tokio::test]
1004    async fn should_merge_when_key_exists() {
1005        // given
1006        let merge_op = Arc::new(AppendMergeOperator);
1007        let storage = InMemoryStorage::with_merge_operator(merge_op);
1008        let key = Bytes::from("key1");
1009        let initial_value = Bytes::from("value1");
1010        let new_value = Bytes::from("value2");
1011
1012        storage
1013            .put(vec![Record::new(key.clone(), initial_value).into()])
1014            .await
1015            .unwrap();
1016
1017        // when
1018        storage
1019            .merge(vec![Record::new(key.clone(), new_value).into()])
1020            .await
1021            .unwrap();
1022        let result = storage.get(key).await.unwrap();
1023
1024        // then
1025        assert!(result.is_some());
1026        assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1027    }
1028
1029    #[tokio::test]
1030    async fn should_merge_multiple_keys() {
1031        // given
1032        let merge_op = Arc::new(AppendMergeOperator);
1033        let storage = InMemoryStorage::with_merge_operator(merge_op);
1034        let records = vec![
1035            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1036            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1037        ];
1038        storage.put(records).await.unwrap();
1039
1040        // when
1041        storage
1042            .merge(vec![
1043                Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1044                Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1045            ])
1046            .await
1047            .unwrap();
1048
1049        // then
1050        let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1051        assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1052
1053        let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1054        assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1055    }
1056
1057    #[tokio::test]
1058    async fn should_return_monotonically_increasing_seqnums_from_put() {
1059        let storage = InMemoryStorage::new();
1060
1061        let r1 = storage
1062            .put(vec![
1063                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1064            ])
1065            .await
1066            .unwrap();
1067        let r2 = storage
1068            .put(vec![
1069                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1070            ])
1071            .await
1072            .unwrap();
1073        let r3 = storage
1074            .put(vec![
1075                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1076            ])
1077            .await
1078            .unwrap();
1079
1080        assert_eq!(r1.seqnum, 1);
1081        assert_eq!(r2.seqnum, 2);
1082        assert_eq!(r3.seqnum, 3);
1083    }
1084
1085    #[tokio::test]
1086    async fn should_return_monotonically_increasing_seqnums_from_apply() {
1087        let storage = InMemoryStorage::new();
1088
1089        let r1 = storage
1090            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1091                Bytes::from("k1"),
1092                Bytes::from("v1"),
1093            )))])
1094            .await
1095            .unwrap();
1096        let r2 = storage
1097            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1098                Bytes::from("k2"),
1099                Bytes::from("v2"),
1100            )))])
1101            .await
1102            .unwrap();
1103
1104        assert_eq!(r1.seqnum, 1);
1105        assert_eq!(r2.seqnum, 2);
1106    }
1107
1108    #[tokio::test]
1109    async fn should_share_seqnum_counter_across_write_methods() {
1110        let merge_op = Arc::new(AppendMergeOperator);
1111        let storage = InMemoryStorage::with_merge_operator(merge_op);
1112
1113        let r1 = storage
1114            .put(vec![
1115                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1116            ])
1117            .await
1118            .unwrap();
1119        let r2 = storage
1120            .merge(vec![
1121                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1122            ])
1123            .await
1124            .unwrap();
1125        let r3 = storage
1126            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1127                Bytes::from("k2"),
1128                Bytes::from("v3"),
1129            )))])
1130            .await
1131            .unwrap();
1132
1133        assert_eq!(r1.seqnum, 1);
1134        assert_eq!(r2.seqnum, 2);
1135        assert_eq!(r3.seqnum, 3);
1136    }
1137
1138    #[tokio::test]
1139    async fn should_start_durable_subscriber_at_zero() {
1140        let storage = InMemoryStorage::new();
1141        let rx = storage.subscribe_durable();
1142        assert_eq!(*rx.borrow(), 0);
1143    }
1144
1145    #[tokio::test]
1146    async fn should_advance_durable_watermark_on_each_write() {
1147        let storage = InMemoryStorage::new();
1148        let rx = storage.subscribe_durable();
1149
1150        storage
1151            .put(vec![
1152                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1153            ])
1154            .await
1155            .unwrap();
1156        assert_eq!(*rx.borrow(), 1);
1157
1158        storage
1159            .put(vec![
1160                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1161            ])
1162            .await
1163            .unwrap();
1164        assert_eq!(*rx.borrow(), 2);
1165    }
1166
1167    #[tokio::test]
1168    async fn should_merge_empty_values() {
1169        // given
1170        let merge_op = Arc::new(AppendMergeOperator);
1171        let storage = InMemoryStorage::with_merge_operator(merge_op);
1172        let key = Bytes::from("key1");
1173
1174        // when
1175        storage
1176            .merge(vec![Record::empty(key.clone()).into()])
1177            .await
1178            .unwrap();
1179        let result = storage.get(key).await.unwrap();
1180
1181        // then
1182        assert!(result.is_some());
1183        assert_eq!(result.unwrap().value, Bytes::new());
1184    }
1185
1186    #[tokio::test]
1187    async fn should_not_advance_durable_watermark_when_deferred() {
1188        let storage = InMemoryStorage::new().with_deferred_durability();
1189        let rx = storage.subscribe_durable();
1190
1191        storage
1192            .put(vec![
1193                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1194            ])
1195            .await
1196            .unwrap();
1197        assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1198
1199        storage
1200            .put(vec![
1201                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1202            ])
1203            .await
1204            .unwrap();
1205        assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1206    }
1207
1208    #[tokio::test]
1209    async fn should_advance_durable_watermark_on_flush_when_deferred() {
1210        let storage = InMemoryStorage::new().with_deferred_durability();
1211        let rx = storage.subscribe_durable();
1212
1213        storage
1214            .put(vec![
1215                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1216            ])
1217            .await
1218            .unwrap();
1219        storage
1220            .put(vec![
1221                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1222            ])
1223            .await
1224            .unwrap();
1225        assert_eq!(*rx.borrow(), 0);
1226
1227        storage.flush().await.unwrap();
1228        assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1229    }
1230
1231    #[tokio::test]
1232    async fn should_advance_durable_watermark_to_specific_seq() {
1233        let storage = InMemoryStorage::new().with_deferred_durability();
1234        let rx = storage.subscribe_durable();
1235
1236        // Write 3 records
1237        for i in 1..=3 {
1238            storage
1239                .put(vec![
1240                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1241                ])
1242                .await
1243                .unwrap();
1244        }
1245        assert_eq!(*rx.borrow(), 0);
1246
1247        // Partially flush through seqnum 2
1248        storage.flush_to(2);
1249        assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1250
1251        // Flush the rest
1252        storage.flush_to(3);
1253        assert_eq!(*rx.borrow(), 3);
1254    }
1255
1256    #[tokio::test]
1257    #[should_panic(expected = "cannot move durable seqnum backwards")]
1258    async fn should_panic_when_flush_to_moves_backwards() {
1259        let storage = InMemoryStorage::new().with_deferred_durability();
1260        for i in 1..=3 {
1261            storage
1262                .put(vec![
1263                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1264                ])
1265                .await
1266                .unwrap();
1267        }
1268
1269        storage.flush_to(2);
1270        storage.flush_to(1);
1271    }
1272
1273    #[tokio::test]
1274    #[should_panic(expected = "cannot flush beyond written seqnum")]
1275    async fn should_panic_when_flush_to_exceeds_written() {
1276        let storage = InMemoryStorage::new().with_deferred_durability();
1277        storage
1278            .put(vec![
1279                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1280            ])
1281            .await
1282            .unwrap();
1283
1284        storage.flush_to(5);
1285    }
1286
1287    #[tokio::test]
1288    async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1289        let storage = InMemoryStorage::new().with_deferred_durability();
1290
1291        storage
1292            .put(vec![
1293                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1294            ])
1295            .await
1296            .unwrap();
1297
1298        // Data is written but not durable — snapshots should still see it
1299        let snapshot = storage.snapshot().await.unwrap();
1300        let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1301        assert!(result.is_some());
1302        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1303
1304        // But durable watermark is still 0
1305        let rx = storage.subscribe_durable();
1306        assert_eq!(*rx.borrow(), 0);
1307    }
1308
1309    #[tokio::test]
1310    async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1311        let storage = InMemoryStorage::new().with_deferred_durability();
1312        let rx = storage.subscribe_durable();
1313
1314        storage
1315            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1316                Bytes::from("k1"),
1317                Bytes::from("v1"),
1318            )))])
1319            .await
1320            .unwrap();
1321        assert_eq!(*rx.borrow(), 0);
1322
1323        storage
1324            .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1325            .await
1326            .unwrap();
1327        assert_eq!(*rx.borrow(), 0);
1328
1329        storage.flush().await.unwrap();
1330        assert_eq!(*rx.borrow(), 2);
1331    }
1332
1333    #[tokio::test]
1334    async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1335        let merge_op = Arc::new(AppendMergeOperator);
1336        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1337        let rx = storage.subscribe_durable();
1338
1339        storage
1340            .merge(vec![
1341                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1342            ])
1343            .await
1344            .unwrap();
1345        assert_eq!(*rx.borrow(), 0);
1346
1347        storage
1348            .merge(vec![
1349                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1350            ])
1351            .await
1352            .unwrap();
1353        assert_eq!(*rx.borrow(), 0);
1354
1355        storage.flush().await.unwrap();
1356        assert_eq!(*rx.borrow(), 2);
1357    }
1358
1359    #[tokio::test]
1360    async fn should_support_multiple_flush_cycles_when_deferred() {
1361        let storage = InMemoryStorage::new().with_deferred_durability();
1362        let rx = storage.subscribe_durable();
1363
1364        // First cycle: write and flush
1365        storage
1366            .put(vec![
1367                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1368            ])
1369            .await
1370            .unwrap();
1371        storage.flush().await.unwrap();
1372        assert_eq!(*rx.borrow(), 1);
1373
1374        // Second cycle: write more and flush again
1375        storage
1376            .put(vec![
1377                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1378            ])
1379            .await
1380            .unwrap();
1381        storage
1382            .put(vec![
1383                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1384            ])
1385            .await
1386            .unwrap();
1387        assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1388
1389        storage.flush().await.unwrap();
1390        assert_eq!(*rx.borrow(), 3);
1391    }
1392
1393    #[tokio::test]
1394    async fn should_flush_on_empty_storage_when_deferred() {
1395        let storage = InMemoryStorage::new().with_deferred_durability();
1396        let rx = storage.subscribe_durable();
1397
1398        // Flushing with no writes should be fine (sends 0 again)
1399        storage.flush().await.unwrap();
1400        assert_eq!(*rx.borrow(), 0);
1401
1402        // flush_to(0) should also be fine
1403        storage.flush_to(0);
1404        assert_eq!(*rx.borrow(), 0);
1405    }
1406
1407    #[tokio::test]
1408    async fn should_defer_durability_across_mixed_write_methods() {
1409        let merge_op = Arc::new(AppendMergeOperator);
1410        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1411        let rx = storage.subscribe_durable();
1412
1413        // put, apply, merge — all should defer
1414        storage
1415            .put(vec![
1416                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1417            ])
1418            .await
1419            .unwrap();
1420        storage
1421            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1422                Bytes::from("k2"),
1423                Bytes::from("v2"),
1424            )))])
1425            .await
1426            .unwrap();
1427        storage
1428            .merge(vec![
1429                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1430            ])
1431            .await
1432            .unwrap();
1433        assert_eq!(*rx.borrow(), 0);
1434
1435        // Partially flush through seqnum 2
1436        storage.flush_to(2);
1437        assert_eq!(*rx.borrow(), 2);
1438
1439        // Flush the rest
1440        storage.flush().await.unwrap();
1441        assert_eq!(*rx.borrow(), 3);
1442    }
1443
1444    #[tokio::test]
1445    async fn should_read_data_written_before_flush_when_deferred() {
1446        let storage = InMemoryStorage::new().with_deferred_durability();
1447
1448        storage
1449            .put(vec![
1450                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1451                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1452            ])
1453            .await
1454            .unwrap();
1455
1456        // get and scan see data even though it is not yet durable
1457        let result = storage.get(Bytes::from("k1")).await.unwrap();
1458        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1459
1460        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1461        assert_eq!(scanned.len(), 2);
1462    }
1463}