1use std::sync::Arc;
19
20use parking_lot::Mutex;
21use serde::{de::DeserializeOwned, Serialize};
22
23use crate::backend::{memory_backend, MemoryBackend, StorageBackend};
24use crate::codec::{Codec, JsonCodec};
25use crate::error::StorageError;
26use crate::tier::{
27 AppendLogStorageTier, BaseStorageTier, KvStorageTier, PrefixIter, SnapshotStorageTier,
28};
29
30type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
31type KeyOfFn<T> = Box<dyn Fn(&T) -> String + Send + Sync>;
32type KvFilterFn<T> = Box<dyn Fn(&str, &T) -> bool + Send + Sync>;
33
34pub struct SnapshotStorage<B, T, C = JsonCodec>
40where
41 B: StorageBackend + ?Sized,
42 T: Send + Sync + 'static,
43 C: Codec<T>,
44{
45 backend: Arc<B>,
46 codec: C,
47 name: String,
48 debounce_ms: Option<u32>,
49 compact_every: Option<u32>,
50 filter: Option<FilterFn<T>>,
51 key_of: KeyOfFn<T>,
52 pending: Mutex<Option<T>>,
55 write_count: Mutex<u64>,
57 last_saved_key: Mutex<Option<String>>,
60}
61
62pub struct SnapshotStorageOptions<T, C = JsonCodec>
64where
65 T: Send + Sync + 'static,
66 C: Codec<T>,
67{
68 pub name: Option<String>,
69 pub codec: C,
70 pub debounce_ms: Option<u32>,
71 pub compact_every: Option<u32>,
72 pub filter: Option<FilterFn<T>>,
73 pub key_of: Option<KeyOfFn<T>>,
74}
75
76impl<T> Default for SnapshotStorageOptions<T, JsonCodec>
77where
78 T: Serialize + DeserializeOwned + Send + Sync + 'static,
79{
80 fn default() -> Self {
81 Self {
82 name: None,
83 codec: JsonCodec,
84 debounce_ms: None,
85 compact_every: None,
86 filter: None,
87 key_of: None,
88 }
89 }
90}
91
92pub fn snapshot_storage<B, T, C>(
100 backend: Arc<B>,
101 opts: SnapshotStorageOptions<T, C>,
102) -> SnapshotStorage<B, T, C>
103where
104 B: StorageBackend + ?Sized,
105 T: Send + Sync + 'static,
106 C: Codec<T>,
107{
108 assert!(
109 opts.compact_every != Some(0),
110 "snapshot_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
111 );
112 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
113 let fallback_key = name.clone();
114 let key_of = opts
115 .key_of
116 .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
117 SnapshotStorage {
118 backend,
119 codec: opts.codec,
120 name,
121 debounce_ms: opts.debounce_ms,
122 compact_every: opts.compact_every,
123 filter: opts.filter,
124 key_of,
125 pending: Mutex::new(None),
126 write_count: Mutex::new(0),
127 last_saved_key: Mutex::new(None),
128 }
129}
130
131pub fn memory_snapshot<T, C>(
133 opts: SnapshotStorageOptions<T, C>,
134) -> SnapshotStorage<MemoryBackend, T, C>
135where
136 T: Send + Sync + 'static,
137 C: Codec<T>,
138{
139 snapshot_storage(memory_backend(), opts)
140}
141
142impl<B, T, C> SnapshotStorage<B, T, C>
143where
144 B: StorageBackend + ?Sized,
145 T: Send + Sync + 'static,
146 C: Codec<T>,
147{
148 fn try_flush(
152 backend: &B,
153 codec: &C,
154 key_of: &KeyOfFn<T>,
155 last_saved_key: &Mutex<Option<String>>,
156 snapshot: T,
157 ) -> Result<(), (T, StorageError)> {
158 let key = key_of(&snapshot);
159 let bytes = match codec.encode(&snapshot) {
160 Ok(b) => b,
161 Err(e) => return Err((snapshot, e.into())),
162 };
163 if let Err(e) = backend.write(&key, &bytes) {
164 return Err((snapshot, e));
165 }
166 *last_saved_key.lock() = Some(key);
167 Ok(())
168 }
169}
170
171impl<B, T, C> BaseStorageTier for SnapshotStorage<B, T, C>
172where
173 B: StorageBackend + ?Sized,
174 T: Send + Sync + 'static,
175 C: Codec<T>,
176{
177 fn name(&self) -> &str {
178 &self.name
179 }
180 fn debounce_ms(&self) -> Option<u32> {
181 self.debounce_ms
182 }
183 fn compact_every(&self) -> Option<u32> {
184 self.compact_every
185 }
186
187 fn flush(&self) -> Result<(), StorageError> {
190 let slot = self.pending.lock().take();
191 let Some(snapshot) = slot else {
192 return Ok(());
193 };
194 match Self::try_flush(
195 &*self.backend,
196 &self.codec,
197 &self.key_of,
198 &self.last_saved_key,
199 snapshot,
200 ) {
201 Ok(()) => Ok(()),
202 Err((snapshot, err)) => {
203 *self.pending.lock() = Some(snapshot);
205 Err(err)
206 }
207 }
208 }
209
210 fn rollback(&self) -> Result<(), StorageError> {
211 *self.pending.lock() = None;
212 Ok(())
213 }
214
215 fn list_by_prefix_bytes<'a>(
216 &'a self,
217 prefix: &str,
218 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
219 Box::new(PrefixIter::new(&*self.backend, prefix))
220 }
221
222 fn compact(&self) -> Result<(), StorageError> {
223 self.flush()
224 }
225}
226
227impl<B, T, C> SnapshotStorageTier<T> for SnapshotStorage<B, T, C>
228where
229 B: StorageBackend + ?Sized,
230 T: Send + Sync + 'static,
231 C: Codec<T>,
232{
233 fn save(&self, snapshot: T) -> Result<(), StorageError> {
234 if let Some(filter) = &self.filter {
235 if !filter(&snapshot) {
236 return Ok(());
237 }
238 }
239 let captured: Option<T> = {
247 let mut pending = self.pending.lock();
248 *pending = Some(snapshot);
249 let mut count = self.write_count.lock();
250 let prev = *count;
251 *count = count.saturating_add(1);
252 let new = *count;
253 let compact_trigger = matches!(
254 self.compact_every,
255 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
256 );
257 let trigger = compact_trigger || self.debounce_ms.is_none();
258 if trigger {
259 pending.take()
260 } else {
261 None
262 }
263 };
264 if let Some(snap) = captured {
265 if let Err((snap, err)) = Self::try_flush(
267 &self.backend,
268 &self.codec,
269 &self.key_of,
270 &self.last_saved_key,
271 snap,
272 ) {
273 *self.pending.lock() = Some(snap);
274 return Err(err);
275 }
276 }
277 Ok(())
278 }
279
280 fn load(&self) -> Result<Option<T>, StorageError> {
281 let key = self
282 .last_saved_key
283 .lock()
284 .clone()
285 .unwrap_or_else(|| self.name.clone());
286 match self.backend.read(&key)? {
287 Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
288 _ => Ok(None),
289 }
290 }
291}
292
293pub struct AppendLogStorage<B, T, C = JsonCodec>
299where
300 B: StorageBackend + ?Sized,
301 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
302 C: Codec<Vec<T>>,
303{
304 backend: Arc<B>,
305 codec: C,
306 name: String,
307 debounce_ms: Option<u32>,
308 compact_every: Option<u32>,
309 key_of: KeyOfFn<T>,
310 pending: Mutex<std::collections::HashMap<String, Vec<T>>>,
312 append_count: Mutex<u64>,
314}
315
316pub struct AppendLogStorageOptions<T, C = JsonCodec>
317where
318 T: Send + Sync + 'static,
319 C: Codec<Vec<T>>,
320{
321 pub name: Option<String>,
322 pub codec: C,
323 pub debounce_ms: Option<u32>,
324 pub compact_every: Option<u32>,
325 pub key_of: Option<KeyOfFn<T>>,
326}
327
328impl<T> Default for AppendLogStorageOptions<T, JsonCodec>
329where
330 T: Serialize + DeserializeOwned + Send + Sync + 'static,
331{
332 fn default() -> Self {
333 Self {
334 name: None,
335 codec: JsonCodec,
336 debounce_ms: None,
337 compact_every: None,
338 key_of: None,
339 }
340 }
341}
342
343pub fn append_log_storage<B, T, C>(
350 backend: Arc<B>,
351 opts: AppendLogStorageOptions<T, C>,
352) -> AppendLogStorage<B, T, C>
353where
354 B: StorageBackend + ?Sized,
355 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
356 C: Codec<Vec<T>>,
357{
358 assert!(
359 opts.compact_every != Some(0),
360 "append_log_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
361 );
362 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
363 let fallback_key = name.clone();
364 let key_of = opts
365 .key_of
366 .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
367 AppendLogStorage {
368 backend,
369 codec: opts.codec,
370 name,
371 debounce_ms: opts.debounce_ms,
372 compact_every: opts.compact_every,
373 key_of,
374 pending: Mutex::new(std::collections::HashMap::new()),
375 append_count: Mutex::new(0),
376 }
377}
378
379pub fn memory_append_log<T, C>(
380 opts: AppendLogStorageOptions<T, C>,
381) -> AppendLogStorage<MemoryBackend, T, C>
382where
383 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
384 C: Codec<Vec<T>>,
385{
386 append_log_storage(memory_backend(), opts)
387}
388
389impl<B, T, C> BaseStorageTier for AppendLogStorage<B, T, C>
390where
391 B: StorageBackend + ?Sized,
392 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
393 C: Codec<Vec<T>>,
394{
395 fn name(&self) -> &str {
396 &self.name
397 }
398 fn debounce_ms(&self) -> Option<u32> {
399 self.debounce_ms
400 }
401 fn compact_every(&self) -> Option<u32> {
402 self.compact_every
403 }
404
405 fn flush(&self) -> Result<(), StorageError> {
408 let mut buckets = std::mem::take(&mut *self.pending.lock());
409 let keys: Vec<String> = buckets.keys().cloned().collect();
410 for key in keys {
411 let bucket = match buckets.remove(&key) {
412 Some(b) if !b.is_empty() => b,
413 _ => continue,
414 };
415 let existing = match self.backend.read(&key) {
417 Ok(e) => e,
418 Err(e) => {
419 buckets.insert(key, bucket);
420 *self.pending.lock() = buckets;
421 return Err(e);
422 }
423 };
424 let mut merged = match existing {
425 Some(bytes) if !bytes.is_empty() => match self.codec.decode(&bytes) {
426 Ok(v) => v,
427 Err(e) => {
428 buckets.insert(key, bucket);
429 *self.pending.lock() = buckets;
430 return Err(e.into());
431 }
432 },
433 _ => Vec::new(),
434 };
435 let bucket_backup = bucket.clone();
436 merged.extend(bucket);
437 let encoded = match self.codec.encode(&merged) {
438 Ok(b) => b,
439 Err(e) => {
440 buckets.insert(key, bucket_backup);
441 *self.pending.lock() = buckets;
442 return Err(e.into());
443 }
444 };
445 if let Err(e) = self.backend.write(&key, &encoded) {
446 *self.pending.lock() = buckets;
447 return Err(e);
448 }
449 }
450 Ok(())
451 }
452
453 fn rollback(&self) -> Result<(), StorageError> {
454 self.pending.lock().clear();
455 Ok(())
456 }
457
458 fn list_by_prefix_bytes<'a>(
459 &'a self,
460 prefix: &str,
461 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
462 Box::new(PrefixIter::new(&*self.backend, prefix))
463 }
464}
465
466impl<B, T, C> AppendLogStorageTier<T> for AppendLogStorage<B, T, C>
467where
468 B: StorageBackend + ?Sized,
469 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
470 C: Codec<Vec<T>>,
471{
472 fn append_entries(&self, entries: &[T]) -> Result<(), StorageError> {
473 if entries.is_empty() {
474 return Ok(());
475 }
476 let trigger_now = {
482 let mut pending = self.pending.lock();
483 for entry in entries {
484 let k = (self.key_of)(entry);
485 pending.entry(k).or_default().push(entry.clone());
486 }
487 let mut count = self.append_count.lock();
488 let prev = *count;
489 *count = count.saturating_add(entries.len() as u64);
490 let new = *count;
491 let compact_trigger = matches!(
492 self.compact_every,
493 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
494 );
495 compact_trigger || self.debounce_ms.is_none()
496 };
497 if trigger_now {
498 self.flush()?;
499 }
500 Ok(())
501 }
502
503 fn load_entries(&self, key_filter: Option<&str>) -> Result<Vec<T>, StorageError> {
504 let keys = match self.backend.list(key_filter.unwrap_or("")) {
507 Ok(ks) => ks,
508 Err(StorageError::BackendNoListSupport { .. }) => match key_filter {
509 Some(k) => vec![k.to_string()],
510 None => vec![self.name.clone()],
511 },
512 Err(e) => return Err(e),
513 };
514 let mut all = Vec::new();
515 for k in keys {
516 if let Some(bytes) = self.backend.read(&k)? {
517 if !bytes.is_empty() {
518 let entries: Vec<T> = self.codec.decode(&bytes)?;
519 all.extend(entries);
520 }
521 }
522 }
523 Ok(all)
524 }
525}
526
527pub struct KvStorage<B, T, C = JsonCodec>
532where
533 B: StorageBackend + ?Sized,
534 T: Send + Sync + 'static,
535 C: Codec<T>,
536{
537 backend: Arc<B>,
538 codec: C,
539 name: String,
540 debounce_ms: Option<u32>,
541 compact_every: Option<u32>,
542 filter: Option<KvFilterFn<T>>,
543 pending: Mutex<std::collections::HashMap<String, T>>,
544 write_count: Mutex<u64>,
545}
546
547pub struct KvStorageOptions<T, C = JsonCodec>
548where
549 T: Send + Sync + 'static,
550 C: Codec<T>,
551{
552 pub name: Option<String>,
553 pub codec: C,
554 pub debounce_ms: Option<u32>,
555 pub compact_every: Option<u32>,
556 pub filter: Option<KvFilterFn<T>>,
557}
558
559impl<T> Default for KvStorageOptions<T, JsonCodec>
560where
561 T: Serialize + DeserializeOwned + Send + Sync + 'static,
562{
563 fn default() -> Self {
564 Self {
565 name: None,
566 codec: JsonCodec,
567 debounce_ms: None,
568 compact_every: None,
569 filter: None,
570 }
571 }
572}
573
574pub fn kv_storage<B, T, C>(backend: Arc<B>, opts: KvStorageOptions<T, C>) -> KvStorage<B, T, C>
581where
582 B: StorageBackend + ?Sized,
583 T: Send + Sync + 'static,
584 C: Codec<T>,
585{
586 assert!(
587 opts.compact_every != Some(0),
588 "kv_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
589 );
590 let name = opts.name.unwrap_or_else(|| backend.name().to_string());
591 KvStorage {
592 backend,
593 codec: opts.codec,
594 name,
595 debounce_ms: opts.debounce_ms,
596 compact_every: opts.compact_every,
597 filter: opts.filter,
598 pending: Mutex::new(std::collections::HashMap::new()),
599 write_count: Mutex::new(0),
600 }
601}
602
603pub fn memory_kv<T, C>(opts: KvStorageOptions<T, C>) -> KvStorage<MemoryBackend, T, C>
604where
605 T: Send + Sync + 'static,
606 C: Codec<T>,
607{
608 kv_storage(memory_backend(), opts)
609}
610
611impl<B, T, C> BaseStorageTier for KvStorage<B, T, C>
612where
613 B: StorageBackend + ?Sized,
614 T: Send + Sync + 'static,
615 C: Codec<T>,
616{
617 fn name(&self) -> &str {
618 &self.name
619 }
620 fn debounce_ms(&self) -> Option<u32> {
621 self.debounce_ms
622 }
623 fn compact_every(&self) -> Option<u32> {
624 self.compact_every
625 }
626
627 fn flush(&self) -> Result<(), StorageError> {
630 let mut entries = std::mem::take(&mut *self.pending.lock());
631 let keys: Vec<String> = entries.keys().cloned().collect();
632 for key in keys {
633 let Some(value) = entries.remove(&key) else {
634 continue;
635 };
636 let bytes = match self.codec.encode(&value) {
637 Ok(b) => b,
638 Err(e) => {
639 entries.insert(key, value);
640 *self.pending.lock() = entries;
641 return Err(e.into());
642 }
643 };
644 if let Err(e) = self.backend.write(&key, &bytes) {
645 entries.insert(key, value);
646 *self.pending.lock() = entries;
647 return Err(e);
648 }
649 }
650 Ok(())
651 }
652
653 fn rollback(&self) -> Result<(), StorageError> {
654 self.pending.lock().clear();
655 Ok(())
656 }
657
658 fn list_by_prefix_bytes<'a>(
659 &'a self,
660 prefix: &str,
661 ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
662 Box::new(PrefixIter::new(&*self.backend, prefix))
663 }
664}
665
666impl<B, T, C> KvStorageTier<T> for KvStorage<B, T, C>
667where
668 B: StorageBackend + ?Sized,
669 T: Send + Sync + 'static,
670 C: Codec<T>,
671{
672 fn save(&self, key: &str, value: T) -> Result<(), StorageError> {
673 if let Some(filter) = &self.filter {
674 if !filter(key, &value) {
675 return Ok(());
676 }
677 }
678 let trigger_now = {
682 self.pending.lock().insert(key.to_string(), value);
683 let mut count = self.write_count.lock();
684 let prev = *count;
685 *count = count.saturating_add(1);
686 let new = *count;
687 let compact_trigger = matches!(
688 self.compact_every,
689 Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
690 );
691 compact_trigger || self.debounce_ms.is_none()
692 };
693 if trigger_now {
694 self.flush()?;
695 }
696 Ok(())
697 }
698
699 fn load(&self, key: &str) -> Result<Option<T>, StorageError> {
700 match self.backend.read(key)? {
701 Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
702 _ => Ok(None),
703 }
704 }
705
706 fn delete(&self, key: &str) -> Result<(), StorageError> {
707 self.backend.delete(key)?;
713 self.pending.lock().remove(key);
714 Ok(())
715 }
716
717 fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
718 self.backend.list(prefix)
719 }
720}