Skip to main content

common/storage/
in_memory.rs

1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9    MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl};
13
14/// Trait for providing the current time.
15pub trait Clock: Send + Sync {
16    /// Returns the current time as milliseconds since the Unix epoch.
17    fn now(&self) -> i64;
18}
19
20/// Clock implementation that returns the real system time.
21pub struct WallClock;
22
23impl Clock for WallClock {
24    fn now(&self) -> i64 {
25        std::time::SystemTime::now()
26            .duration_since(std::time::UNIX_EPOCH)
27            .expect("system time before Unix epoch")
28            .as_millis() as i64
29    }
30}
31
32/// Internal wrapper that stores a value alongside its expiration timestamp.
33#[derive(Clone, Debug)]
34struct StoredValue {
35    value: Bytes,
36    /// `None` means the value never expires.
37    expire_ts: Option<i64>,
38}
39
40impl StoredValue {
41    fn is_expired(&self, now: i64) -> bool {
42        self.expire_ts.is_some_and(|ts| now >= ts)
43    }
44}
45
46/// Computes the absolute expiration timestamp from TTL options.
47fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
48    let duration = match ttl {
49        Ttl::Default => default_ttl,
50        Ttl::NoExpiry => None,
51        Ttl::ExpireAfter(ms) => Some(ms),
52    };
53    duration.map(|ms| now + ms as i64)
54}
55
56/// In-memory implementation of the Storage trait using a BTreeMap.
57///
58/// This implementation stores all data in memory and is useful for testing
59/// or scenarios where durability is not required. Supports TTL-based
60/// expiration via a configurable [`Clock`].
61pub struct InMemoryStorage {
62    data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
63    merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
64    clock: Arc<dyn Clock>,
65    default_ttl: Option<u64>,
66    written_seq: std::sync::atomic::AtomicU64,
67    durable_seq: std::sync::atomic::AtomicU64,
68    durable_tx: tokio::sync::watch::Sender<u64>,
69    defer_durability: bool,
70}
71
72impl InMemoryStorage {
73    /// Creates a new InMemoryStorage instance with an empty store.
74    pub fn new() -> Self {
75        let (durable_tx, _) = tokio::sync::watch::channel(0);
76        Self {
77            data: Arc::new(RwLock::new(BTreeMap::new())),
78            merge_operator: None,
79            clock: Arc::new(WallClock),
80            default_ttl: None,
81            written_seq: std::sync::atomic::AtomicU64::new(0),
82            durable_seq: std::sync::atomic::AtomicU64::new(0),
83            durable_tx,
84            defer_durability: false,
85        }
86    }
87
88    /// Creates a new InMemoryStorage instance with an optional merge operator.
89    ///
90    /// If a merge operator is provided, the `merge` method will use it to combine
91    /// existing values with new values. If no merge operator is provided, the
92    /// `merge` method will return an error.
93    pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
94        let (durable_tx, _) = tokio::sync::watch::channel(0);
95        Self {
96            data: Arc::new(RwLock::new(BTreeMap::new())),
97            merge_operator: Some(merge_operator),
98            clock: Arc::new(WallClock),
99            default_ttl: None,
100            written_seq: std::sync::atomic::AtomicU64::new(0),
101            durable_seq: std::sync::atomic::AtomicU64::new(0),
102            durable_tx,
103            defer_durability: false,
104        }
105    }
106
107    /// Sets a custom clock for TTL expiration checks.
108    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
109        self.clock = clock;
110        self
111    }
112
113    /// Sets the default TTL (in milliseconds) for records written with [`Ttl::Default`].
114    pub fn with_default_ttl(mut self, ttl: u64) -> Self {
115        self.default_ttl = Some(ttl);
116        self
117    }
118
119    /// Enables deferred durability mode.
120    ///
121    /// When enabled, writes are not marked as durable until [`flush()`](Storage::flush)
122    /// or [`flush_to()`](Self::flush_to) is called. This is useful for testing
123    /// `read_durable` behavior without depending on SlateDB internals.
124    pub fn with_deferred_durability(mut self) -> Self {
125        self.defer_durability = true;
126        self
127    }
128
129    /// Advances the durable watermark to a specific sequence number.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `seq` is greater than the current written sequence number
134    /// or less than the current durable sequence number.
135    pub fn flush_to(&self, seq: u64) {
136        let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
137        assert!(
138            seq <= written,
139            "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
140        );
141        let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
142        assert!(
143            seq >= durable,
144            "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
145        );
146        self.durable_seq
147            .store(seq, std::sync::atomic::Ordering::Relaxed);
148        let _ = self.durable_tx.send(seq);
149    }
150
151    /// Allocates the next sequence number.
152    ///
153    /// When `defer_durability` is false (default), the sequence number is
154    /// immediately marked as durable. When true, durability is deferred
155    /// until an explicit flush.
156    fn next_seqnum(&self) -> u64 {
157        let seq = self
158            .written_seq
159            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
160            + 1;
161        if !self.defer_durability {
162            self.durable_seq
163                .store(seq, std::sync::atomic::Ordering::Relaxed);
164            let _ = self.durable_tx.send(seq);
165        }
166        seq
167    }
168}
169
170impl Default for InMemoryStorage {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176#[async_trait]
177impl StorageRead for InMemoryStorage {
178    /// Retrieves a single record by key from the in-memory store.
179    ///
180    /// Returns `None` if the key does not exist or has expired.
181    #[tracing::instrument(level = "trace", skip_all)]
182    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
183        let data = self
184            .data
185            .read()
186            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
187
188        match data.get(&key) {
189            Some(stored) if !stored.is_expired(self.clock.now()) => {
190                Ok(Some(Record::new(key, stored.value.clone())))
191            }
192            _ => Ok(None),
193        }
194    }
195
196    #[tracing::instrument(level = "trace", skip_all)]
197    async fn scan_iter(
198        &self,
199        range: BytesRange,
200    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
201        let data = self
202            .data
203            .read()
204            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
205
206        let now = self.clock.now();
207        let records: Vec<Record> = data
208            .range((range.start_bound().cloned(), range.end_bound().cloned()))
209            .filter(|(_, stored)| !stored.is_expired(now))
210            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
211            .collect();
212
213        Ok(Box::new(InMemoryIterator { records, index: 0 }))
214    }
215}
216
217struct InMemoryIterator {
218    records: Vec<Record>,
219    index: usize,
220}
221
222#[async_trait]
223impl StorageIterator for InMemoryIterator {
224    #[tracing::instrument(level = "trace", skip_all)]
225    async fn next(&mut self) -> StorageResult<Option<Record>> {
226        if self.index >= self.records.len() {
227            Ok(None)
228        } else {
229            let record = self.records[self.index].clone();
230            self.index += 1;
231            Ok(Some(record))
232        }
233    }
234}
235
236/// In-memory snapshot that holds a copy of the data at the time of snapshot creation.
237///
238/// Provides a consistent read-only view of the database at the time the snapshot was created.
239/// Expired entries are filtered out at read time using the shared clock.
240pub struct InMemoryStorageSnapshot {
241    data: Arc<BTreeMap<Bytes, StoredValue>>,
242    clock: Arc<dyn Clock>,
243}
244
245#[async_trait]
246impl StorageRead for InMemoryStorageSnapshot {
247    #[tracing::instrument(level = "trace", skip_all)]
248    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
249        match self.data.get(&key) {
250            Some(stored) if !stored.is_expired(self.clock.now()) => {
251                Ok(Some(Record::new(key, stored.value.clone())))
252            }
253            _ => Ok(None),
254        }
255    }
256
257    #[tracing::instrument(level = "trace", skip_all)]
258    async fn scan_iter(
259        &self,
260        range: BytesRange,
261    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
262        let now = self.clock.now();
263        let records: Vec<Record> = self
264            .data
265            .range((range.start_bound().cloned(), range.end_bound().cloned()))
266            .filter(|(_, stored)| !stored.is_expired(now))
267            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
268            .collect();
269
270        Ok(Box::new(InMemoryIterator { records, index: 0 }))
271    }
272}
273
274#[async_trait]
275impl StorageSnapshot for InMemoryStorageSnapshot {}
276
277#[async_trait]
278impl Storage for InMemoryStorage {
279    async fn apply_with_options(
280        &self,
281        records: Vec<RecordOp>,
282        _options: WriteOptions,
283    ) -> StorageResult<WriteResult> {
284        let mut data = self
285            .data
286            .write()
287            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
288
289        let now = self.clock.now();
290        for record in records {
291            match record {
292                RecordOp::Put(op) => {
293                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
294                    data.insert(
295                        op.record.key,
296                        StoredValue {
297                            value: op.record.value,
298                            expire_ts,
299                        },
300                    );
301                }
302                RecordOp::Merge(op) => {
303                    let existing_value = data
304                        .get(&op.record.key)
305                        .filter(|s| !s.is_expired(now))
306                        .map(|s| s.value.clone());
307                    let merged_value = self.merge_operator.as_ref().unwrap().merge_batch(
308                        &op.record.key,
309                        existing_value,
310                        &[op.record.value],
311                    );
312                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
313                    data.insert(
314                        op.record.key,
315                        StoredValue {
316                            value: merged_value,
317                            expire_ts,
318                        },
319                    );
320                }
321                RecordOp::Delete(key) => {
322                    data.remove(&key);
323                }
324            }
325        }
326
327        Ok(WriteResult {
328            seqnum: self.next_seqnum(),
329        })
330    }
331
332    /// Writes a batch of records to the in-memory store.
333    ///
334    /// All records are written atomically within a single write lock acquisition.
335    /// For in-memory storage, write options are ignored since there is no
336    /// durable storage to await.
337    async fn put_with_options(
338        &self,
339        records: Vec<PutRecordOp>,
340        _options: WriteOptions,
341    ) -> StorageResult<WriteResult> {
342        let mut data = self
343            .data
344            .write()
345            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
346
347        let now = self.clock.now();
348        for op in records {
349            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
350            data.insert(
351                op.record.key,
352                StoredValue {
353                    value: op.record.value,
354                    expire_ts,
355                },
356            );
357        }
358
359        Ok(WriteResult {
360            seqnum: self.next_seqnum(),
361        })
362    }
363
364    /// Merges values for the given keys using the configured merge operator.
365    ///
366    /// This method requires a merge operator to be configured during construction.
367    /// For each record, it will:
368    /// 1. Get the existing value (if any), excluding expired entries
369    /// 2. Call the merge operator to combine existing and new values
370    /// 3. Put the merged result back with the new TTL
371    ///
372    /// If no merge operator is configured, this method will return a
373    /// `StorageError::Storage` error.
374    async fn merge_with_options(
375        &self,
376        records: Vec<MergeRecordOp>,
377        _options: WriteOptions,
378    ) -> StorageResult<WriteResult> {
379        let merge_op = self
380            .merge_operator
381            .as_ref()
382            .ok_or_else(|| {
383                StorageError::Storage(
384                    "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
385                )
386            })?;
387
388        let mut data = self
389            .data
390            .write()
391            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
392
393        let now = self.clock.now();
394        for op in records {
395            let existing_value = data
396                .get(&op.record.key)
397                .filter(|s| !s.is_expired(now))
398                .map(|s| s.value.clone());
399            let merged_value =
400                merge_op.merge_batch(&op.record.key, existing_value, &[op.record.value]);
401            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
402            data.insert(
403                op.record.key,
404                StoredValue {
405                    value: merged_value,
406                    expire_ts,
407                },
408            );
409        }
410
411        Ok(WriteResult {
412            seqnum: self.next_seqnum(),
413        })
414    }
415
416    /// Creates a point-in-time snapshot of the in-memory storage.
417    ///
418    /// The snapshot provides a consistent read-only view of the database at the time
419    /// the snapshot was created. Reads from the snapshot will not see any subsequent
420    /// writes to the underlying storage. Expired entries are filtered at read time.
421    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
422        let data = self
423            .data
424            .read()
425            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
426
427        let snapshot_data = Arc::new(data.clone());
428
429        Ok(Arc::new(InMemoryStorageSnapshot {
430            data: snapshot_data,
431            clock: self.clock.clone(),
432        }))
433    }
434
435    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
436        self.durable_tx.subscribe()
437    }
438
439    async fn flush(&self) -> StorageResult<()> {
440        if self.defer_durability {
441            self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
442        }
443        Ok(())
444    }
445}
446
447/// Injected failure that fires either once or on every call.
448#[cfg(feature = "test-utils")]
449#[derive(Clone)]
450enum Failure {
451    /// Error is returned once, then automatically cleared.
452    Once(super::StorageError),
453    /// Error is returned on every subsequent call until explicitly cleared.
454    Persistent(super::StorageError),
455}
456
457#[cfg(feature = "test-utils")]
458type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
459
460/// Checks a [`FailSlot`] and returns an error if one is set.
461///
462/// For [`Failure::Once`], the slot is atomically swapped to `None` so the
463/// error fires exactly once. For [`Failure::Persistent`], the slot is left
464/// unchanged.
465#[cfg(feature = "test-utils")]
466fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
467    let guard = slot.load();
468    match guard.as_ref() {
469        None => Ok(()),
470        Some(Failure::Persistent(err)) => Err(err.clone()),
471        Some(Failure::Once(_)) => {
472            // Swap to None; if another thread raced us, one of them gets the
473            // error and the others pass through — reasonable for tests.
474            let prev = slot.swap(Arc::new(None));
475            match prev.as_ref() {
476                Some(Failure::Once(err)) => Err(err.clone()),
477                _ => Ok(()),
478            }
479        }
480    }
481}
482
483/// A storage wrapper that delegates to an inner [`Storage`] but can inject
484/// failures into `apply`, `put`/`put_with_options`, `flush`, and `snapshot` on demand.
485///
486/// Each failure slot is controlled by a lock-free [`ArcSwap`](arc_swap::ArcSwap).
487/// The read path avoids introducing artificial synchronisation that could mask
488/// concurrency bugs in the code under test.
489///
490/// Failures can be *persistent* (returned on every call until cleared) or
491/// *once* (returned on the next call, then automatically cleared).
492///
493/// Gated behind the `test-utils` feature.
494///
495/// # Example
496///
497/// ```ignore
498/// let inner = Arc::new(InMemoryStorage::new());
499/// let storage = FailingStorage::wrap(inner);
500/// storage.fail_put(StorageError::Storage("disk full".into()));
501/// // every put_with_options call now returns Err(...)
502///
503/// storage.fail_flush_once(StorageError::Storage("io error".into()));
504/// // only the next flush call returns Err(...), then auto-clears
505/// ```
506#[cfg(feature = "test-utils")]
507pub struct FailingStorage {
508    inner: Arc<dyn super::Storage>,
509    fail_apply: FailSlot,
510    fail_put: FailSlot,
511    fail_flush: FailSlot,
512    fail_snapshot: FailSlot,
513}
514
515#[cfg(feature = "test-utils")]
516impl FailingStorage {
517    /// Wraps an existing storage, with all failure injections initially `None`.
518    pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
519        Arc::new(Self {
520            inner,
521            fail_apply: arc_swap::ArcSwap::from_pointee(None),
522            fail_put: arc_swap::ArcSwap::from_pointee(None),
523            fail_flush: arc_swap::ArcSwap::from_pointee(None),
524            fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
525        })
526    }
527
528    /// Makes `apply` return the given error on every subsequent call.
529    pub fn fail_apply(&self, err: super::StorageError) {
530        self.fail_apply
531            .store(Arc::new(Some(Failure::Persistent(err))));
532    }
533
534    /// Makes `apply` return the given error on the next call only.
535    pub fn fail_apply_once(&self, err: super::StorageError) {
536        self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
537    }
538
539    /// Makes `put` and `put_with_options` return the given error on every subsequent call.
540    pub fn fail_put(&self, err: super::StorageError) {
541        self.fail_put
542            .store(Arc::new(Some(Failure::Persistent(err))));
543    }
544
545    /// Makes `put` or `put_with_options` return the given error on the next call only.
546    ///
547    /// The failure fires on whichever of the two methods is called first, then
548    /// automatically clears.
549    pub fn fail_put_once(&self, err: super::StorageError) {
550        self.fail_put.store(Arc::new(Some(Failure::Once(err))));
551    }
552
553    /// Makes `flush` return the given error on every subsequent call.
554    pub fn fail_flush(&self, err: super::StorageError) {
555        self.fail_flush
556            .store(Arc::new(Some(Failure::Persistent(err))));
557    }
558
559    /// Makes `flush` return the given error on the next call only.
560    pub fn fail_flush_once(&self, err: super::StorageError) {
561        self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
562    }
563
564    /// Makes `snapshot` return the given error on every subsequent call.
565    pub fn fail_snapshot(&self, err: super::StorageError) {
566        self.fail_snapshot
567            .store(Arc::new(Some(Failure::Persistent(err))));
568    }
569
570    /// Makes `snapshot` return the given error on the next call only.
571    pub fn fail_snapshot_once(&self, err: super::StorageError) {
572        self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
573    }
574}
575
576#[cfg(feature = "test-utils")]
577#[async_trait]
578impl super::StorageRead for FailingStorage {
579    async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
580        self.inner.get(key).await
581    }
582
583    async fn scan_iter(
584        &self,
585        range: crate::BytesRange,
586    ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
587        self.inner.scan_iter(range).await
588    }
589}
590
591#[cfg(feature = "test-utils")]
592#[async_trait]
593impl super::Storage for FailingStorage {
594    async fn apply_with_options(
595        &self,
596        ops: Vec<super::RecordOp>,
597        options: super::WriteOptions,
598    ) -> super::StorageResult<super::WriteResult> {
599        check_failure(&self.fail_apply)?;
600        self.inner.apply_with_options(ops, options).await
601    }
602
603    async fn put_with_options(
604        &self,
605        records: Vec<super::PutRecordOp>,
606        options: super::WriteOptions,
607    ) -> super::StorageResult<super::WriteResult> {
608        check_failure(&self.fail_put)?;
609        self.inner.put_with_options(records, options).await
610    }
611
612    async fn merge_with_options(
613        &self,
614        records: Vec<super::MergeRecordOp>,
615        options: super::WriteOptions,
616    ) -> super::StorageResult<super::WriteResult> {
617        self.inner.merge_with_options(records, options).await
618    }
619
620    async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
621        check_failure(&self.fail_snapshot)?;
622        self.inner.snapshot().await
623    }
624
625    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
626        self.inner.subscribe_durable()
627    }
628
629    async fn flush(&self) -> super::StorageResult<()> {
630        check_failure(&self.fail_flush)?;
631        self.inner.flush().await
632    }
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use bytes::BytesMut;
639    use std::ops::Bound;
640
641    /// Test merge operator that appends new value to existing value with a separator.
642    struct AppendMergeOperator;
643
644    impl MergeOperator for AppendMergeOperator {
645        fn merge_batch(
646            &self,
647            _key: &Bytes,
648            existing_value: Option<Bytes>,
649            operands: &[Bytes],
650        ) -> Bytes {
651            operands
652                .iter()
653                .fold(existing_value.unwrap_or_default(), |acc, operand| {
654                    let mut result = BytesMut::from(acc);
655                    if !result.is_empty() {
656                        result.extend_from_slice(b",");
657                    }
658                    result.extend_from_slice(operand);
659                    result.freeze()
660                })
661        }
662    }
663
664    #[tokio::test]
665    async fn should_return_none_when_key_not_found() {
666        // given
667        let storage = InMemoryStorage::new();
668
669        // when
670        let result = storage.get(Bytes::from("missing_key")).await;
671
672        // then
673        assert!(result.is_ok());
674        assert!(result.unwrap().is_none());
675    }
676
677    #[tokio::test]
678    async fn should_store_and_retrieve_record() {
679        // given
680        let storage = InMemoryStorage::new();
681        let key = Bytes::from("test_key");
682        let value = Bytes::from("test_value");
683
684        // when
685        storage
686            .put(vec![Record::new(key.clone(), value.clone()).into()])
687            .await
688            .unwrap();
689        let result = storage.get(key).await.unwrap();
690
691        // then
692        assert!(result.is_some());
693        let record = result.unwrap();
694        assert_eq!(record.key, Bytes::from("test_key"));
695        assert_eq!(record.value, value);
696    }
697
698    #[tokio::test]
699    async fn should_overwrite_existing_key() {
700        // given
701        let storage = InMemoryStorage::new();
702        let key = Bytes::from("test_key");
703        let initial_value = Bytes::from("initial_value");
704        let updated_value = Bytes::from("updated_value");
705
706        // when
707        storage
708            .put(vec![Record::new(key.clone(), initial_value).into()])
709            .await
710            .unwrap();
711        storage
712            .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
713            .await
714            .unwrap();
715        let result = storage.get(key).await.unwrap();
716
717        // then
718        assert!(result.is_some());
719        assert_eq!(result.unwrap().value, updated_value);
720    }
721
722    #[tokio::test]
723    async fn should_store_multiple_records() {
724        // given
725        let storage = InMemoryStorage::new();
726        let records = vec![
727            Record::new(Bytes::from("key1"), Bytes::from("value1")),
728            Record::new(Bytes::from("key2"), Bytes::from("value2")),
729            Record::new(Bytes::from("key3"), Bytes::from("value3")),
730        ];
731
732        // when
733        storage
734            .put(records.iter().cloned().map(PutRecordOp::new).collect())
735            .await
736            .unwrap();
737
738        // then
739        for record in records {
740            let retrieved = storage.get(record.key.clone()).await.unwrap();
741            assert!(retrieved.is_some());
742            assert_eq!(retrieved.unwrap().value, record.value);
743        }
744    }
745
746    #[tokio::test]
747    async fn should_scan_all_records_when_unbounded() {
748        // given
749        let storage = InMemoryStorage::new();
750        let records = [
751            Record::new(Bytes::from("a"), Bytes::from("value_a")),
752            Record::new(Bytes::from("b"), Bytes::from("value_b")),
753            Record::new(Bytes::from("c"), Bytes::from("value_c")),
754        ];
755        storage
756            .put(records.iter().cloned().map(PutRecordOp::new).collect())
757            .await
758            .unwrap();
759
760        // when
761        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
762
763        // then
764        assert_eq!(scanned.len(), 3);
765        assert_eq!(scanned[0].key, Bytes::from("a"));
766        assert_eq!(scanned[1].key, Bytes::from("b"));
767        assert_eq!(scanned[2].key, Bytes::from("c"));
768    }
769
770    #[tokio::test]
771    async fn should_scan_records_with_prefix() {
772        // given
773        let storage = InMemoryStorage::new();
774        let records = vec![
775            Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
776            Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
777            Record::new(Bytes::from("other_c"), Bytes::from("value3")),
778        ];
779        storage
780            .put(records.into_iter().map(PutRecordOp::new).collect())
781            .await
782            .unwrap();
783
784        // when
785        let scanned = storage
786            .scan(BytesRange::prefix(Bytes::from("prefix_")))
787            .await
788            .unwrap();
789
790        // then
791        assert_eq!(scanned.len(), 2);
792        assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
793        assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
794    }
795
796    #[tokio::test]
797    async fn should_scan_records_in_bounded_range() {
798        // given
799        let storage = InMemoryStorage::new();
800        let records = vec![
801            Record::new(Bytes::from("a"), Bytes::from("value_a")),
802            Record::new(Bytes::from("b"), Bytes::from("value_b")),
803            Record::new(Bytes::from("c"), Bytes::from("value_c")),
804            Record::new(Bytes::from("d"), Bytes::from("value_d")),
805        ];
806        storage
807            .put(records.into_iter().map(PutRecordOp::new).collect())
808            .await
809            .unwrap();
810
811        // when
812        let range = BytesRange::new(
813            Bound::Included(Bytes::from("b")),
814            Bound::Excluded(Bytes::from("d")),
815        );
816        let scanned = storage.scan(range).await.unwrap();
817
818        // then
819        assert_eq!(scanned.len(), 2);
820        assert_eq!(scanned[0].key, Bytes::from("b"));
821        assert_eq!(scanned[1].key, Bytes::from("c"));
822    }
823
824    #[tokio::test]
825    async fn should_return_empty_vec_when_scanning_empty_storage() {
826        // given
827        let storage = InMemoryStorage::new();
828
829        // when
830        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
831
832        // then
833        assert!(scanned.is_empty());
834    }
835
836    #[tokio::test]
837    async fn should_iterate_over_records() {
838        // given
839        let storage = InMemoryStorage::new();
840        let records = vec![
841            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
842            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
843        ];
844        storage.put(records).await.unwrap();
845
846        // when
847        let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
848        let first = iter.next().await.unwrap();
849        let second = iter.next().await.unwrap();
850        let third = iter.next().await.unwrap();
851
852        // then
853        assert!(first.is_some());
854        assert_eq!(first.unwrap().key, Bytes::from("key1"));
855        assert!(second.is_some());
856        assert_eq!(second.unwrap().key, Bytes::from("key2"));
857        assert!(third.is_none());
858    }
859
860    #[tokio::test]
861    async fn should_create_snapshot_with_current_data() {
862        // given
863        let storage = InMemoryStorage::new();
864        storage
865            .put(vec![
866                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
867            ])
868            .await
869            .unwrap();
870
871        // when
872        let snapshot = storage.snapshot().await.unwrap();
873
874        // then
875        let result = snapshot.get(Bytes::from("key1")).await.unwrap();
876        assert!(result.is_some());
877        assert_eq!(result.unwrap().value, Bytes::from("value1"));
878    }
879
880    #[tokio::test]
881    async fn should_not_see_writes_after_snapshot() {
882        // given
883        let storage = InMemoryStorage::new();
884        storage
885            .put(vec![
886                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
887            ])
888            .await
889            .unwrap();
890
891        // when
892        let snapshot = storage.snapshot().await.unwrap();
893        storage
894            .put(vec![
895                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
896            ])
897            .await
898            .unwrap();
899
900        // then
901        let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
902        assert!(snapshot_result.is_none());
903
904        let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
905        assert!(storage_result.is_some());
906    }
907
908    #[tokio::test]
909    async fn should_scan_snapshot_independently() {
910        // given
911        let storage = InMemoryStorage::new();
912        storage
913            .put(vec![
914                Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
915            ])
916            .await
917            .unwrap();
918
919        // when
920        let snapshot = storage.snapshot().await.unwrap();
921        storage
922            .put(vec![
923                Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
924            ])
925            .await
926            .unwrap();
927
928        // then
929        let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
930        assert_eq!(snapshot_records.len(), 1);
931        assert_eq!(snapshot_records[0].key, Bytes::from("a"));
932
933        let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
934        assert_eq!(storage_records.len(), 2);
935    }
936
937    #[tokio::test]
938    async fn should_handle_empty_record() {
939        // given
940        let storage = InMemoryStorage::new();
941        let key = Bytes::from("empty_key");
942
943        // when
944        storage
945            .put(vec![Record::empty(key.clone()).into()])
946            .await
947            .unwrap();
948        let result = storage.get(key).await.unwrap();
949
950        // then
951        assert!(result.is_some());
952        assert_eq!(result.unwrap().value, Bytes::new());
953    }
954
955    #[tokio::test]
956    async fn should_return_error_when_merge_operator_not_configured() {
957        // given
958        let storage = InMemoryStorage::new();
959        let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
960
961        // when
962        let result = storage.merge(vec![record.into()]).await;
963
964        // then
965        assert!(result.is_err());
966        assert!(
967            result
968                .unwrap_err()
969                .to_string()
970                .contains("Merge operator not configured")
971        );
972    }
973
974    #[tokio::test]
975    async fn should_merge_when_key_does_not_exist() {
976        // given
977        let merge_op = Arc::new(AppendMergeOperator);
978        let storage = InMemoryStorage::with_merge_operator(merge_op);
979        let key = Bytes::from("new_key");
980        let value = Bytes::from("value1");
981
982        // when
983        storage
984            .merge(vec![Record::new(key.clone(), value.clone()).into()])
985            .await
986            .unwrap();
987        let result = storage.get(key).await.unwrap();
988
989        // then
990        assert!(result.is_some());
991        assert_eq!(result.unwrap().value, value);
992    }
993
994    #[tokio::test]
995    async fn should_merge_when_key_exists() {
996        // given
997        let merge_op = Arc::new(AppendMergeOperator);
998        let storage = InMemoryStorage::with_merge_operator(merge_op);
999        let key = Bytes::from("key1");
1000        let initial_value = Bytes::from("value1");
1001        let new_value = Bytes::from("value2");
1002
1003        storage
1004            .put(vec![Record::new(key.clone(), initial_value).into()])
1005            .await
1006            .unwrap();
1007
1008        // when
1009        storage
1010            .merge(vec![Record::new(key.clone(), new_value).into()])
1011            .await
1012            .unwrap();
1013        let result = storage.get(key).await.unwrap();
1014
1015        // then
1016        assert!(result.is_some());
1017        assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1018    }
1019
1020    #[tokio::test]
1021    async fn should_merge_multiple_keys() {
1022        // given
1023        let merge_op = Arc::new(AppendMergeOperator);
1024        let storage = InMemoryStorage::with_merge_operator(merge_op);
1025        let records = vec![
1026            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1027            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1028        ];
1029        storage.put(records).await.unwrap();
1030
1031        // when
1032        storage
1033            .merge(vec![
1034                Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1035                Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1036            ])
1037            .await
1038            .unwrap();
1039
1040        // then
1041        let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1042        assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1043
1044        let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1045        assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1046    }
1047
1048    #[tokio::test]
1049    async fn should_return_monotonically_increasing_seqnums_from_put() {
1050        let storage = InMemoryStorage::new();
1051
1052        let r1 = storage
1053            .put(vec![
1054                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1055            ])
1056            .await
1057            .unwrap();
1058        let r2 = storage
1059            .put(vec![
1060                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1061            ])
1062            .await
1063            .unwrap();
1064        let r3 = storage
1065            .put(vec![
1066                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1067            ])
1068            .await
1069            .unwrap();
1070
1071        assert_eq!(r1.seqnum, 1);
1072        assert_eq!(r2.seqnum, 2);
1073        assert_eq!(r3.seqnum, 3);
1074    }
1075
1076    #[tokio::test]
1077    async fn should_return_monotonically_increasing_seqnums_from_apply() {
1078        let storage = InMemoryStorage::new();
1079
1080        let r1 = storage
1081            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1082                Bytes::from("k1"),
1083                Bytes::from("v1"),
1084            )))])
1085            .await
1086            .unwrap();
1087        let r2 = storage
1088            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1089                Bytes::from("k2"),
1090                Bytes::from("v2"),
1091            )))])
1092            .await
1093            .unwrap();
1094
1095        assert_eq!(r1.seqnum, 1);
1096        assert_eq!(r2.seqnum, 2);
1097    }
1098
1099    #[tokio::test]
1100    async fn should_share_seqnum_counter_across_write_methods() {
1101        let merge_op = Arc::new(AppendMergeOperator);
1102        let storage = InMemoryStorage::with_merge_operator(merge_op);
1103
1104        let r1 = storage
1105            .put(vec![
1106                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1107            ])
1108            .await
1109            .unwrap();
1110        let r2 = storage
1111            .merge(vec![
1112                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1113            ])
1114            .await
1115            .unwrap();
1116        let r3 = storage
1117            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1118                Bytes::from("k2"),
1119                Bytes::from("v3"),
1120            )))])
1121            .await
1122            .unwrap();
1123
1124        assert_eq!(r1.seqnum, 1);
1125        assert_eq!(r2.seqnum, 2);
1126        assert_eq!(r3.seqnum, 3);
1127    }
1128
1129    #[tokio::test]
1130    async fn should_start_durable_subscriber_at_zero() {
1131        let storage = InMemoryStorage::new();
1132        let rx = storage.subscribe_durable();
1133        assert_eq!(*rx.borrow(), 0);
1134    }
1135
1136    #[tokio::test]
1137    async fn should_advance_durable_watermark_on_each_write() {
1138        let storage = InMemoryStorage::new();
1139        let rx = storage.subscribe_durable();
1140
1141        storage
1142            .put(vec![
1143                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1144            ])
1145            .await
1146            .unwrap();
1147        assert_eq!(*rx.borrow(), 1);
1148
1149        storage
1150            .put(vec![
1151                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1152            ])
1153            .await
1154            .unwrap();
1155        assert_eq!(*rx.borrow(), 2);
1156    }
1157
1158    #[tokio::test]
1159    async fn should_merge_empty_values() {
1160        // given
1161        let merge_op = Arc::new(AppendMergeOperator);
1162        let storage = InMemoryStorage::with_merge_operator(merge_op);
1163        let key = Bytes::from("key1");
1164
1165        // when
1166        storage
1167            .merge(vec![Record::empty(key.clone()).into()])
1168            .await
1169            .unwrap();
1170        let result = storage.get(key).await.unwrap();
1171
1172        // then
1173        assert!(result.is_some());
1174        assert_eq!(result.unwrap().value, Bytes::new());
1175    }
1176
1177    #[tokio::test]
1178    async fn should_not_advance_durable_watermark_when_deferred() {
1179        let storage = InMemoryStorage::new().with_deferred_durability();
1180        let rx = storage.subscribe_durable();
1181
1182        storage
1183            .put(vec![
1184                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1185            ])
1186            .await
1187            .unwrap();
1188        assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1189
1190        storage
1191            .put(vec![
1192                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1193            ])
1194            .await
1195            .unwrap();
1196        assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1197    }
1198
1199    #[tokio::test]
1200    async fn should_advance_durable_watermark_on_flush_when_deferred() {
1201        let storage = InMemoryStorage::new().with_deferred_durability();
1202        let rx = storage.subscribe_durable();
1203
1204        storage
1205            .put(vec![
1206                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1207            ])
1208            .await
1209            .unwrap();
1210        storage
1211            .put(vec![
1212                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1213            ])
1214            .await
1215            .unwrap();
1216        assert_eq!(*rx.borrow(), 0);
1217
1218        storage.flush().await.unwrap();
1219        assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1220    }
1221
1222    #[tokio::test]
1223    async fn should_advance_durable_watermark_to_specific_seq() {
1224        let storage = InMemoryStorage::new().with_deferred_durability();
1225        let rx = storage.subscribe_durable();
1226
1227        // Write 3 records
1228        for i in 1..=3 {
1229            storage
1230                .put(vec![
1231                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1232                ])
1233                .await
1234                .unwrap();
1235        }
1236        assert_eq!(*rx.borrow(), 0);
1237
1238        // Partially flush through seqnum 2
1239        storage.flush_to(2);
1240        assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1241
1242        // Flush the rest
1243        storage.flush_to(3);
1244        assert_eq!(*rx.borrow(), 3);
1245    }
1246
1247    #[tokio::test]
1248    #[should_panic(expected = "cannot move durable seqnum backwards")]
1249    async fn should_panic_when_flush_to_moves_backwards() {
1250        let storage = InMemoryStorage::new().with_deferred_durability();
1251        for i in 1..=3 {
1252            storage
1253                .put(vec![
1254                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1255                ])
1256                .await
1257                .unwrap();
1258        }
1259
1260        storage.flush_to(2);
1261        storage.flush_to(1);
1262    }
1263
1264    #[tokio::test]
1265    #[should_panic(expected = "cannot flush beyond written seqnum")]
1266    async fn should_panic_when_flush_to_exceeds_written() {
1267        let storage = InMemoryStorage::new().with_deferred_durability();
1268        storage
1269            .put(vec![
1270                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1271            ])
1272            .await
1273            .unwrap();
1274
1275        storage.flush_to(5);
1276    }
1277
1278    #[tokio::test]
1279    async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1280        let storage = InMemoryStorage::new().with_deferred_durability();
1281
1282        storage
1283            .put(vec![
1284                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1285            ])
1286            .await
1287            .unwrap();
1288
1289        // Data is written but not durable — snapshots should still see it
1290        let snapshot = storage.snapshot().await.unwrap();
1291        let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1292        assert!(result.is_some());
1293        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1294
1295        // But durable watermark is still 0
1296        let rx = storage.subscribe_durable();
1297        assert_eq!(*rx.borrow(), 0);
1298    }
1299
1300    #[tokio::test]
1301    async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1302        let storage = InMemoryStorage::new().with_deferred_durability();
1303        let rx = storage.subscribe_durable();
1304
1305        storage
1306            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1307                Bytes::from("k1"),
1308                Bytes::from("v1"),
1309            )))])
1310            .await
1311            .unwrap();
1312        assert_eq!(*rx.borrow(), 0);
1313
1314        storage
1315            .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1316            .await
1317            .unwrap();
1318        assert_eq!(*rx.borrow(), 0);
1319
1320        storage.flush().await.unwrap();
1321        assert_eq!(*rx.borrow(), 2);
1322    }
1323
1324    #[tokio::test]
1325    async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1326        let merge_op = Arc::new(AppendMergeOperator);
1327        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1328        let rx = storage.subscribe_durable();
1329
1330        storage
1331            .merge(vec![
1332                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1333            ])
1334            .await
1335            .unwrap();
1336        assert_eq!(*rx.borrow(), 0);
1337
1338        storage
1339            .merge(vec![
1340                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1341            ])
1342            .await
1343            .unwrap();
1344        assert_eq!(*rx.borrow(), 0);
1345
1346        storage.flush().await.unwrap();
1347        assert_eq!(*rx.borrow(), 2);
1348    }
1349
1350    #[tokio::test]
1351    async fn should_support_multiple_flush_cycles_when_deferred() {
1352        let storage = InMemoryStorage::new().with_deferred_durability();
1353        let rx = storage.subscribe_durable();
1354
1355        // First cycle: write and flush
1356        storage
1357            .put(vec![
1358                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1359            ])
1360            .await
1361            .unwrap();
1362        storage.flush().await.unwrap();
1363        assert_eq!(*rx.borrow(), 1);
1364
1365        // Second cycle: write more and flush again
1366        storage
1367            .put(vec![
1368                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1369            ])
1370            .await
1371            .unwrap();
1372        storage
1373            .put(vec![
1374                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1375            ])
1376            .await
1377            .unwrap();
1378        assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1379
1380        storage.flush().await.unwrap();
1381        assert_eq!(*rx.borrow(), 3);
1382    }
1383
1384    #[tokio::test]
1385    async fn should_flush_on_empty_storage_when_deferred() {
1386        let storage = InMemoryStorage::new().with_deferred_durability();
1387        let rx = storage.subscribe_durable();
1388
1389        // Flushing with no writes should be fine (sends 0 again)
1390        storage.flush().await.unwrap();
1391        assert_eq!(*rx.borrow(), 0);
1392
1393        // flush_to(0) should also be fine
1394        storage.flush_to(0);
1395        assert_eq!(*rx.borrow(), 0);
1396    }
1397
1398    #[tokio::test]
1399    async fn should_defer_durability_across_mixed_write_methods() {
1400        let merge_op = Arc::new(AppendMergeOperator);
1401        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1402        let rx = storage.subscribe_durable();
1403
1404        // put, apply, merge — all should defer
1405        storage
1406            .put(vec![
1407                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1408            ])
1409            .await
1410            .unwrap();
1411        storage
1412            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1413                Bytes::from("k2"),
1414                Bytes::from("v2"),
1415            )))])
1416            .await
1417            .unwrap();
1418        storage
1419            .merge(vec![
1420                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1421            ])
1422            .await
1423            .unwrap();
1424        assert_eq!(*rx.borrow(), 0);
1425
1426        // Partially flush through seqnum 2
1427        storage.flush_to(2);
1428        assert_eq!(*rx.borrow(), 2);
1429
1430        // Flush the rest
1431        storage.flush().await.unwrap();
1432        assert_eq!(*rx.borrow(), 3);
1433    }
1434
1435    #[tokio::test]
1436    async fn should_read_data_written_before_flush_when_deferred() {
1437        let storage = InMemoryStorage::new().with_deferred_durability();
1438
1439        storage
1440            .put(vec![
1441                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1442                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1443            ])
1444            .await
1445            .unwrap();
1446
1447        // get and scan see data even though it is not yet durable
1448        let result = storage.get(Bytes::from("k1")).await.unwrap();
1449        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1450
1451        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1452        assert_eq!(scanned.len(), 2);
1453    }
1454}