1use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22use serde::{de::DeserializeOwned, Serialize};
23
24use crate::backend::{memory_backend, MemoryBackend, StorageBackend};
25use crate::codec::{Codec, JsonCodec};
26use crate::error::StorageError;
27use crate::tier::{
28 AppendCursor, AppendLoadResult, AppendLogMode, AppendLogStorageTier, BaseStorageTier,
29 KvStorageTier, LoadEntriesOpts, PrefixIter, SnapshotStorageTier,
30};
31
32type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
33type KeyOfFn<T> = Box<dyn Fn(&T) -> String + Send + Sync>;
34type KvFilterFn<T> = Box<dyn Fn(&str, &T) -> bool + Send + Sync>;
35
36pub struct SnapshotStorage<B, T, C = JsonCodec>
42where
43 B: StorageBackend + ?Sized,
44 T: Send + Sync + 'static,
45 C: Codec<T>,
46{
47 backend: Arc<B>,
48 codec: C,
49 name: String,
50 debounce_ms: Option<u32>,
51 compact_every: Option<u32>,
52 filter: Option<FilterFn<T>>,
53 key_of: KeyOfFn<T>,
54 pending: Mutex<Option<T>>,
57 write_count: Mutex<u64>,
59 last_saved_key: Mutex<Option<String>>,
62}
63
64pub struct SnapshotStorageOptions<T, C = JsonCodec>
66where
67 T: Send + Sync + 'static,
68 C: Codec<T>,
69{
70 pub name: Option<String>,
71 pub codec: C,
72 pub debounce_ms: Option<u32>,
73 pub compact_every: Option<u32>,
74 pub filter: Option<FilterFn<T>>,
75 pub key_of: Option<KeyOfFn<T>>,
76}
77
78impl<T> Default for SnapshotStorageOptions<T, JsonCodec>
79where
80 T: Serialize + DeserializeOwned + Send + Sync + 'static,
81{
82 fn default() -> Self {
83 Self {
84 name: None,
85 codec: JsonCodec,
86 debounce_ms: None,
87 compact_every: None,
88 filter: None,
89 key_of: None,
90 }
91 }
92}
93
94pub fn snapshot_storage<B, T, C>(
102 backend: Arc<B>,
103 opts: SnapshotStorageOptions<T, C>,
104) -> SnapshotStorage<B, T, C>
105where
106 B: StorageBackend + ?Sized,
107 T: Send + Sync + 'static,
108 C: Codec<T>,
109{
110 assert!(
111 opts.compact_every != Some(0),
112 "snapshot_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
113 );
114 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
115 let fallback_key = name.clone();
116 let key_of = opts
117 .key_of
118 .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
119 SnapshotStorage {
120 backend,
121 codec: opts.codec,
122 name,
123 debounce_ms: opts.debounce_ms,
124 compact_every: opts.compact_every,
125 filter: opts.filter,
126 key_of,
127 pending: Mutex::new(None),
128 write_count: Mutex::new(0),
129 last_saved_key: Mutex::new(None),
130 }
131}
132
133pub fn memory_snapshot<T, C>(
135 opts: SnapshotStorageOptions<T, C>,
136) -> SnapshotStorage<MemoryBackend, T, C>
137where
138 T: Send + Sync + 'static,
139 C: Codec<T>,
140{
141 snapshot_storage(memory_backend(), opts)
142}
143
144impl<B, T, C> SnapshotStorage<B, T, C>
145where
146 B: StorageBackend + ?Sized,
147 T: Send + Sync + 'static,
148 C: Codec<T>,
149{
150 fn try_flush(
154 backend: &B,
155 codec: &C,
156 key_of: &KeyOfFn<T>,
157 last_saved_key: &Mutex<Option<String>>,
158 snapshot: T,
159 ) -> Result<(), (T, StorageError)> {
160 let key = key_of(&snapshot);
161 let bytes = match codec.encode(&snapshot) {
162 Ok(b) => b,
163 Err(e) => return Err((snapshot, e.into())),
164 };
165 if let Err(e) = backend.write(&key, &bytes) {
166 return Err((snapshot, e));
167 }
168 *last_saved_key.lock() = Some(key);
169 Ok(())
170 }
171}
172
173impl<B, T, C> BaseStorageTier for SnapshotStorage<B, T, C>
174where
175 B: StorageBackend + ?Sized,
176 T: Send + Sync + 'static,
177 C: Codec<T>,
178{
179 fn name(&self) -> &str {
180 &self.name
181 }
182 fn debounce_ms(&self) -> Option<u32> {
183 self.debounce_ms
184 }
185 fn compact_every(&self) -> Option<u32> {
186 self.compact_every
187 }
188
189 fn flush(&self) -> Result<(), StorageError> {
192 let slot = self.pending.lock().take();
193 let Some(snapshot) = slot else {
194 return Ok(());
195 };
196 match Self::try_flush(
197 &*self.backend,
198 &self.codec,
199 &self.key_of,
200 &self.last_saved_key,
201 snapshot,
202 ) {
203 Ok(()) => Ok(()),
204 Err((snapshot, err)) => {
205 *self.pending.lock() = Some(snapshot);
207 Err(err)
208 }
209 }
210 }
211
212 fn rollback(&self) -> Result<(), StorageError> {
213 *self.pending.lock() = None;
214 Ok(())
215 }
216
217 fn list_by_prefix_bytes<'a>(
218 &'a self,
219 prefix: &str,
220 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
221 Box::new(PrefixIter::new(&*self.backend, prefix))
222 }
223
224 fn compact(&self) -> Result<(), StorageError> {
225 self.flush()
226 }
227}
228
229impl<B, T, C> SnapshotStorageTier<T> for SnapshotStorage<B, T, C>
230where
231 B: StorageBackend + ?Sized,
232 T: Send + Sync + 'static,
233 C: Codec<T>,
234{
235 fn save(&self, snapshot: T) -> Result<(), StorageError> {
236 if let Some(filter) = &self.filter {
237 if !filter(&snapshot) {
238 return Ok(());
239 }
240 }
241 let captured: Option<T> = {
249 let mut pending = self.pending.lock();
250 *pending = Some(snapshot);
251 let mut count = self.write_count.lock();
252 let prev = *count;
253 *count = count.saturating_add(1);
254 let new = *count;
255 let compact_trigger = matches!(
256 self.compact_every,
257 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
258 );
259 let trigger = compact_trigger || self.debounce_ms.is_none();
260 if trigger {
261 pending.take()
262 } else {
263 None
264 }
265 };
266 if let Some(snap) = captured {
267 if let Err((snap, err)) = Self::try_flush(
269 &self.backend,
270 &self.codec,
271 &self.key_of,
272 &self.last_saved_key,
273 snap,
274 ) {
275 *self.pending.lock() = Some(snap);
276 return Err(err);
277 }
278 }
279 Ok(())
280 }
281
282 fn load(&self) -> Result<Option<T>, StorageError> {
283 let key = self
284 .last_saved_key
285 .lock()
286 .clone()
287 .unwrap_or_else(|| self.name.clone());
288 match self.backend.read(&key)? {
289 Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
290 _ => Ok(None),
291 }
292 }
293}
294
295pub struct AppendLogStorage<B, T, C = JsonCodec>
301where
302 B: StorageBackend + ?Sized,
303 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
304 C: Codec<Vec<T>>,
305{
306 backend: Arc<B>,
307 codec: C,
308 name: String,
309 debounce_ms: Option<u32>,
310 compact_every: Option<u32>,
311 mode: AppendLogMode,
314 key_of: KeyOfFn<T>,
315 pending: Mutex<std::collections::HashMap<String, Vec<T>>>,
317 append_count: Mutex<u64>,
319 rollback_epoch: AtomicU64,
339}
340
341pub struct AppendLogStorageOptions<T, C = JsonCodec>
342where
343 T: Send + Sync + 'static,
344 C: Codec<Vec<T>>,
345{
346 pub name: Option<String>,
347 pub codec: C,
348 pub debounce_ms: Option<u32>,
349 pub compact_every: Option<u32>,
350 pub key_of: Option<KeyOfFn<T>>,
351 pub mode: AppendLogMode,
353}
354
355impl<T> Default for AppendLogStorageOptions<T, JsonCodec>
356where
357 T: Serialize + DeserializeOwned + Send + Sync + 'static,
358{
359 fn default() -> Self {
360 Self {
361 name: None,
362 codec: JsonCodec,
363 debounce_ms: None,
364 compact_every: None,
365 key_of: None,
366 mode: AppendLogMode::Append,
367 }
368 }
369}
370
371pub fn append_log_storage<B, T, C>(
378 backend: Arc<B>,
379 opts: AppendLogStorageOptions<T, C>,
380) -> AppendLogStorage<B, T, C>
381where
382 B: StorageBackend + ?Sized,
383 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
384 C: Codec<Vec<T>>,
385{
386 assert!(
387 opts.compact_every != Some(0),
388 "append_log_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
389 );
390 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
391 let fallback_key = name.clone();
392 let key_of = opts
393 .key_of
394 .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
395 AppendLogStorage {
396 backend,
397 codec: opts.codec,
398 name,
399 debounce_ms: opts.debounce_ms,
400 compact_every: opts.compact_every,
401 mode: opts.mode,
402 key_of,
403 pending: Mutex::new(std::collections::HashMap::new()),
404 append_count: Mutex::new(0),
405 rollback_epoch: AtomicU64::new(0),
406 }
407}
408
409pub fn memory_append_log<T, C>(
410 opts: AppendLogStorageOptions<T, C>,
411) -> AppendLogStorage<MemoryBackend, T, C>
412where
413 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
414 C: Codec<Vec<T>>,
415{
416 append_log_storage(memory_backend(), opts)
417}
418
419impl<B, T, C> BaseStorageTier for AppendLogStorage<B, T, C>
420where
421 B: StorageBackend + ?Sized,
422 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
423 C: Codec<Vec<T>>,
424{
425 fn name(&self) -> &str {
426 &self.name
427 }
428 fn debounce_ms(&self) -> Option<u32> {
429 self.debounce_ms
430 }
431 fn compact_every(&self) -> Option<u32> {
432 self.compact_every
433 }
434
435 fn flush(&self) -> Result<(), StorageError> {
445 let scheduled_epoch = self.rollback_epoch.load(Ordering::Acquire);
446 let mut buckets = std::mem::take(&mut *self.pending.lock());
447 let keys: Vec<String> = buckets.keys().cloned().collect();
448 for key in keys {
449 if self.rollback_epoch.load(Ordering::Acquire) != scheduled_epoch {
452 return Ok(());
453 }
454 let bucket = match buckets.remove(&key) {
455 Some(b) if !b.is_empty() => b,
456 _ => continue,
457 };
458 let restore_or_drop = |buckets: &mut std::collections::HashMap<String, Vec<T>>,
481 key: String,
482 payload: Vec<T>| {
483 if self.rollback_epoch.load(Ordering::Acquire) == scheduled_epoch {
484 buckets.insert(key, payload);
485 *self.pending.lock() = std::mem::take(buckets);
486 }
487 };
491 let (final_payload, restore_payload): (Vec<T>, Vec<T>) = match self.mode {
492 AppendLogMode::Overwrite => {
493 let snapshot = bucket.clone();
494 (bucket, snapshot)
495 }
496 AppendLogMode::Append => {
497 let existing = match self.backend.read(&key) {
498 Ok(e) => e,
499 Err(e) => {
500 restore_or_drop(&mut buckets, key, bucket);
501 return Err(e);
502 }
503 };
504 let mut merged = match existing {
505 Some(bytes) if !bytes.is_empty() => match self.codec.decode(&bytes) {
506 Ok(v) => v,
507 Err(e) => {
508 restore_or_drop(&mut buckets, key, bucket);
509 return Err(e.into());
510 }
511 },
512 _ => Vec::new(),
513 };
514 let new_entries_backup = bucket.clone();
515 merged.extend(bucket);
516 (merged, new_entries_backup)
517 }
518 };
519 let encoded = match self.codec.encode(&final_payload) {
520 Ok(b) => b,
521 Err(e) => {
522 restore_or_drop(&mut buckets, key, restore_payload);
523 return Err(e.into());
524 }
525 };
526 if let Err(e) = self.backend.write(&key, &encoded) {
527 restore_or_drop(&mut buckets, key, restore_payload);
528 return Err(e);
529 }
530 }
531 Ok(())
532 }
533
534 fn rollback(&self) -> Result<(), StorageError> {
541 self.rollback_epoch.fetch_add(1, Ordering::AcqRel);
542 self.pending.lock().clear();
543 Ok(())
544 }
545
546 fn list_by_prefix_bytes<'a>(
547 &'a self,
548 prefix: &str,
549 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
550 Box::new(PrefixIter::new(&*self.backend, prefix))
551 }
552}
553
554impl<B, T, C> AppendLogStorageTier<T> for AppendLogStorage<B, T, C>
555where
556 B: StorageBackend + ?Sized,
557 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
558 C: Codec<Vec<T>>,
559{
560 fn append_entries(&self, entries: &[T]) -> Result<(), StorageError> {
561 if entries.is_empty() {
562 return Ok(());
563 }
564 let trigger_now = {
570 let mut pending = self.pending.lock();
571 for entry in entries {
572 let k = (self.key_of)(entry);
573 pending.entry(k).or_default().push(entry.clone());
574 }
575 let mut count = self.append_count.lock();
576 let prev = *count;
577 *count = count.saturating_add(entries.len() as u64);
578 let new = *count;
579 let compact_trigger = matches!(
580 self.compact_every,
581 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
582 );
583 compact_trigger || self.debounce_ms.is_none()
584 };
585 if trigger_now {
586 self.flush()?;
587 }
588 Ok(())
589 }
590
591 fn mode(&self) -> AppendLogMode {
592 self.mode
593 }
594
595 fn load_entries(&self, opts: LoadEntriesOpts<'_>) -> Result<AppendLoadResult<T>, StorageError> {
606 let mut keys = match self.backend.list(opts.key_filter.unwrap_or("")) {
610 Ok(ks) => ks,
611 Err(StorageError::BackendNoListSupport { .. }) => match opts.key_filter {
612 Some(k) => vec![k.to_string()],
613 None => vec![self.name.clone()],
614 },
615 Err(e) => return Err(e),
616 };
617 keys.sort();
618
619 let start: u64 = opts.cursor.map_or(0, |c| c.position);
620 let page_size = opts.page_size.filter(|n| *n > 0);
622
623 let want_decoded_at_least = page_size.map(|n| start + u64::from(n) + 1);
626
627 let mut decoded: Vec<T> = Vec::new();
628 let mut total_seen: u64 = 0;
629
630 for k in keys {
631 if let Some(want) = want_decoded_at_least {
632 if total_seen >= want {
633 break;
634 }
635 }
636 if let Some(bytes) = self.backend.read(&k)? {
637 if !bytes.is_empty() {
638 let entries: Vec<T> = self.codec.decode(&bytes)?;
639 total_seen = total_seen.saturating_add(entries.len() as u64);
640 decoded.extend(entries);
641 }
642 }
643 }
644
645 let start_idx: usize = start.try_into().unwrap_or(usize::MAX).min(decoded.len());
648 let mut window: Vec<T> = decoded.split_off(start_idx);
649
650 let next_cursor: Option<AppendCursor> = match page_size {
651 Some(n) => {
652 let n_usize: usize = (n as usize).min(window.len());
653 let has_more = window.len() > n_usize;
654 window.truncate(n_usize);
655 if has_more {
656 Some(AppendCursor::from_position(start + u64::from(n)))
657 } else {
658 None
659 }
660 }
661 None => None,
662 };
663
664 Ok(AppendLoadResult {
665 entries: window,
666 cursor: next_cursor,
667 })
668 }
669}
670
671pub struct KvStorage<B, T, C = JsonCodec>
676where
677 B: StorageBackend + ?Sized,
678 T: Send + Sync + 'static,
679 C: Codec<T>,
680{
681 backend: Arc<B>,
682 codec: C,
683 name: String,
684 debounce_ms: Option<u32>,
685 compact_every: Option<u32>,
686 filter: Option<KvFilterFn<T>>,
687 pending: Mutex<std::collections::HashMap<String, T>>,
688 write_count: Mutex<u64>,
689}
690
691pub struct KvStorageOptions<T, C = JsonCodec>
692where
693 T: Send + Sync + 'static,
694 C: Codec<T>,
695{
696 pub name: Option<String>,
697 pub codec: C,
698 pub debounce_ms: Option<u32>,
699 pub compact_every: Option<u32>,
700 pub filter: Option<KvFilterFn<T>>,
701}
702
703impl<T> Default for KvStorageOptions<T, JsonCodec>
704where
705 T: Serialize + DeserializeOwned + Send + Sync + 'static,
706{
707 fn default() -> Self {
708 Self {
709 name: None,
710 codec: JsonCodec,
711 debounce_ms: None,
712 compact_every: None,
713 filter: None,
714 }
715 }
716}
717
718pub fn kv_storage<B, T, C>(backend: Arc<B>, opts: KvStorageOptions<T, C>) -> KvStorage<B, T, C>
725where
726 B: StorageBackend + ?Sized,
727 T: Send + Sync + 'static,
728 C: Codec<T>,
729{
730 assert!(
731 opts.compact_every != Some(0),
732 "kv_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
733 );
734 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
735 KvStorage {
736 backend,
737 codec: opts.codec,
738 name,
739 debounce_ms: opts.debounce_ms,
740 compact_every: opts.compact_every,
741 filter: opts.filter,
742 pending: Mutex::new(std::collections::HashMap::new()),
743 write_count: Mutex::new(0),
744 }
745}
746
747pub fn memory_kv<T, C>(opts: KvStorageOptions<T, C>) -> KvStorage<MemoryBackend, T, C>
748where
749 T: Send + Sync + 'static,
750 C: Codec<T>,
751{
752 kv_storage(memory_backend(), opts)
753}
754
755impl<B, T, C> BaseStorageTier for KvStorage<B, T, C>
756where
757 B: StorageBackend + ?Sized,
758 T: Send + Sync + 'static,
759 C: Codec<T>,
760{
761 fn name(&self) -> &str {
762 &self.name
763 }
764 fn debounce_ms(&self) -> Option<u32> {
765 self.debounce_ms
766 }
767 fn compact_every(&self) -> Option<u32> {
768 self.compact_every
769 }
770
771 fn flush(&self) -> Result<(), StorageError> {
774 let mut entries = std::mem::take(&mut *self.pending.lock());
775 let keys: Vec<String> = entries.keys().cloned().collect();
776 for key in keys {
777 let Some(value) = entries.remove(&key) else {
778 continue;
779 };
780 let bytes = match self.codec.encode(&value) {
781 Ok(b) => b,
782 Err(e) => {
783 entries.insert(key, value);
784 *self.pending.lock() = entries;
785 return Err(e.into());
786 }
787 };
788 if let Err(e) = self.backend.write(&key, &bytes) {
789 entries.insert(key, value);
790 *self.pending.lock() = entries;
791 return Err(e);
792 }
793 }
794 Ok(())
795 }
796
797 fn rollback(&self) -> Result<(), StorageError> {
798 self.pending.lock().clear();
799 Ok(())
800 }
801
802 fn list_by_prefix_bytes<'a>(
803 &'a self,
804 prefix: &str,
805 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
806 Box::new(PrefixIter::new(&*self.backend, prefix))
807 }
808}
809
810impl<B, T, C> KvStorageTier<T> for KvStorage<B, T, C>
811where
812 B: StorageBackend + ?Sized,
813 T: Send + Sync + 'static,
814 C: Codec<T>,
815{
816 fn save(&self, key: &str, value: T) -> Result<(), StorageError> {
817 if let Some(filter) = &self.filter {
818 if !filter(key, &value) {
819 return Ok(());
820 }
821 }
822 let trigger_now = {
826 self.pending.lock().insert(key.to_string(), value);
827 let mut count = self.write_count.lock();
828 let prev = *count;
829 *count = count.saturating_add(1);
830 let new = *count;
831 let compact_trigger = matches!(
832 self.compact_every,
833 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
834 );
835 compact_trigger || self.debounce_ms.is_none()
836 };
837 if trigger_now {
838 self.flush()?;
839 }
840 Ok(())
841 }
842
843 fn load(&self, key: &str) -> Result<Option<T>, StorageError> {
844 match self.backend.read(key)? {
845 Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
846 _ => Ok(None),
847 }
848 }
849
850 fn delete(&self, key: &str) -> Result<(), StorageError> {
851 self.backend.delete(key)?;
857 self.pending.lock().remove(key);
858 Ok(())
859 }
860
861 fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
862 self.backend.list(prefix)
863 }
864}