1use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22use serde::{de::DeserializeOwned, Serialize};
23
24use crate::backend::{memory_backend, MemoryBackend, StorageBackend};
25use crate::codec::{Codec, JsonCodec};
26use crate::error::StorageError;
27use crate::tier::{
28 AppendCursor, AppendLoadResult, AppendLogMode, AppendLogStorageTier, BaseStorageTier,
29 KvStorageTier, LoadEntriesOpts, PrefixIter, SnapshotStorageTier,
30};
31
32type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
33type KeyOfFn<T> = Box<dyn Fn(&T) -> String + Send + Sync>;
34type KvFilterFn<T> = Box<dyn Fn(&str, &T) -> bool + Send + Sync>;
35
36pub struct SnapshotStorage<B, T, C = JsonCodec>
42where
43 B: StorageBackend + ?Sized,
44 T: Send + Sync + 'static,
45 C: Codec<T>,
46{
47 backend: Arc<B>,
48 codec: C,
49 name: String,
50 debounce_ms: Option<u32>,
51 compact_every: Option<u32>,
52 filter: Option<FilterFn<T>>,
53 key_of: KeyOfFn<T>,
54 pending: Mutex<Option<T>>,
57 write_count: Mutex<u64>,
59 last_saved_key: Mutex<Option<String>>,
62}
63
64pub struct SnapshotStorageOptions<T, C = JsonCodec>
66where
67 T: Send + Sync + 'static,
68 C: Codec<T>,
69{
70 pub name: Option<String>,
71 pub codec: C,
72 pub debounce_ms: Option<u32>,
73 pub compact_every: Option<u32>,
74 pub filter: Option<FilterFn<T>>,
75 pub key_of: Option<KeyOfFn<T>>,
76}
77
78impl<T> Default for SnapshotStorageOptions<T, JsonCodec>
79where
80 T: Serialize + DeserializeOwned + Send + Sync + 'static,
81{
82 fn default() -> Self {
83 Self {
84 name: None,
85 codec: JsonCodec,
86 debounce_ms: None,
87 compact_every: None,
88 filter: None,
89 key_of: None,
90 }
91 }
92}
93
94pub fn snapshot_storage<B, T, C>(
102 backend: Arc<B>,
103 opts: SnapshotStorageOptions<T, C>,
104) -> SnapshotStorage<B, T, C>
105where
106 B: StorageBackend + ?Sized,
107 T: Send + Sync + 'static,
108 C: Codec<T>,
109{
110 assert!(
111 opts.compact_every != Some(0),
112 "snapshot_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
113 );
114 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
115 let fallback_key = name.clone();
116 let key_of = opts
117 .key_of
118 .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
119 SnapshotStorage {
120 backend,
121 codec: opts.codec,
122 name,
123 debounce_ms: opts.debounce_ms,
124 compact_every: opts.compact_every,
125 filter: opts.filter,
126 key_of,
127 pending: Mutex::new(None),
128 write_count: Mutex::new(0),
129 last_saved_key: Mutex::new(None),
130 }
131}
132
133pub fn memory_snapshot<T, C>(
135 opts: SnapshotStorageOptions<T, C>,
136) -> SnapshotStorage<MemoryBackend, T, C>
137where
138 T: Send + Sync + 'static,
139 C: Codec<T>,
140{
141 snapshot_storage(memory_backend(), opts)
142}
143
144impl<B, T, C> SnapshotStorage<B, T, C>
145where
146 B: StorageBackend + ?Sized,
147 T: Send + Sync + 'static,
148 C: Codec<T>,
149{
150 fn try_flush(
154 backend: &B,
155 codec: &C,
156 key_of: &KeyOfFn<T>,
157 last_saved_key: &Mutex<Option<String>>,
158 snapshot: T,
159 ) -> Result<(), (T, StorageError)> {
160 let key = key_of(&snapshot);
161 let bytes = match codec.encode(&snapshot) {
162 Ok(b) => b,
163 Err(e) => return Err((snapshot, e.into())),
164 };
165 if let Err(e) = backend.write(&key, &bytes) {
166 return Err((snapshot, e));
167 }
168 *last_saved_key.lock() = Some(key);
169 Ok(())
170 }
171}
172
173impl<B, T, C> BaseStorageTier for SnapshotStorage<B, T, C>
174where
175 B: StorageBackend + ?Sized,
176 T: Send + Sync + 'static,
177 C: Codec<T>,
178{
179 fn name(&self) -> &str {
180 &self.name
181 }
182 fn debounce_ms(&self) -> Option<u32> {
183 self.debounce_ms
184 }
185 fn compact_every(&self) -> Option<u32> {
186 self.compact_every
187 }
188
189 fn flush(&self) -> Result<(), StorageError> {
192 let slot = self.pending.lock().take();
193 let Some(snapshot) = slot else {
194 return Ok(());
195 };
196 match Self::try_flush(
197 &*self.backend,
198 &self.codec,
199 &self.key_of,
200 &self.last_saved_key,
201 snapshot,
202 ) {
203 Ok(()) => Ok(()),
204 Err((snapshot, err)) => {
205 *self.pending.lock() = Some(snapshot);
207 Err(err)
208 }
209 }
210 }
211
212 fn rollback(&self) -> Result<(), StorageError> {
213 *self.pending.lock() = None;
214 Ok(())
215 }
216
217 fn list_by_prefix_bytes<'a>(
218 &'a self,
219 prefix: &str,
220 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
221 Box::new(PrefixIter::new(&*self.backend, prefix))
222 }
223
224 fn compact(&self) -> Result<(), StorageError> {
225 self.flush()
226 }
227}
228
229impl<B, T, C> SnapshotStorageTier<T> for SnapshotStorage<B, T, C>
230where
231 B: StorageBackend + ?Sized,
232 T: Send + Sync + 'static,
233 C: Codec<T>,
234{
235 fn save(&self, snapshot: T) -> Result<(), StorageError> {
236 if let Some(filter) = &self.filter {
237 if !filter(&snapshot) {
238 return Ok(());
239 }
240 }
241 let captured: Option<T> = {
249 let mut pending = self.pending.lock();
250 *pending = Some(snapshot);
251 let mut count = self.write_count.lock();
252 let prev = *count;
253 *count = count.saturating_add(1);
254 let new = *count;
255 let compact_trigger = matches!(
256 self.compact_every,
257 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
258 );
259 let trigger = compact_trigger || self.debounce_ms.is_none();
260 if trigger {
261 pending.take()
262 } else {
263 None
264 }
265 };
266 if let Some(snap) = captured {
267 if let Err((snap, err)) = Self::try_flush(
269 &self.backend,
270 &self.codec,
271 &self.key_of,
272 &self.last_saved_key,
273 snap,
274 ) {
275 *self.pending.lock() = Some(snap);
276 return Err(err);
277 }
278 }
279 Ok(())
280 }
281
282 fn load(&self) -> Result<Option<T>, StorageError> {
283 let key = self
284 .last_saved_key
285 .lock()
286 .clone()
287 .unwrap_or_else(|| self.name.clone());
288 match self.backend.read(&key)? {
289 Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
290 _ => Ok(None),
291 }
292 }
293}
294
295pub struct AppendLogStorage<B, T, C = JsonCodec>
301where
302 B: StorageBackend + ?Sized,
303 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
304 C: Codec<Vec<T>>,
305{
306 backend: Arc<B>,
307 codec: C,
308 name: String,
309 debounce_ms: Option<u32>,
310 compact_every: Option<u32>,
311 mode: AppendLogMode,
314 key_of: KeyOfFn<T>,
315 pending: Mutex<std::collections::HashMap<String, Vec<T>>>,
317 append_count: Mutex<u64>,
319 rollback_epoch: AtomicU64,
339}
340
341pub struct AppendLogStorageOptions<T, C = JsonCodec>
342where
343 T: Send + Sync + 'static,
344 C: Codec<Vec<T>>,
345{
346 pub name: Option<String>,
347 pub codec: C,
348 pub debounce_ms: Option<u32>,
349 pub compact_every: Option<u32>,
350 pub key_of: Option<KeyOfFn<T>>,
351 pub mode: AppendLogMode,
353}
354
355impl<T> Default for AppendLogStorageOptions<T, JsonCodec>
356where
357 T: Serialize + DeserializeOwned + Send + Sync + 'static,
358{
359 fn default() -> Self {
360 Self {
361 name: None,
362 codec: JsonCodec,
363 debounce_ms: None,
364 compact_every: None,
365 key_of: None,
366 mode: AppendLogMode::Append,
367 }
368 }
369}
370
371pub fn append_log_storage<B, T, C>(
378 backend: Arc<B>,
379 opts: AppendLogStorageOptions<T, C>,
380) -> AppendLogStorage<B, T, C>
381where
382 B: StorageBackend + ?Sized,
383 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
384 C: Codec<Vec<T>>,
385{
386 assert!(
387 opts.compact_every != Some(0),
388 "append_log_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
389 );
390 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
391 let fallback_key = name.clone();
392 let key_of = opts
393 .key_of
394 .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
395 AppendLogStorage {
396 backend,
397 codec: opts.codec,
398 name,
399 debounce_ms: opts.debounce_ms,
400 compact_every: opts.compact_every,
401 mode: opts.mode,
402 key_of,
403 pending: Mutex::new(std::collections::HashMap::new()),
404 append_count: Mutex::new(0),
405 rollback_epoch: AtomicU64::new(0),
406 }
407}
408
409pub fn memory_append_log<T, C>(
410 opts: AppendLogStorageOptions<T, C>,
411) -> AppendLogStorage<MemoryBackend, T, C>
412where
413 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
414 C: Codec<Vec<T>>,
415{
416 append_log_storage(memory_backend(), opts)
417}
418
419impl<B, T, C> BaseStorageTier for AppendLogStorage<B, T, C>
420where
421 B: StorageBackend + ?Sized,
422 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
423 C: Codec<Vec<T>>,
424{
425 fn name(&self) -> &str {
426 &self.name
427 }
428 fn debounce_ms(&self) -> Option<u32> {
429 self.debounce_ms
430 }
431 fn compact_every(&self) -> Option<u32> {
432 self.compact_every
433 }
434
435 fn flush(&self) -> Result<(), StorageError> {
445 let scheduled_epoch = self.rollback_epoch.load(Ordering::Acquire);
446 let mut buckets = std::mem::take(&mut *self.pending.lock());
447 let keys: Vec<String> = buckets.keys().cloned().collect();
448 for key in keys {
449 if self.rollback_epoch.load(Ordering::Acquire) != scheduled_epoch {
452 return Ok(());
453 }
454 let bucket = match buckets.remove(&key) {
455 Some(b) if !b.is_empty() => b,
456 _ => continue,
457 };
458 let (final_payload, restore_payload): (Vec<T>, Vec<T>) = match self.mode {
473 AppendLogMode::Overwrite => {
474 let snapshot = bucket.clone();
475 (bucket, snapshot)
476 }
477 AppendLogMode::Append => {
478 let existing = match self.backend.read(&key) {
479 Ok(e) => e,
480 Err(e) => {
481 buckets.insert(key, bucket);
482 *self.pending.lock() = buckets;
483 return Err(e);
484 }
485 };
486 let mut merged = match existing {
487 Some(bytes) if !bytes.is_empty() => match self.codec.decode(&bytes) {
488 Ok(v) => v,
489 Err(e) => {
490 buckets.insert(key, bucket);
491 *self.pending.lock() = buckets;
492 return Err(e.into());
493 }
494 },
495 _ => Vec::new(),
496 };
497 let new_entries_backup = bucket.clone();
498 merged.extend(bucket);
499 (merged, new_entries_backup)
500 }
501 };
502 let encoded = match self.codec.encode(&final_payload) {
503 Ok(b) => b,
504 Err(e) => {
505 buckets.insert(key, restore_payload);
506 *self.pending.lock() = buckets;
507 return Err(e.into());
508 }
509 };
510 if let Err(e) = self.backend.write(&key, &encoded) {
511 buckets.insert(key, restore_payload);
512 *self.pending.lock() = buckets;
513 return Err(e);
514 }
515 }
516 Ok(())
517 }
518
519 fn rollback(&self) -> Result<(), StorageError> {
526 self.rollback_epoch.fetch_add(1, Ordering::AcqRel);
527 self.pending.lock().clear();
528 Ok(())
529 }
530
531 fn list_by_prefix_bytes<'a>(
532 &'a self,
533 prefix: &str,
534 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
535 Box::new(PrefixIter::new(&*self.backend, prefix))
536 }
537}
538
539impl<B, T, C> AppendLogStorageTier<T> for AppendLogStorage<B, T, C>
540where
541 B: StorageBackend + ?Sized,
542 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
543 C: Codec<Vec<T>>,
544{
545 fn append_entries(&self, entries: &[T]) -> Result<(), StorageError> {
546 if entries.is_empty() {
547 return Ok(());
548 }
549 let trigger_now = {
555 let mut pending = self.pending.lock();
556 for entry in entries {
557 let k = (self.key_of)(entry);
558 pending.entry(k).or_default().push(entry.clone());
559 }
560 let mut count = self.append_count.lock();
561 let prev = *count;
562 *count = count.saturating_add(entries.len() as u64);
563 let new = *count;
564 let compact_trigger = matches!(
565 self.compact_every,
566 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
567 );
568 compact_trigger || self.debounce_ms.is_none()
569 };
570 if trigger_now {
571 self.flush()?;
572 }
573 Ok(())
574 }
575
576 fn mode(&self) -> AppendLogMode {
577 self.mode
578 }
579
580 fn load_entries(&self, opts: LoadEntriesOpts<'_>) -> Result<AppendLoadResult<T>, StorageError> {
591 let mut keys = match self.backend.list(opts.key_filter.unwrap_or("")) {
595 Ok(ks) => ks,
596 Err(StorageError::BackendNoListSupport { .. }) => match opts.key_filter {
597 Some(k) => vec![k.to_string()],
598 None => vec![self.name.clone()],
599 },
600 Err(e) => return Err(e),
601 };
602 keys.sort();
603
604 let start: u64 = opts.cursor.map_or(0, |c| c.position);
605 let page_size = opts.page_size.filter(|n| *n > 0);
607
608 let want_decoded_at_least = page_size.map(|n| start + u64::from(n) + 1);
611
612 let mut decoded: Vec<T> = Vec::new();
613 let mut total_seen: u64 = 0;
614
615 for k in keys {
616 if let Some(want) = want_decoded_at_least {
617 if total_seen >= want {
618 break;
619 }
620 }
621 if let Some(bytes) = self.backend.read(&k)? {
622 if !bytes.is_empty() {
623 let entries: Vec<T> = self.codec.decode(&bytes)?;
624 total_seen = total_seen.saturating_add(entries.len() as u64);
625 decoded.extend(entries);
626 }
627 }
628 }
629
630 let start_idx: usize = start.try_into().unwrap_or(usize::MAX).min(decoded.len());
633 let mut window: Vec<T> = decoded.split_off(start_idx);
634
635 let next_cursor: Option<AppendCursor> = match page_size {
636 Some(n) => {
637 let n_usize: usize = (n as usize).min(window.len());
638 let has_more = window.len() > n_usize;
639 window.truncate(n_usize);
640 if has_more {
641 Some(AppendCursor::from_position(start + u64::from(n)))
642 } else {
643 None
644 }
645 }
646 None => None,
647 };
648
649 Ok(AppendLoadResult {
650 entries: window,
651 cursor: next_cursor,
652 })
653 }
654}
655
656pub struct KvStorage<B, T, C = JsonCodec>
661where
662 B: StorageBackend + ?Sized,
663 T: Send + Sync + 'static,
664 C: Codec<T>,
665{
666 backend: Arc<B>,
667 codec: C,
668 name: String,
669 debounce_ms: Option<u32>,
670 compact_every: Option<u32>,
671 filter: Option<KvFilterFn<T>>,
672 pending: Mutex<std::collections::HashMap<String, T>>,
673 write_count: Mutex<u64>,
674}
675
676pub struct KvStorageOptions<T, C = JsonCodec>
677where
678 T: Send + Sync + 'static,
679 C: Codec<T>,
680{
681 pub name: Option<String>,
682 pub codec: C,
683 pub debounce_ms: Option<u32>,
684 pub compact_every: Option<u32>,
685 pub filter: Option<KvFilterFn<T>>,
686}
687
688impl<T> Default for KvStorageOptions<T, JsonCodec>
689where
690 T: Serialize + DeserializeOwned + Send + Sync + 'static,
691{
692 fn default() -> Self {
693 Self {
694 name: None,
695 codec: JsonCodec,
696 debounce_ms: None,
697 compact_every: None,
698 filter: None,
699 }
700 }
701}
702
703pub fn kv_storage<B, T, C>(backend: Arc<B>, opts: KvStorageOptions<T, C>) -> KvStorage<B, T, C>
710where
711 B: StorageBackend + ?Sized,
712 T: Send + Sync + 'static,
713 C: Codec<T>,
714{
715 assert!(
716 opts.compact_every != Some(0),
717 "kv_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
718 );
719 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
720 KvStorage {
721 backend,
722 codec: opts.codec,
723 name,
724 debounce_ms: opts.debounce_ms,
725 compact_every: opts.compact_every,
726 filter: opts.filter,
727 pending: Mutex::new(std::collections::HashMap::new()),
728 write_count: Mutex::new(0),
729 }
730}
731
732pub fn memory_kv<T, C>(opts: KvStorageOptions<T, C>) -> KvStorage<MemoryBackend, T, C>
733where
734 T: Send + Sync + 'static,
735 C: Codec<T>,
736{
737 kv_storage(memory_backend(), opts)
738}
739
740impl<B, T, C> BaseStorageTier for KvStorage<B, T, C>
741where
742 B: StorageBackend + ?Sized,
743 T: Send + Sync + 'static,
744 C: Codec<T>,
745{
746 fn name(&self) -> &str {
747 &self.name
748 }
749 fn debounce_ms(&self) -> Option<u32> {
750 self.debounce_ms
751 }
752 fn compact_every(&self) -> Option<u32> {
753 self.compact_every
754 }
755
756 fn flush(&self) -> Result<(), StorageError> {
759 let mut entries = std::mem::take(&mut *self.pending.lock());
760 let keys: Vec<String> = entries.keys().cloned().collect();
761 for key in keys {
762 let Some(value) = entries.remove(&key) else {
763 continue;
764 };
765 let bytes = match self.codec.encode(&value) {
766 Ok(b) => b,
767 Err(e) => {
768 entries.insert(key, value);
769 *self.pending.lock() = entries;
770 return Err(e.into());
771 }
772 };
773 if let Err(e) = self.backend.write(&key, &bytes) {
774 entries.insert(key, value);
775 *self.pending.lock() = entries;
776 return Err(e);
777 }
778 }
779 Ok(())
780 }
781
782 fn rollback(&self) -> Result<(), StorageError> {
783 self.pending.lock().clear();
784 Ok(())
785 }
786
787 fn list_by_prefix_bytes<'a>(
788 &'a self,
789 prefix: &str,
790 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
791 Box::new(PrefixIter::new(&*self.backend, prefix))
792 }
793}
794
795impl<B, T, C> KvStorageTier<T> for KvStorage<B, T, C>
796where
797 B: StorageBackend + ?Sized,
798 T: Send + Sync + 'static,
799 C: Codec<T>,
800{
801 fn save(&self, key: &str, value: T) -> Result<(), StorageError> {
802 if let Some(filter) = &self.filter {
803 if !filter(key, &value) {
804 return Ok(());
805 }
806 }
807 let trigger_now = {
811 self.pending.lock().insert(key.to_string(), value);
812 let mut count = self.write_count.lock();
813 let prev = *count;
814 *count = count.saturating_add(1);
815 let new = *count;
816 let compact_trigger = matches!(
817 self.compact_every,
818 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
819 );
820 compact_trigger || self.debounce_ms.is_none()
821 };
822 if trigger_now {
823 self.flush()?;
824 }
825 Ok(())
826 }
827
828 fn load(&self, key: &str) -> Result<Option<T>, StorageError> {
829 match self.backend.read(key)? {
830 Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
831 _ => Ok(None),
832 }
833 }
834
835 fn delete(&self, key: &str) -> Result<(), StorageError> {
836 self.backend.delete(key)?;
842 self.pending.lock().remove(key);
843 Ok(())
844 }
845
846 fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
847 self.backend.list(prefix)
848 }
849}