1use std::collections::BTreeMap;
2use std::ops::RangeBounds;
3use std::sync::{Arc, RwLock};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8use super::{
9 MergeOperator, MergeRecordOp, PutRecordOp, Storage, StorageSnapshot, WriteOptions, WriteResult,
10};
11use crate::storage::RecordOp;
12use crate::{BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl};
13
14pub trait Clock: Send + Sync {
16 fn now(&self) -> i64;
18}
19
20pub struct WallClock;
22
23impl Clock for WallClock {
24 fn now(&self) -> i64 {
25 std::time::SystemTime::now()
26 .duration_since(std::time::UNIX_EPOCH)
27 .expect("system time before Unix epoch")
28 .as_millis() as i64
29 }
30}
31
32#[derive(Clone, Debug)]
34struct StoredValue {
35 value: Bytes,
36 expire_ts: Option<i64>,
38}
39
40impl StoredValue {
41 fn is_expired(&self, now: i64) -> bool {
42 self.expire_ts.is_some_and(|ts| now >= ts)
43 }
44}
45
46fn compute_expire_ts(now: i64, ttl: Ttl, default_ttl: Option<u64>) -> Option<i64> {
48 let duration = match ttl {
49 Ttl::Default => default_ttl,
50 Ttl::NoExpiry => None,
51 Ttl::ExpireAfter(ms) => Some(ms),
52 };
53 duration.map(|ms| now + ms as i64)
54}
55
56pub struct InMemoryStorage {
62 data: Arc<RwLock<BTreeMap<Bytes, StoredValue>>>,
63 merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
64 clock: Arc<dyn Clock>,
65 default_ttl: Option<u64>,
66 written_seq: std::sync::atomic::AtomicU64,
67 durable_seq: std::sync::atomic::AtomicU64,
68 durable_tx: tokio::sync::watch::Sender<u64>,
69 defer_durability: bool,
70}
71
72impl InMemoryStorage {
73 pub fn new() -> Self {
75 let (durable_tx, _) = tokio::sync::watch::channel(0);
76 Self {
77 data: Arc::new(RwLock::new(BTreeMap::new())),
78 merge_operator: None,
79 clock: Arc::new(WallClock),
80 default_ttl: None,
81 written_seq: std::sync::atomic::AtomicU64::new(0),
82 durable_seq: std::sync::atomic::AtomicU64::new(0),
83 durable_tx,
84 defer_durability: false,
85 }
86 }
87
88 pub fn with_merge_operator(merge_operator: Arc<dyn MergeOperator + Send + Sync>) -> Self {
94 let (durable_tx, _) = tokio::sync::watch::channel(0);
95 Self {
96 data: Arc::new(RwLock::new(BTreeMap::new())),
97 merge_operator: Some(merge_operator),
98 clock: Arc::new(WallClock),
99 default_ttl: None,
100 written_seq: std::sync::atomic::AtomicU64::new(0),
101 durable_seq: std::sync::atomic::AtomicU64::new(0),
102 durable_tx,
103 defer_durability: false,
104 }
105 }
106
107 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
109 self.clock = clock;
110 self
111 }
112
113 pub fn with_default_ttl(mut self, ttl: u64) -> Self {
115 self.default_ttl = Some(ttl);
116 self
117 }
118
119 pub fn with_deferred_durability(mut self) -> Self {
125 self.defer_durability = true;
126 self
127 }
128
129 pub fn flush_to(&self, seq: u64) {
136 let written = self.written_seq.load(std::sync::atomic::Ordering::Relaxed);
137 assert!(
138 seq <= written,
139 "cannot flush beyond written seqnum: flush_to({seq}) but written is {written}"
140 );
141 let durable = self.durable_seq.load(std::sync::atomic::Ordering::Relaxed);
142 assert!(
143 seq >= durable,
144 "cannot move durable seqnum backwards: flush_to({seq}) but durable is {durable}"
145 );
146 self.durable_seq
147 .store(seq, std::sync::atomic::Ordering::Relaxed);
148 let _ = self.durable_tx.send(seq);
149 }
150
151 fn next_seqnum(&self) -> u64 {
157 let seq = self
158 .written_seq
159 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
160 + 1;
161 if !self.defer_durability {
162 self.durable_seq
163 .store(seq, std::sync::atomic::Ordering::Relaxed);
164 let _ = self.durable_tx.send(seq);
165 }
166 seq
167 }
168}
169
170impl Default for InMemoryStorage {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176#[async_trait]
177impl StorageRead for InMemoryStorage {
178 #[tracing::instrument(level = "trace", skip_all)]
182 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
183 let data = self
184 .data
185 .read()
186 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
187
188 match data.get(&key) {
189 Some(stored) if !stored.is_expired(self.clock.now()) => {
190 Ok(Some(Record::new(key, stored.value.clone())))
191 }
192 _ => Ok(None),
193 }
194 }
195
196 #[tracing::instrument(level = "trace", skip_all)]
197 async fn scan_iter(
198 &self,
199 range: BytesRange,
200 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
201 let data = self
202 .data
203 .read()
204 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
205
206 let now = self.clock.now();
207 let records: Vec<Record> = data
208 .range((range.start_bound().cloned(), range.end_bound().cloned()))
209 .filter(|(_, stored)| !stored.is_expired(now))
210 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
211 .collect();
212
213 Ok(Box::new(InMemoryIterator { records, index: 0 }))
214 }
215}
216
217struct InMemoryIterator {
218 records: Vec<Record>,
219 index: usize,
220}
221
222#[async_trait]
223impl StorageIterator for InMemoryIterator {
224 #[tracing::instrument(level = "trace", skip_all)]
225 async fn next(&mut self) -> StorageResult<Option<Record>> {
226 if self.index >= self.records.len() {
227 Ok(None)
228 } else {
229 let record = self.records[self.index].clone();
230 self.index += 1;
231 Ok(Some(record))
232 }
233 }
234}
235
236pub struct InMemoryStorageSnapshot {
241 data: Arc<BTreeMap<Bytes, StoredValue>>,
242 clock: Arc<dyn Clock>,
243}
244
245#[async_trait]
246impl StorageRead for InMemoryStorageSnapshot {
247 #[tracing::instrument(level = "trace", skip_all)]
248 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
249 match self.data.get(&key) {
250 Some(stored) if !stored.is_expired(self.clock.now()) => {
251 Ok(Some(Record::new(key, stored.value.clone())))
252 }
253 _ => Ok(None),
254 }
255 }
256
257 #[tracing::instrument(level = "trace", skip_all)]
258 async fn scan_iter(
259 &self,
260 range: BytesRange,
261 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
262 let now = self.clock.now();
263 let records: Vec<Record> = self
264 .data
265 .range((range.start_bound().cloned(), range.end_bound().cloned()))
266 .filter(|(_, stored)| !stored.is_expired(now))
267 .map(|(k, stored)| Record::new(k.clone(), stored.value.clone()))
268 .collect();
269
270 Ok(Box::new(InMemoryIterator { records, index: 0 }))
271 }
272}
273
274#[async_trait]
275impl StorageSnapshot for InMemoryStorageSnapshot {}
276
277#[async_trait]
278impl Storage for InMemoryStorage {
279 async fn apply_with_options(
280 &self,
281 records: Vec<RecordOp>,
282 _options: WriteOptions,
283 ) -> StorageResult<WriteResult> {
284 let mut data = self
285 .data
286 .write()
287 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
288
289 let now = self.clock.now();
290 for record in records {
291 match record {
292 RecordOp::Put(op) => {
293 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
294 data.insert(
295 op.record.key,
296 StoredValue {
297 value: op.record.value,
298 expire_ts,
299 },
300 );
301 }
302 RecordOp::Merge(op) => {
303 let existing_value = data
304 .get(&op.record.key)
305 .filter(|s| !s.is_expired(now))
306 .map(|s| s.value.clone());
307 let merged_value = self.merge_operator.as_ref().unwrap().merge_batch(
308 &op.record.key,
309 existing_value,
310 &[op.record.value],
311 );
312 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
313 data.insert(
314 op.record.key,
315 StoredValue {
316 value: merged_value,
317 expire_ts,
318 },
319 );
320 }
321 RecordOp::Delete(key) => {
322 data.remove(&key);
323 }
324 }
325 }
326
327 Ok(WriteResult {
328 seqnum: self.next_seqnum(),
329 })
330 }
331
332 async fn put_with_options(
338 &self,
339 records: Vec<PutRecordOp>,
340 _options: WriteOptions,
341 ) -> StorageResult<WriteResult> {
342 let mut data = self
343 .data
344 .write()
345 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
346
347 let now = self.clock.now();
348 for op in records {
349 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
350 data.insert(
351 op.record.key,
352 StoredValue {
353 value: op.record.value,
354 expire_ts,
355 },
356 );
357 }
358
359 Ok(WriteResult {
360 seqnum: self.next_seqnum(),
361 })
362 }
363
364 async fn merge_with_options(
375 &self,
376 records: Vec<MergeRecordOp>,
377 _options: WriteOptions,
378 ) -> StorageResult<WriteResult> {
379 let merge_op = self
380 .merge_operator
381 .as_ref()
382 .ok_or_else(|| {
383 StorageError::Storage(
384 "Merge operator not configured: in-memory storage requires a merge operator to be set during construction".to_string(),
385 )
386 })?;
387
388 let mut data = self
389 .data
390 .write()
391 .map_err(|e| StorageError::Internal(format!("Failed to acquire write lock: {}", e)))?;
392
393 let now = self.clock.now();
394 for op in records {
395 let existing_value = data
396 .get(&op.record.key)
397 .filter(|s| !s.is_expired(now))
398 .map(|s| s.value.clone());
399 let merged_value =
400 merge_op.merge_batch(&op.record.key, existing_value, &[op.record.value]);
401 let expire_ts = compute_expire_ts(now, op.options.ttl, self.default_ttl);
402 data.insert(
403 op.record.key,
404 StoredValue {
405 value: merged_value,
406 expire_ts,
407 },
408 );
409 }
410
411 Ok(WriteResult {
412 seqnum: self.next_seqnum(),
413 })
414 }
415
416 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
422 let data = self
423 .data
424 .read()
425 .map_err(|e| StorageError::Internal(format!("Failed to acquire read lock: {}", e)))?;
426
427 let snapshot_data = Arc::new(data.clone());
428
429 Ok(Arc::new(InMemoryStorageSnapshot {
430 data: snapshot_data,
431 clock: self.clock.clone(),
432 }))
433 }
434
435 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
436 self.durable_tx.subscribe()
437 }
438
439 async fn flush(&self) -> StorageResult<()> {
440 if self.defer_durability {
441 self.flush_to(self.written_seq.load(std::sync::atomic::Ordering::Relaxed));
442 }
443 Ok(())
444 }
445}
446
447#[cfg(feature = "test-utils")]
449#[derive(Clone)]
450enum Failure {
451 Once(super::StorageError),
453 Persistent(super::StorageError),
455}
456
457#[cfg(feature = "test-utils")]
458type FailSlot = arc_swap::ArcSwap<Option<Failure>>;
459
460#[cfg(feature = "test-utils")]
466fn check_failure(slot: &FailSlot) -> super::StorageResult<()> {
467 let guard = slot.load();
468 match guard.as_ref() {
469 None => Ok(()),
470 Some(Failure::Persistent(err)) => Err(err.clone()),
471 Some(Failure::Once(_)) => {
472 let prev = slot.swap(Arc::new(None));
475 match prev.as_ref() {
476 Some(Failure::Once(err)) => Err(err.clone()),
477 _ => Ok(()),
478 }
479 }
480 }
481}
482
483#[cfg(feature = "test-utils")]
507pub struct FailingStorage {
508 inner: Arc<dyn super::Storage>,
509 fail_apply: FailSlot,
510 fail_put: FailSlot,
511 fail_flush: FailSlot,
512 fail_snapshot: FailSlot,
513}
514
515#[cfg(feature = "test-utils")]
516impl FailingStorage {
517 pub fn wrap(inner: Arc<dyn super::Storage>) -> Arc<Self> {
519 Arc::new(Self {
520 inner,
521 fail_apply: arc_swap::ArcSwap::from_pointee(None),
522 fail_put: arc_swap::ArcSwap::from_pointee(None),
523 fail_flush: arc_swap::ArcSwap::from_pointee(None),
524 fail_snapshot: arc_swap::ArcSwap::from_pointee(None),
525 })
526 }
527
528 pub fn fail_apply(&self, err: super::StorageError) {
530 self.fail_apply
531 .store(Arc::new(Some(Failure::Persistent(err))));
532 }
533
534 pub fn fail_apply_once(&self, err: super::StorageError) {
536 self.fail_apply.store(Arc::new(Some(Failure::Once(err))));
537 }
538
539 pub fn fail_put(&self, err: super::StorageError) {
541 self.fail_put
542 .store(Arc::new(Some(Failure::Persistent(err))));
543 }
544
545 pub fn fail_put_once(&self, err: super::StorageError) {
550 self.fail_put.store(Arc::new(Some(Failure::Once(err))));
551 }
552
553 pub fn fail_flush(&self, err: super::StorageError) {
555 self.fail_flush
556 .store(Arc::new(Some(Failure::Persistent(err))));
557 }
558
559 pub fn fail_flush_once(&self, err: super::StorageError) {
561 self.fail_flush.store(Arc::new(Some(Failure::Once(err))));
562 }
563
564 pub fn fail_snapshot(&self, err: super::StorageError) {
566 self.fail_snapshot
567 .store(Arc::new(Some(Failure::Persistent(err))));
568 }
569
570 pub fn fail_snapshot_once(&self, err: super::StorageError) {
572 self.fail_snapshot.store(Arc::new(Some(Failure::Once(err))));
573 }
574}
575
576#[cfg(feature = "test-utils")]
577#[async_trait]
578impl super::StorageRead for FailingStorage {
579 async fn get(&self, key: Bytes) -> super::StorageResult<Option<crate::Record>> {
580 self.inner.get(key).await
581 }
582
583 async fn scan_iter(
584 &self,
585 range: crate::BytesRange,
586 ) -> super::StorageResult<Box<dyn super::StorageIterator + Send + 'static>> {
587 self.inner.scan_iter(range).await
588 }
589}
590
591#[cfg(feature = "test-utils")]
592#[async_trait]
593impl super::Storage for FailingStorage {
594 async fn apply_with_options(
595 &self,
596 ops: Vec<super::RecordOp>,
597 options: super::WriteOptions,
598 ) -> super::StorageResult<super::WriteResult> {
599 check_failure(&self.fail_apply)?;
600 self.inner.apply_with_options(ops, options).await
601 }
602
603 async fn put_with_options(
604 &self,
605 records: Vec<super::PutRecordOp>,
606 options: super::WriteOptions,
607 ) -> super::StorageResult<super::WriteResult> {
608 check_failure(&self.fail_put)?;
609 self.inner.put_with_options(records, options).await
610 }
611
612 async fn merge_with_options(
613 &self,
614 records: Vec<super::MergeRecordOp>,
615 options: super::WriteOptions,
616 ) -> super::StorageResult<super::WriteResult> {
617 self.inner.merge_with_options(records, options).await
618 }
619
620 async fn snapshot(&self) -> super::StorageResult<Arc<dyn super::StorageSnapshot>> {
621 check_failure(&self.fail_snapshot)?;
622 self.inner.snapshot().await
623 }
624
625 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64> {
626 self.inner.subscribe_durable()
627 }
628
629 async fn flush(&self) -> super::StorageResult<()> {
630 check_failure(&self.fail_flush)?;
631 self.inner.flush().await
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638 use bytes::BytesMut;
639 use std::ops::Bound;
640
641 struct AppendMergeOperator;
643
644 impl MergeOperator for AppendMergeOperator {
645 fn merge_batch(
646 &self,
647 _key: &Bytes,
648 existing_value: Option<Bytes>,
649 operands: &[Bytes],
650 ) -> Bytes {
651 operands
652 .iter()
653 .fold(existing_value.unwrap_or_default(), |acc, operand| {
654 let mut result = BytesMut::from(acc);
655 if !result.is_empty() {
656 result.extend_from_slice(b",");
657 }
658 result.extend_from_slice(operand);
659 result.freeze()
660 })
661 }
662 }
663
664 #[tokio::test]
665 async fn should_return_none_when_key_not_found() {
666 let storage = InMemoryStorage::new();
668
669 let result = storage.get(Bytes::from("missing_key")).await;
671
672 assert!(result.is_ok());
674 assert!(result.unwrap().is_none());
675 }
676
677 #[tokio::test]
678 async fn should_store_and_retrieve_record() {
679 let storage = InMemoryStorage::new();
681 let key = Bytes::from("test_key");
682 let value = Bytes::from("test_value");
683
684 storage
686 .put(vec![Record::new(key.clone(), value.clone()).into()])
687 .await
688 .unwrap();
689 let result = storage.get(key).await.unwrap();
690
691 assert!(result.is_some());
693 let record = result.unwrap();
694 assert_eq!(record.key, Bytes::from("test_key"));
695 assert_eq!(record.value, value);
696 }
697
698 #[tokio::test]
699 async fn should_overwrite_existing_key() {
700 let storage = InMemoryStorage::new();
702 let key = Bytes::from("test_key");
703 let initial_value = Bytes::from("initial_value");
704 let updated_value = Bytes::from("updated_value");
705
706 storage
708 .put(vec![Record::new(key.clone(), initial_value).into()])
709 .await
710 .unwrap();
711 storage
712 .put(vec![Record::new(key.clone(), updated_value.clone()).into()])
713 .await
714 .unwrap();
715 let result = storage.get(key).await.unwrap();
716
717 assert!(result.is_some());
719 assert_eq!(result.unwrap().value, updated_value);
720 }
721
722 #[tokio::test]
723 async fn should_store_multiple_records() {
724 let storage = InMemoryStorage::new();
726 let records = vec![
727 Record::new(Bytes::from("key1"), Bytes::from("value1")),
728 Record::new(Bytes::from("key2"), Bytes::from("value2")),
729 Record::new(Bytes::from("key3"), Bytes::from("value3")),
730 ];
731
732 storage
734 .put(records.iter().cloned().map(PutRecordOp::new).collect())
735 .await
736 .unwrap();
737
738 for record in records {
740 let retrieved = storage.get(record.key.clone()).await.unwrap();
741 assert!(retrieved.is_some());
742 assert_eq!(retrieved.unwrap().value, record.value);
743 }
744 }
745
746 #[tokio::test]
747 async fn should_scan_all_records_when_unbounded() {
748 let storage = InMemoryStorage::new();
750 let records = [
751 Record::new(Bytes::from("a"), Bytes::from("value_a")),
752 Record::new(Bytes::from("b"), Bytes::from("value_b")),
753 Record::new(Bytes::from("c"), Bytes::from("value_c")),
754 ];
755 storage
756 .put(records.iter().cloned().map(PutRecordOp::new).collect())
757 .await
758 .unwrap();
759
760 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
762
763 assert_eq!(scanned.len(), 3);
765 assert_eq!(scanned[0].key, Bytes::from("a"));
766 assert_eq!(scanned[1].key, Bytes::from("b"));
767 assert_eq!(scanned[2].key, Bytes::from("c"));
768 }
769
770 #[tokio::test]
771 async fn should_scan_records_with_prefix() {
772 let storage = InMemoryStorage::new();
774 let records = vec![
775 Record::new(Bytes::from("prefix_a"), Bytes::from("value1")),
776 Record::new(Bytes::from("prefix_b"), Bytes::from("value2")),
777 Record::new(Bytes::from("other_c"), Bytes::from("value3")),
778 ];
779 storage
780 .put(records.into_iter().map(PutRecordOp::new).collect())
781 .await
782 .unwrap();
783
784 let scanned = storage
786 .scan(BytesRange::prefix(Bytes::from("prefix_")))
787 .await
788 .unwrap();
789
790 assert_eq!(scanned.len(), 2);
792 assert_eq!(scanned[0].key, Bytes::from("prefix_a"));
793 assert_eq!(scanned[1].key, Bytes::from("prefix_b"));
794 }
795
796 #[tokio::test]
797 async fn should_scan_records_in_bounded_range() {
798 let storage = InMemoryStorage::new();
800 let records = vec![
801 Record::new(Bytes::from("a"), Bytes::from("value_a")),
802 Record::new(Bytes::from("b"), Bytes::from("value_b")),
803 Record::new(Bytes::from("c"), Bytes::from("value_c")),
804 Record::new(Bytes::from("d"), Bytes::from("value_d")),
805 ];
806 storage
807 .put(records.into_iter().map(PutRecordOp::new).collect())
808 .await
809 .unwrap();
810
811 let range = BytesRange::new(
813 Bound::Included(Bytes::from("b")),
814 Bound::Excluded(Bytes::from("d")),
815 );
816 let scanned = storage.scan(range).await.unwrap();
817
818 assert_eq!(scanned.len(), 2);
820 assert_eq!(scanned[0].key, Bytes::from("b"));
821 assert_eq!(scanned[1].key, Bytes::from("c"));
822 }
823
824 #[tokio::test]
825 async fn should_return_empty_vec_when_scanning_empty_storage() {
826 let storage = InMemoryStorage::new();
828
829 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
831
832 assert!(scanned.is_empty());
834 }
835
836 #[tokio::test]
837 async fn should_iterate_over_records() {
838 let storage = InMemoryStorage::new();
840 let records = vec![
841 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
842 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
843 ];
844 storage.put(records).await.unwrap();
845
846 let mut iter = storage.scan_iter(BytesRange::unbounded()).await.unwrap();
848 let first = iter.next().await.unwrap();
849 let second = iter.next().await.unwrap();
850 let third = iter.next().await.unwrap();
851
852 assert!(first.is_some());
854 assert_eq!(first.unwrap().key, Bytes::from("key1"));
855 assert!(second.is_some());
856 assert_eq!(second.unwrap().key, Bytes::from("key2"));
857 assert!(third.is_none());
858 }
859
860 #[tokio::test]
861 async fn should_create_snapshot_with_current_data() {
862 let storage = InMemoryStorage::new();
864 storage
865 .put(vec![
866 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
867 ])
868 .await
869 .unwrap();
870
871 let snapshot = storage.snapshot().await.unwrap();
873
874 let result = snapshot.get(Bytes::from("key1")).await.unwrap();
876 assert!(result.is_some());
877 assert_eq!(result.unwrap().value, Bytes::from("value1"));
878 }
879
880 #[tokio::test]
881 async fn should_not_see_writes_after_snapshot() {
882 let storage = InMemoryStorage::new();
884 storage
885 .put(vec![
886 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
887 ])
888 .await
889 .unwrap();
890
891 let snapshot = storage.snapshot().await.unwrap();
893 storage
894 .put(vec![
895 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
896 ])
897 .await
898 .unwrap();
899
900 let snapshot_result = snapshot.get(Bytes::from("key2")).await.unwrap();
902 assert!(snapshot_result.is_none());
903
904 let storage_result = storage.get(Bytes::from("key2")).await.unwrap();
905 assert!(storage_result.is_some());
906 }
907
908 #[tokio::test]
909 async fn should_scan_snapshot_independently() {
910 let storage = InMemoryStorage::new();
912 storage
913 .put(vec![
914 Record::new(Bytes::from("a"), Bytes::from("value_a")).into(),
915 ])
916 .await
917 .unwrap();
918
919 let snapshot = storage.snapshot().await.unwrap();
921 storage
922 .put(vec![
923 Record::new(Bytes::from("b"), Bytes::from("value_b")).into(),
924 ])
925 .await
926 .unwrap();
927
928 let snapshot_records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
930 assert_eq!(snapshot_records.len(), 1);
931 assert_eq!(snapshot_records[0].key, Bytes::from("a"));
932
933 let storage_records = storage.scan(BytesRange::unbounded()).await.unwrap();
934 assert_eq!(storage_records.len(), 2);
935 }
936
937 #[tokio::test]
938 async fn should_handle_empty_record() {
939 let storage = InMemoryStorage::new();
941 let key = Bytes::from("empty_key");
942
943 storage
945 .put(vec![Record::empty(key.clone()).into()])
946 .await
947 .unwrap();
948 let result = storage.get(key).await.unwrap();
949
950 assert!(result.is_some());
952 assert_eq!(result.unwrap().value, Bytes::new());
953 }
954
955 #[tokio::test]
956 async fn should_return_error_when_merge_operator_not_configured() {
957 let storage = InMemoryStorage::new();
959 let record = Record::new(Bytes::from("key1"), Bytes::from("value1"));
960
961 let result = storage.merge(vec![record.into()]).await;
963
964 assert!(result.is_err());
966 assert!(
967 result
968 .unwrap_err()
969 .to_string()
970 .contains("Merge operator not configured")
971 );
972 }
973
974 #[tokio::test]
975 async fn should_merge_when_key_does_not_exist() {
976 let merge_op = Arc::new(AppendMergeOperator);
978 let storage = InMemoryStorage::with_merge_operator(merge_op);
979 let key = Bytes::from("new_key");
980 let value = Bytes::from("value1");
981
982 storage
984 .merge(vec![Record::new(key.clone(), value.clone()).into()])
985 .await
986 .unwrap();
987 let result = storage.get(key).await.unwrap();
988
989 assert!(result.is_some());
991 assert_eq!(result.unwrap().value, value);
992 }
993
994 #[tokio::test]
995 async fn should_merge_when_key_exists() {
996 let merge_op = Arc::new(AppendMergeOperator);
998 let storage = InMemoryStorage::with_merge_operator(merge_op);
999 let key = Bytes::from("key1");
1000 let initial_value = Bytes::from("value1");
1001 let new_value = Bytes::from("value2");
1002
1003 storage
1004 .put(vec![Record::new(key.clone(), initial_value).into()])
1005 .await
1006 .unwrap();
1007
1008 storage
1010 .merge(vec![Record::new(key.clone(), new_value).into()])
1011 .await
1012 .unwrap();
1013 let result = storage.get(key).await.unwrap();
1014
1015 assert!(result.is_some());
1017 assert_eq!(result.unwrap().value, Bytes::from("value1,value2"));
1018 }
1019
1020 #[tokio::test]
1021 async fn should_merge_multiple_keys() {
1022 let merge_op = Arc::new(AppendMergeOperator);
1024 let storage = InMemoryStorage::with_merge_operator(merge_op);
1025 let records = vec![
1026 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
1027 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
1028 ];
1029 storage.put(records).await.unwrap();
1030
1031 storage
1033 .merge(vec![
1034 Record::new(Bytes::from("key1"), Bytes::from("value1a")).into(),
1035 Record::new(Bytes::from("key2"), Bytes::from("value2a")).into(),
1036 ])
1037 .await
1038 .unwrap();
1039
1040 let result1 = storage.get(Bytes::from("key1")).await.unwrap();
1042 assert_eq!(result1.unwrap().value, Bytes::from("value1,value1a"));
1043
1044 let result2 = storage.get(Bytes::from("key2")).await.unwrap();
1045 assert_eq!(result2.unwrap().value, Bytes::from("value2,value2a"));
1046 }
1047
1048 #[tokio::test]
1049 async fn should_return_monotonically_increasing_seqnums_from_put() {
1050 let storage = InMemoryStorage::new();
1051
1052 let r1 = storage
1053 .put(vec![
1054 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1055 ])
1056 .await
1057 .unwrap();
1058 let r2 = storage
1059 .put(vec![
1060 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1061 ])
1062 .await
1063 .unwrap();
1064 let r3 = storage
1065 .put(vec![
1066 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1067 ])
1068 .await
1069 .unwrap();
1070
1071 assert_eq!(r1.seqnum, 1);
1072 assert_eq!(r2.seqnum, 2);
1073 assert_eq!(r3.seqnum, 3);
1074 }
1075
1076 #[tokio::test]
1077 async fn should_return_monotonically_increasing_seqnums_from_apply() {
1078 let storage = InMemoryStorage::new();
1079
1080 let r1 = storage
1081 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1082 Bytes::from("k1"),
1083 Bytes::from("v1"),
1084 )))])
1085 .await
1086 .unwrap();
1087 let r2 = storage
1088 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1089 Bytes::from("k2"),
1090 Bytes::from("v2"),
1091 )))])
1092 .await
1093 .unwrap();
1094
1095 assert_eq!(r1.seqnum, 1);
1096 assert_eq!(r2.seqnum, 2);
1097 }
1098
1099 #[tokio::test]
1100 async fn should_share_seqnum_counter_across_write_methods() {
1101 let merge_op = Arc::new(AppendMergeOperator);
1102 let storage = InMemoryStorage::with_merge_operator(merge_op);
1103
1104 let r1 = storage
1105 .put(vec![
1106 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1107 ])
1108 .await
1109 .unwrap();
1110 let r2 = storage
1111 .merge(vec![
1112 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1113 ])
1114 .await
1115 .unwrap();
1116 let r3 = storage
1117 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1118 Bytes::from("k2"),
1119 Bytes::from("v3"),
1120 )))])
1121 .await
1122 .unwrap();
1123
1124 assert_eq!(r1.seqnum, 1);
1125 assert_eq!(r2.seqnum, 2);
1126 assert_eq!(r3.seqnum, 3);
1127 }
1128
1129 #[tokio::test]
1130 async fn should_start_durable_subscriber_at_zero() {
1131 let storage = InMemoryStorage::new();
1132 let rx = storage.subscribe_durable();
1133 assert_eq!(*rx.borrow(), 0);
1134 }
1135
1136 #[tokio::test]
1137 async fn should_advance_durable_watermark_on_each_write() {
1138 let storage = InMemoryStorage::new();
1139 let rx = storage.subscribe_durable();
1140
1141 storage
1142 .put(vec![
1143 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1144 ])
1145 .await
1146 .unwrap();
1147 assert_eq!(*rx.borrow(), 1);
1148
1149 storage
1150 .put(vec![
1151 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1152 ])
1153 .await
1154 .unwrap();
1155 assert_eq!(*rx.borrow(), 2);
1156 }
1157
1158 #[tokio::test]
1159 async fn should_merge_empty_values() {
1160 let merge_op = Arc::new(AppendMergeOperator);
1162 let storage = InMemoryStorage::with_merge_operator(merge_op);
1163 let key = Bytes::from("key1");
1164
1165 storage
1167 .merge(vec![Record::empty(key.clone()).into()])
1168 .await
1169 .unwrap();
1170 let result = storage.get(key).await.unwrap();
1171
1172 assert!(result.is_some());
1174 assert_eq!(result.unwrap().value, Bytes::new());
1175 }
1176
1177 #[tokio::test]
1178 async fn should_not_advance_durable_watermark_when_deferred() {
1179 let storage = InMemoryStorage::new().with_deferred_durability();
1180 let rx = storage.subscribe_durable();
1181
1182 storage
1183 .put(vec![
1184 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1185 ])
1186 .await
1187 .unwrap();
1188 assert_eq!(*rx.borrow(), 0, "durable watermark should not advance");
1189
1190 storage
1191 .put(vec![
1192 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1193 ])
1194 .await
1195 .unwrap();
1196 assert_eq!(*rx.borrow(), 0, "durable watermark should still be 0");
1197 }
1198
1199 #[tokio::test]
1200 async fn should_advance_durable_watermark_on_flush_when_deferred() {
1201 let storage = InMemoryStorage::new().with_deferred_durability();
1202 let rx = storage.subscribe_durable();
1203
1204 storage
1205 .put(vec![
1206 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1207 ])
1208 .await
1209 .unwrap();
1210 storage
1211 .put(vec![
1212 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1213 ])
1214 .await
1215 .unwrap();
1216 assert_eq!(*rx.borrow(), 0);
1217
1218 storage.flush().await.unwrap();
1219 assert_eq!(*rx.borrow(), 2, "flush should advance to current seqnum");
1220 }
1221
1222 #[tokio::test]
1223 async fn should_advance_durable_watermark_to_specific_seq() {
1224 let storage = InMemoryStorage::new().with_deferred_durability();
1225 let rx = storage.subscribe_durable();
1226
1227 for i in 1..=3 {
1229 storage
1230 .put(vec![
1231 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1232 ])
1233 .await
1234 .unwrap();
1235 }
1236 assert_eq!(*rx.borrow(), 0);
1237
1238 storage.flush_to(2);
1240 assert_eq!(*rx.borrow(), 2, "should advance to requested seqnum");
1241
1242 storage.flush_to(3);
1244 assert_eq!(*rx.borrow(), 3);
1245 }
1246
1247 #[tokio::test]
1248 #[should_panic(expected = "cannot move durable seqnum backwards")]
1249 async fn should_panic_when_flush_to_moves_backwards() {
1250 let storage = InMemoryStorage::new().with_deferred_durability();
1251 for i in 1..=3 {
1252 storage
1253 .put(vec![
1254 Record::new(Bytes::from(format!("k{i}")), Bytes::from(format!("v{i}"))).into(),
1255 ])
1256 .await
1257 .unwrap();
1258 }
1259
1260 storage.flush_to(2);
1261 storage.flush_to(1);
1262 }
1263
1264 #[tokio::test]
1265 #[should_panic(expected = "cannot flush beyond written seqnum")]
1266 async fn should_panic_when_flush_to_exceeds_written() {
1267 let storage = InMemoryStorage::new().with_deferred_durability();
1268 storage
1269 .put(vec![
1270 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1271 ])
1272 .await
1273 .unwrap();
1274
1275 storage.flush_to(5);
1276 }
1277
1278 #[tokio::test]
1279 async fn should_see_data_in_snapshot_before_flush_when_deferred() {
1280 let storage = InMemoryStorage::new().with_deferred_durability();
1281
1282 storage
1283 .put(vec![
1284 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1285 ])
1286 .await
1287 .unwrap();
1288
1289 let snapshot = storage.snapshot().await.unwrap();
1291 let result = snapshot.get(Bytes::from("k1")).await.unwrap();
1292 assert!(result.is_some());
1293 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1294
1295 let rx = storage.subscribe_durable();
1297 assert_eq!(*rx.borrow(), 0);
1298 }
1299
1300 #[tokio::test]
1301 async fn should_not_advance_durable_watermark_on_apply_when_deferred() {
1302 let storage = InMemoryStorage::new().with_deferred_durability();
1303 let rx = storage.subscribe_durable();
1304
1305 storage
1306 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1307 Bytes::from("k1"),
1308 Bytes::from("v1"),
1309 )))])
1310 .await
1311 .unwrap();
1312 assert_eq!(*rx.borrow(), 0);
1313
1314 storage
1315 .apply(vec![RecordOp::Delete(Bytes::from("k1"))])
1316 .await
1317 .unwrap();
1318 assert_eq!(*rx.borrow(), 0);
1319
1320 storage.flush().await.unwrap();
1321 assert_eq!(*rx.borrow(), 2);
1322 }
1323
1324 #[tokio::test]
1325 async fn should_not_advance_durable_watermark_on_merge_when_deferred() {
1326 let merge_op = Arc::new(AppendMergeOperator);
1327 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1328 let rx = storage.subscribe_durable();
1329
1330 storage
1331 .merge(vec![
1332 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1333 ])
1334 .await
1335 .unwrap();
1336 assert_eq!(*rx.borrow(), 0);
1337
1338 storage
1339 .merge(vec![
1340 Record::new(Bytes::from("k1"), Bytes::from("v2")).into(),
1341 ])
1342 .await
1343 .unwrap();
1344 assert_eq!(*rx.borrow(), 0);
1345
1346 storage.flush().await.unwrap();
1347 assert_eq!(*rx.borrow(), 2);
1348 }
1349
1350 #[tokio::test]
1351 async fn should_support_multiple_flush_cycles_when_deferred() {
1352 let storage = InMemoryStorage::new().with_deferred_durability();
1353 let rx = storage.subscribe_durable();
1354
1355 storage
1357 .put(vec![
1358 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1359 ])
1360 .await
1361 .unwrap();
1362 storage.flush().await.unwrap();
1363 assert_eq!(*rx.borrow(), 1);
1364
1365 storage
1367 .put(vec![
1368 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1369 ])
1370 .await
1371 .unwrap();
1372 storage
1373 .put(vec![
1374 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1375 ])
1376 .await
1377 .unwrap();
1378 assert_eq!(*rx.borrow(), 1, "should not advance before second flush");
1379
1380 storage.flush().await.unwrap();
1381 assert_eq!(*rx.borrow(), 3);
1382 }
1383
1384 #[tokio::test]
1385 async fn should_flush_on_empty_storage_when_deferred() {
1386 let storage = InMemoryStorage::new().with_deferred_durability();
1387 let rx = storage.subscribe_durable();
1388
1389 storage.flush().await.unwrap();
1391 assert_eq!(*rx.borrow(), 0);
1392
1393 storage.flush_to(0);
1395 assert_eq!(*rx.borrow(), 0);
1396 }
1397
1398 #[tokio::test]
1399 async fn should_defer_durability_across_mixed_write_methods() {
1400 let merge_op = Arc::new(AppendMergeOperator);
1401 let storage = InMemoryStorage::with_merge_operator(merge_op).with_deferred_durability();
1402 let rx = storage.subscribe_durable();
1403
1404 storage
1406 .put(vec![
1407 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1408 ])
1409 .await
1410 .unwrap();
1411 storage
1412 .apply(vec![RecordOp::Put(PutRecordOp::new(Record::new(
1413 Bytes::from("k2"),
1414 Bytes::from("v2"),
1415 )))])
1416 .await
1417 .unwrap();
1418 storage
1419 .merge(vec![
1420 Record::new(Bytes::from("k3"), Bytes::from("v3")).into(),
1421 ])
1422 .await
1423 .unwrap();
1424 assert_eq!(*rx.borrow(), 0);
1425
1426 storage.flush_to(2);
1428 assert_eq!(*rx.borrow(), 2);
1429
1430 storage.flush().await.unwrap();
1432 assert_eq!(*rx.borrow(), 3);
1433 }
1434
1435 #[tokio::test]
1436 async fn should_read_data_written_before_flush_when_deferred() {
1437 let storage = InMemoryStorage::new().with_deferred_durability();
1438
1439 storage
1440 .put(vec![
1441 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
1442 Record::new(Bytes::from("k2"), Bytes::from("v2")).into(),
1443 ])
1444 .await
1445 .unwrap();
1446
1447 let result = storage.get(Bytes::from("k1")).await.unwrap();
1449 assert_eq!(result.unwrap().value, Bytes::from("v1"));
1450
1451 let scanned = storage.scan(BytesRange::unbounded()).await.unwrap();
1452 assert_eq!(scanned.len(), 2);
1453 }
1454}