1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9 MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl};
13
14pub trait Clock: Send + Sync {
16 fn now(&self) -> i64;
18}
19
20pub struct WallClock;
22
23impl Clock for WallClock {
24 fn now(&self) -> i64 {
25 std::time::SystemTime::now()
26 .duration_since(std::time::UNIX_EPOCH)
27 .expect("system time before Unix epoch")
28 .as_millis() as i64
29 }
30}
31
32#[derive(Clone, Debug)]
34struct StoredValue {
35 value: Bytes,
36 expire_ts: Option<i64>,
38}
39
40impl StoredValue {
41 fn is_expired(&self, now: i64) -> bool {
42 self.expire_ts.is_some_and(|ts| now >= ts)
43 }
44}
45
46fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
48 let duration = match ttl {
49 Ttl::Default => default_ttl,
50 Ttl::NoExpiry => None,
51 Ttl::ExpireAfter(ms) => Some(ms),
52 };
53 duration.map(|ms| now + ms as i64)
54}
55
56pub struct InMemoryStorage {
62 data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
63 merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
64 clock: Arc<dyn Clock>,
65 default_ttl: Option<u64>,
66 written_seq: std::sync::atomic::AtomicU64,
67 durable_seq: std::sync::atomic::AtomicU64,
68 durable_tx: tokio::sync::watch::Sender<u64>,
69 defer_durability: bool,
70}
71
72impl InMemoryStorage {
73 pub fn new() -> Self {
75 let (durable_tx, _) = tokio::sync::watch::channel(0);
76 Self {
77 data: Arc::new(RwLock::new(BTreeMap::new())),
78 merge_operator: None,
79 clock: Arc::new(WallClock),
80 default_ttl: None,
81 written_seq: std::sync::atomic::AtomicU64::new(0),
82 durable_seq: std::sync::atomic::AtomicU64::new(0),
83 durable_tx,
84 defer_durability: false,
85 }
86 }
87
88 pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
94 let (durable_tx, _) = tokio::sync::watch::channel(0);
95 Self {
96 data: Arc::new(RwLock::new(BTreeMap::new())),
97 merge_operator: Some(merge_operator),
98 clock: Arc::new(WallClock),
99 default_ttl: None,
100 written_seq: std::sync::atomic::AtomicU64::new(0),
101 durable_seq: std::sync::atomic::AtomicU64::new(0),
102 durable_tx,
103 defer_durability: false,
104 }
105 }
106
107 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
109 self.clock = clock;
110 self
111 }
112
113 pub fn with_default_ttl(mut self, ttl: u64) -> Self {
115 self.default_ttl = Some(ttl);
116 self
117 }
118
119 pub fn with_deferred_durability(mut self) -> Self {
125 self.defer_durability = true;
126 self
127 }
128
129 pub fn flush_to(&self, seq: u64) {
136 let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
137 assert!(
138 seq <= written,
139 "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
140 );
141 let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
142 assert!(
143 seq >= durable,
144 "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
145 );
146 self.durable_seq
147 .store(seq, std::sync::atomic::Ordering::Relaxed);
148 let _ = self.durable_tx.send(seq);
149 }
150
151 fn next_seqnum(&self) -> u64 {
157 let seq = self
158 .written_seq
159 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
160 + 1;
161 if !self.defer_durability {
162 self.durable_seq
163 .store(seq, std::sync::atomic::Ordering::Relaxed);
164 let _ = self.durable_tx.send(seq);
165 }
166 seq
167 }
168}
169
170impl Default for InMemoryStorage {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176#[async_trait]
177impl StorageRead for InMemoryStorage {
178 #[tracing::instrument(level = "trace", skip_all)]
182 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
183 let data = self
184 .data
185 .read()
186 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
187
188 match data.get(&key) {
189 Some(stored) if !stored.is_expired(self.clock.now()) => {
190 Ok(Some(Record::new(key, stored.value.clone())))
191 }
192 _ => Ok(None),
193 }
194 }
195
196 #[tracing::instrument(level = "trace", skip_all)]
197 async fn scan_iter(
198 &self,
199 range: BytesRange,
200 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
201 let data = self
202 .data
203 .read()
204 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
205
206 let now = self.clock.now();
207 let records: Vec<Record> = data
208 .range((range.start_bound().cloned(), range.end_bound().cloned()))
209 .filter(|(_, stored)| !stored.is_expired(now))
210 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
211 .collect();
212
213 Ok(Box::new(InMemoryIterator { records, index: 0 }))
214 }
215}
216
217struct InMemoryIterator {
218 records: Vec<Record>,
219 index: usize,
220}
221
222#[async_trait]
223impl StorageIterator for InMemoryIterator {
224 #[tracing::instrument(level = "trace", skip_all)]
225 async fn next(&mut self) -> StorageResult<Option<Record>> {
226 if self.index >= self.records.len() {
227 Ok(None)
228 } else {
229 let record = self.records[self.index].clone();
230 self.index += 1;
231 Ok(Some(record))
232 }
233 }
234}
235
236pub struct InMemoryStorageSnapshot {
241 data: Arc<BTreeMap<Bytes, StoredValue>>,
242 clock: Arc<dyn Clock>,
243}
244
245#[async_trait]
246impl StorageRead for InMemoryStorageSnapshot {
247 #[tracing::instrument(level = "trace", skip_all)]
248 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
249 match self.data.get(&key) {
250 Some(stored) if !stored.is_expired(self.clock.now()) => {
251 Ok(Some(Record::new(key, stored.value.clone())))
252 }
253 _ => Ok(None),
254 }
255 }
256
257 #[tracing::instrument(level = "trace", skip_all)]
258 async fn scan_iter(
259 &self,
260 range: BytesRange,
261 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
262 let now = self.clock.now();
263 let records: Vec<Record> = self
264 .data
265 .range((range.start_bound().cloned(), range.end_bound().cloned()))
266 .filter(|(_, stored)| !stored.is_expired(now))
267 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
268 .collect();
269
270 Ok(Box::new(InMemoryIterator { records, index: 0 }))
271 }
272}
273
274#[async_trait]
275impl StorageSnapshot for InMemoryStorageSnapshot {}
276
277#[async_trait]
278impl Storage for InMemoryStorage {
279 async fn apply_with_options(
280 &self,
281 records: Vec<RecordOp>,
282 _options: WriteOptions,
283 ) -> StorageResult<WriteResult> {
284 let mut data = self
285 .data
286 .write()
287 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
288
289 let now = self.clock.now();
290 for record in records {
291 match record {
292 RecordOp::Put(op) => {
293 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
294 data.insert(
295 op.record.key,
296 StoredValue {
297 value: op.record.value,
298 expire_ts,
299 },
300 );
301 }
302 RecordOp::Merge(op) => {
303 let existing_value = data
304 .get(&op.record.key)
305 .filter(|s| !s.is_expired(now))
306 .map(|s| s.value.clone());
307 let merged_value = self.merge_operator.as_ref().unwrap().merge(
308 &op.record.key,
309 existing_value,
310 op.record.value.clone(),
311 );
312 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
313 data.insert(
314 op.record.key,
315 StoredValue {
316 value: merged_value,
317 expire_ts,
318 },
319 );
320 }
321 RecordOp::Delete(key) => {
322 data.remove(&key);
323 }
324 }
325 }
326
327 Ok(WriteResult {
328 seqnum: self.next_seqnum(),
329 })
330 }
331
332 async fn put_with_options(
338 &self,
339 records: Vec<PutRecordOp>,
340 _options: WriteOptions,
341 ) -> StorageResult<WriteResult> {
342 let mut data = self
343 .data
344 .write()
345 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
346
347 let now = self.clock.now();
348 for op in records {
349 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
350 data.insert(
351 op.record.key,
352 StoredValue {
353 value: op.record.value,
354 expire_ts,
355 },
356 );
357 }
358
359 Ok(WriteResult {
360 seqnum: self.next_seqnum(),
361 })
362 }
363
364 async fn merge_with_options(
375 &self,
376 records: Vec<MergeRecordOp>,
377 _options: WriteOptions,
378 ) -> StorageResult<WriteResult> {
379 let merge_op = self
380 .merge_operator
381 .as_ref()
382 .ok_or_else(|| {
383 StorageError::Storage(
384 "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
385 )
386 })?;
387
388 let mut data = self
389 .data
390 .write()
391 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
392
393 let now = self.clock.now();
394 for op in records {
395 let existing_value = data
396 .get(&op.record.key)
397 .filter(|s| !s.is_expired(now))
398 .map(|s| s.value.clone());
399 let merged_value =
400 merge_op.merge(&op.record.key, existing_value, op.record.value.clone());
401 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
402 data.insert(
403 op.record.key,
404 StoredValue {
405 value: merged_value,
406 expire_ts,
407 },
408 );
409 }
410
411 Ok(WriteResult {
412 seqnum: self.next_seqnum(),
413 })
414 }
415
416 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
422 let data = self
423 .data
424 .read()
425 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
426
427 let snapshot_data = Arc::new(data.clone());
428
429 Ok(Arc::new(InMemoryStorageSnapshot {
430 data: snapshot_data,
431 clock: self.clock.clone(),
432 }))
433 }
434
435 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
436 self.durable_tx.subscribe()
437 }
438
439 async fn flush(&self) -> StorageResult<()> {
440 if self.defer_durability {
441 self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
442 }
443 Ok(())
444 }
445
446 async fn close(&self) -> StorageResult<()> {
447 Ok(())
449 }
450}
451
452#[cfg(feature = "test-utils")]
454#[derive(Clone)]
455enum Failure {
456 Once(super::StorageError),
458 Persistent(super::StorageError),
460}
461
462#[cfg(feature = "test-utils")]
463type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
464
465#[cfg(feature = "test-utils")]
471fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
472 let guard = slot.load();
473 match guard.as_ref() {
474 None => Ok(()),
475 Some(Failure::Persistent(err)) => Err(err.clone()),
476 Some(Failure::Once(_)) => {
477 let prev = slot.swap(Arc::new(None));
480 match prev.as_ref() {
481 Some(Failure::Once(err)) => Err(err.clone()),
482 _ => Ok(()),
483 }
484 }
485 }
486}
487
488#[cfg(feature = "test-utils")]
512pub struct FailingStorage {
513 inner: Arc<dyn super::Storage>,
514 fail_apply: FailSlot,
515 fail_put: FailSlot,
516 fail_flush: FailSlot,
517 fail_snapshot: FailSlot,
518}
519
520#[cfg(feature = "test-utils")]
521impl FailingStorage {
522 pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
524 Arc::new(Self {
525 inner,
526 fail_apply: arc_swap::ArcSwap::from_pointee(None),
527 fail_put: arc_swap::ArcSwap::from_pointee(None),
528 fail_flush: arc_swap::ArcSwap::from_pointee(None),
529 fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
530 })
531 }
532
533 pub fn fail_apply(&self, err: super::StorageError) {
535 self.fail_apply
536 .store(Arc::new(Some(Failure::Persistent(err))));
537 }
538
539 pub fn fail_apply_once(&self, err: super::StorageError) {
541 self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
542 }
543
544 pub fn fail_put(&self, err: super::StorageError) {
546 self.fail_put
547 .store(Arc::new(Some(Failure::Persistent(err))));
548 }
549
550 pub fn fail_put_once(&self, err: super::StorageError) {
555 self.fail_put.store(Arc::new(Some(Failure::Once(err))));
556 }
557
558 pub fn fail_flush(&self, err: super::StorageError) {
560 self.fail_flush
561 .store(Arc::new(Some(Failure::Persistent(err))));
562 }
563
564 pub fn fail_flush_once(&self, err: super::StorageError) {
566 self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
567 }
568
569 pub fn fail_snapshot(&self, err: super::StorageError) {
571 self.fail_snapshot
572 .store(Arc::new(Some(Failure::Persistent(err))));
573 }
574
575 pub fn fail_snapshot_once(&self, err: super::StorageError) {
577 self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
578 }
579}
580
581#[cfg(feature = "test-utils")]
582#[async_trait]
583impl super::StorageRead for FailingStorage {
584 async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
585 self.inner.get(key).await
586 }
587
588 async fn scan_iter(
589 &self,
590 range: crate::BytesRange,
591 ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
592 self.inner.scan_iter(range).await
593 }
594}
595
596#[cfg(feature = "test-utils")]
597#[async_trait]
598impl super::Storage for FailingStorage {
599 async fn apply_with_options(
600 &self,
601 ops: Vec<super::RecordOp>,
602 options: super::WriteOptions,
603 ) -> super::StorageResult<super::WriteResult> {
604 check_failure(&self.fail_apply)?;
605 self.inner.apply_with_options(ops, options).await
606 }
607
608 async fn put_with_options(
609 &self,
610 records: Vec<super::PutRecordOp>,
611 options: super::WriteOptions,
612 ) -> super::StorageResult<super::WriteResult> {
613 check_failure(&self.fail_put)?;
614 self.inner.put_with_options(records, options).await
615 }
616
617 async fn merge_with_options(
618 &self,
619 records: Vec<super::MergeRecordOp>,
620 options: super::WriteOptions,
621 ) -> super::StorageResult<super::WriteResult> {
622 self.inner.merge_with_options(records, options).await
623 }
624
625 async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
626 check_failure(&self.fail_snapshot)?;
627 self.inner.snapshot().await
628 }
629
630 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
631 self.inner.subscribe_durable()
632 }
633
634 async fn flush(&self) -> super::StorageResult<()> {
635 check_failure(&self.fail_flush)?;
636 self.inner.flush().await
637 }
638
639 async fn close(&self) -> super::StorageResult<()> {
640 self.inner.close().await
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647 use bytes::BytesMut;
648 use std::ops::Bound;
649
650 struct AppendMergeOperator;
652
653 impl MergeOperator for AppendMergeOperator {
654 fn merge(&self, _key: &Bytes, existing_value: Option<Bytes>, new_value: Bytes) -> Bytes {
655 match existing_value {
656 Some(existing) => {
657 let mut result = BytesMut::from(existing);
658 result.extend_from_slice(b",");
659 result.extend_from_slice(&new_value);
660 result.freeze()
661 }
662 None => new_value,
663 }
664 }
665 }
666
667 #[tokio::test]
668 async fn should_return_none_when_key_not_found() {
669 let storage = InMemoryStorage::new();
671
672 let result = storage.get(Bytes::from("missing_key")).await;
674
675 assert!(result.is_ok());
677 assert!(result.unwrap().is_none());
678 }
679
680 #[tokio::test]
681 async fn should_store_and_retrieve_record() {
682 let storage = InMemoryStorage::new();
684 let key = Bytes::from("test_key");
685 let value = Bytes::from("test_value");
686
687 storage
689 .put(vec![Record::new(key.clone(), value.clone()).into()])
690 .await
691 .unwrap();
692 let result = storage.get(key).await.unwrap();
693
694 assert!(result.is_some());
696 let record = result.unwrap();
697 assert_eq!(record.key, Bytes::from("test_key"));
698 assert_eq!(record.value, value);
699 }
700
701 #[tokio::test]
702 async fn should_overwrite_existing_key() {
703 let storage = InMemoryStorage::new();
705 let key = Bytes::from("test_key");
706 let initial_value = Bytes::from("initial_value");
707 let updated_value = Bytes::from("updated_value");
708
709 storage
711 .put(vec![Record::new(key.clone(), initial_value).into()])
712 .await
713 .unwrap();
714 storage
715 .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
716 .await
717 .unwrap();
718 let result = storage.get(key).await.unwrap();
719
720 assert!(result.is_some());
722 assert_eq!(result.unwrap().value, updated_value);
723 }
724
725 #[tokio::test]
726 async fn should_store_multiple_records() {
727 let storage = InMemoryStorage::new();
729 let records = vec![
730 Record::new(Bytes::from("key1"), Bytes::from("value1")),
731 Record::new(Bytes::from("key2"), Bytes::from("value2")),
732 Record::new(Bytes::from("key3"), Bytes::from("value3")),
733 ];
734
735 storage
737 .put(records.iter().cloned().map(PutRecordOp::new).collect())
738 .await
739 .unwrap();
740
741 for record in records {
743 let retrieved = storage.get(record.key.clone()).await.unwrap();
744 assert!(retrieved.is_some());
745 assert_eq!(retrieved.unwrap().value, record.value);
746 }
747 }
748
749 #[tokio::test]
750 async fn should_scan_all_records_when_unbounded() {
751 let storage = InMemoryStorage::new();
753 let records = [
754 Record::new(Bytes::from("a"), Bytes::from("value_a")),
755 Record::new(Bytes::from("b"), Bytes::from("value_b")),
756 Record::new(Bytes::from("c"), Bytes::from("value_c")),
757 ];
758 storage
759 .put(records.iter().cloned().map(PutRecordOp::new).collect())
760 .await
761 .unwrap();
762
763 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
765
766 assert_eq!(scanned.len(), 3);
768 assert_eq!(scanned[0].key, Bytes::from("a"));
769 assert_eq!(scanned[1].key, Bytes::from("b"));
770 assert_eq!(scanned[2].key, Bytes::from("c"));
771 }
772
773 #[tokio::test]
774 async fn should_scan_records_with_prefix() {
775 let storage = InMemoryStorage::new();
777 let records = vec![
778 Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
779 Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
780 Record::new(Bytes::from("other_c"), Bytes::from("value3")),
781 ];
782 storage
783 .put(records.into_iter().map(PutRecordOp::new).collect())
784 .await
785 .unwrap();
786
787 let scanned = storage
789 .scan(BytesRange::prefix(Bytes::from("prefix_")))
790 .await
791 .unwrap();
792
793 assert_eq!(scanned.len(), 2);
795 assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
796 assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
797 }
798
799 #[tokio::test]
800 async fn should_scan_records_in_bounded_range() {
801 let storage = InMemoryStorage::new();
803 let records = vec![
804 Record::new(Bytes::from("a"), Bytes::from("value_a")),
805 Record::new(Bytes::from("b"), Bytes::from("value_b")),
806 Record::new(Bytes::from("c"), Bytes::from("value_c")),
807 Record::new(Bytes::from("d"), Bytes::from("value_d")),
808 ];
809 storage
810 .put(records.into_iter().map(PutRecordOp::new).collect())
811 .await
812 .unwrap();
813
814 let range = BytesRange::new(
816 Bound::Included(Bytes::from("b")),
817 Bound::Excluded(Bytes::from("d")),
818 );
819 let scanned = storage.scan(range).await.unwrap();
820
821 assert_eq!(scanned.len(), 2);
823 assert_eq!(scanned[0].key, Bytes::from("b"));
824 assert_eq!(scanned[1].key, Bytes::from("c"));
825 }
826
827 #[tokio::test]
828 async fn should_return_empty_vec_when_scanning_empty_storage() {
829 let storage = InMemoryStorage::new();
831
832 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
834
835 assert!(scanned.is_empty());
837 }
838
839 #[tokio::test]
840 async fn should_iterate_over_records() {
841 let storage = InMemoryStorage::new();
843 let records = vec![
844 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
845 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
846 ];
847 storage.put(records).await.unwrap();
848
849 let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
851 let first = iter.next().await.unwrap();
852 let second = iter.next().await.unwrap();
853 let third = iter.next().await.unwrap();
854
855 assert!(first.is_some());
857 assert_eq!(first.unwrap().key, Bytes::from("key1"));
858 assert!(second.is_some());
859 assert_eq!(second.unwrap().key, Bytes::from("key2"));
860 assert!(third.is_none());
861 }
862
863 #[tokio::test]
864 async fn should_create_snapshot_with_current_data() {
865 let storage = InMemoryStorage::new();
867 storage
868 .put(vec![
869 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
870 ])
871 .await
872 .unwrap();
873
874 let snapshot = storage.snapshot().await.unwrap();
876
877 let result = snapshot.get(Bytes::from("key1")).await.unwrap();
879 assert!(result.is_some());
880 assert_eq!(result.unwrap().value, Bytes::from("value1"));
881 }
882
883 #[tokio::test]
884 async fn should_not_see_writes_after_snapshot() {
885 let storage = InMemoryStorage::new();
887 storage
888 .put(vec![
889 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
890 ])
891 .await
892 .unwrap();
893
894 let snapshot = storage.snapshot().await.unwrap();
896 storage
897 .put(vec![
898 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
899 ])
900 .await
901 .unwrap();
902
903 let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
905 assert!(snapshot_result.is_none());
906
907 let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
908 assert!(storage_result.is_some());
909 }
910
911 #[tokio::test]
912 async fn should_scan_snapshot_independently() {
913 let storage = InMemoryStorage::new();
915 storage
916 .put(vec![
917 Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
918 ])
919 .await
920 .unwrap();
921
922 let snapshot = storage.snapshot().await.unwrap();
924 storage
925 .put(vec![
926 Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
927 ])
928 .await
929 .unwrap();
930
931 let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
933 assert_eq!(snapshot_records.len(), 1);
934 assert_eq!(snapshot_records[0].key, Bytes::from("a"));
935
936 let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
937 assert_eq!(storage_records.len(), 2);
938 }
939
940 #[tokio::test]
941 async fn should_handle_empty_record() {
942 let storage = InMemoryStorage::new();
944 let key = Bytes::from("empty_key");
945
946 storage
948 .put(vec![Record::empty(key.clone()).into()])
949 .await
950 .unwrap();
951 let result = storage.get(key).await.unwrap();
952
953 assert!(result.is_some());
955 assert_eq!(result.unwrap().value, Bytes::new());
956 }
957
958 #[tokio::test]
959 async fn should_return_error_when_merge_operator_not_configured() {
960 let storage = InMemoryStorage::new();
962 let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
963
964 let result = storage.merge(vec![record.into()]).await;
966
967 assert!(result.is_err());
969 assert!(
970 result
971 .unwrap_err()
972 .to_string()
973 .contains("Merge operator not configured")
974 );
975 }
976
977 #[tokio::test]
978 async fn should_merge_when_key_does_not_exist() {
979 let merge_op = Arc::new(AppendMergeOperator);
981 let storage = InMemoryStorage::with_merge_operator(merge_op);
982 let key = Bytes::from("new_key");
983 let value = Bytes::from("value1");
984
985 storage
987 .merge(vec![Record::new(key.clone(), value.clone()).into()])
988 .await
989 .unwrap();
990 let result = storage.get(key).await.unwrap();
991
992 assert!(result.is_some());
994 assert_eq!(result.unwrap().value, value);
995 }
996
997 #[tokio::test]
998 async fn should_merge_when_key_exists() {
999 let merge_op = Arc::new(AppendMergeOperator);
1001 let storage = InMemoryStorage::with_merge_operator(merge_op);
1002 let key = Bytes::from("key1");
1003 let initial_value = Bytes::from("value1");
1004 let new_value = Bytes::from("value2");
1005
1006 storage
1007 .put(vec![Record::new(key.clone(), initial_value).into()])
1008 .await
1009 .unwrap();
1010
1011 storage
1013 .merge(vec![Record::new(key.clone(), new_value).into()])
1014 .await
1015 .unwrap();
1016 let result = storage.get(key).await.unwrap();
1017
1018 assert!(result.is_some());
1020 assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1021 }
1022
1023 #[tokio::test]
1024 async fn should_merge_multiple_keys() {
1025 let merge_op = Arc::new(AppendMergeOperator);
1027 let storage = InMemoryStorage::with_merge_operator(merge_op);
1028 let records = vec![
1029 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1030 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1031 ];
1032 storage.put(records).await.unwrap();
1033
1034 storage
1036 .merge(vec![
1037 Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1038 Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1039 ])
1040 .await
1041 .unwrap();
1042
1043 let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1045 assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1046
1047 let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1048 assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1049 }
1050
1051 #[tokio::test]
1052 async fn should_return_monotonically_increasing_seqnums_from_put() {
1053 let storage = InMemoryStorage::new();
1054
1055 let r1 = storage
1056 .put(vec![
1057 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1058 ])
1059 .await
1060 .unwrap();
1061 let r2 = storage
1062 .put(vec![
1063 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1064 ])
1065 .await
1066 .unwrap();
1067 let r3 = storage
1068 .put(vec![
1069 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1070 ])
1071 .await
1072 .unwrap();
1073
1074 assert_eq!(r1.seqnum, 1);
1075 assert_eq!(r2.seqnum, 2);
1076 assert_eq!(r3.seqnum, 3);
1077 }
1078
1079 #[tokio::test]
1080 async fn should_return_monotonically_increasing_seqnums_from_apply() {
1081 let storage = InMemoryStorage::new();
1082
1083 let r1 = storage
1084 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1085 Bytes::from("k1"),
1086 Bytes::from("v1"),
1087 )))])
1088 .await
1089 .unwrap();
1090 let r2 = storage
1091 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1092 Bytes::from("k2"),
1093 Bytes::from("v2"),
1094 )))])
1095 .await
1096 .unwrap();
1097
1098 assert_eq!(r1.seqnum, 1);
1099 assert_eq!(r2.seqnum, 2);
1100 }
1101
1102 #[tokio::test]
1103 async fn should_share_seqnum_counter_across_write_methods() {
1104 let merge_op = Arc::new(AppendMergeOperator);
1105 let storage = InMemoryStorage::with_merge_operator(merge_op);
1106
1107 let r1 = storage
1108 .put(vec![
1109 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1110 ])
1111 .await
1112 .unwrap();
1113 let r2 = storage
1114 .merge(vec![
1115 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1116 ])
1117 .await
1118 .unwrap();
1119 let r3 = storage
1120 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1121 Bytes::from("k2"),
1122 Bytes::from("v3"),
1123 )))])
1124 .await
1125 .unwrap();
1126
1127 assert_eq!(r1.seqnum, 1);
1128 assert_eq!(r2.seqnum, 2);
1129 assert_eq!(r3.seqnum, 3);
1130 }
1131
1132 #[tokio::test]
1133 async fn should_start_durable_subscriber_at_zero() {
1134 let storage = InMemoryStorage::new();
1135 let rx = storage.subscribe_durable();
1136 assert_eq!(*rx.borrow(), 0);
1137 }
1138
1139 #[tokio::test]
1140 async fn should_advance_durable_watermark_on_each_write() {
1141 let storage = InMemoryStorage::new();
1142 let rx = storage.subscribe_durable();
1143
1144 storage
1145 .put(vec![
1146 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1147 ])
1148 .await
1149 .unwrap();
1150 assert_eq!(*rx.borrow(), 1);
1151
1152 storage
1153 .put(vec![
1154 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1155 ])
1156 .await
1157 .unwrap();
1158 assert_eq!(*rx.borrow(), 2);
1159 }
1160
1161 #[tokio::test]
1162 async fn should_merge_empty_values() {
1163 let merge_op = Arc::new(AppendMergeOperator);
1165 let storage = InMemoryStorage::with_merge_operator(merge_op);
1166 let key = Bytes::from("key1");
1167
1168 storage
1170 .merge(vec![Record::empty(key.clone()).into()])
1171 .await
1172 .unwrap();
1173 let result = storage.get(key).await.unwrap();
1174
1175 assert!(result.is_some());
1177 assert_eq!(result.unwrap().value, Bytes::new());
1178 }
1179
1180 #[tokio::test]
1181 async fn should_not_advance_durable_watermark_when_deferred() {
1182 let storage = InMemoryStorage::new().with_deferred_durability();
1183 let rx = storage.subscribe_durable();
1184
1185 storage
1186 .put(vec![
1187 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1188 ])
1189 .await
1190 .unwrap();
1191 assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1192
1193 storage
1194 .put(vec![
1195 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1196 ])
1197 .await
1198 .unwrap();
1199 assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1200 }
1201
1202 #[tokio::test]
1203 async fn should_advance_durable_watermark_on_flush_when_deferred() {
1204 let storage = InMemoryStorage::new().with_deferred_durability();
1205 let rx = storage.subscribe_durable();
1206
1207 storage
1208 .put(vec![
1209 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1210 ])
1211 .await
1212 .unwrap();
1213 storage
1214 .put(vec![
1215 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1216 ])
1217 .await
1218 .unwrap();
1219 assert_eq!(*rx.borrow(), 0);
1220
1221 storage.flush().await.unwrap();
1222 assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1223 }
1224
1225 #[tokio::test]
1226 async fn should_advance_durable_watermark_to_specific_seq() {
1227 let storage = InMemoryStorage::new().with_deferred_durability();
1228 let rx = storage.subscribe_durable();
1229
1230 for i in 1..=3 {
1232 storage
1233 .put(vec![
1234 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1235 ])
1236 .await
1237 .unwrap();
1238 }
1239 assert_eq!(*rx.borrow(), 0);
1240
1241 storage.flush_to(2);
1243 assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1244
1245 storage.flush_to(3);
1247 assert_eq!(*rx.borrow(), 3);
1248 }
1249
1250 #[tokio::test]
1251 #[should_panic(expected = "cannot move durable seqnum backwards")]
1252 async fn should_panic_when_flush_to_moves_backwards() {
1253 let storage = InMemoryStorage::new().with_deferred_durability();
1254 for i in 1..=3 {
1255 storage
1256 .put(vec![
1257 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1258 ])
1259 .await
1260 .unwrap();
1261 }
1262
1263 storage.flush_to(2);
1264 storage.flush_to(1);
1265 }
1266
1267 #[tokio::test]
1268 #[should_panic(expected = "cannot flush beyond written seqnum")]
1269 async fn should_panic_when_flush_to_exceeds_written() {
1270 let storage = InMemoryStorage::new().with_deferred_durability();
1271 storage
1272 .put(vec![
1273 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1274 ])
1275 .await
1276 .unwrap();
1277
1278 storage.flush_to(5);
1279 }
1280
1281 #[tokio::test]
1282 async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1283 let storage = InMemoryStorage::new().with_deferred_durability();
1284
1285 storage
1286 .put(vec![
1287 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1288 ])
1289 .await
1290 .unwrap();
1291
1292 let snapshot = storage.snapshot().await.unwrap();
1294 let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1295 assert!(result.is_some());
1296 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1297
1298 let rx = storage.subscribe_durable();
1300 assert_eq!(*rx.borrow(), 0);
1301 }
1302
1303 #[tokio::test]
1304 async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1305 let storage = InMemoryStorage::new().with_deferred_durability();
1306 let rx = storage.subscribe_durable();
1307
1308 storage
1309 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1310 Bytes::from("k1"),
1311 Bytes::from("v1"),
1312 )))])
1313 .await
1314 .unwrap();
1315 assert_eq!(*rx.borrow(), 0);
1316
1317 storage
1318 .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1319 .await
1320 .unwrap();
1321 assert_eq!(*rx.borrow(), 0);
1322
1323 storage.flush().await.unwrap();
1324 assert_eq!(*rx.borrow(), 2);
1325 }
1326
1327 #[tokio::test]
1328 async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1329 let merge_op = Arc::new(AppendMergeOperator);
1330 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1331 let rx = storage.subscribe_durable();
1332
1333 storage
1334 .merge(vec![
1335 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1336 ])
1337 .await
1338 .unwrap();
1339 assert_eq!(*rx.borrow(), 0);
1340
1341 storage
1342 .merge(vec![
1343 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1344 ])
1345 .await
1346 .unwrap();
1347 assert_eq!(*rx.borrow(), 0);
1348
1349 storage.flush().await.unwrap();
1350 assert_eq!(*rx.borrow(), 2);
1351 }
1352
1353 #[tokio::test]
1354 async fn should_support_multiple_flush_cycles_when_deferred() {
1355 let storage = InMemoryStorage::new().with_deferred_durability();
1356 let rx = storage.subscribe_durable();
1357
1358 storage
1360 .put(vec![
1361 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1362 ])
1363 .await
1364 .unwrap();
1365 storage.flush().await.unwrap();
1366 assert_eq!(*rx.borrow(), 1);
1367
1368 storage
1370 .put(vec![
1371 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1372 ])
1373 .await
1374 .unwrap();
1375 storage
1376 .put(vec![
1377 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1378 ])
1379 .await
1380 .unwrap();
1381 assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1382
1383 storage.flush().await.unwrap();
1384 assert_eq!(*rx.borrow(), 3);
1385 }
1386
1387 #[tokio::test]
1388 async fn should_flush_on_empty_storage_when_deferred() {
1389 let storage = InMemoryStorage::new().with_deferred_durability();
1390 let rx = storage.subscribe_durable();
1391
1392 storage.flush().await.unwrap();
1394 assert_eq!(*rx.borrow(), 0);
1395
1396 storage.flush_to(0);
1398 assert_eq!(*rx.borrow(), 0);
1399 }
1400
1401 #[tokio::test]
1402 async fn should_defer_durability_across_mixed_write_methods() {
1403 let merge_op = Arc::new(AppendMergeOperator);
1404 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1405 let rx = storage.subscribe_durable();
1406
1407 storage
1409 .put(vec![
1410 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1411 ])
1412 .await
1413 .unwrap();
1414 storage
1415 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1416 Bytes::from("k2"),
1417 Bytes::from("v2"),
1418 )))])
1419 .await
1420 .unwrap();
1421 storage
1422 .merge(vec![
1423 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1424 ])
1425 .await
1426 .unwrap();
1427 assert_eq!(*rx.borrow(), 0);
1428
1429 storage.flush_to(2);
1431 assert_eq!(*rx.borrow(), 2);
1432
1433 storage.flush().await.unwrap();
1435 assert_eq!(*rx.borrow(), 3);
1436 }
1437
1438 #[tokio::test]
1439 async fn should_read_data_written_before_flush_when_deferred() {
1440 let storage = InMemoryStorage::new().with_deferred_durability();
1441
1442 storage
1443 .put(vec![
1444 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1445 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1446 ])
1447 .await
1448 .unwrap();
1449
1450 let result = storage.get(Bytes::from("k1")).await.unwrap();
1452 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1453
1454 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1455 assert_eq!(scanned.len(), 2);
1456 }
1457}