Skip to main content

common/storage/
in_memory.rs

1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9    MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{
13    BytesRange, CheckpointInfo, Record, StorageError, StorageIterator, StorageRead, StorageResult,
14    Ttl,
15};
16
17/// Trait for providing the current time.
18pub trait Clock: Send + Sync {
19    /// Returns the current time as milliseconds since the Unix epoch.
20    fn now(&self) -> i64;
21}
22
23/// Clock implementation that returns the real system time.
24pub struct WallClock;
25
26impl Clock for WallClock {
27    fn now(&self) -> i64 {
28        std::time::SystemTime::now()
29            .duration_since(std::time::UNIX_EPOCH)
30            .expect("system time before Unix epoch")
31            .as_millis() as i64
32    }
33}
34
35/// Internal wrapper that stores a value alongside its expiration timestamp.
36#[derive(Clone, Debug)]
37struct StoredValue {
38    value: Bytes,
39    /// `None` means the value never expires.
40    expire_ts: Option<i64>,
41}
42
43impl StoredValue {
44    fn is_expired(&self, now: i64) -> bool {
45        self.expire_ts.is_some_and(|ts| now >= ts)
46    }
47}
48
49/// Computes the absolute expiration timestamp from TTL options.
50fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
51    match ttl {
52        Ttl::Default => default_ttl.map(|ms| now + ms as i64),
53        Ttl::NoExpiry => None,
54        Ttl::ExpireAfter(ms) => Some(now + ms as i64),
55        Ttl::ExpireAt(ts) => Some(ts),
56    }
57}
58
59/// In-memory implementation of the Storage trait using a BTreeMap.
60///
61/// This implementation stores all data in memory and is useful for testing
62/// or scenarios where durability is not required. Supports TTL-based
63/// expiration via a configurable [`Clock`].
64pub struct InMemoryStorage {
65    data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
66    merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
67    clock: Arc<dyn Clock>,
68    default_ttl: Option<u64>,
69    written_seq: std::sync::atomic::AtomicU64,
70    durable_seq: std::sync::atomic::AtomicU64,
71    durable_tx: tokio::sync::watch::Sender<u64>,
72    defer_durability: bool,
73}
74
75impl InMemoryStorage {
76    /// Creates a new InMemoryStorage instance with an empty store.
77    pub fn new() -> Self {
78        let (durable_tx, _) = tokio::sync::watch::channel(0);
79        Self {
80            data: Arc::new(RwLock::new(BTreeMap::new())),
81            merge_operator: None,
82            clock: Arc::new(WallClock),
83            default_ttl: None,
84            written_seq: std::sync::atomic::AtomicU64::new(0),
85            durable_seq: std::sync::atomic::AtomicU64::new(0),
86            durable_tx,
87            defer_durability: false,
88        }
89    }
90
91    /// Creates a new InMemoryStorage instance with an optional merge operator.
92    ///
93    /// If a merge operator is provided, the `merge` method will use it to combine
94    /// existing values with new values. If no merge operator is provided, the
95    /// `merge` method will return an error.
96    pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
97        let (durable_tx, _) = tokio::sync::watch::channel(0);
98        Self {
99            data: Arc::new(RwLock::new(BTreeMap::new())),
100            merge_operator: Some(merge_operator),
101            clock: Arc::new(WallClock),
102            default_ttl: None,
103            written_seq: std::sync::atomic::AtomicU64::new(0),
104            durable_seq: std::sync::atomic::AtomicU64::new(0),
105            durable_tx,
106            defer_durability: false,
107        }
108    }
109
110    /// Sets a custom clock for TTL expiration checks.
111    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
112        self.clock = clock;
113        self
114    }
115
116    /// Sets the default TTL (in milliseconds) for records written with [`Ttl::Default`].
117    pub fn with_default_ttl(mut self, ttl: u64) -> Self {
118        self.default_ttl = Some(ttl);
119        self
120    }
121
122    /// Enables deferred durability mode.
123    ///
124    /// When enabled, writes are not marked as durable until [`flush()`](Storage::flush)
125    /// or [`flush_to()`](Self::flush_to) is called. This is useful for testing
126    /// `read_durable` behavior without depending on SlateDB internals.
127    pub fn with_deferred_durability(mut self) -> Self {
128        self.defer_durability = true;
129        self
130    }
131
132    /// Advances the durable watermark to a specific sequence number.
133    ///
134    /// # Panics
135    ///
136    /// Panics if `seq` is greater than the current written sequence number
137    /// or less than the current durable sequence number.
138    pub fn flush_to(&self, seq: u64) {
139        let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
140        assert!(
141            seq <= written,
142            "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
143        );
144        let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
145        assert!(
146            seq >= durable,
147            "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
148        );
149        self.durable_seq
150            .store(seq, std::sync::atomic::Ordering::Relaxed);
151        let _ = self.durable_tx.send(seq);
152    }
153
154    /// Allocates the next sequence number.
155    ///
156    /// When `defer_durability` is false (default), the sequence number is
157    /// immediately marked as durable. When true, durability is deferred
158    /// until an explicit flush.
159    fn next_seqnum(&self) -> u64 {
160        let seq = self
161            .written_seq
162            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
163            + 1;
164        if !self.defer_durability {
165            self.durable_seq
166                .store(seq, std::sync::atomic::Ordering::Relaxed);
167            let _ = self.durable_tx.send(seq);
168        }
169        seq
170    }
171}
172
173impl Default for InMemoryStorage {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179#[async_trait]
180impl StorageRead for InMemoryStorage {
181    /// Retrieves a single record by key from the in-memory store.
182    ///
183    /// Returns `None` if the key does not exist or has expired.
184    #[tracing::instrument(level = "trace", skip_all)]
185    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
186        let data = self
187            .data
188            .read()
189            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
190
191        match data.get(&key) {
192            Some(stored) if !stored.is_expired(self.clock.now()) => {
193                Ok(Some(Record::new(key, stored.value.clone())))
194            }
195            _ => Ok(None),
196        }
197    }
198
199    #[tracing::instrument(level = "trace", skip_all)]
200    async fn scan_iter(
201        &self,
202        range: BytesRange,
203    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
204        let data = self
205            .data
206            .read()
207            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
208
209        let now = self.clock.now();
210        let records: Vec<Record> = data
211            .range((range.start_bound().cloned(), range.end_bound().cloned()))
212            .filter(|(_, stored)| !stored.is_expired(now))
213            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
214            .collect();
215
216        Ok(Box::new(InMemoryIterator { records, index: 0 }))
217    }
218}
219
220struct InMemoryIterator {
221    records: Vec<Record>,
222    index: usize,
223}
224
225#[async_trait]
226impl StorageIterator for InMemoryIterator {
227    #[tracing::instrument(level = "trace", skip_all)]
228    async fn next(&mut self) -> StorageResult<Option<Record>> {
229        if self.index >= self.records.len() {
230            Ok(None)
231        } else {
232            let record = self.records[self.index].clone();
233            self.index += 1;
234            Ok(Some(record))
235        }
236    }
237}
238
239/// In-memory snapshot that holds a copy of the data at the time of snapshot creation.
240///
241/// Provides a consistent read-only view of the database at the time the snapshot was created.
242/// Expired entries are filtered out at read time using the shared clock.
243pub struct InMemoryStorageSnapshot {
244    data: Arc<BTreeMap<Bytes, StoredValue>>,
245    clock: Arc<dyn Clock>,
246}
247
248#[async_trait]
249impl StorageRead for InMemoryStorageSnapshot {
250    #[tracing::instrument(level = "trace", skip_all)]
251    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
252        match self.data.get(&key) {
253            Some(stored) if !stored.is_expired(self.clock.now()) => {
254                Ok(Some(Record::new(key, stored.value.clone())))
255            }
256            _ => Ok(None),
257        }
258    }
259
260    #[tracing::instrument(level = "trace", skip_all)]
261    async fn scan_iter(
262        &self,
263        range: BytesRange,
264    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
265        let now = self.clock.now();
266        let records: Vec<Record> = self
267            .data
268            .range((range.start_bound().cloned(), range.end_bound().cloned()))
269            .filter(|(_, stored)| !stored.is_expired(now))
270            .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
271            .collect();
272
273        Ok(Box::new(InMemoryIterator { records, index: 0 }))
274    }
275}
276
277#[async_trait]
278impl StorageSnapshot for InMemoryStorageSnapshot {}
279
280#[async_trait]
281impl Storage for InMemoryStorage {
282    async fn apply_with_options(
283        &self,
284        records: Vec<RecordOp>,
285        _options: WriteOptions,
286    ) -> StorageResult<WriteResult> {
287        let mut data = self
288            .data
289            .write()
290            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
291
292        let now = self.clock.now();
293        for record in records {
294            match record {
295                RecordOp::Put(op) => {
296                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
297                    data.insert(
298                        op.record.key,
299                        StoredValue {
300                            value: op.record.value,
301                            expire_ts,
302                        },
303                    );
304                }
305                RecordOp::Merge(op) => {
306                    let existing_value = data
307                        .get(&op.record.key)
308                        .filter(|s| !s.is_expired(now))
309                        .map(|s| s.value.clone());
310                    let merged_value = self.merge_operator.as_ref().unwrap().merge_batch(
311                        &op.record.key,
312                        existing_value,
313                        &[op.record.value],
314                    );
315                    let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
316                    data.insert(
317                        op.record.key,
318                        StoredValue {
319                            value: merged_value,
320                            expire_ts,
321                        },
322                    );
323                }
324                RecordOp::Delete(key) => {
325                    data.remove(&key);
326                }
327            }
328        }
329
330        Ok(WriteResult {
331            seqnum: self.next_seqnum(),
332        })
333    }
334
335    /// Writes a batch of records to the in-memory store.
336    ///
337    /// All records are written atomically within a single write lock acquisition.
338    /// For in-memory storage, write options are ignored since there is no
339    /// durable storage to await.
340    async fn put_with_options(
341        &self,
342        records: Vec<PutRecordOp>,
343        _options: WriteOptions,
344    ) -> StorageResult<WriteResult> {
345        let mut data = self
346            .data
347            .write()
348            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
349
350        let now = self.clock.now();
351        for op in records {
352            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
353            data.insert(
354                op.record.key,
355                StoredValue {
356                    value: op.record.value,
357                    expire_ts,
358                },
359            );
360        }
361
362        Ok(WriteResult {
363            seqnum: self.next_seqnum(),
364        })
365    }
366
367    /// Merges values for the given keys using the configured merge operator.
368    ///
369    /// This method requires a merge operator to be configured during construction.
370    /// For each record, it will:
371    /// 1. Get the existing value (if any), excluding expired entries
372    /// 2. Call the merge operator to combine existing and new values
373    /// 3. Put the merged result back with the new TTL
374    ///
375    /// If no merge operator is configured, this method will return a
376    /// `StorageError::Storage` error.
377    async fn merge_with_options(
378        &self,
379        records: Vec<MergeRecordOp>,
380        _options: WriteOptions,
381    ) -> StorageResult<WriteResult> {
382        let merge_op = self
383            .merge_operator
384            .as_ref()
385            .ok_or_else(|| {
386                StorageError::Storage(
387                    "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
388                )
389            })?;
390
391        let mut data = self
392            .data
393            .write()
394            .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
395
396        let now = self.clock.now();
397        for op in records {
398            let existing_value = data
399                .get(&op.record.key)
400                .filter(|s| !s.is_expired(now))
401                .map(|s| s.value.clone());
402            let merged_value =
403                merge_op.merge_batch(&op.record.key, existing_value, &[op.record.value]);
404            let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
405            data.insert(
406                op.record.key,
407                StoredValue {
408                    value: merged_value,
409                    expire_ts,
410                },
411            );
412        }
413
414        Ok(WriteResult {
415            seqnum: self.next_seqnum(),
416        })
417    }
418
419    /// Creates a point-in-time snapshot of the in-memory storage.
420    ///
421    /// The snapshot provides a consistent read-only view of the database at the time
422    /// the snapshot was created. Reads from the snapshot will not see any subsequent
423    /// writes to the underlying storage. Expired entries are filtered at read time.
424    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
425        let data = self
426            .data
427            .read()
428            .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
429
430        let snapshot_data = Arc::new(data.clone());
431
432        Ok(Arc::new(InMemoryStorageSnapshot {
433            data: snapshot_data,
434            clock: self.clock.clone(),
435        }))
436    }
437
438    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
439        self.durable_tx.subscribe()
440    }
441
442    async fn flush(&self) -> StorageResult<()> {
443        if self.defer_durability {
444            self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
445        }
446        Ok(())
447    }
448
449    async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo> {
450        Err(StorageError::Storage(
451            "checkpoints are not supported for in-memory storage".to_string(),
452        ))
453    }
454}
455
456/// Injected failure that fires either once or on every call.
457#[cfg(feature = "test-utils")]
458#[derive(Clone)]
459enum Failure {
460    /// Error is returned once, then automatically cleared.
461    Once(super::StorageError),
462    /// Error is returned on every subsequent call until explicitly cleared.
463    Persistent(super::StorageError),
464}
465
466#[cfg(feature = "test-utils")]
467type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
468
469/// Checks a [`FailSlot`] and returns an error if one is set.
470///
471/// For [`Failure::Once`], the slot is atomically swapped to `None` so the
472/// error fires exactly once. For [`Failure::Persistent`], the slot is left
473/// unchanged.
474#[cfg(feature = "test-utils")]
475fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
476    let guard = slot.load();
477    match guard.as_ref() {
478        None => Ok(()),
479        Some(Failure::Persistent(err)) => Err(err.clone()),
480        Some(Failure::Once(_)) => {
481            // Swap to None; if another thread raced us, one of them gets the
482            // error and the others pass through — reasonable for tests.
483            let prev = slot.swap(Arc::new(None));
484            match prev.as_ref() {
485                Some(Failure::Once(err)) => Err(err.clone()),
486                _ => Ok(()),
487            }
488        }
489    }
490}
491
492/// A storage wrapper that delegates to an inner [`Storage`] but can inject
493/// failures into `apply`, `put`/`put_with_options`, `flush`, and `snapshot` on demand.
494///
495/// Each failure slot is controlled by a lock-free [`ArcSwap`](arc_swap::ArcSwap).
496/// The read path avoids introducing artificial synchronisation that could mask
497/// concurrency bugs in the code under test.
498///
499/// Failures can be *persistent* (returned on every call until cleared) or
500/// *once* (returned on the next call, then automatically cleared).
501///
502/// Gated behind the `test-utils` feature.
503///
504/// # Example
505///
506/// ```ignore
507/// let inner = Arc::new(InMemoryStorage::new());
508/// let storage = FailingStorage::wrap(inner);
509/// storage.fail_put(StorageError::Storage("disk full".into()));
510/// // every put_with_options call now returns Err(...)
511///
512/// storage.fail_flush_once(StorageError::Storage("io error".into()));
513/// // only the next flush call returns Err(...), then auto-clears
514/// ```
515#[cfg(feature = "test-utils")]
516pub struct FailingStorage {
517    inner: Arc<dyn super::Storage>,
518    fail_apply: FailSlot,
519    fail_put: FailSlot,
520    fail_flush: FailSlot,
521    fail_snapshot: FailSlot,
522}
523
524#[cfg(feature = "test-utils")]
525impl FailingStorage {
526    /// Wraps an existing storage, with all failure injections initially `None`.
527    pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
528        Arc::new(Self {
529            inner,
530            fail_apply: arc_swap::ArcSwap::from_pointee(None),
531            fail_put: arc_swap::ArcSwap::from_pointee(None),
532            fail_flush: arc_swap::ArcSwap::from_pointee(None),
533            fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
534        })
535    }
536
537    /// Makes `apply` return the given error on every subsequent call.
538    pub fn fail_apply(&self, err: super::StorageError) {
539        self.fail_apply
540            .store(Arc::new(Some(Failure::Persistent(err))));
541    }
542
543    /// Makes `apply` return the given error on the next call only.
544    pub fn fail_apply_once(&self, err: super::StorageError) {
545        self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
546    }
547
548    /// Makes `put` and `put_with_options` return the given error on every subsequent call.
549    pub fn fail_put(&self, err: super::StorageError) {
550        self.fail_put
551            .store(Arc::new(Some(Failure::Persistent(err))));
552    }
553
554    /// Makes `put` or `put_with_options` return the given error on the next call only.
555    ///
556    /// The failure fires on whichever of the two methods is called first, then
557    /// automatically clears.
558    pub fn fail_put_once(&self, err: super::StorageError) {
559        self.fail_put.store(Arc::new(Some(Failure::Once(err))));
560    }
561
562    /// Makes `flush` return the given error on every subsequent call.
563    pub fn fail_flush(&self, err: super::StorageError) {
564        self.fail_flush
565            .store(Arc::new(Some(Failure::Persistent(err))));
566    }
567
568    /// Makes `flush` return the given error on the next call only.
569    pub fn fail_flush_once(&self, err: super::StorageError) {
570        self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
571    }
572
573    /// Makes `snapshot` return the given error on every subsequent call.
574    pub fn fail_snapshot(&self, err: super::StorageError) {
575        self.fail_snapshot
576            .store(Arc::new(Some(Failure::Persistent(err))));
577    }
578
579    /// Makes `snapshot` return the given error on the next call only.
580    pub fn fail_snapshot_once(&self, err: super::StorageError) {
581        self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
582    }
583}
584
585#[cfg(feature = "test-utils")]
586#[async_trait]
587impl super::StorageRead for FailingStorage {
588    async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
589        self.inner.get(key).await
590    }
591
592    async fn scan_iter(
593        &self,
594        range: crate::BytesRange,
595    ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
596        self.inner.scan_iter(range).await
597    }
598}
599
600#[cfg(feature = "test-utils")]
601#[async_trait]
602impl super::Storage for FailingStorage {
603    async fn apply_with_options(
604        &self,
605        ops: Vec<super::RecordOp>,
606        options: super::WriteOptions,
607    ) -> super::StorageResult<super::WriteResult> {
608        check_failure(&self.fail_apply)?;
609        self.inner.apply_with_options(ops, options).await
610    }
611
612    async fn put_with_options(
613        &self,
614        records: Vec<super::PutRecordOp>,
615        options: super::WriteOptions,
616    ) -> super::StorageResult<super::WriteResult> {
617        check_failure(&self.fail_put)?;
618        self.inner.put_with_options(records, options).await
619    }
620
621    async fn merge_with_options(
622        &self,
623        records: Vec<super::MergeRecordOp>,
624        options: super::WriteOptions,
625    ) -> super::StorageResult<super::WriteResult> {
626        self.inner.merge_with_options(records, options).await
627    }
628
629    async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
630        check_failure(&self.fail_snapshot)?;
631        self.inner.snapshot().await
632    }
633
634    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
635        self.inner.subscribe_durable()
636    }
637
638    async fn flush(&self) -> super::StorageResult<()> {
639        check_failure(&self.fail_flush)?;
640        self.inner.flush().await
641    }
642
643    async fn create_checkpoint(&self) -> super::StorageResult<CheckpointInfo> {
644        self.inner.create_checkpoint().await
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use super::*;
651    use bytes::BytesMut;
652    use std::ops::Bound;
653
654    /// Test merge operator that appends new value to existing value with a separator.
655    struct AppendMergeOperator;
656
657    impl MergeOperator for AppendMergeOperator {
658        fn merge_batch(
659            &self,
660            _key: &Bytes,
661            existing_value: Option<Bytes>,
662            operands: &[Bytes],
663        ) -> Bytes {
664            operands
665                .iter()
666                .fold(existing_value.unwrap_or_default(), |acc, operand| {
667                    let mut result = BytesMut::from(acc);
668                    if !result.is_empty() {
669                        result.extend_from_slice(b",");
670                    }
671                    result.extend_from_slice(operand);
672                    result.freeze()
673                })
674        }
675    }
676
677    #[tokio::test]
678    async fn should_return_none_when_key_not_found() {
679        // given
680        let storage = InMemoryStorage::new();
681
682        // when
683        let result = storage.get(Bytes::from("missing_key")).await;
684
685        // then
686        assert!(result.is_ok());
687        assert!(result.unwrap().is_none());
688    }
689
690    #[tokio::test]
691    async fn should_store_and_retrieve_record() {
692        // given
693        let storage = InMemoryStorage::new();
694        let key = Bytes::from("test_key");
695        let value = Bytes::from("test_value");
696
697        // when
698        storage
699            .put(vec![Record::new(key.clone(), value.clone()).into()])
700            .await
701            .unwrap();
702        let result = storage.get(key).await.unwrap();
703
704        // then
705        assert!(result.is_some());
706        let record = result.unwrap();
707        assert_eq!(record.key, Bytes::from("test_key"));
708        assert_eq!(record.value, value);
709    }
710
711    #[tokio::test]
712    async fn should_overwrite_existing_key() {
713        // given
714        let storage = InMemoryStorage::new();
715        let key = Bytes::from("test_key");
716        let initial_value = Bytes::from("initial_value");
717        let updated_value = Bytes::from("updated_value");
718
719        // when
720        storage
721            .put(vec![Record::new(key.clone(), initial_value).into()])
722            .await
723            .unwrap();
724        storage
725            .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
726            .await
727            .unwrap();
728        let result = storage.get(key).await.unwrap();
729
730        // then
731        assert!(result.is_some());
732        assert_eq!(result.unwrap().value, updated_value);
733    }
734
735    #[tokio::test]
736    async fn should_store_multiple_records() {
737        // given
738        let storage = InMemoryStorage::new();
739        let records = vec![
740            Record::new(Bytes::from("key1"), Bytes::from("value1")),
741            Record::new(Bytes::from("key2"), Bytes::from("value2")),
742            Record::new(Bytes::from("key3"), Bytes::from("value3")),
743        ];
744
745        // when
746        storage
747            .put(records.iter().cloned().map(PutRecordOp::new).collect())
748            .await
749            .unwrap();
750
751        // then
752        for record in records {
753            let retrieved = storage.get(record.key.clone()).await.unwrap();
754            assert!(retrieved.is_some());
755            assert_eq!(retrieved.unwrap().value, record.value);
756        }
757    }
758
759    #[tokio::test]
760    async fn should_scan_all_records_when_unbounded() {
761        // given
762        let storage = InMemoryStorage::new();
763        let records = [
764            Record::new(Bytes::from("a"), Bytes::from("value_a")),
765            Record::new(Bytes::from("b"), Bytes::from("value_b")),
766            Record::new(Bytes::from("c"), Bytes::from("value_c")),
767        ];
768        storage
769            .put(records.iter().cloned().map(PutRecordOp::new).collect())
770            .await
771            .unwrap();
772
773        // when
774        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
775
776        // then
777        assert_eq!(scanned.len(), 3);
778        assert_eq!(scanned[0].key, Bytes::from("a"));
779        assert_eq!(scanned[1].key, Bytes::from("b"));
780        assert_eq!(scanned[2].key, Bytes::from("c"));
781    }
782
783    #[tokio::test]
784    async fn should_scan_records_with_prefix() {
785        // given
786        let storage = InMemoryStorage::new();
787        let records = vec![
788            Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
789            Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
790            Record::new(Bytes::from("other_c"), Bytes::from("value3")),
791        ];
792        storage
793            .put(records.into_iter().map(PutRecordOp::new).collect())
794            .await
795            .unwrap();
796
797        // when
798        let scanned = storage
799            .scan(BytesRange::prefix(Bytes::from("prefix_")))
800            .await
801            .unwrap();
802
803        // then
804        assert_eq!(scanned.len(), 2);
805        assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
806        assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
807    }
808
809    #[tokio::test]
810    async fn should_scan_records_in_bounded_range() {
811        // given
812        let storage = InMemoryStorage::new();
813        let records = vec![
814            Record::new(Bytes::from("a"), Bytes::from("value_a")),
815            Record::new(Bytes::from("b"), Bytes::from("value_b")),
816            Record::new(Bytes::from("c"), Bytes::from("value_c")),
817            Record::new(Bytes::from("d"), Bytes::from("value_d")),
818        ];
819        storage
820            .put(records.into_iter().map(PutRecordOp::new).collect())
821            .await
822            .unwrap();
823
824        // when
825        let range = BytesRange::new(
826            Bound::Included(Bytes::from("b")),
827            Bound::Excluded(Bytes::from("d")),
828        );
829        let scanned = storage.scan(range).await.unwrap();
830
831        // then
832        assert_eq!(scanned.len(), 2);
833        assert_eq!(scanned[0].key, Bytes::from("b"));
834        assert_eq!(scanned[1].key, Bytes::from("c"));
835    }
836
837    #[tokio::test]
838    async fn should_return_empty_vec_when_scanning_empty_storage() {
839        // given
840        let storage = InMemoryStorage::new();
841
842        // when
843        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
844
845        // then
846        assert!(scanned.is_empty());
847    }
848
849    #[tokio::test]
850    async fn should_iterate_over_records() {
851        // given
852        let storage = InMemoryStorage::new();
853        let records = vec![
854            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
855            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
856        ];
857        storage.put(records).await.unwrap();
858
859        // when
860        let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
861        let first = iter.next().await.unwrap();
862        let second = iter.next().await.unwrap();
863        let third = iter.next().await.unwrap();
864
865        // then
866        assert!(first.is_some());
867        assert_eq!(first.unwrap().key, Bytes::from("key1"));
868        assert!(second.is_some());
869        assert_eq!(second.unwrap().key, Bytes::from("key2"));
870        assert!(third.is_none());
871    }
872
873    #[tokio::test]
874    async fn should_create_snapshot_with_current_data() {
875        // given
876        let storage = InMemoryStorage::new();
877        storage
878            .put(vec![
879                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
880            ])
881            .await
882            .unwrap();
883
884        // when
885        let snapshot = storage.snapshot().await.unwrap();
886
887        // then
888        let result = snapshot.get(Bytes::from("key1")).await.unwrap();
889        assert!(result.is_some());
890        assert_eq!(result.unwrap().value, Bytes::from("value1"));
891    }
892
893    #[tokio::test]
894    async fn should_not_see_writes_after_snapshot() {
895        // given
896        let storage = InMemoryStorage::new();
897        storage
898            .put(vec![
899                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
900            ])
901            .await
902            .unwrap();
903
904        // when
905        let snapshot = storage.snapshot().await.unwrap();
906        storage
907            .put(vec![
908                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
909            ])
910            .await
911            .unwrap();
912
913        // then
914        let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
915        assert!(snapshot_result.is_none());
916
917        let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
918        assert!(storage_result.is_some());
919    }
920
921    #[tokio::test]
922    async fn should_scan_snapshot_independently() {
923        // given
924        let storage = InMemoryStorage::new();
925        storage
926            .put(vec![
927                Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
928            ])
929            .await
930            .unwrap();
931
932        // when
933        let snapshot = storage.snapshot().await.unwrap();
934        storage
935            .put(vec![
936                Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
937            ])
938            .await
939            .unwrap();
940
941        // then
942        let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
943        assert_eq!(snapshot_records.len(), 1);
944        assert_eq!(snapshot_records[0].key, Bytes::from("a"));
945
946        let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
947        assert_eq!(storage_records.len(), 2);
948    }
949
950    #[tokio::test]
951    async fn should_handle_empty_record() {
952        // given
953        let storage = InMemoryStorage::new();
954        let key = Bytes::from("empty_key");
955
956        // when
957        storage
958            .put(vec![Record::empty(key.clone()).into()])
959            .await
960            .unwrap();
961        let result = storage.get(key).await.unwrap();
962
963        // then
964        assert!(result.is_some());
965        assert_eq!(result.unwrap().value, Bytes::new());
966    }
967
968    #[tokio::test]
969    async fn should_return_error_when_merge_operator_not_configured() {
970        // given
971        let storage = InMemoryStorage::new();
972        let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
973
974        // when
975        let result = storage.merge(vec![record.into()]).await;
976
977        // then
978        assert!(result.is_err());
979        assert!(
980            result
981                .unwrap_err()
982                .to_string()
983                .contains("Merge operator not configured")
984        );
985    }
986
987    #[tokio::test]
988    async fn should_merge_when_key_does_not_exist() {
989        // given
990        let merge_op = Arc::new(AppendMergeOperator);
991        let storage = InMemoryStorage::with_merge_operator(merge_op);
992        let key = Bytes::from("new_key");
993        let value = Bytes::from("value1");
994
995        // when
996        storage
997            .merge(vec![Record::new(key.clone(), value.clone()).into()])
998            .await
999            .unwrap();
1000        let result = storage.get(key).await.unwrap();
1001
1002        // then
1003        assert!(result.is_some());
1004        assert_eq!(result.unwrap().value, value);
1005    }
1006
1007    #[tokio::test]
1008    async fn should_merge_when_key_exists() {
1009        // given
1010        let merge_op = Arc::new(AppendMergeOperator);
1011        let storage = InMemoryStorage::with_merge_operator(merge_op);
1012        let key = Bytes::from("key1");
1013        let initial_value = Bytes::from("value1");
1014        let new_value = Bytes::from("value2");
1015
1016        storage
1017            .put(vec![Record::new(key.clone(), initial_value).into()])
1018            .await
1019            .unwrap();
1020
1021        // when
1022        storage
1023            .merge(vec![Record::new(key.clone(), new_value).into()])
1024            .await
1025            .unwrap();
1026        let result = storage.get(key).await.unwrap();
1027
1028        // then
1029        assert!(result.is_some());
1030        assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1031    }
1032
1033    #[tokio::test]
1034    async fn should_merge_multiple_keys() {
1035        // given
1036        let merge_op = Arc::new(AppendMergeOperator);
1037        let storage = InMemoryStorage::with_merge_operator(merge_op);
1038        let records = vec![
1039            Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1040            Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1041        ];
1042        storage.put(records).await.unwrap();
1043
1044        // when
1045        storage
1046            .merge(vec![
1047                Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1048                Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1049            ])
1050            .await
1051            .unwrap();
1052
1053        // then
1054        let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1055        assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1056
1057        let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1058        assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1059    }
1060
1061    #[tokio::test]
1062    async fn should_return_monotonically_increasing_seqnums_from_put() {
1063        let storage = InMemoryStorage::new();
1064
1065        let r1 = storage
1066            .put(vec![
1067                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1068            ])
1069            .await
1070            .unwrap();
1071        let r2 = storage
1072            .put(vec![
1073                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1074            ])
1075            .await
1076            .unwrap();
1077        let r3 = storage
1078            .put(vec![
1079                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1080            ])
1081            .await
1082            .unwrap();
1083
1084        assert_eq!(r1.seqnum, 1);
1085        assert_eq!(r2.seqnum, 2);
1086        assert_eq!(r3.seqnum, 3);
1087    }
1088
1089    #[tokio::test]
1090    async fn should_return_monotonically_increasing_seqnums_from_apply() {
1091        let storage = InMemoryStorage::new();
1092
1093        let r1 = storage
1094            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1095                Bytes::from("k1"),
1096                Bytes::from("v1"),
1097            )))])
1098            .await
1099            .unwrap();
1100        let r2 = storage
1101            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1102                Bytes::from("k2"),
1103                Bytes::from("v2"),
1104            )))])
1105            .await
1106            .unwrap();
1107
1108        assert_eq!(r1.seqnum, 1);
1109        assert_eq!(r2.seqnum, 2);
1110    }
1111
1112    #[tokio::test]
1113    async fn should_share_seqnum_counter_across_write_methods() {
1114        let merge_op = Arc::new(AppendMergeOperator);
1115        let storage = InMemoryStorage::with_merge_operator(merge_op);
1116
1117        let r1 = storage
1118            .put(vec![
1119                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1120            ])
1121            .await
1122            .unwrap();
1123        let r2 = storage
1124            .merge(vec![
1125                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1126            ])
1127            .await
1128            .unwrap();
1129        let r3 = storage
1130            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1131                Bytes::from("k2"),
1132                Bytes::from("v3"),
1133            )))])
1134            .await
1135            .unwrap();
1136
1137        assert_eq!(r1.seqnum, 1);
1138        assert_eq!(r2.seqnum, 2);
1139        assert_eq!(r3.seqnum, 3);
1140    }
1141
1142    #[tokio::test]
1143    async fn should_start_durable_subscriber_at_zero() {
1144        let storage = InMemoryStorage::new();
1145        let rx = storage.subscribe_durable();
1146        assert_eq!(*rx.borrow(), 0);
1147    }
1148
1149    #[tokio::test]
1150    async fn should_advance_durable_watermark_on_each_write() {
1151        let storage = InMemoryStorage::new();
1152        let rx = storage.subscribe_durable();
1153
1154        storage
1155            .put(vec![
1156                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1157            ])
1158            .await
1159            .unwrap();
1160        assert_eq!(*rx.borrow(), 1);
1161
1162        storage
1163            .put(vec![
1164                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1165            ])
1166            .await
1167            .unwrap();
1168        assert_eq!(*rx.borrow(), 2);
1169    }
1170
1171    #[tokio::test]
1172    async fn should_merge_empty_values() {
1173        // given
1174        let merge_op = Arc::new(AppendMergeOperator);
1175        let storage = InMemoryStorage::with_merge_operator(merge_op);
1176        let key = Bytes::from("key1");
1177
1178        // when
1179        storage
1180            .merge(vec![Record::empty(key.clone()).into()])
1181            .await
1182            .unwrap();
1183        let result = storage.get(key).await.unwrap();
1184
1185        // then
1186        assert!(result.is_some());
1187        assert_eq!(result.unwrap().value, Bytes::new());
1188    }
1189
1190    #[tokio::test]
1191    async fn should_not_advance_durable_watermark_when_deferred() {
1192        let storage = InMemoryStorage::new().with_deferred_durability();
1193        let rx = storage.subscribe_durable();
1194
1195        storage
1196            .put(vec![
1197                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1198            ])
1199            .await
1200            .unwrap();
1201        assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1202
1203        storage
1204            .put(vec![
1205                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1206            ])
1207            .await
1208            .unwrap();
1209        assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1210    }
1211
1212    #[tokio::test]
1213    async fn should_advance_durable_watermark_on_flush_when_deferred() {
1214        let storage = InMemoryStorage::new().with_deferred_durability();
1215        let rx = storage.subscribe_durable();
1216
1217        storage
1218            .put(vec![
1219                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1220            ])
1221            .await
1222            .unwrap();
1223        storage
1224            .put(vec![
1225                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1226            ])
1227            .await
1228            .unwrap();
1229        assert_eq!(*rx.borrow(), 0);
1230
1231        storage.flush().await.unwrap();
1232        assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1233    }
1234
1235    #[tokio::test]
1236    async fn should_advance_durable_watermark_to_specific_seq() {
1237        let storage = InMemoryStorage::new().with_deferred_durability();
1238        let rx = storage.subscribe_durable();
1239
1240        // Write 3 records
1241        for i in 1..=3 {
1242            storage
1243                .put(vec![
1244                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1245                ])
1246                .await
1247                .unwrap();
1248        }
1249        assert_eq!(*rx.borrow(), 0);
1250
1251        // Partially flush through seqnum 2
1252        storage.flush_to(2);
1253        assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1254
1255        // Flush the rest
1256        storage.flush_to(3);
1257        assert_eq!(*rx.borrow(), 3);
1258    }
1259
1260    #[tokio::test]
1261    #[should_panic(expected = "cannot move durable seqnum backwards")]
1262    async fn should_panic_when_flush_to_moves_backwards() {
1263        let storage = InMemoryStorage::new().with_deferred_durability();
1264        for i in 1..=3 {
1265            storage
1266                .put(vec![
1267                    Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1268                ])
1269                .await
1270                .unwrap();
1271        }
1272
1273        storage.flush_to(2);
1274        storage.flush_to(1);
1275    }
1276
1277    #[tokio::test]
1278    #[should_panic(expected = "cannot flush beyond written seqnum")]
1279    async fn should_panic_when_flush_to_exceeds_written() {
1280        let storage = InMemoryStorage::new().with_deferred_durability();
1281        storage
1282            .put(vec![
1283                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1284            ])
1285            .await
1286            .unwrap();
1287
1288        storage.flush_to(5);
1289    }
1290
1291    #[tokio::test]
1292    async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1293        let storage = InMemoryStorage::new().with_deferred_durability();
1294
1295        storage
1296            .put(vec![
1297                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1298            ])
1299            .await
1300            .unwrap();
1301
1302        // Data is written but not durable — snapshots should still see it
1303        let snapshot = storage.snapshot().await.unwrap();
1304        let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1305        assert!(result.is_some());
1306        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1307
1308        // But durable watermark is still 0
1309        let rx = storage.subscribe_durable();
1310        assert_eq!(*rx.borrow(), 0);
1311    }
1312
1313    #[tokio::test]
1314    async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1315        let storage = InMemoryStorage::new().with_deferred_durability();
1316        let rx = storage.subscribe_durable();
1317
1318        storage
1319            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1320                Bytes::from("k1"),
1321                Bytes::from("v1"),
1322            )))])
1323            .await
1324            .unwrap();
1325        assert_eq!(*rx.borrow(), 0);
1326
1327        storage
1328            .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1329            .await
1330            .unwrap();
1331        assert_eq!(*rx.borrow(), 0);
1332
1333        storage.flush().await.unwrap();
1334        assert_eq!(*rx.borrow(), 2);
1335    }
1336
1337    #[tokio::test]
1338    async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1339        let merge_op = Arc::new(AppendMergeOperator);
1340        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1341        let rx = storage.subscribe_durable();
1342
1343        storage
1344            .merge(vec![
1345                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1346            ])
1347            .await
1348            .unwrap();
1349        assert_eq!(*rx.borrow(), 0);
1350
1351        storage
1352            .merge(vec![
1353                Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1354            ])
1355            .await
1356            .unwrap();
1357        assert_eq!(*rx.borrow(), 0);
1358
1359        storage.flush().await.unwrap();
1360        assert_eq!(*rx.borrow(), 2);
1361    }
1362
1363    #[tokio::test]
1364    async fn should_support_multiple_flush_cycles_when_deferred() {
1365        let storage = InMemoryStorage::new().with_deferred_durability();
1366        let rx = storage.subscribe_durable();
1367
1368        // First cycle: write and flush
1369        storage
1370            .put(vec![
1371                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1372            ])
1373            .await
1374            .unwrap();
1375        storage.flush().await.unwrap();
1376        assert_eq!(*rx.borrow(), 1);
1377
1378        // Second cycle: write more and flush again
1379        storage
1380            .put(vec![
1381                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1382            ])
1383            .await
1384            .unwrap();
1385        storage
1386            .put(vec![
1387                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1388            ])
1389            .await
1390            .unwrap();
1391        assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1392
1393        storage.flush().await.unwrap();
1394        assert_eq!(*rx.borrow(), 3);
1395    }
1396
1397    #[tokio::test]
1398    async fn should_flush_on_empty_storage_when_deferred() {
1399        let storage = InMemoryStorage::new().with_deferred_durability();
1400        let rx = storage.subscribe_durable();
1401
1402        // Flushing with no writes should be fine (sends 0 again)
1403        storage.flush().await.unwrap();
1404        assert_eq!(*rx.borrow(), 0);
1405
1406        // flush_to(0) should also be fine
1407        storage.flush_to(0);
1408        assert_eq!(*rx.borrow(), 0);
1409    }
1410
1411    #[tokio::test]
1412    async fn should_defer_durability_across_mixed_write_methods() {
1413        let merge_op = Arc::new(AppendMergeOperator);
1414        let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1415        let rx = storage.subscribe_durable();
1416
1417        // put, apply, merge — all should defer
1418        storage
1419            .put(vec![
1420                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1421            ])
1422            .await
1423            .unwrap();
1424        storage
1425            .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1426                Bytes::from("k2"),
1427                Bytes::from("v2"),
1428            )))])
1429            .await
1430            .unwrap();
1431        storage
1432            .merge(vec![
1433                Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1434            ])
1435            .await
1436            .unwrap();
1437        assert_eq!(*rx.borrow(), 0);
1438
1439        // Partially flush through seqnum 2
1440        storage.flush_to(2);
1441        assert_eq!(*rx.borrow(), 2);
1442
1443        // Flush the rest
1444        storage.flush().await.unwrap();
1445        assert_eq!(*rx.borrow(), 3);
1446    }
1447
1448    #[tokio::test]
1449    async fn should_read_data_written_before_flush_when_deferred() {
1450        let storage = InMemoryStorage::new().with_deferred_durability();
1451
1452        storage
1453            .put(vec![
1454                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1455                Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1456            ])
1457            .await
1458            .unwrap();
1459
1460        // get and scan see data even though it is not yet durable
1461        let result = storage.get(Bytes::from("k1")).await.unwrap();
1462        assert_eq!(result.unwrap().value, Bytes::from("v1"));
1463
1464        let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1465        assert_eq!(scanned.len(), 2);
1466    }
1467}