Skip to main content

common/storage/
in_memory.rs

1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9    MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl};
13
14/// Trait for providing the current time.
15pub trait Clock: Send + Sync {
16    /// Returns the current time as milliseconds since the Unix epoch.
17    fn now(&self) -> i64;
18}
19
20/// Clock implementation that returns the real system time.
21pub struct WallClock;
22
23impl Clock for WallClock {
24    fn now(&self) -> i64 {
25        std::time::SystemTime::now()
26            .duration_since(std::time::UNIX_EPOCH)
27            .expect("system time before Unix epoch")
28            .as_millis() as i64
29    }
30}
31
32/// Internal wrapper that stores a value alongside its expiration timestamp.
33#[derive(Clone, Debug)]
34struct StoredValue {
35    value: Bytes,
36    /// `None` means the value never expires.
37    expire_ts: Option<i64>,
38}
39
40impl StoredValue {
41    fn is_expired(&self, now: i64) -> bool {
42        self.expire_ts.is_some_and(|ts| now >= ts)
43    }
44}
45
46/// Computes the absolute expiration timestamp from TTL options.
47fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
48    let duration = match ttl {
49        Ttl::Default => default_ttl,
50        Ttl::NoExpiry => None,
51        Ttl::ExpireAfter(ms) => Some(ms),
52    };
53    duration.map(|ms| now + ms as i64)
54}
55
56/// In-memory implementation of the Storage trait using a BTreeMap.
57///
58/// This implementation stores all data in memory and is useful for testing
59/// or scenarios where durability is not required. Supports TTL-based
60/// expiration via a configurable [`Clock`].
61pub struct InMemoryStorage {
62    data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
63    merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
64    clock: Arc<dyn Clock>,
65    default_ttl: Option<u64>,
66    written_seq: std::sync::atomic::AtomicU64,
67    durable_seq: std::sync::atomic::AtomicU64,
68    durable_tx: tokio::sync::watch::Sender<u64>,
69    defer_durability: bool,
70}
71
72impl InMemoryStorage {
73    /// Creates a new InMemoryStorage instance with an empty store.
74    pub fn new() -> Self {
75        let (durable_tx, _) = tokio::sync::watch::channel(0);
76        Self {
77            data: Arc::new(RwLock::new(BTreeMap::new())),
78            merge_operator: None,
79            clock: Arc::new(WallClock),
80            default_ttl: None,
81            written_seq: std::sync::atomic::AtomicU64::new(0),
82            durable_seq: std::sync::atomic::AtomicU64::new(0),
83            durable_tx,
84            defer_durability: false,
85        }
86    }
87
88    /// Creates a new InMemoryStorage instance with an optional merge operator.
89    ///
90    /// If a merge operator is provided, the `merge` method will use it to combine
91    /// existing values with new values. If no merge operator is provided, the
92    /// `merge` method will return an error.
93    pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
94        let (durable_tx, _) = tokio::sync::watch::channel(0);
95        Self {
96            data: Arc::new(RwLock::new(BTreeMap::new())),
97            merge_operator: Some(merge_operator),
98            clock: Arc::new(WallClock),
99            default_ttl: None,
100            written_seq: std::sync::atomic::AtomicU64::new(0),
101            durable_seq: std::sync::atomic::AtomicU64::new(0),
102            durable_tx,
103            defer_durability: false,
104        }
105    }
106
107    /// Sets a custom clock for TTL expiration checks.
108    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
109        self.clock = clock;
110        self
111    }
112
113    /// Sets the default TTL (in milliseconds) for records written with [`Ttl::Default`].
114    pub fn with_default_ttl(mut self, ttl: u64) -> Self {
115        self.default_ttl = Some(ttl);
116        self
117    }
118
119    /// Enables deferred durability mode.
120    ///
121    /// When enabled, writes are not marked as durable until [`flush()`](Storage::flush)
122    /// or [`flush_to()`](Self::flush_to) is called. This is useful for testing
123    /// `read_durable` behavior without depending on SlateDB internals.
124    pub fn with_deferred_durability(mut self) -> Self {
125        self.defer_durability = true;
126        self
127    }
128
129    /// Advances the durable watermark to a specific sequence number.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `seq` is greater than the current written sequence number
134    /// or less than the current durable sequence number.
135    pub fn flush_to(&self, seq: u64) {
136        let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
137        assert!(
138            seq <= written,
139            "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
140        );
141        let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
142        assert!(
143            seq >= durable,
144            "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
145        );
146        self.durable_seq
147            .store(seq, std::sync::atomic::Ordering::Relaxed);
148        let _ = self.durable_tx.send(seq);
149    }
150
151    /// Allocates the next sequence number.
152    ///
153    /// When `defer_durability` is false (default), the sequence number is
154    /// immediately marked as durable. When true, durability is deferred
155    /// until an explicit flush.
156    fn next_seqnum(&self) -> u64 {
157        let seq = self
158            .written_seq
159            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
160            + 1;
161        if !self.defer_durability {
162            self.durable_seq
163                .store(seq, std::sync::atomic::Ordering::Relaxed);
164            let _ = self.durable_tx.send(seq);
165        }
166        seq
167    }
168}
169
170impl Default for InMemoryStorage {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176#[async_trait]
177impl StorageRead for InMemoryStorage {
178    /// Retrieves a single record by key from the in-memory store.
179    ///
180    /// Returns `None` if the key does not exist or has expired.
181    #[tracing::instrument(level = "trace", skip_all)]
182    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
183        let data = self
184            .data
185            .read()
186            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
187
188        match data.get(&key) {
189            Some(stored) if !stored.is_expired(self.clock.now()) => {
190                Ok(Some(Record::new(key, stored.value.clone())))
191            }
192            _ => Ok(None),
193        }
194    }
195
196    #[tracing::instrument(level = "trace", skip_all)]
197    async fn scan_iter(
198        &self,
199        range: BytesRange,
200    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
201        let data = self
202            .data
203            .read()
204            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
205
206        let now = self.clock.now();
207        let records: Vec<Record> = data
208            .range((range.start_bound().cloned(), range.end_bound().cloned()))
209            .filter(|(_, stored)| !stored.is_expired(now))
210            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
211            .collect();
212
213        Ok(Box::new(InMemoryIterator { records, index: 0 }))
214    }
215}
216
217struct InMemoryIterator {
218    records: Vec<Record>,
219    index: usize,
220}
221
222#[async_trait]
223impl StorageIterator for InMemoryIterator {
224    #[tracing::instrument(level = "trace", skip_all)]
225    async fn next(&mut self) -> StorageResult<Option<Record>> {
226        if self.index >= self.records.len() {
227            Ok(None)
228        } else {
229            let record = self.records[self.index].clone();
230            self.index += 1;
231            Ok(Some(record))
232        }
233    }
234}
235
236/// In-memory snapshot that holds a copy of the data at the time of snapshot creation.
237///
238/// Provides a consistent read-only view of the database at the time the snapshot was created.
239/// Expired entries are filtered out at read time using the shared clock.
240pub struct InMemoryStorageSnapshot {
241    data: Arc<BTreeMap<Bytes, StoredValue>>,
242    clock: Arc<dyn Clock>,
243}
244
245#[async_trait]
246impl StorageRead for InMemoryStorageSnapshot {
247    #[tracing::instrument(level = "trace", skip_all)]
248    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
249        match self.data.get(&key) {
250            Some(stored) if !stored.is_expired(self.clock.now()) => {
251                Ok(Some(Record::new(key, stored.value.clone())))
252            }
253            _ => Ok(None),
254        }
255    }
256
257    #[tracing::instrument(level = "trace", skip_all)]
258    async fn scan_iter(
259        &self,
260        range: BytesRange,
261    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
262        let now = self.clock.now();
263        let records: Vec<Record> = self
264            .data
265            .range((range.start_bound().cloned(), range.end_bound().cloned()))
266            .filter(|(_, stored)| !stored.is_expired(now))
267            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
268            .collect();
269
270        Ok(Box::new(InMemoryIterator { records, index: 0 }))
271    }
272}
273
274#[async_trait]
275impl StorageSnapshot for InMemoryStorageSnapshot {}
276
277#[async_trait]
278impl Storage for InMemoryStorage {
279    async fn apply_with_options(
280        &self,
281        records: Vec<RecordOp>,
282        _options: WriteOptions,
283    ) -> StorageResult<WriteResult> {
284        let mut data = self
285            .data
286            .write()
287            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
288
289        let now = self.clock.now();
290        for record in records {
291            match record {
292                RecordOp::Put(op) => {
293                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
294                    data.insert(
295                        op.record.key,
296                        StoredValue {
297                            value: op.record.value,
298                            expire_ts,
299                        },
300                    );
301                }
302                RecordOp::Merge(op) => {
303                    let existing_value = data
304                        .get(&op.record.key)
305                        .filter(|s| !s.is_expired(now))
306                        .map(|s| s.value.clone());
307                    let merged_value = self.merge_operator.as_ref().unwrap().merge(
308                        &op.record.key,
309                        existing_value,
310                        op.record.value.clone(),
311                    );
312                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
313                    data.insert(
314                        op.record.key,
315                        StoredValue {
316                            value: merged_value,
317                            expire_ts,
318                        },
319                    );
320                }
321                RecordOp::Delete(key) => {
322                    data.remove(&key);
323                }
324            }
325        }
326
327        Ok(WriteResult {
328            seqnum: self.next_seqnum(),
329        })
330    }
331
332    /// Writes a batch of records to the in-memory store.
333    ///
334    /// All records are written atomically within a single write lock acquisition.
335    /// For in-memory storage, write options are ignored since there is no
336    /// durable storage to await.
337    async fn put_with_options(
338        &self,
339        records: Vec<PutRecordOp>,
340        _options: WriteOptions,
341    ) -> StorageResult<WriteResult> {
342        let mut data = self
343            .data
344            .write()
345            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
346
347        let now = self.clock.now();
348        for op in records {
349            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
350            data.insert(
351                op.record.key,
352                StoredValue {
353                    value: op.record.value,
354                    expire_ts,
355                },
356            );
357        }
358
359        Ok(WriteResult {
360            seqnum: self.next_seqnum(),
361        })
362    }
363
364    /// Merges values for the given keys using the configured merge operator.
365    ///
366    /// This method requires a merge operator to be configured during construction.
367    /// For each record, it will:
368    /// 1. Get the existing value (if any), excluding expired entries
369    /// 2. Call the merge operator to combine existing and new values
370    /// 3. Put the merged result back with the new TTL
371    ///
372    /// If no merge operator is configured, this method will return a
373    /// `StorageError::Storage` error.
374    async fn merge_with_options(
375        &self,
376        records: Vec<MergeRecordOp>,
377        _options: WriteOptions,
378    ) -> StorageResult<WriteResult> {
379        let merge_op = self
380            .merge_operator
381            .as_ref()
382            .ok_or_else(|| {
383                StorageError::Storage(
384                    "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
385                )
386            })?;
387
388        let mut data = self
389            .data
390            .write()
391            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
392
393        let now = self.clock.now();
394        for op in records {
395            let existing_value = data
396                .get(&op.record.key)
397                .filter(|s| !s.is_expired(now))
398                .map(|s| s.value.clone());
399            let merged_value =
400                merge_op.merge(&op.record.key, existing_value, op.record.value.clone());
401            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
402            data.insert(
403                op.record.key,
404                StoredValue {
405                    value: merged_value,
406                    expire_ts,
407                },
408            );
409        }
410
411        Ok(WriteResult {
412            seqnum: self.next_seqnum(),
413        })
414    }
415
416    /// Creates a point-in-time snapshot of the in-memory storage.
417    ///
418    /// The snapshot provides a consistent read-only view of the database at the time
419    /// the snapshot was created. Reads from the snapshot will not see any subsequent
420    /// writes to the underlying storage. Expired entries are filtered at read time.
421    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
422        let data = self
423            .data
424            .read()
425            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
426
427        let snapshot_data = Arc::new(data.clone());
428
429        Ok(Arc::new(InMemoryStorageSnapshot {
430            data: snapshot_data,
431            clock: self.clock.clone(),
432        }))
433    }
434
435    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
436        self.durable_tx.subscribe()
437    }
438
439    async fn flush(&self) -> StorageResult<()> {
440        if self.defer_durability {
441            self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
442        }
443        Ok(())
444    }
445
446    async fn close(&self) -> StorageResult<()> {
447        // No-op for in-memory storage
448        Ok(())
449    }
450}
451
452/// Injected failure that fires either once or on every call.
453#[cfg(feature = "test-utils")]
454#[derive(Clone)]
455enum Failure {
456    /// Error is returned once, then automatically cleared.
457    Once(super::StorageError),
458    /// Error is returned on every subsequent call until explicitly cleared.
459    Persistent(super::StorageError),
460}
461
462#[cfg(feature = "test-utils")]
463type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
464
465/// Checks a [`FailSlot`] and returns an error if one is set.
466///
467/// For [`Failure::Once`], the slot is atomically swapped to `None` so the
468/// error fires exactly once. For [`Failure::Persistent`], the slot is left
469/// unchanged.
470#[cfg(feature = "test-utils")]
471fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
472    let guard = slot.load();
473    match guard.as_ref() {
474        None => Ok(()),
475        Some(Failure::Persistent(err)) => Err(err.clone()),
476        Some(Failure::Once(_)) => {
477            // Swap to None; if another thread raced us, one of them gets the
478            // error and the others pass through — reasonable for tests.
479            let prev = slot.swap(Arc::new(None));
480            match prev.as_ref() {
481                Some(Failure::Once(err)) => Err(err.clone()),
482                _ => Ok(()),
483            }
484        }
485    }
486}
487
488/// A storage wrapper that delegates to an inner [`Storage`] but can inject
489/// failures into `apply`, `put`/`put_with_options`, `flush`, and `snapshot` on demand.
490///
491/// Each failure slot is controlled by a lock-free [`ArcSwap`](arc_swap::ArcSwap).
492/// The read path avoids introducing artificial synchronisation that could mask
493/// concurrency bugs in the code under test.
494///
495/// Failures can be *persistent* (returned on every call until cleared) or
496/// *once* (returned on the next call, then automatically cleared).
497///
498/// Gated behind the `test-utils` feature.
499///
500/// # Example
501///
502/// ```ignore
503/// let inner = Arc::new(InMemoryStorage::new());
504/// let storage = FailingStorage::wrap(inner);
505/// storage.fail_put(StorageError::Storage("disk full".into()));
506/// // every put_with_options call now returns Err(...)
507///
508/// storage.fail_flush_once(StorageError::Storage("io error".into()));
509/// // only the next flush call returns Err(...), then auto-clears
510/// ```
511#[cfg(feature = "test-utils")]
512pub struct FailingStorage {
513    inner: Arc<dyn super::Storage>,
514    fail_apply: FailSlot,
515    fail_put: FailSlot,
516    fail_flush: FailSlot,
517    fail_snapshot: FailSlot,
518}
519
520#[cfg(feature = "test-utils")]
521impl FailingStorage {
522    /// Wraps an existing storage, with all failure injections initially `None`.
523    pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
524        Arc::new(Self {
525            inner,
526            fail_apply: arc_swap::ArcSwap::from_pointee(None),
527            fail_put: arc_swap::ArcSwap::from_pointee(None),
528            fail_flush: arc_swap::ArcSwap::from_pointee(None),
529            fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
530        })
531    }
532
533    /// Makes `apply` return the given error on every subsequent call.
534    pub fn fail_apply(&self, err: super::StorageError) {
535        self.fail_apply
536            .store(Arc::new(Some(Failure::Persistent(err))));
537    }
538
539    /// Makes `apply` return the given error on the next call only.
540    pub fn fail_apply_once(&self, err: super::StorageError) {
541        self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
542    }
543
544    /// Makes `put` and `put_with_options` return the given error on every subsequent call.
545    pub fn fail_put(&self, err: super::StorageError) {
546        self.fail_put
547            .store(Arc::new(Some(Failure::Persistent(err))));
548    }
549
550    /// Makes `put` or `put_with_options` return the given error on the next call only.
551    ///
552    /// The failure fires on whichever of the two methods is called first, then
553    /// automatically clears.
554    pub fn fail_put_once(&self, err: super::StorageError) {
555        self.fail_put.store(Arc::new(Some(Failure::Once(err))));
556    }
557
558    /// Makes `flush` return the given error on every subsequent call.
559    pub fn fail_flush(&self, err: super::StorageError) {
560        self.fail_flush
561            .store(Arc::new(Some(Failure::Persistent(err))));
562    }
563
564    /// Makes `flush` return the given error on the next call only.
565    pub fn fail_flush_once(&self, err: super::StorageError) {
566        self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
567    }
568
569    /// Makes `snapshot` return the given error on every subsequent call.
570    pub fn fail_snapshot(&self, err: super::StorageError) {
571        self.fail_snapshot
572            .store(Arc::new(Some(Failure::Persistent(err))));
573    }
574
575    /// Makes `snapshot` return the given error on the next call only.
576    pub fn fail_snapshot_once(&self, err: super::StorageError) {
577        self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
578    }
579}
580
581#[cfg(feature = "test-utils")]
582#[async_trait]
583impl super::StorageRead for FailingStorage {
584    async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
585        self.inner.get(key).await
586    }
587
588    async fn scan_iter(
589        &self,
590        range: crate::BytesRange,
591    ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
592        self.inner.scan_iter(range).await
593    }
594}
595
596#[cfg(feature = "test-utils")]
597#[async_trait]
598impl super::Storage for FailingStorage {
599    async fn apply_with_options(
600        &self,
601        ops: Vec<super::RecordOp>,
602        options: super::WriteOptions,
603    ) -> super::StorageResult<super::WriteResult> {
604        check_failure(&self.fail_apply)?;
605        self.inner.apply_with_options(ops, options).await
606    }
607
608    async fn put_with_options(
609        &self,
610        records: Vec<super::PutRecordOp>,
611        options: super::WriteOptions,
612    ) -> super::StorageResult<super::WriteResult> {
613        check_failure(&self.fail_put)?;
614        self.inner.put_with_options(records, options).await
615    }
616
617    async fn merge_with_options(
618        &self,
619        records: Vec<super::MergeRecordOp>,
620        options: super::WriteOptions,
621    ) -> super::StorageResult<super::WriteResult> {
622        self.inner.merge_with_options(records, options).await
623    }
624
625    async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
626        check_failure(&self.fail_snapshot)?;
627        self.inner.snapshot().await
628    }
629
630    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
631        self.inner.subscribe_durable()
632    }
633
634    async fn flush(&self) -> super::StorageResult<()> {
635        check_failure(&self.fail_flush)?;
636        self.inner.flush().await
637    }
638
639    async fn close(&self) -> super::StorageResult<()> {
640        self.inner.close().await
641    }
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use bytes::BytesMut;
648    use std::ops::Bound;
649
650    /// Test merge operator that appends new value to existing value with a separator.
651    struct AppendMergeOperator;
652
653    impl MergeOperator for AppendMergeOperator {
654        fn merge(&self, _key: &Bytes, existing_value: Option<Bytes>, new_value: Bytes) -> Bytes {
655            match existing_value {
656                Some(existing) => {
657                    let mut result = BytesMut::from(existing);
658                    result.extend_from_slice(b",");
659                    result.extend_from_slice(&new_value);
660                    result.freeze()
661                }
662                None => new_value,
663            }
664        }
665    }
666
667    #[tokio::test]
668    async fn should_return_none_when_key_not_found() {
669        // given
670        let storage = InMemoryStorage::new();
671
672        // when
673        let result = storage.get(Bytes::from("missing_key")).await;
674
675        // then
676        assert!(result.is_ok());
677        assert!(result.unwrap().is_none());
678    }
679
680    #[tokio::test]
681    async fn should_store_and_retrieve_record() {
682        // given
683        let storage = InMemoryStorage::new();
684        let key = Bytes::from("test_key");
685        let value = Bytes::from("test_value");
686
687        // when
688        storage
689            .put(vec![Record::new(key.clone(), value.clone()).into()])
690            .await
691            .unwrap();
692        let result = storage.get(key).await.unwrap();
693
694        // then
695        assert!(result.is_some());
696        let record = result.unwrap();
697        assert_eq!(record.key, Bytes::from("test_key"));
698        assert_eq!(record.value, value);
699    }
700
701    #[tokio::test]
702    async fn should_overwrite_existing_key() {
703        // given
704        let storage = InMemoryStorage::new();
705        let key = Bytes::from("test_key");
706        let initial_value = Bytes::from("initial_value");
707        let updated_value = Bytes::from("updated_value");
708
709        // when
710        storage
711            .put(vec![Record::new(key.clone(), initial_value).into()])
712            .await
713            .unwrap();
714        storage
715            .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
716            .await
717            .unwrap();
718        let result = storage.get(key).await.unwrap();
719
720        // then
721        assert!(result.is_some());
722        assert_eq!(result.unwrap().value, updated_value);
723    }
724
725    #[tokio::test]
726    async fn should_store_multiple_records() {
727        // given
728        let storage = InMemoryStorage::new();
729        let records = vec![
730            Record::new(Bytes::from("key1"), Bytes::from("value1")),
731            Record::new(Bytes::from("key2"), Bytes::from("value2")),
732            Record::new(Bytes::from("key3"), Bytes::from("value3")),
733        ];
734
735        // when
736        storage
737            .put(records.iter().cloned().map(PutRecordOp::new).collect())
738            .await
739            .unwrap();
740
741        // then
742        for record in records {
743            let retrieved = storage.get(record.key.clone()).await.unwrap();
744            assert!(retrieved.is_some());
745            assert_eq!(retrieved.unwrap().value, record.value);
746        }
747    }
748
749    #[tokio::test]
750    async fn should_scan_all_records_when_unbounded() {
751        // given
752        let storage = InMemoryStorage::new();
753        let records = [
754            Record::new(Bytes::from("a"), Bytes::from("value_a")),
755            Record::new(Bytes::from("b"), Bytes::from("value_b")),
756            Record::new(Bytes::from("c"), Bytes::from("value_c")),
757        ];
758        storage
759            .put(records.iter().cloned().map(PutRecordOp::new).collect())
760            .await
761            .unwrap();
762
763        // when
764        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
765
766        // then
767        assert_eq!(scanned.len(), 3);
768        assert_eq!(scanned[0].key, Bytes::from("a"));
769        assert_eq!(scanned[1].key, Bytes::from("b"));
770        assert_eq!(scanned[2].key, Bytes::from("c"));
771    }
772
773    #[tokio::test]
774    async fn should_scan_records_with_prefix() {
775        // given
776        let storage = InMemoryStorage::new();
777        let records = vec![
778            Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
779            Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
780            Record::new(Bytes::from("other_c"), Bytes::from("value3")),
781        ];
782        storage
783            .put(records.into_iter().map(PutRecordOp::new).collect())
784            .await
785            .unwrap();
786
787        // when
788        let scanned = storage
789            .scan(BytesRange::prefix(Bytes::from("prefix_")))
790            .await
791            .unwrap();
792
793        // then
794        assert_eq!(scanned.len(), 2);
795        assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
796        assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
797    }
798
799    #[tokio::test]
800    async fn should_scan_records_in_bounded_range() {
801        // given
802        let storage = InMemoryStorage::new();
803        let records = vec![
804            Record::new(Bytes::from("a"), Bytes::from("value_a")),
805            Record::new(Bytes::from("b"), Bytes::from("value_b")),
806            Record::new(Bytes::from("c"), Bytes::from("value_c")),
807            Record::new(Bytes::from("d"), Bytes::from("value_d")),
808        ];
809        storage
810            .put(records.into_iter().map(PutRecordOp::new).collect())
811            .await
812            .unwrap();
813
814        // when
815        let range = BytesRange::new(
816            Bound::Included(Bytes::from("b")),
817            Bound::Excluded(Bytes::from("d")),
818        );
819        let scanned = storage.scan(range).await.unwrap();
820
821        // then
822        assert_eq!(scanned.len(), 2);
823        assert_eq!(scanned[0].key, Bytes::from("b"));
824        assert_eq!(scanned[1].key, Bytes::from("c"));
825    }
826
827    #[tokio::test]
828    async fn should_return_empty_vec_when_scanning_empty_storage() {
829        // given
830        let storage = InMemoryStorage::new();
831
832        // when
833        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
834
835        // then
836        assert!(scanned.is_empty());
837    }
838
839    #[tokio::test]
840    async fn should_iterate_over_records() {
841        // given
842        let storage = InMemoryStorage::new();
843        let records = vec![
844            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
845            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
846        ];
847        storage.put(records).await.unwrap();
848
849        // when
850        let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
851        let first = iter.next().await.unwrap();
852        let second = iter.next().await.unwrap();
853        let third = iter.next().await.unwrap();
854
855        // then
856        assert!(first.is_some());
857        assert_eq!(first.unwrap().key, Bytes::from("key1"));
858        assert!(second.is_some());
859        assert_eq!(second.unwrap().key, Bytes::from("key2"));
860        assert!(third.is_none());
861    }
862
863    #[tokio::test]
864    async fn should_create_snapshot_with_current_data() {
865        // given
866        let storage = InMemoryStorage::new();
867        storage
868            .put(vec![
869                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
870            ])
871            .await
872            .unwrap();
873
874        // when
875        let snapshot = storage.snapshot().await.unwrap();
876
877        // then
878        let result = snapshot.get(Bytes::from("key1")).await.unwrap();
879        assert!(result.is_some());
880        assert_eq!(result.unwrap().value, Bytes::from("value1"));
881    }
882
883    #[tokio::test]
884    async fn should_not_see_writes_after_snapshot() {
885        // given
886        let storage = InMemoryStorage::new();
887        storage
888            .put(vec![
889                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
890            ])
891            .await
892            .unwrap();
893
894        // when
895        let snapshot = storage.snapshot().await.unwrap();
896        storage
897            .put(vec![
898                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
899            ])
900            .await
901            .unwrap();
902
903        // then
904        let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
905        assert!(snapshot_result.is_none());
906
907        let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
908        assert!(storage_result.is_some());
909    }
910
911    #[tokio::test]
912    async fn should_scan_snapshot_independently() {
913        // given
914        let storage = InMemoryStorage::new();
915        storage
916            .put(vec![
917                Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
918            ])
919            .await
920            .unwrap();
921
922        // when
923        let snapshot = storage.snapshot().await.unwrap();
924        storage
925            .put(vec![
926                Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
927            ])
928            .await
929            .unwrap();
930
931        // then
932        let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
933        assert_eq!(snapshot_records.len(), 1);
934        assert_eq!(snapshot_records[0].key, Bytes::from("a"));
935
936        let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
937        assert_eq!(storage_records.len(), 2);
938    }
939
940    #[tokio::test]
941    async fn should_handle_empty_record() {
942        // given
943        let storage = InMemoryStorage::new();
944        let key = Bytes::from("empty_key");
945
946        // when
947        storage
948            .put(vec![Record::empty(key.clone()).into()])
949            .await
950            .unwrap();
951        let result = storage.get(key).await.unwrap();
952
953        // then
954        assert!(result.is_some());
955        assert_eq!(result.unwrap().value, Bytes::new());
956    }
957
958    #[tokio::test]
959    async fn should_return_error_when_merge_operator_not_configured() {
960        // given
961        let storage = InMemoryStorage::new();
962        let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
963
964        // when
965        let result = storage.merge(vec![record.into()]).await;
966
967        // then
968        assert!(result.is_err());
969        assert!(
970            result
971                .unwrap_err()
972                .to_string()
973                .contains("Merge operator not configured")
974        );
975    }
976
977    #[tokio::test]
978    async fn should_merge_when_key_does_not_exist() {
979        // given
980        let merge_op = Arc::new(AppendMergeOperator);
981        let storage = InMemoryStorage::with_merge_operator(merge_op);
982        let key = Bytes::from("new_key");
983        let value = Bytes::from("value1");
984
985        // when
986        storage
987            .merge(vec![Record::new(key.clone(), value.clone()).into()])
988            .await
989            .unwrap();
990        let result = storage.get(key).await.unwrap();
991
992        // then
993        assert!(result.is_some());
994        assert_eq!(result.unwrap().value, value);
995    }
996
997    #[tokio::test]
998    async fn should_merge_when_key_exists() {
999        // given
1000        let merge_op = Arc::new(AppendMergeOperator);
1001        let storage = InMemoryStorage::with_merge_operator(merge_op);
1002        let key = Bytes::from("key1");
1003        let initial_value = Bytes::from("value1");
1004        let new_value = Bytes::from("value2");
1005
1006        storage
1007            .put(vec![Record::new(key.clone(), initial_value).into()])
1008            .await
1009            .unwrap();
1010
1011        // when
1012        storage
1013            .merge(vec![Record::new(key.clone(), new_value).into()])
1014            .await
1015            .unwrap();
1016        let result = storage.get(key).await.unwrap();
1017
1018        // then
1019        assert!(result.is_some());
1020        assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1021    }
1022
1023    #[tokio::test]
1024    async fn should_merge_multiple_keys() {
1025        // given
1026        let merge_op = Arc::new(AppendMergeOperator);
1027        let storage = InMemoryStorage::with_merge_operator(merge_op);
1028        let records = vec![
1029            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1030            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1031        ];
1032        storage.put(records).await.unwrap();
1033
1034        // when
1035        storage
1036            .merge(vec![
1037                Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1038                Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1039            ])
1040            .await
1041            .unwrap();
1042
1043        // then
1044        let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1045        assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1046
1047        let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1048        assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1049    }
1050
1051    #[tokio::test]
1052    async fn should_return_monotonically_increasing_seqnums_from_put() {
1053        let storage = InMemoryStorage::new();
1054
1055        let r1 = storage
1056            .put(vec![
1057                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1058            ])
1059            .await
1060            .unwrap();
1061        let r2 = storage
1062            .put(vec![
1063                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1064            ])
1065            .await
1066            .unwrap();
1067        let r3 = storage
1068            .put(vec![
1069                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1070            ])
1071            .await
1072            .unwrap();
1073
1074        assert_eq!(r1.seqnum, 1);
1075        assert_eq!(r2.seqnum, 2);
1076        assert_eq!(r3.seqnum, 3);
1077    }
1078
1079    #[tokio::test]
1080    async fn should_return_monotonically_increasing_seqnums_from_apply() {
1081        let storage = InMemoryStorage::new();
1082
1083        let r1 = storage
1084            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1085                Bytes::from("k1"),
1086                Bytes::from("v1"),
1087            )))])
1088            .await
1089            .unwrap();
1090        let r2 = storage
1091            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1092                Bytes::from("k2"),
1093                Bytes::from("v2"),
1094            )))])
1095            .await
1096            .unwrap();
1097
1098        assert_eq!(r1.seqnum, 1);
1099        assert_eq!(r2.seqnum, 2);
1100    }
1101
1102    #[tokio::test]
1103    async fn should_share_seqnum_counter_across_write_methods() {
1104        let merge_op = Arc::new(AppendMergeOperator);
1105        let storage = InMemoryStorage::with_merge_operator(merge_op);
1106
1107        let r1 = storage
1108            .put(vec![
1109                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1110            ])
1111            .await
1112            .unwrap();
1113        let r2 = storage
1114            .merge(vec![
1115                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1116            ])
1117            .await
1118            .unwrap();
1119        let r3 = storage
1120            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1121                Bytes::from("k2"),
1122                Bytes::from("v3"),
1123            )))])
1124            .await
1125            .unwrap();
1126
1127        assert_eq!(r1.seqnum, 1);
1128        assert_eq!(r2.seqnum, 2);
1129        assert_eq!(r3.seqnum, 3);
1130    }
1131
1132    #[tokio::test]
1133    async fn should_start_durable_subscriber_at_zero() {
1134        let storage = InMemoryStorage::new();
1135        let rx = storage.subscribe_durable();
1136        assert_eq!(*rx.borrow(), 0);
1137    }
1138
1139    #[tokio::test]
1140    async fn should_advance_durable_watermark_on_each_write() {
1141        let storage = InMemoryStorage::new();
1142        let rx = storage.subscribe_durable();
1143
1144        storage
1145            .put(vec![
1146                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1147            ])
1148            .await
1149            .unwrap();
1150        assert_eq!(*rx.borrow(), 1);
1151
1152        storage
1153            .put(vec![
1154                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1155            ])
1156            .await
1157            .unwrap();
1158        assert_eq!(*rx.borrow(), 2);
1159    }
1160
1161    #[tokio::test]
1162    async fn should_merge_empty_values() {
1163        // given
1164        let merge_op = Arc::new(AppendMergeOperator);
1165        let storage = InMemoryStorage::with_merge_operator(merge_op);
1166        let key = Bytes::from("key1");
1167
1168        // when
1169        storage
1170            .merge(vec![Record::empty(key.clone()).into()])
1171            .await
1172            .unwrap();
1173        let result = storage.get(key).await.unwrap();
1174
1175        // then
1176        assert!(result.is_some());
1177        assert_eq!(result.unwrap().value, Bytes::new());
1178    }
1179
1180    #[tokio::test]
1181    async fn should_not_advance_durable_watermark_when_deferred() {
1182        let storage = InMemoryStorage::new().with_deferred_durability();
1183        let rx = storage.subscribe_durable();
1184
1185        storage
1186            .put(vec![
1187                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1188            ])
1189            .await
1190            .unwrap();
1191        assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1192
1193        storage
1194            .put(vec![
1195                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1196            ])
1197            .await
1198            .unwrap();
1199        assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1200    }
1201
1202    #[tokio::test]
1203    async fn should_advance_durable_watermark_on_flush_when_deferred() {
1204        let storage = InMemoryStorage::new().with_deferred_durability();
1205        let rx = storage.subscribe_durable();
1206
1207        storage
1208            .put(vec![
1209                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1210            ])
1211            .await
1212            .unwrap();
1213        storage
1214            .put(vec![
1215                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1216            ])
1217            .await
1218            .unwrap();
1219        assert_eq!(*rx.borrow(), 0);
1220
1221        storage.flush().await.unwrap();
1222        assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1223    }
1224
1225    #[tokio::test]
1226    async fn should_advance_durable_watermark_to_specific_seq() {
1227        let storage = InMemoryStorage::new().with_deferred_durability();
1228        let rx = storage.subscribe_durable();
1229
1230        // Write 3 records
1231        for i in 1..=3 {
1232            storage
1233                .put(vec![
1234                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1235                ])
1236                .await
1237                .unwrap();
1238        }
1239        assert_eq!(*rx.borrow(), 0);
1240
1241        // Partially flush through seqnum 2
1242        storage.flush_to(2);
1243        assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1244
1245        // Flush the rest
1246        storage.flush_to(3);
1247        assert_eq!(*rx.borrow(), 3);
1248    }
1249
1250    #[tokio::test]
1251    #[should_panic(expected = "cannot move durable seqnum backwards")]
1252    async fn should_panic_when_flush_to_moves_backwards() {
1253        let storage = InMemoryStorage::new().with_deferred_durability();
1254        for i in 1..=3 {
1255            storage
1256                .put(vec![
1257                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1258                ])
1259                .await
1260                .unwrap();
1261        }
1262
1263        storage.flush_to(2);
1264        storage.flush_to(1);
1265    }
1266
1267    #[tokio::test]
1268    #[should_panic(expected = "cannot flush beyond written seqnum")]
1269    async fn should_panic_when_flush_to_exceeds_written() {
1270        let storage = InMemoryStorage::new().with_deferred_durability();
1271        storage
1272            .put(vec![
1273                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1274            ])
1275            .await
1276            .unwrap();
1277
1278        storage.flush_to(5);
1279    }
1280
1281    #[tokio::test]
1282    async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1283        let storage = InMemoryStorage::new().with_deferred_durability();
1284
1285        storage
1286            .put(vec![
1287                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1288            ])
1289            .await
1290            .unwrap();
1291
1292        // Data is written but not durable — snapshots should still see it
1293        let snapshot = storage.snapshot().await.unwrap();
1294        let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1295        assert!(result.is_some());
1296        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1297
1298        // But durable watermark is still 0
1299        let rx = storage.subscribe_durable();
1300        assert_eq!(*rx.borrow(), 0);
1301    }
1302
1303    #[tokio::test]
1304    async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1305        let storage = InMemoryStorage::new().with_deferred_durability();
1306        let rx = storage.subscribe_durable();
1307
1308        storage
1309            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1310                Bytes::from("k1"),
1311                Bytes::from("v1"),
1312            )))])
1313            .await
1314            .unwrap();
1315        assert_eq!(*rx.borrow(), 0);
1316
1317        storage
1318            .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1319            .await
1320            .unwrap();
1321        assert_eq!(*rx.borrow(), 0);
1322
1323        storage.flush().await.unwrap();
1324        assert_eq!(*rx.borrow(), 2);
1325    }
1326
1327    #[tokio::test]
1328    async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1329        let merge_op = Arc::new(AppendMergeOperator);
1330        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1331        let rx = storage.subscribe_durable();
1332
1333        storage
1334            .merge(vec![
1335                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1336            ])
1337            .await
1338            .unwrap();
1339        assert_eq!(*rx.borrow(), 0);
1340
1341        storage
1342            .merge(vec![
1343                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1344            ])
1345            .await
1346            .unwrap();
1347        assert_eq!(*rx.borrow(), 0);
1348
1349        storage.flush().await.unwrap();
1350        assert_eq!(*rx.borrow(), 2);
1351    }
1352
1353    #[tokio::test]
1354    async fn should_support_multiple_flush_cycles_when_deferred() {
1355        let storage = InMemoryStorage::new().with_deferred_durability();
1356        let rx = storage.subscribe_durable();
1357
1358        // First cycle: write and flush
1359        storage
1360            .put(vec![
1361                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1362            ])
1363            .await
1364            .unwrap();
1365        storage.flush().await.unwrap();
1366        assert_eq!(*rx.borrow(), 1);
1367
1368        // Second cycle: write more and flush again
1369        storage
1370            .put(vec![
1371                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1372            ])
1373            .await
1374            .unwrap();
1375        storage
1376            .put(vec![
1377                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1378            ])
1379            .await
1380            .unwrap();
1381        assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1382
1383        storage.flush().await.unwrap();
1384        assert_eq!(*rx.borrow(), 3);
1385    }
1386
1387    #[tokio::test]
1388    async fn should_flush_on_empty_storage_when_deferred() {
1389        let storage = InMemoryStorage::new().with_deferred_durability();
1390        let rx = storage.subscribe_durable();
1391
1392        // Flushing with no writes should be fine (sends 0 again)
1393        storage.flush().await.unwrap();
1394        assert_eq!(*rx.borrow(), 0);
1395
1396        // flush_to(0) should also be fine
1397        storage.flush_to(0);
1398        assert_eq!(*rx.borrow(), 0);
1399    }
1400
1401    #[tokio::test]
1402    async fn should_defer_durability_across_mixed_write_methods() {
1403        let merge_op = Arc::new(AppendMergeOperator);
1404        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1405        let rx = storage.subscribe_durable();
1406
1407        // put, apply, merge — all should defer
1408        storage
1409            .put(vec![
1410                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1411            ])
1412            .await
1413            .unwrap();
1414        storage
1415            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1416                Bytes::from("k2"),
1417                Bytes::from("v2"),
1418            )))])
1419            .await
1420            .unwrap();
1421        storage
1422            .merge(vec![
1423                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1424            ])
1425            .await
1426            .unwrap();
1427        assert_eq!(*rx.borrow(), 0);
1428
1429        // Partially flush through seqnum 2
1430        storage.flush_to(2);
1431        assert_eq!(*rx.borrow(), 2);
1432
1433        // Flush the rest
1434        storage.flush().await.unwrap();
1435        assert_eq!(*rx.borrow(), 3);
1436    }
1437
1438    #[tokio::test]
1439    async fn should_read_data_written_before_flush_when_deferred() {
1440        let storage = InMemoryStorage::new().with_deferred_durability();
1441
1442        storage
1443            .put(vec![
1444                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1445                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1446            ])
1447            .await
1448            .unwrap();
1449
1450        // get and scan see data even though it is not yet durable
1451        let result = storage.get(Bytes::from("k1")).await.unwrap();
1452        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1453
1454        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1455        assert_eq!(scanned.len(), 2);
1456    }
1457}