1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9 MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl};
13
14pub trait Clock: Send + Sync {
16 fn now(&self) -> i64;
18}
19
20pub struct WallClock;
22
23impl Clock for WallClock {
24 fn now(&self) -> i64 {
25 std::time::SystemTime::now()
26 .duration_since(std::time::UNIX_EPOCH)
27 .expect("system time before Unix epoch")
28 .as_millis() as i64
29 }
30}
31
32#[derive(Clone, Debug)]
34struct StoredValue {
35 value: Bytes,
36 expire_ts: Option<i64>,
38}
39
40impl StoredValue {
41 fn is_expired(&self, now: i64) -> bool {
42 self.expire_ts.is_some_and(|ts| now >= ts)
43 }
44}
45
46fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
48 let duration = match ttl {
49 Ttl::Default => default_ttl,
50 Ttl::NoExpiry => None,
51 Ttl::ExpireAfter(ms) => Some(ms),
52 };
53 duration.map(|ms| now + ms as i64)
54}
55
56pub struct InMemoryStorage {
62 data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
63 merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
64 clock: Arc<dyn Clock>,
65 default_ttl: Option<u64>,
66 written_seq: std::sync::atomic::AtomicU64,
67 durable_seq: std::sync::atomic::AtomicU64,
68 durable_tx: tokio::sync::watch::Sender<u64>,
69 defer_durability: bool,
70}
71
72impl InMemoryStorage {
73 pub fn new() -> Self {
75 let (durable_tx, _) = tokio::sync::watch::channel(0);
76 Self {
77 data: Arc::new(RwLock::new(BTreeMap::new())),
78 merge_operator: None,
79 clock: Arc::new(WallClock),
80 default_ttl: None,
81 written_seq: std::sync::atomic::AtomicU64::new(0),
82 durable_seq: std::sync::atomic::AtomicU64::new(0),
83 durable_tx,
84 defer_durability: false,
85 }
86 }
87
88 pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
94 let (durable_tx, _) = tokio::sync::watch::channel(0);
95 Self {
96 data: Arc::new(RwLock::new(BTreeMap::new())),
97 merge_operator: Some(merge_operator),
98 clock: Arc::new(WallClock),
99 default_ttl: None,
100 written_seq: std::sync::atomic::AtomicU64::new(0),
101 durable_seq: std::sync::atomic::AtomicU64::new(0),
102 durable_tx,
103 defer_durability: false,
104 }
105 }
106
107 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
109 self.clock = clock;
110 self
111 }
112
113 pub fn with_default_ttl(mut self, ttl: u64) -> Self {
115 self.default_ttl = Some(ttl);
116 self
117 }
118
119 pub fn with_deferred_durability(mut self) -> Self {
125 self.defer_durability = true;
126 self
127 }
128
129 pub fn flush_to(&self, seq: u64) {
136 let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
137 assert!(
138 seq <= written,
139 "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
140 );
141 let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
142 assert!(
143 seq >= durable,
144 "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
145 );
146 self.durable_seq
147 .store(seq, std::sync::atomic::Ordering::Relaxed);
148 let _ = self.durable_tx.send(seq);
149 }
150
151 fn next_seqnum(&self) -> u64 {
157 let seq = self
158 .written_seq
159 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
160 + 1;
161 if !self.defer_durability {
162 self.durable_seq
163 .store(seq, std::sync::atomic::Ordering::Relaxed);
164 let _ = self.durable_tx.send(seq);
165 }
166 seq
167 }
168}
169
170impl Default for InMemoryStorage {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176#[async_trait]
177impl StorageRead for InMemoryStorage {
178 #[tracing::instrument(level = "trace", skip_all)]
182 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
183 let data = self
184 .data
185 .read()
186 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
187
188 match data.get(&key) {
189 Some(stored) if !stored.is_expired(self.clock.now()) => {
190 Ok(Some(Record::new(key, stored.value.clone())))
191 }
192 _ => Ok(None),
193 }
194 }
195
196 #[tracing::instrument(level = "trace", skip_all)]
197 async fn scan_iter(
198 &self,
199 range: BytesRange,
200 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
201 let data = self
202 .data
203 .read()
204 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
205
206 let now = self.clock.now();
207 let records: Vec<Record> = data
208 .range((range.start_bound().cloned(), range.end_bound().cloned()))
209 .filter(|(_, stored)| !stored.is_expired(now))
210 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
211 .collect();
212
213 Ok(Box::new(InMemoryIterator { records, index: 0 }))
214 }
215}
216
217struct InMemoryIterator {
218 records: Vec<Record>,
219 index: usize,
220}
221
222#[async_trait]
223impl StorageIterator for InMemoryIterator {
224 #[tracing::instrument(level = "trace", skip_all)]
225 async fn next(&mut self) -> StorageResult<Option<Record>> {
226 if self.index >= self.records.len() {
227 Ok(None)
228 } else {
229 let record = self.records[self.index].clone();
230 self.index += 1;
231 Ok(Some(record))
232 }
233 }
234}
235
236pub struct InMemoryStorageSnapshot {
241 data: Arc<BTreeMap<Bytes, StoredValue>>,
242 clock: Arc<dyn Clock>,
243}
244
245#[async_trait]
246impl StorageRead for InMemoryStorageSnapshot {
247 #[tracing::instrument(level = "trace", skip_all)]
248 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
249 match self.data.get(&key) {
250 Some(stored) if !stored.is_expired(self.clock.now()) => {
251 Ok(Some(Record::new(key, stored.value.clone())))
252 }
253 _ => Ok(None),
254 }
255 }
256
257 #[tracing::instrument(level = "trace", skip_all)]
258 async fn scan_iter(
259 &self,
260 range: BytesRange,
261 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
262 let now = self.clock.now();
263 let records: Vec<Record> = self
264 .data
265 .range((range.start_bound().cloned(), range.end_bound().cloned()))
266 .filter(|(_, stored)| !stored.is_expired(now))
267 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
268 .collect();
269
270 Ok(Box::new(InMemoryIterator { records, index: 0 }))
271 }
272}
273
274#[async_trait]
275impl StorageSnapshot for InMemoryStorageSnapshot {}
276
277#[async_trait]
278impl Storage for InMemoryStorage {
279 async fn apply_with_options(
280 &self,
281 records: Vec<RecordOp>,
282 _options: WriteOptions,
283 ) -> StorageResult<WriteResult> {
284 let mut data = self
285 .data
286 .write()
287 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
288
289 let now = self.clock.now();
290 for record in records {
291 match record {
292 RecordOp::Put(op) => {
293 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
294 data.insert(
295 op.record.key,
296 StoredValue {
297 value: op.record.value,
298 expire_ts,
299 },
300 );
301 }
302 RecordOp::Merge(op) => {
303 let existing_value = data
304 .get(&op.record.key)
305 .filter(|s| !s.is_expired(now))
306 .map(|s| s.value.clone());
307 let merged_value = self.merge_operator.as_ref().unwrap().merge_batch(
308 &op.record.key,
309 existing_value,
310 &[op.record.value],
311 );
312 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
313 data.insert(
314 op.record.key,
315 StoredValue {
316 value: merged_value,
317 expire_ts,
318 },
319 );
320 }
321 RecordOp::Delete(key) => {
322 data.remove(&key);
323 }
324 }
325 }
326
327 Ok(WriteResult {
328 seqnum: self.next_seqnum(),
329 })
330 }
331
332 async fn put_with_options(
338 &self,
339 records: Vec<PutRecordOp>,
340 _options: WriteOptions,
341 ) -> StorageResult<WriteResult> {
342 let mut data = self
343 .data
344 .write()
345 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
346
347 let now = self.clock.now();
348 for op in records {
349 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
350 data.insert(
351 op.record.key,
352 StoredValue {
353 value: op.record.value,
354 expire_ts,
355 },
356 );
357 }
358
359 Ok(WriteResult {
360 seqnum: self.next_seqnum(),
361 })
362 }
363
364 async fn merge_with_options(
375 &self,
376 records: Vec<MergeRecordOp>,
377 _options: WriteOptions,
378 ) -> StorageResult<WriteResult> {
379 let merge_op = self
380 .merge_operator
381 .as_ref()
382 .ok_or_else(|| {
383 StorageError::Storage(
384 "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
385 )
386 })?;
387
388 let mut data = self
389 .data
390 .write()
391 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
392
393 let now = self.clock.now();
394 for op in records {
395 let existing_value = data
396 .get(&op.record.key)
397 .filter(|s| !s.is_expired(now))
398 .map(|s| s.value.clone());
399 let merged_value =
400 merge_op.merge_batch(&op.record.key, existing_value, &[op.record.value]);
401 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
402 data.insert(
403 op.record.key,
404 StoredValue {
405 value: merged_value,
406 expire_ts,
407 },
408 );
409 }
410
411 Ok(WriteResult {
412 seqnum: self.next_seqnum(),
413 })
414 }
415
416 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
422 let data = self
423 .data
424 .read()
425 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
426
427 let snapshot_data = Arc::new(data.clone());
428
429 Ok(Arc::new(InMemoryStorageSnapshot {
430 data: snapshot_data,
431 clock: self.clock.clone(),
432 }))
433 }
434
435 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
436 self.durable_tx.subscribe()
437 }
438
439 async fn flush(&self) -> StorageResult<()> {
440 if self.defer_durability {
441 self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
442 }
443 Ok(())
444 }
445
446 async fn close(&self) -> StorageResult<()> {
447 Ok(())
449 }
450}
451
452#[cfg(feature = "test-utils")]
454#[derive(Clone)]
455enum Failure {
456 Once(super::StorageError),
458 Persistent(super::StorageError),
460}
461
462#[cfg(feature = "test-utils")]
463type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
464
465#[cfg(feature = "test-utils")]
471fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
472 let guard = slot.load();
473 match guard.as_ref() {
474 None => Ok(()),
475 Some(Failure::Persistent(err)) => Err(err.clone()),
476 Some(Failure::Once(_)) => {
477 let prev = slot.swap(Arc::new(None));
480 match prev.as_ref() {
481 Some(Failure::Once(err)) => Err(err.clone()),
482 _ => Ok(()),
483 }
484 }
485 }
486}
487
488#[cfg(feature = "test-utils")]
512pub struct FailingStorage {
513 inner: Arc<dyn super::Storage>,
514 fail_apply: FailSlot,
515 fail_put: FailSlot,
516 fail_flush: FailSlot,
517 fail_snapshot: FailSlot,
518}
519
520#[cfg(feature = "test-utils")]
521impl FailingStorage {
522 pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
524 Arc::new(Self {
525 inner,
526 fail_apply: arc_swap::ArcSwap::from_pointee(None),
527 fail_put: arc_swap::ArcSwap::from_pointee(None),
528 fail_flush: arc_swap::ArcSwap::from_pointee(None),
529 fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
530 })
531 }
532
533 pub fn fail_apply(&self, err: super::StorageError) {
535 self.fail_apply
536 .store(Arc::new(Some(Failure::Persistent(err))));
537 }
538
539 pub fn fail_apply_once(&self, err: super::StorageError) {
541 self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
542 }
543
544 pub fn fail_put(&self, err: super::StorageError) {
546 self.fail_put
547 .store(Arc::new(Some(Failure::Persistent(err))));
548 }
549
550 pub fn fail_put_once(&self, err: super::StorageError) {
555 self.fail_put.store(Arc::new(Some(Failure::Once(err))));
556 }
557
558 pub fn fail_flush(&self, err: super::StorageError) {
560 self.fail_flush
561 .store(Arc::new(Some(Failure::Persistent(err))));
562 }
563
564 pub fn fail_flush_once(&self, err: super::StorageError) {
566 self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
567 }
568
569 pub fn fail_snapshot(&self, err: super::StorageError) {
571 self.fail_snapshot
572 .store(Arc::new(Some(Failure::Persistent(err))));
573 }
574
575 pub fn fail_snapshot_once(&self, err: super::StorageError) {
577 self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
578 }
579}
580
581#[cfg(feature = "test-utils")]
582#[async_trait]
583impl super::StorageRead for FailingStorage {
584 async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
585 self.inner.get(key).await
586 }
587
588 async fn scan_iter(
589 &self,
590 range: crate::BytesRange,
591 ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
592 self.inner.scan_iter(range).await
593 }
594}
595
596#[cfg(feature = "test-utils")]
597#[async_trait]
598impl super::Storage for FailingStorage {
599 async fn apply_with_options(
600 &self,
601 ops: Vec<super::RecordOp>,
602 options: super::WriteOptions,
603 ) -> super::StorageResult<super::WriteResult> {
604 check_failure(&self.fail_apply)?;
605 self.inner.apply_with_options(ops, options).await
606 }
607
608 async fn put_with_options(
609 &self,
610 records: Vec<super::PutRecordOp>,
611 options: super::WriteOptions,
612 ) -> super::StorageResult<super::WriteResult> {
613 check_failure(&self.fail_put)?;
614 self.inner.put_with_options(records, options).await
615 }
616
617 async fn merge_with_options(
618 &self,
619 records: Vec<super::MergeRecordOp>,
620 options: super::WriteOptions,
621 ) -> super::StorageResult<super::WriteResult> {
622 self.inner.merge_with_options(records, options).await
623 }
624
625 async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
626 check_failure(&self.fail_snapshot)?;
627 self.inner.snapshot().await
628 }
629
630 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
631 self.inner.subscribe_durable()
632 }
633
634 async fn flush(&self) -> super::StorageResult<()> {
635 check_failure(&self.fail_flush)?;
636 self.inner.flush().await
637 }
638
639 async fn close(&self) -> super::StorageResult<()> {
640 self.inner.close().await
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use bytes::BytesMut;
648 use std::ops::Bound;
649
650 struct AppendMergeOperator;
652
653 impl MergeOperator for AppendMergeOperator {
654 fn merge_batch(
655 &self,
656 _key: &Bytes,
657 existing_value: Option<Bytes>,
658 operands: &[Bytes],
659 ) -> Bytes {
660 operands
661 .iter()
662 .fold(existing_value.unwrap_or_default(), |acc, operand| {
663 let mut result = BytesMut::from(acc);
664 if !result.is_empty() {
665 result.extend_from_slice(b",");
666 }
667 result.extend_from_slice(operand);
668 result.freeze()
669 })
670 }
671 }
672
673 #[tokio::test]
674 async fn should_return_none_when_key_not_found() {
675 let storage = InMemoryStorage::new();
677
678 let result = storage.get(Bytes::from("missing_key")).await;
680
681 assert!(result.is_ok());
683 assert!(result.unwrap().is_none());
684 }
685
686 #[tokio::test]
687 async fn should_store_and_retrieve_record() {
688 let storage = InMemoryStorage::new();
690 let key = Bytes::from("test_key");
691 let value = Bytes::from("test_value");
692
693 storage
695 .put(vec![Record::new(key.clone(), value.clone()).into()])
696 .await
697 .unwrap();
698 let result = storage.get(key).await.unwrap();
699
700 assert!(result.is_some());
702 let record = result.unwrap();
703 assert_eq!(record.key, Bytes::from("test_key"));
704 assert_eq!(record.value, value);
705 }
706
707 #[tokio::test]
708 async fn should_overwrite_existing_key() {
709 let storage = InMemoryStorage::new();
711 let key = Bytes::from("test_key");
712 let initial_value = Bytes::from("initial_value");
713 let updated_value = Bytes::from("updated_value");
714
715 storage
717 .put(vec![Record::new(key.clone(), initial_value).into()])
718 .await
719 .unwrap();
720 storage
721 .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
722 .await
723 .unwrap();
724 let result = storage.get(key).await.unwrap();
725
726 assert!(result.is_some());
728 assert_eq!(result.unwrap().value, updated_value);
729 }
730
731 #[tokio::test]
732 async fn should_store_multiple_records() {
733 let storage = InMemoryStorage::new();
735 let records = vec![
736 Record::new(Bytes::from("key1"), Bytes::from("value1")),
737 Record::new(Bytes::from("key2"), Bytes::from("value2")),
738 Record::new(Bytes::from("key3"), Bytes::from("value3")),
739 ];
740
741 storage
743 .put(records.iter().cloned().map(PutRecordOp::new).collect())
744 .await
745 .unwrap();
746
747 for record in records {
749 let retrieved = storage.get(record.key.clone()).await.unwrap();
750 assert!(retrieved.is_some());
751 assert_eq!(retrieved.unwrap().value, record.value);
752 }
753 }
754
755 #[tokio::test]
756 async fn should_scan_all_records_when_unbounded() {
757 let storage = InMemoryStorage::new();
759 let records = [
760 Record::new(Bytes::from("a"), Bytes::from("value_a")),
761 Record::new(Bytes::from("b"), Bytes::from("value_b")),
762 Record::new(Bytes::from("c"), Bytes::from("value_c")),
763 ];
764 storage
765 .put(records.iter().cloned().map(PutRecordOp::new).collect())
766 .await
767 .unwrap();
768
769 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
771
772 assert_eq!(scanned.len(), 3);
774 assert_eq!(scanned[0].key, Bytes::from("a"));
775 assert_eq!(scanned[1].key, Bytes::from("b"));
776 assert_eq!(scanned[2].key, Bytes::from("c"));
777 }
778
779 #[tokio::test]
780 async fn should_scan_records_with_prefix() {
781 let storage = InMemoryStorage::new();
783 let records = vec![
784 Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
785 Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
786 Record::new(Bytes::from("other_c"), Bytes::from("value3")),
787 ];
788 storage
789 .put(records.into_iter().map(PutRecordOp::new).collect())
790 .await
791 .unwrap();
792
793 let scanned = storage
795 .scan(BytesRange::prefix(Bytes::from("prefix_")))
796 .await
797 .unwrap();
798
799 assert_eq!(scanned.len(), 2);
801 assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
802 assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
803 }
804
805 #[tokio::test]
806 async fn should_scan_records_in_bounded_range() {
807 let storage = InMemoryStorage::new();
809 let records = vec![
810 Record::new(Bytes::from("a"), Bytes::from("value_a")),
811 Record::new(Bytes::from("b"), Bytes::from("value_b")),
812 Record::new(Bytes::from("c"), Bytes::from("value_c")),
813 Record::new(Bytes::from("d"), Bytes::from("value_d")),
814 ];
815 storage
816 .put(records.into_iter().map(PutRecordOp::new).collect())
817 .await
818 .unwrap();
819
820 let range = BytesRange::new(
822 Bound::Included(Bytes::from("b")),
823 Bound::Excluded(Bytes::from("d")),
824 );
825 let scanned = storage.scan(range).await.unwrap();
826
827 assert_eq!(scanned.len(), 2);
829 assert_eq!(scanned[0].key, Bytes::from("b"));
830 assert_eq!(scanned[1].key, Bytes::from("c"));
831 }
832
833 #[tokio::test]
834 async fn should_return_empty_vec_when_scanning_empty_storage() {
835 let storage = InMemoryStorage::new();
837
838 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
840
841 assert!(scanned.is_empty());
843 }
844
845 #[tokio::test]
846 async fn should_iterate_over_records() {
847 let storage = InMemoryStorage::new();
849 let records = vec![
850 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
851 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
852 ];
853 storage.put(records).await.unwrap();
854
855 let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
857 let first = iter.next().await.unwrap();
858 let second = iter.next().await.unwrap();
859 let third = iter.next().await.unwrap();
860
861 assert!(first.is_some());
863 assert_eq!(first.unwrap().key, Bytes::from("key1"));
864 assert!(second.is_some());
865 assert_eq!(second.unwrap().key, Bytes::from("key2"));
866 assert!(third.is_none());
867 }
868
869 #[tokio::test]
870 async fn should_create_snapshot_with_current_data() {
871 let storage = InMemoryStorage::new();
873 storage
874 .put(vec![
875 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
876 ])
877 .await
878 .unwrap();
879
880 let snapshot = storage.snapshot().await.unwrap();
882
883 let result = snapshot.get(Bytes::from("key1")).await.unwrap();
885 assert!(result.is_some());
886 assert_eq!(result.unwrap().value, Bytes::from("value1"));
887 }
888
889 #[tokio::test]
890 async fn should_not_see_writes_after_snapshot() {
891 let storage = InMemoryStorage::new();
893 storage
894 .put(vec![
895 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
896 ])
897 .await
898 .unwrap();
899
900 let snapshot = storage.snapshot().await.unwrap();
902 storage
903 .put(vec![
904 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
905 ])
906 .await
907 .unwrap();
908
909 let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
911 assert!(snapshot_result.is_none());
912
913 let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
914 assert!(storage_result.is_some());
915 }
916
917 #[tokio::test]
918 async fn should_scan_snapshot_independently() {
919 let storage = InMemoryStorage::new();
921 storage
922 .put(vec![
923 Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
924 ])
925 .await
926 .unwrap();
927
928 let snapshot = storage.snapshot().await.unwrap();
930 storage
931 .put(vec![
932 Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
933 ])
934 .await
935 .unwrap();
936
937 let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
939 assert_eq!(snapshot_records.len(), 1);
940 assert_eq!(snapshot_records[0].key, Bytes::from("a"));
941
942 let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
943 assert_eq!(storage_records.len(), 2);
944 }
945
946 #[tokio::test]
947 async fn should_handle_empty_record() {
948 let storage = InMemoryStorage::new();
950 let key = Bytes::from("empty_key");
951
952 storage
954 .put(vec![Record::empty(key.clone()).into()])
955 .await
956 .unwrap();
957 let result = storage.get(key).await.unwrap();
958
959 assert!(result.is_some());
961 assert_eq!(result.unwrap().value, Bytes::new());
962 }
963
964 #[tokio::test]
965 async fn should_return_error_when_merge_operator_not_configured() {
966 let storage = InMemoryStorage::new();
968 let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
969
970 let result = storage.merge(vec![record.into()]).await;
972
973 assert!(result.is_err());
975 assert!(
976 result
977 .unwrap_err()
978 .to_string()
979 .contains("Merge operator not configured")
980 );
981 }
982
983 #[tokio::test]
984 async fn should_merge_when_key_does_not_exist() {
985 let merge_op = Arc::new(AppendMergeOperator);
987 let storage = InMemoryStorage::with_merge_operator(merge_op);
988 let key = Bytes::from("new_key");
989 let value = Bytes::from("value1");
990
991 storage
993 .merge(vec![Record::new(key.clone(), value.clone()).into()])
994 .await
995 .unwrap();
996 let result = storage.get(key).await.unwrap();
997
998 assert!(result.is_some());
1000 assert_eq!(result.unwrap().value, value);
1001 }
1002
1003 #[tokio::test]
1004 async fn should_merge_when_key_exists() {
1005 let merge_op = Arc::new(AppendMergeOperator);
1007 let storage = InMemoryStorage::with_merge_operator(merge_op);
1008 let key = Bytes::from("key1");
1009 let initial_value = Bytes::from("value1");
1010 let new_value = Bytes::from("value2");
1011
1012 storage
1013 .put(vec![Record::new(key.clone(), initial_value).into()])
1014 .await
1015 .unwrap();
1016
1017 storage
1019 .merge(vec![Record::new(key.clone(), new_value).into()])
1020 .await
1021 .unwrap();
1022 let result = storage.get(key).await.unwrap();
1023
1024 assert!(result.is_some());
1026 assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1027 }
1028
1029 #[tokio::test]
1030 async fn should_merge_multiple_keys() {
1031 let merge_op = Arc::new(AppendMergeOperator);
1033 let storage = InMemoryStorage::with_merge_operator(merge_op);
1034 let records = vec![
1035 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1036 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1037 ];
1038 storage.put(records).await.unwrap();
1039
1040 storage
1042 .merge(vec![
1043 Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1044 Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1045 ])
1046 .await
1047 .unwrap();
1048
1049 let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1051 assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1052
1053 let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1054 assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1055 }
1056
1057 #[tokio::test]
1058 async fn should_return_monotonically_increasing_seqnums_from_put() {
1059 let storage = InMemoryStorage::new();
1060
1061 let r1 = storage
1062 .put(vec![
1063 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1064 ])
1065 .await
1066 .unwrap();
1067 let r2 = storage
1068 .put(vec![
1069 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1070 ])
1071 .await
1072 .unwrap();
1073 let r3 = storage
1074 .put(vec![
1075 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1076 ])
1077 .await
1078 .unwrap();
1079
1080 assert_eq!(r1.seqnum, 1);
1081 assert_eq!(r2.seqnum, 2);
1082 assert_eq!(r3.seqnum, 3);
1083 }
1084
1085 #[tokio::test]
1086 async fn should_return_monotonically_increasing_seqnums_from_apply() {
1087 let storage = InMemoryStorage::new();
1088
1089 let r1 = storage
1090 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1091 Bytes::from("k1"),
1092 Bytes::from("v1"),
1093 )))])
1094 .await
1095 .unwrap();
1096 let r2 = storage
1097 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1098 Bytes::from("k2"),
1099 Bytes::from("v2"),
1100 )))])
1101 .await
1102 .unwrap();
1103
1104 assert_eq!(r1.seqnum, 1);
1105 assert_eq!(r2.seqnum, 2);
1106 }
1107
1108 #[tokio::test]
1109 async fn should_share_seqnum_counter_across_write_methods() {
1110 let merge_op = Arc::new(AppendMergeOperator);
1111 let storage = InMemoryStorage::with_merge_operator(merge_op);
1112
1113 let r1 = storage
1114 .put(vec![
1115 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1116 ])
1117 .await
1118 .unwrap();
1119 let r2 = storage
1120 .merge(vec![
1121 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1122 ])
1123 .await
1124 .unwrap();
1125 let r3 = storage
1126 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1127 Bytes::from("k2"),
1128 Bytes::from("v3"),
1129 )))])
1130 .await
1131 .unwrap();
1132
1133 assert_eq!(r1.seqnum, 1);
1134 assert_eq!(r2.seqnum, 2);
1135 assert_eq!(r3.seqnum, 3);
1136 }
1137
1138 #[tokio::test]
1139 async fn should_start_durable_subscriber_at_zero() {
1140 let storage = InMemoryStorage::new();
1141 let rx = storage.subscribe_durable();
1142 assert_eq!(*rx.borrow(), 0);
1143 }
1144
1145 #[tokio::test]
1146 async fn should_advance_durable_watermark_on_each_write() {
1147 let storage = InMemoryStorage::new();
1148 let rx = storage.subscribe_durable();
1149
1150 storage
1151 .put(vec![
1152 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1153 ])
1154 .await
1155 .unwrap();
1156 assert_eq!(*rx.borrow(), 1);
1157
1158 storage
1159 .put(vec![
1160 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1161 ])
1162 .await
1163 .unwrap();
1164 assert_eq!(*rx.borrow(), 2);
1165 }
1166
1167 #[tokio::test]
1168 async fn should_merge_empty_values() {
1169 let merge_op = Arc::new(AppendMergeOperator);
1171 let storage = InMemoryStorage::with_merge_operator(merge_op);
1172 let key = Bytes::from("key1");
1173
1174 storage
1176 .merge(vec![Record::empty(key.clone()).into()])
1177 .await
1178 .unwrap();
1179 let result = storage.get(key).await.unwrap();
1180
1181 assert!(result.is_some());
1183 assert_eq!(result.unwrap().value, Bytes::new());
1184 }
1185
1186 #[tokio::test]
1187 async fn should_not_advance_durable_watermark_when_deferred() {
1188 let storage = InMemoryStorage::new().with_deferred_durability();
1189 let rx = storage.subscribe_durable();
1190
1191 storage
1192 .put(vec![
1193 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1194 ])
1195 .await
1196 .unwrap();
1197 assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1198
1199 storage
1200 .put(vec![
1201 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1202 ])
1203 .await
1204 .unwrap();
1205 assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1206 }
1207
1208 #[tokio::test]
1209 async fn should_advance_durable_watermark_on_flush_when_deferred() {
1210 let storage = InMemoryStorage::new().with_deferred_durability();
1211 let rx = storage.subscribe_durable();
1212
1213 storage
1214 .put(vec![
1215 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1216 ])
1217 .await
1218 .unwrap();
1219 storage
1220 .put(vec![
1221 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1222 ])
1223 .await
1224 .unwrap();
1225 assert_eq!(*rx.borrow(), 0);
1226
1227 storage.flush().await.unwrap();
1228 assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1229 }
1230
1231 #[tokio::test]
1232 async fn should_advance_durable_watermark_to_specific_seq() {
1233 let storage = InMemoryStorage::new().with_deferred_durability();
1234 let rx = storage.subscribe_durable();
1235
1236 for i in 1..=3 {
1238 storage
1239 .put(vec![
1240 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1241 ])
1242 .await
1243 .unwrap();
1244 }
1245 assert_eq!(*rx.borrow(), 0);
1246
1247 storage.flush_to(2);
1249 assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1250
1251 storage.flush_to(3);
1253 assert_eq!(*rx.borrow(), 3);
1254 }
1255
1256 #[tokio::test]
1257 #[should_panic(expected = "cannot move durable seqnum backwards")]
1258 async fn should_panic_when_flush_to_moves_backwards() {
1259 let storage = InMemoryStorage::new().with_deferred_durability();
1260 for i in 1..=3 {
1261 storage
1262 .put(vec![
1263 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1264 ])
1265 .await
1266 .unwrap();
1267 }
1268
1269 storage.flush_to(2);
1270 storage.flush_to(1);
1271 }
1272
1273 #[tokio::test]
1274 #[should_panic(expected = "cannot flush beyond written seqnum")]
1275 async fn should_panic_when_flush_to_exceeds_written() {
1276 let storage = InMemoryStorage::new().with_deferred_durability();
1277 storage
1278 .put(vec![
1279 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1280 ])
1281 .await
1282 .unwrap();
1283
1284 storage.flush_to(5);
1285 }
1286
1287 #[tokio::test]
1288 async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1289 let storage = InMemoryStorage::new().with_deferred_durability();
1290
1291 storage
1292 .put(vec![
1293 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1294 ])
1295 .await
1296 .unwrap();
1297
1298 let snapshot = storage.snapshot().await.unwrap();
1300 let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1301 assert!(result.is_some());
1302 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1303
1304 let rx = storage.subscribe_durable();
1306 assert_eq!(*rx.borrow(), 0);
1307 }
1308
1309 #[tokio::test]
1310 async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1311 let storage = InMemoryStorage::new().with_deferred_durability();
1312 let rx = storage.subscribe_durable();
1313
1314 storage
1315 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1316 Bytes::from("k1"),
1317 Bytes::from("v1"),
1318 )))])
1319 .await
1320 .unwrap();
1321 assert_eq!(*rx.borrow(), 0);
1322
1323 storage
1324 .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1325 .await
1326 .unwrap();
1327 assert_eq!(*rx.borrow(), 0);
1328
1329 storage.flush().await.unwrap();
1330 assert_eq!(*rx.borrow(), 2);
1331 }
1332
1333 #[tokio::test]
1334 async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1335 let merge_op = Arc::new(AppendMergeOperator);
1336 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1337 let rx = storage.subscribe_durable();
1338
1339 storage
1340 .merge(vec![
1341 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1342 ])
1343 .await
1344 .unwrap();
1345 assert_eq!(*rx.borrow(), 0);
1346
1347 storage
1348 .merge(vec![
1349 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1350 ])
1351 .await
1352 .unwrap();
1353 assert_eq!(*rx.borrow(), 0);
1354
1355 storage.flush().await.unwrap();
1356 assert_eq!(*rx.borrow(), 2);
1357 }
1358
1359 #[tokio::test]
1360 async fn should_support_multiple_flush_cycles_when_deferred() {
1361 let storage = InMemoryStorage::new().with_deferred_durability();
1362 let rx = storage.subscribe_durable();
1363
1364 storage
1366 .put(vec![
1367 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1368 ])
1369 .await
1370 .unwrap();
1371 storage.flush().await.unwrap();
1372 assert_eq!(*rx.borrow(), 1);
1373
1374 storage
1376 .put(vec![
1377 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1378 ])
1379 .await
1380 .unwrap();
1381 storage
1382 .put(vec![
1383 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1384 ])
1385 .await
1386 .unwrap();
1387 assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1388
1389 storage.flush().await.unwrap();
1390 assert_eq!(*rx.borrow(), 3);
1391 }
1392
1393 #[tokio::test]
1394 async fn should_flush_on_empty_storage_when_deferred() {
1395 let storage = InMemoryStorage::new().with_deferred_durability();
1396 let rx = storage.subscribe_durable();
1397
1398 storage.flush().await.unwrap();
1400 assert_eq!(*rx.borrow(), 0);
1401
1402 storage.flush_to(0);
1404 assert_eq!(*rx.borrow(), 0);
1405 }
1406
1407 #[tokio::test]
1408 async fn should_defer_durability_across_mixed_write_methods() {
1409 let merge_op = Arc::new(AppendMergeOperator);
1410 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1411 let rx = storage.subscribe_durable();
1412
1413 storage
1415 .put(vec![
1416 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1417 ])
1418 .await
1419 .unwrap();
1420 storage
1421 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1422 Bytes::from("k2"),
1423 Bytes::from("v2"),
1424 )))])
1425 .await
1426 .unwrap();
1427 storage
1428 .merge(vec![
1429 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1430 ])
1431 .await
1432 .unwrap();
1433 assert_eq!(*rx.borrow(), 0);
1434
1435 storage.flush_to(2);
1437 assert_eq!(*rx.borrow(), 2);
1438
1439 storage.flush().await.unwrap();
1441 assert_eq!(*rx.borrow(), 3);
1442 }
1443
1444 #[tokio::test]
1445 async fn should_read_data_written_before_flush_when_deferred() {
1446 let storage = InMemoryStorage::new().with_deferred_durability();
1447
1448 storage
1449 .put(vec![
1450 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1451 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1452 ])
1453 .await
1454 .unwrap();
1455
1456 let result = storage.get(Bytes::from("k1")).await.unwrap();
1458 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1459
1460 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1461 assert_eq!(scanned.len(), 2);
1462 }
1463}