co_storage/storage/
change.rs1use crate::{BlockStorageContentMapping, ExtendedBlock, ExtendedBlockStorage};
5use async_trait::async_trait;
6use cid::Cid;
7use co_primitives::{
8 Block, BlockStat, BlockStorage, BlockStorageStoreParams, CloneWithBlockStorageSettings, MappedCid, StorageError,
9};
10use std::{
11 collections::{BTreeSet, HashSet},
12 mem::swap,
13 sync::{Arc, Mutex},
14};
15
16#[derive(Debug, Clone)]
19pub struct ChangeBlockStorage<S> {
20 next: S,
21 changes: Arc<Mutex<HashSet<BlockStorageChange>>>,
22 record: bool,
23}
24impl<S> ChangeBlockStorage<S> {
25 pub fn new(next: S) -> Self {
26 Self { next, changes: Default::default(), record: true }
27 }
28
29 pub fn set_record(&mut self, record: bool) {
30 self.record = record;
31 }
32
33 pub async fn drain(&self) -> impl Iterator<Item = BlockStorageChange> + use<S> {
35 let mut created = self.changes.lock().unwrap();
36 let mut result = HashSet::new();
37 swap(&mut result, &mut created);
38 result.into_iter()
39 }
40}
41#[async_trait]
42impl<S> BlockStorage for ChangeBlockStorage<S>
43where
44 S: BlockStorage + 'static,
45{
46 async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
47 Ok(self.next.get(cid).await?)
48 }
49
50 async fn set(&self, block: Block) -> Result<Cid, StorageError> {
51 if (self.next.stat(block.cid()).await).is_ok() {
53 return Ok(*block.cid());
54 }
55
56 let result = self.next.set(block).await?;
58
59 if self.record {
61 let mut changes = self.changes.lock().unwrap();
62 changes.remove(&BlockStorageChange::Remove(result));
63 changes.insert(BlockStorageChange::Set(result));
64 }
65
66 Ok(result)
68 }
69
70 async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
71 let result = self.next.remove(cid).await?;
73
74 if self.record {
76 let mut changes = self.changes.lock().unwrap();
77 if !changes.remove(&BlockStorageChange::Set(*cid)) {
78 changes.insert(BlockStorageChange::Remove(*cid));
79 }
80 }
81
82 Ok(result)
84 }
85
86 async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
87 Ok(self.next.stat(cid).await?)
88 }
89
90 fn max_block_size(&self) -> usize {
91 self.next.max_block_size()
92 }
93}
94#[async_trait]
95impl<S> ExtendedBlockStorage for ChangeBlockStorage<S>
96where
97 S: ExtendedBlockStorage + 'static,
98{
99 async fn set_extended(&self, block: ExtendedBlock) -> Result<Cid, StorageError> {
100 self.next.set_extended(block).await
101 }
102
103 async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
104 self.next.exists(cid).await
105 }
106
107 async fn clear(&self) -> Result<(), StorageError> {
108 self.next.clear().await
109 }
110}
111impl<S> CloneWithBlockStorageSettings for ChangeBlockStorage<S>
112where
113 S: BlockStorage + CloneWithBlockStorageSettings + 'static,
114{
115 fn clone_with_settings(&self, settings: co_primitives::BlockStorageCloneSettings) -> Self {
116 Self { next: self.next.clone_with_settings(settings), changes: self.changes.clone(), record: self.record }
117 }
118}
119#[async_trait]
120impl<S> BlockStorageContentMapping for ChangeBlockStorage<S>
121where
122 S: BlockStorage + BlockStorageContentMapping + 'static,
123{
124 async fn is_content_mapped(&self) -> bool {
125 self.next.is_content_mapped().await
126 }
127
128 async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
129 self.next.to_plain(mapped).await
130 }
131
132 async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
133 self.next.to_mapped(plain).await
134 }
135
136 async fn insert_mappings(&self, mappings: BTreeSet<MappedCid>) {
137 self.next.insert_mappings(mappings).await
138 }
139}
140impl<S> BlockStorageStoreParams for ChangeBlockStorage<S>
141where
142 S: BlockStorageStoreParams,
143{
144 type StoreParams = S::StoreParams;
145}
146
147#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
148pub enum BlockStorageChange {
149 Set(Cid),
150 Remove(Cid),
151}