1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9 MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{
13 BytesRange, CheckpointInfo, Record, StorageError, StorageIterator, StorageRead, StorageResult,
14 Ttl,
15};
16
17pub trait Clock: Send + Sync {
19 fn now(&self) -> i64;
21}
22
23pub struct WallClock;
25
26impl Clock for WallClock {
27 fn now(&self) -> i64 {
28 std::time::SystemTime::now()
29 .duration_since(std::time::UNIX_EPOCH)
30 .expect("system time before Unix epoch")
31 .as_millis() as i64
32 }
33}
34
35#[derive(Clone, Debug)]
37struct StoredValue {
38 value: Bytes,
39 expire_ts: Option<i64>,
41}
42
43impl StoredValue {
44 fn is_expired(&self, now: i64) -> bool {
45 self.expire_ts.is_some_and(|ts| now >= ts)
46 }
47}
48
49fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
51 match ttl {
52 Ttl::Default => default_ttl.map(|ms| now + ms as i64),
53 Ttl::NoExpiry => None,
54 Ttl::ExpireAfter(ms) => Some(now + ms as i64),
55 Ttl::ExpireAt(ts) => Some(ts),
56 }
57}
58
59pub struct InMemoryStorage {
65 data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
66 merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
67 clock: Arc<dyn Clock>,
68 default_ttl: Option<u64>,
69 written_seq: std::sync::atomic::AtomicU64,
70 durable_seq: std::sync::atomic::AtomicU64,
71 durable_tx: tokio::sync::watch::Sender<u64>,
72 defer_durability: bool,
73}
74
75impl InMemoryStorage {
76 pub fn new() -> Self {
78 let (durable_tx, _) = tokio::sync::watch::channel(0);
79 Self {
80 data: Arc::new(RwLock::new(BTreeMap::new())),
81 merge_operator: None,
82 clock: Arc::new(WallClock),
83 default_ttl: None,
84 written_seq: std::sync::atomic::AtomicU64::new(0),
85 durable_seq: std::sync::atomic::AtomicU64::new(0),
86 durable_tx,
87 defer_durability: false,
88 }
89 }
90
91 pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
97 let (durable_tx, _) = tokio::sync::watch::channel(0);
98 Self {
99 data: Arc::new(RwLock::new(BTreeMap::new())),
100 merge_operator: Some(merge_operator),
101 clock: Arc::new(WallClock),
102 default_ttl: None,
103 written_seq: std::sync::atomic::AtomicU64::new(0),
104 durable_seq: std::sync::atomic::AtomicU64::new(0),
105 durable_tx,
106 defer_durability: false,
107 }
108 }
109
110 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
112 self.clock = clock;
113 self
114 }
115
116 pub fn with_default_ttl(mut self, ttl: u64) -> Self {
118 self.default_ttl = Some(ttl);
119 self
120 }
121
122 pub fn with_deferred_durability(mut self) -> Self {
128 self.defer_durability = true;
129 self
130 }
131
132 pub fn flush_to(&self, seq: u64) {
139 let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
140 assert!(
141 seq <= written,
142 "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
143 );
144 let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
145 assert!(
146 seq >= durable,
147 "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
148 );
149 self.durable_seq
150 .store(seq, std::sync::atomic::Ordering::Relaxed);
151 let _ = self.durable_tx.send(seq);
152 }
153
154 fn next_seqnum(&self) -> u64 {
160 let seq = self
161 .written_seq
162 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
163 + 1;
164 if !self.defer_durability {
165 self.durable_seq
166 .store(seq, std::sync::atomic::Ordering::Relaxed);
167 let _ = self.durable_tx.send(seq);
168 }
169 seq
170 }
171}
172
173impl Default for InMemoryStorage {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179#[async_trait]
180impl StorageRead for InMemoryStorage {
181 #[tracing::instrument(level = "trace", skip_all)]
185 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
186 let data = self
187 .data
188 .read()
189 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
190
191 match data.get(&key) {
192 Some(stored) if !stored.is_expired(self.clock.now()) => {
193 Ok(Some(Record::new(key, stored.value.clone())))
194 }
195 _ => Ok(None),
196 }
197 }
198
199 #[tracing::instrument(level = "trace", skip_all)]
200 async fn scan_iter(
201 &self,
202 range: BytesRange,
203 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
204 let data = self
205 .data
206 .read()
207 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
208
209 let now = self.clock.now();
210 let records: Vec<Record> = data
211 .range((range.start_bound().cloned(), range.end_bound().cloned()))
212 .filter(|(_, stored)| !stored.is_expired(now))
213 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
214 .collect();
215
216 Ok(Box::new(InMemoryIterator { records, index: 0 }))
217 }
218}
219
220struct InMemoryIterator {
221 records: Vec<Record>,
222 index: usize,
223}
224
225#[async_trait]
226impl StorageIterator for InMemoryIterator {
227 #[tracing::instrument(level = "trace", skip_all)]
228 async fn next(&mut self) -> StorageResult<Option<Record>> {
229 if self.index >= self.records.len() {
230 Ok(None)
231 } else {
232 let record = self.records[self.index].clone();
233 self.index += 1;
234 Ok(Some(record))
235 }
236 }
237}
238
239pub struct InMemoryStorageSnapshot {
244 data: Arc<BTreeMap<Bytes, StoredValue>>,
245 clock: Arc<dyn Clock>,
246}
247
248#[async_trait]
249impl StorageRead for InMemoryStorageSnapshot {
250 #[tracing::instrument(level = "trace", skip_all)]
251 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
252 match self.data.get(&key) {
253 Some(stored) if !stored.is_expired(self.clock.now()) => {
254 Ok(Some(Record::new(key, stored.value.clone())))
255 }
256 _ => Ok(None),
257 }
258 }
259
260 #[tracing::instrument(level = "trace", skip_all)]
261 async fn scan_iter(
262 &self,
263 range: BytesRange,
264 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
265 let now = self.clock.now();
266 let records: Vec<Record> = self
267 .data
268 .range((range.start_bound().cloned(), range.end_bound().cloned()))
269 .filter(|(_, stored)| !stored.is_expired(now))
270 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
271 .collect();
272
273 Ok(Box::new(InMemoryIterator { records, index: 0 }))
274 }
275}
276
277#[async_trait]
278impl StorageSnapshot for InMemoryStorageSnapshot {}
279
280#[async_trait]
281impl Storage for InMemoryStorage {
282 async fn apply_with_options(
283 &self,
284 records: Vec<RecordOp>,
285 _options: WriteOptions,
286 ) -> StorageResult<WriteResult> {
287 let mut data = self
288 .data
289 .write()
290 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
291
292 let now = self.clock.now();
293 for record in records {
294 match record {
295 RecordOp::Put(op) => {
296 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
297 data.insert(
298 op.record.key,
299 StoredValue {
300 value: op.record.value,
301 expire_ts,
302 },
303 );
304 }
305 RecordOp::Merge(op) => {
306 let existing_value = data
307 .get(&op.record.key)
308 .filter(|s| !s.is_expired(now))
309 .map(|s| s.value.clone());
310 let merged_value = self.merge_operator.as_ref().unwrap().merge_batch(
311 &op.record.key,
312 existing_value,
313 &[op.record.value],
314 );
315 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
316 data.insert(
317 op.record.key,
318 StoredValue {
319 value: merged_value,
320 expire_ts,
321 },
322 );
323 }
324 RecordOp::Delete(key) => {
325 data.remove(&key);
326 }
327 }
328 }
329
330 Ok(WriteResult {
331 seqnum: self.next_seqnum(),
332 })
333 }
334
335 async fn put_with_options(
341 &self,
342 records: Vec<PutRecordOp>,
343 _options: WriteOptions,
344 ) -> StorageResult<WriteResult> {
345 let mut data = self
346 .data
347 .write()
348 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
349
350 let now = self.clock.now();
351 for op in records {
352 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
353 data.insert(
354 op.record.key,
355 StoredValue {
356 value: op.record.value,
357 expire_ts,
358 },
359 );
360 }
361
362 Ok(WriteResult {
363 seqnum: self.next_seqnum(),
364 })
365 }
366
367 async fn merge_with_options(
378 &self,
379 records: Vec<MergeRecordOp>,
380 _options: WriteOptions,
381 ) -> StorageResult<WriteResult> {
382 let merge_op = self
383 .merge_operator
384 .as_ref()
385 .ok_or_else(|| {
386 StorageError::Storage(
387 "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
388 )
389 })?;
390
391 let mut data = self
392 .data
393 .write()
394 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
395
396 let now = self.clock.now();
397 for op in records {
398 let existing_value = data
399 .get(&op.record.key)
400 .filter(|s| !s.is_expired(now))
401 .map(|s| s.value.clone());
402 let merged_value =
403 merge_op.merge_batch(&op.record.key, existing_value, &[op.record.value]);
404 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
405 data.insert(
406 op.record.key,
407 StoredValue {
408 value: merged_value,
409 expire_ts,
410 },
411 );
412 }
413
414 Ok(WriteResult {
415 seqnum: self.next_seqnum(),
416 })
417 }
418
419 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
425 let data = self
426 .data
427 .read()
428 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
429
430 let snapshot_data = Arc::new(data.clone());
431
432 Ok(Arc::new(InMemoryStorageSnapshot {
433 data: snapshot_data,
434 clock: self.clock.clone(),
435 }))
436 }
437
438 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
439 self.durable_tx.subscribe()
440 }
441
442 async fn flush(&self) -> StorageResult<()> {
443 if self.defer_durability {
444 self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
445 }
446 Ok(())
447 }
448
449 async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo> {
450 Err(StorageError::Storage(
451 "checkpoints are not supported for in-memory storage".to_string(),
452 ))
453 }
454}
455
456#[cfg(feature = "test-utils")]
458#[derive(Clone)]
459enum Failure {
460 Once(super::StorageError),
462 Persistent(super::StorageError),
464}
465
466#[cfg(feature = "test-utils")]
467type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
468
469#[cfg(feature = "test-utils")]
475fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
476 let guard = slot.load();
477 match guard.as_ref() {
478 None => Ok(()),
479 Some(Failure::Persistent(err)) => Err(err.clone()),
480 Some(Failure::Once(_)) => {
481 let prev = slot.swap(Arc::new(None));
484 match prev.as_ref() {
485 Some(Failure::Once(err)) => Err(err.clone()),
486 _ => Ok(()),
487 }
488 }
489 }
490}
491
492#[cfg(feature = "test-utils")]
516pub struct FailingStorage {
517 inner: Arc<dyn super::Storage>,
518 fail_apply: FailSlot,
519 fail_put: FailSlot,
520 fail_flush: FailSlot,
521 fail_snapshot: FailSlot,
522}
523
524#[cfg(feature = "test-utils")]
525impl FailingStorage {
526 pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
528 Arc::new(Self {
529 inner,
530 fail_apply: arc_swap::ArcSwap::from_pointee(None),
531 fail_put: arc_swap::ArcSwap::from_pointee(None),
532 fail_flush: arc_swap::ArcSwap::from_pointee(None),
533 fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
534 })
535 }
536
537 pub fn fail_apply(&self, err: super::StorageError) {
539 self.fail_apply
540 .store(Arc::new(Some(Failure::Persistent(err))));
541 }
542
543 pub fn fail_apply_once(&self, err: super::StorageError) {
545 self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
546 }
547
548 pub fn fail_put(&self, err: super::StorageError) {
550 self.fail_put
551 .store(Arc::new(Some(Failure::Persistent(err))));
552 }
553
554 pub fn fail_put_once(&self, err: super::StorageError) {
559 self.fail_put.store(Arc::new(Some(Failure::Once(err))));
560 }
561
562 pub fn fail_flush(&self, err: super::StorageError) {
564 self.fail_flush
565 .store(Arc::new(Some(Failure::Persistent(err))));
566 }
567
568 pub fn fail_flush_once(&self, err: super::StorageError) {
570 self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
571 }
572
573 pub fn fail_snapshot(&self, err: super::StorageError) {
575 self.fail_snapshot
576 .store(Arc::new(Some(Failure::Persistent(err))));
577 }
578
579 pub fn fail_snapshot_once(&self, err: super::StorageError) {
581 self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
582 }
583}
584
585#[cfg(feature = "test-utils")]
586#[async_trait]
587impl super::StorageRead for FailingStorage {
588 async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
589 self.inner.get(key).await
590 }
591
592 async fn scan_iter(
593 &self,
594 range: crate::BytesRange,
595 ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
596 self.inner.scan_iter(range).await
597 }
598}
599
600#[cfg(feature = "test-utils")]
601#[async_trait]
602impl super::Storage for FailingStorage {
603 async fn apply_with_options(
604 &self,
605 ops: Vec<super::RecordOp>,
606 options: super::WriteOptions,
607 ) -> super::StorageResult<super::WriteResult> {
608 check_failure(&self.fail_apply)?;
609 self.inner.apply_with_options(ops, options).await
610 }
611
612 async fn put_with_options(
613 &self,
614 records: Vec<super::PutRecordOp>,
615 options: super::WriteOptions,
616 ) -> super::StorageResult<super::WriteResult> {
617 check_failure(&self.fail_put)?;
618 self.inner.put_with_options(records, options).await
619 }
620
621 async fn merge_with_options(
622 &self,
623 records: Vec<super::MergeRecordOp>,
624 options: super::WriteOptions,
625 ) -> super::StorageResult<super::WriteResult> {
626 self.inner.merge_with_options(records, options).await
627 }
628
629 async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
630 check_failure(&self.fail_snapshot)?;
631 self.inner.snapshot().await
632 }
633
634 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
635 self.inner.subscribe_durable()
636 }
637
638 async fn flush(&self) -> super::StorageResult<()> {
639 check_failure(&self.fail_flush)?;
640 self.inner.flush().await
641 }
642
643 async fn create_checkpoint(&self) -> super::StorageResult<CheckpointInfo> {
644 self.inner.create_checkpoint().await
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651 use bytes::BytesMut;
652 use std::ops::Bound;
653
654 struct AppendMergeOperator;
656
657 impl MergeOperator for AppendMergeOperator {
658 fn merge_batch(
659 &self,
660 _key: &Bytes,
661 existing_value: Option<Bytes>,
662 operands: &[Bytes],
663 ) -> Bytes {
664 operands
665 .iter()
666 .fold(existing_value.unwrap_or_default(), |acc, operand| {
667 let mut result = BytesMut::from(acc);
668 if !result.is_empty() {
669 result.extend_from_slice(b",");
670 }
671 result.extend_from_slice(operand);
672 result.freeze()
673 })
674 }
675 }
676
677 #[tokio::test]
678 async fn should_return_none_when_key_not_found() {
679 let storage = InMemoryStorage::new();
681
682 let result = storage.get(Bytes::from("missing_key")).await;
684
685 assert!(result.is_ok());
687 assert!(result.unwrap().is_none());
688 }
689
690 #[tokio::test]
691 async fn should_store_and_retrieve_record() {
692 let storage = InMemoryStorage::new();
694 let key = Bytes::from("test_key");
695 let value = Bytes::from("test_value");
696
697 storage
699 .put(vec![Record::new(key.clone(), value.clone()).into()])
700 .await
701 .unwrap();
702 let result = storage.get(key).await.unwrap();
703
704 assert!(result.is_some());
706 let record = result.unwrap();
707 assert_eq!(record.key, Bytes::from("test_key"));
708 assert_eq!(record.value, value);
709 }
710
711 #[tokio::test]
712 async fn should_overwrite_existing_key() {
713 let storage = InMemoryStorage::new();
715 let key = Bytes::from("test_key");
716 let initial_value = Bytes::from("initial_value");
717 let updated_value = Bytes::from("updated_value");
718
719 storage
721 .put(vec![Record::new(key.clone(), initial_value).into()])
722 .await
723 .unwrap();
724 storage
725 .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
726 .await
727 .unwrap();
728 let result = storage.get(key).await.unwrap();
729
730 assert!(result.is_some());
732 assert_eq!(result.unwrap().value, updated_value);
733 }
734
735 #[tokio::test]
736 async fn should_store_multiple_records() {
737 let storage = InMemoryStorage::new();
739 let records = vec![
740 Record::new(Bytes::from("key1"), Bytes::from("value1")),
741 Record::new(Bytes::from("key2"), Bytes::from("value2")),
742 Record::new(Bytes::from("key3"), Bytes::from("value3")),
743 ];
744
745 storage
747 .put(records.iter().cloned().map(PutRecordOp::new).collect())
748 .await
749 .unwrap();
750
751 for record in records {
753 let retrieved = storage.get(record.key.clone()).await.unwrap();
754 assert!(retrieved.is_some());
755 assert_eq!(retrieved.unwrap().value, record.value);
756 }
757 }
758
759 #[tokio::test]
760 async fn should_scan_all_records_when_unbounded() {
761 let storage = InMemoryStorage::new();
763 let records = [
764 Record::new(Bytes::from("a"), Bytes::from("value_a")),
765 Record::new(Bytes::from("b"), Bytes::from("value_b")),
766 Record::new(Bytes::from("c"), Bytes::from("value_c")),
767 ];
768 storage
769 .put(records.iter().cloned().map(PutRecordOp::new).collect())
770 .await
771 .unwrap();
772
773 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
775
776 assert_eq!(scanned.len(), 3);
778 assert_eq!(scanned[0].key, Bytes::from("a"));
779 assert_eq!(scanned[1].key, Bytes::from("b"));
780 assert_eq!(scanned[2].key, Bytes::from("c"));
781 }
782
783 #[tokio::test]
784 async fn should_scan_records_with_prefix() {
785 let storage = InMemoryStorage::new();
787 let records = vec![
788 Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
789 Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
790 Record::new(Bytes::from("other_c"), Bytes::from("value3")),
791 ];
792 storage
793 .put(records.into_iter().map(PutRecordOp::new).collect())
794 .await
795 .unwrap();
796
797 let scanned = storage
799 .scan(BytesRange::prefix(Bytes::from("prefix_")))
800 .await
801 .unwrap();
802
803 assert_eq!(scanned.len(), 2);
805 assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
806 assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
807 }
808
809 #[tokio::test]
810 async fn should_scan_records_in_bounded_range() {
811 let storage = InMemoryStorage::new();
813 let records = vec![
814 Record::new(Bytes::from("a"), Bytes::from("value_a")),
815 Record::new(Bytes::from("b"), Bytes::from("value_b")),
816 Record::new(Bytes::from("c"), Bytes::from("value_c")),
817 Record::new(Bytes::from("d"), Bytes::from("value_d")),
818 ];
819 storage
820 .put(records.into_iter().map(PutRecordOp::new).collect())
821 .await
822 .unwrap();
823
824 let range = BytesRange::new(
826 Bound::Included(Bytes::from("b")),
827 Bound::Excluded(Bytes::from("d")),
828 );
829 let scanned = storage.scan(range).await.unwrap();
830
831 assert_eq!(scanned.len(), 2);
833 assert_eq!(scanned[0].key, Bytes::from("b"));
834 assert_eq!(scanned[1].key, Bytes::from("c"));
835 }
836
837 #[tokio::test]
838 async fn should_return_empty_vec_when_scanning_empty_storage() {
839 let storage = InMemoryStorage::new();
841
842 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
844
845 assert!(scanned.is_empty());
847 }
848
849 #[tokio::test]
850 async fn should_iterate_over_records() {
851 let storage = InMemoryStorage::new();
853 let records = vec![
854 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
855 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
856 ];
857 storage.put(records).await.unwrap();
858
859 let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
861 let first = iter.next().await.unwrap();
862 let second = iter.next().await.unwrap();
863 let third = iter.next().await.unwrap();
864
865 assert!(first.is_some());
867 assert_eq!(first.unwrap().key, Bytes::from("key1"));
868 assert!(second.is_some());
869 assert_eq!(second.unwrap().key, Bytes::from("key2"));
870 assert!(third.is_none());
871 }
872
873 #[tokio::test]
874 async fn should_create_snapshot_with_current_data() {
875 let storage = InMemoryStorage::new();
877 storage
878 .put(vec![
879 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
880 ])
881 .await
882 .unwrap();
883
884 let snapshot = storage.snapshot().await.unwrap();
886
887 let result = snapshot.get(Bytes::from("key1")).await.unwrap();
889 assert!(result.is_some());
890 assert_eq!(result.unwrap().value, Bytes::from("value1"));
891 }
892
893 #[tokio::test]
894 async fn should_not_see_writes_after_snapshot() {
895 let storage = InMemoryStorage::new();
897 storage
898 .put(vec![
899 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
900 ])
901 .await
902 .unwrap();
903
904 let snapshot = storage.snapshot().await.unwrap();
906 storage
907 .put(vec![
908 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
909 ])
910 .await
911 .unwrap();
912
913 let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
915 assert!(snapshot_result.is_none());
916
917 let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
918 assert!(storage_result.is_some());
919 }
920
921 #[tokio::test]
922 async fn should_scan_snapshot_independently() {
923 let storage = InMemoryStorage::new();
925 storage
926 .put(vec![
927 Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
928 ])
929 .await
930 .unwrap();
931
932 let snapshot = storage.snapshot().await.unwrap();
934 storage
935 .put(vec![
936 Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
937 ])
938 .await
939 .unwrap();
940
941 let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
943 assert_eq!(snapshot_records.len(), 1);
944 assert_eq!(snapshot_records[0].key, Bytes::from("a"));
945
946 let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
947 assert_eq!(storage_records.len(), 2);
948 }
949
950 #[tokio::test]
951 async fn should_handle_empty_record() {
952 let storage = InMemoryStorage::new();
954 let key = Bytes::from("empty_key");
955
956 storage
958 .put(vec![Record::empty(key.clone()).into()])
959 .await
960 .unwrap();
961 let result = storage.get(key).await.unwrap();
962
963 assert!(result.is_some());
965 assert_eq!(result.unwrap().value, Bytes::new());
966 }
967
968 #[tokio::test]
969 async fn should_return_error_when_merge_operator_not_configured() {
970 let storage = InMemoryStorage::new();
972 let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
973
974 let result = storage.merge(vec![record.into()]).await;
976
977 assert!(result.is_err());
979 assert!(
980 result
981 .unwrap_err()
982 .to_string()
983 .contains("Merge operator not configured")
984 );
985 }
986
987 #[tokio::test]
988 async fn should_merge_when_key_does_not_exist() {
989 let merge_op = Arc::new(AppendMergeOperator);
991 let storage = InMemoryStorage::with_merge_operator(merge_op);
992 let key = Bytes::from("new_key");
993 let value = Bytes::from("value1");
994
995 storage
997 .merge(vec![Record::new(key.clone(), value.clone()).into()])
998 .await
999 .unwrap();
1000 let result = storage.get(key).await.unwrap();
1001
1002 assert!(result.is_some());
1004 assert_eq!(result.unwrap().value, value);
1005 }
1006
1007 #[tokio::test]
1008 async fn should_merge_when_key_exists() {
1009 let merge_op = Arc::new(AppendMergeOperator);
1011 let storage = InMemoryStorage::with_merge_operator(merge_op);
1012 let key = Bytes::from("key1");
1013 let initial_value = Bytes::from("value1");
1014 let new_value = Bytes::from("value2");
1015
1016 storage
1017 .put(vec![Record::new(key.clone(), initial_value).into()])
1018 .await
1019 .unwrap();
1020
1021 storage
1023 .merge(vec![Record::new(key.clone(), new_value).into()])
1024 .await
1025 .unwrap();
1026 let result = storage.get(key).await.unwrap();
1027
1028 assert!(result.is_some());
1030 assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1031 }
1032
1033 #[tokio::test]
1034 async fn should_merge_multiple_keys() {
1035 let merge_op = Arc::new(AppendMergeOperator);
1037 let storage = InMemoryStorage::with_merge_operator(merge_op);
1038 let records = vec![
1039 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1040 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1041 ];
1042 storage.put(records).await.unwrap();
1043
1044 storage
1046 .merge(vec![
1047 Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1048 Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1049 ])
1050 .await
1051 .unwrap();
1052
1053 let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1055 assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1056
1057 let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1058 assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1059 }
1060
1061 #[tokio::test]
1062 async fn should_return_monotonically_increasing_seqnums_from_put() {
1063 let storage = InMemoryStorage::new();
1064
1065 let r1 = storage
1066 .put(vec![
1067 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1068 ])
1069 .await
1070 .unwrap();
1071 let r2 = storage
1072 .put(vec![
1073 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1074 ])
1075 .await
1076 .unwrap();
1077 let r3 = storage
1078 .put(vec![
1079 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1080 ])
1081 .await
1082 .unwrap();
1083
1084 assert_eq!(r1.seqnum, 1);
1085 assert_eq!(r2.seqnum, 2);
1086 assert_eq!(r3.seqnum, 3);
1087 }
1088
1089 #[tokio::test]
1090 async fn should_return_monotonically_increasing_seqnums_from_apply() {
1091 let storage = InMemoryStorage::new();
1092
1093 let r1 = storage
1094 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1095 Bytes::from("k1"),
1096 Bytes::from("v1"),
1097 )))])
1098 .await
1099 .unwrap();
1100 let r2 = storage
1101 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1102 Bytes::from("k2"),
1103 Bytes::from("v2"),
1104 )))])
1105 .await
1106 .unwrap();
1107
1108 assert_eq!(r1.seqnum, 1);
1109 assert_eq!(r2.seqnum, 2);
1110 }
1111
1112 #[tokio::test]
1113 async fn should_share_seqnum_counter_across_write_methods() {
1114 let merge_op = Arc::new(AppendMergeOperator);
1115 let storage = InMemoryStorage::with_merge_operator(merge_op);
1116
1117 let r1 = storage
1118 .put(vec![
1119 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1120 ])
1121 .await
1122 .unwrap();
1123 let r2 = storage
1124 .merge(vec![
1125 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1126 ])
1127 .await
1128 .unwrap();
1129 let r3 = storage
1130 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1131 Bytes::from("k2"),
1132 Bytes::from("v3"),
1133 )))])
1134 .await
1135 .unwrap();
1136
1137 assert_eq!(r1.seqnum, 1);
1138 assert_eq!(r2.seqnum, 2);
1139 assert_eq!(r3.seqnum, 3);
1140 }
1141
1142 #[tokio::test]
1143 async fn should_start_durable_subscriber_at_zero() {
1144 let storage = InMemoryStorage::new();
1145 let rx = storage.subscribe_durable();
1146 assert_eq!(*rx.borrow(), 0);
1147 }
1148
1149 #[tokio::test]
1150 async fn should_advance_durable_watermark_on_each_write() {
1151 let storage = InMemoryStorage::new();
1152 let rx = storage.subscribe_durable();
1153
1154 storage
1155 .put(vec![
1156 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1157 ])
1158 .await
1159 .unwrap();
1160 assert_eq!(*rx.borrow(), 1);
1161
1162 storage
1163 .put(vec![
1164 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1165 ])
1166 .await
1167 .unwrap();
1168 assert_eq!(*rx.borrow(), 2);
1169 }
1170
1171 #[tokio::test]
1172 async fn should_merge_empty_values() {
1173 let merge_op = Arc::new(AppendMergeOperator);
1175 let storage = InMemoryStorage::with_merge_operator(merge_op);
1176 let key = Bytes::from("key1");
1177
1178 storage
1180 .merge(vec![Record::empty(key.clone()).into()])
1181 .await
1182 .unwrap();
1183 let result = storage.get(key).await.unwrap();
1184
1185 assert!(result.is_some());
1187 assert_eq!(result.unwrap().value, Bytes::new());
1188 }
1189
1190 #[tokio::test]
1191 async fn should_not_advance_durable_watermark_when_deferred() {
1192 let storage = InMemoryStorage::new().with_deferred_durability();
1193 let rx = storage.subscribe_durable();
1194
1195 storage
1196 .put(vec![
1197 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1198 ])
1199 .await
1200 .unwrap();
1201 assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1202
1203 storage
1204 .put(vec![
1205 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1206 ])
1207 .await
1208 .unwrap();
1209 assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1210 }
1211
1212 #[tokio::test]
1213 async fn should_advance_durable_watermark_on_flush_when_deferred() {
1214 let storage = InMemoryStorage::new().with_deferred_durability();
1215 let rx = storage.subscribe_durable();
1216
1217 storage
1218 .put(vec![
1219 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1220 ])
1221 .await
1222 .unwrap();
1223 storage
1224 .put(vec![
1225 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1226 ])
1227 .await
1228 .unwrap();
1229 assert_eq!(*rx.borrow(), 0);
1230
1231 storage.flush().await.unwrap();
1232 assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1233 }
1234
1235 #[tokio::test]
1236 async fn should_advance_durable_watermark_to_specific_seq() {
1237 let storage = InMemoryStorage::new().with_deferred_durability();
1238 let rx = storage.subscribe_durable();
1239
1240 for i in 1..=3 {
1242 storage
1243 .put(vec![
1244 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1245 ])
1246 .await
1247 .unwrap();
1248 }
1249 assert_eq!(*rx.borrow(), 0);
1250
1251 storage.flush_to(2);
1253 assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1254
1255 storage.flush_to(3);
1257 assert_eq!(*rx.borrow(), 3);
1258 }
1259
1260 #[tokio::test]
1261 #[should_panic(expected = "cannot move durable seqnum backwards")]
1262 async fn should_panic_when_flush_to_moves_backwards() {
1263 let storage = InMemoryStorage::new().with_deferred_durability();
1264 for i in 1..=3 {
1265 storage
1266 .put(vec![
1267 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1268 ])
1269 .await
1270 .unwrap();
1271 }
1272
1273 storage.flush_to(2);
1274 storage.flush_to(1);
1275 }
1276
1277 #[tokio::test]
1278 #[should_panic(expected = "cannot flush beyond written seqnum")]
1279 async fn should_panic_when_flush_to_exceeds_written() {
1280 let storage = InMemoryStorage::new().with_deferred_durability();
1281 storage
1282 .put(vec![
1283 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1284 ])
1285 .await
1286 .unwrap();
1287
1288 storage.flush_to(5);
1289 }
1290
1291 #[tokio::test]
1292 async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1293 let storage = InMemoryStorage::new().with_deferred_durability();
1294
1295 storage
1296 .put(vec![
1297 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1298 ])
1299 .await
1300 .unwrap();
1301
1302 let snapshot = storage.snapshot().await.unwrap();
1304 let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1305 assert!(result.is_some());
1306 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1307
1308 let rx = storage.subscribe_durable();
1310 assert_eq!(*rx.borrow(), 0);
1311 }
1312
1313 #[tokio::test]
1314 async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1315 let storage = InMemoryStorage::new().with_deferred_durability();
1316 let rx = storage.subscribe_durable();
1317
1318 storage
1319 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1320 Bytes::from("k1"),
1321 Bytes::from("v1"),
1322 )))])
1323 .await
1324 .unwrap();
1325 assert_eq!(*rx.borrow(), 0);
1326
1327 storage
1328 .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1329 .await
1330 .unwrap();
1331 assert_eq!(*rx.borrow(), 0);
1332
1333 storage.flush().await.unwrap();
1334 assert_eq!(*rx.borrow(), 2);
1335 }
1336
1337 #[tokio::test]
1338 async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1339 let merge_op = Arc::new(AppendMergeOperator);
1340 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1341 let rx = storage.subscribe_durable();
1342
1343 storage
1344 .merge(vec![
1345 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1346 ])
1347 .await
1348 .unwrap();
1349 assert_eq!(*rx.borrow(), 0);
1350
1351 storage
1352 .merge(vec![
1353 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1354 ])
1355 .await
1356 .unwrap();
1357 assert_eq!(*rx.borrow(), 0);
1358
1359 storage.flush().await.unwrap();
1360 assert_eq!(*rx.borrow(), 2);
1361 }
1362
1363 #[tokio::test]
1364 async fn should_support_multiple_flush_cycles_when_deferred() {
1365 let storage = InMemoryStorage::new().with_deferred_durability();
1366 let rx = storage.subscribe_durable();
1367
1368 storage
1370 .put(vec![
1371 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1372 ])
1373 .await
1374 .unwrap();
1375 storage.flush().await.unwrap();
1376 assert_eq!(*rx.borrow(), 1);
1377
1378 storage
1380 .put(vec![
1381 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1382 ])
1383 .await
1384 .unwrap();
1385 storage
1386 .put(vec![
1387 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1388 ])
1389 .await
1390 .unwrap();
1391 assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1392
1393 storage.flush().await.unwrap();
1394 assert_eq!(*rx.borrow(), 3);
1395 }
1396
1397 #[tokio::test]
1398 async fn should_flush_on_empty_storage_when_deferred() {
1399 let storage = InMemoryStorage::new().with_deferred_durability();
1400 let rx = storage.subscribe_durable();
1401
1402 storage.flush().await.unwrap();
1404 assert_eq!(*rx.borrow(), 0);
1405
1406 storage.flush_to(0);
1408 assert_eq!(*rx.borrow(), 0);
1409 }
1410
1411 #[tokio::test]
1412 async fn should_defer_durability_across_mixed_write_methods() {
1413 let merge_op = Arc::new(AppendMergeOperator);
1414 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1415 let rx = storage.subscribe_durable();
1416
1417 storage
1419 .put(vec![
1420 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1421 ])
1422 .await
1423 .unwrap();
1424 storage
1425 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1426 Bytes::from("k2"),
1427 Bytes::from("v2"),
1428 )))])
1429 .await
1430 .unwrap();
1431 storage
1432 .merge(vec![
1433 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1434 ])
1435 .await
1436 .unwrap();
1437 assert_eq!(*rx.borrow(), 0);
1438
1439 storage.flush_to(2);
1441 assert_eq!(*rx.borrow(), 2);
1442
1443 storage.flush().await.unwrap();
1445 assert_eq!(*rx.borrow(), 3);
1446 }
1447
1448 #[tokio::test]
1449 async fn should_read_data_written_before_flush_when_deferred() {
1450 let storage = InMemoryStorage::new().with_deferred_durability();
1451
1452 storage
1453 .put(vec![
1454 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1455 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1456 ])
1457 .await
1458 .unwrap();
1459
1460 let result = storage.get(Bytes::from("k1")).await.unwrap();
1462 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1463
1464 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1465 assert_eq!(scanned.len(), 2);
1466 }
1467}