Skip to main content

co_storage/storage/
change.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{BlockStorageContentMapping, ExtendedBlock, ExtendedBlockStorage};
5use async_trait::async_trait;
6use cid::Cid;
7use co_primitives::{
8	Block, BlockStat, BlockStorage, BlockStorageStoreParams, CloneWithBlockStorageSettings, MappedCid, StorageError,
9};
10use std::{
11	collections::{BTreeSet, HashSet},
12	mem::swap,
13	sync::{Arc, Mutex},
14};
15
16/// Store all [`Cid`] of blocks that have been newly created or removed.
17/// Additionally set calls for blocks which already exists in `next` will be ignored.
18#[derive(Debug, Clone)]
19pub struct ChangeBlockStorage<S> {
20	next: S,
21	changes: Arc<Mutex<HashSet<BlockStorageChange>>>,
22	record: bool,
23}
24impl<S> ChangeBlockStorage<S> {
25	pub fn new(next: S) -> Self {
26		Self { next, changes: Default::default(), record: true }
27	}
28
29	pub fn set_record(&mut self, record: bool) {
30		self.record = record;
31	}
32
33	/// Drain all changes and return them as iterator.
34	pub async fn drain(&self) -> impl Iterator<Item = BlockStorageChange> + use<S> {
35		let mut created = self.changes.lock().unwrap();
36		let mut result = HashSet::new();
37		swap(&mut result, &mut created);
38		result.into_iter()
39	}
40}
41#[async_trait]
42impl<S> BlockStorage for ChangeBlockStorage<S>
43where
44	S: BlockStorage + 'static,
45{
46	async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
47		Ok(self.next.get(cid).await?)
48	}
49
50	async fn set(&self, block: Block) -> Result<Cid, StorageError> {
51		// already exists?
52		if (self.next.stat(block.cid()).await).is_ok() {
53			return Ok(*block.cid());
54		}
55
56		// create
57		let result = self.next.set(block).await?;
58
59		// record
60		if self.record {
61			let mut changes = self.changes.lock().unwrap();
62			changes.remove(&BlockStorageChange::Remove(result));
63			changes.insert(BlockStorageChange::Set(result));
64		}
65
66		// result
67		Ok(result)
68	}
69
70	async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
71		// remove
72		let result = self.next.remove(cid).await?;
73
74		// record (ignore when it just has been added)
75		if self.record {
76			let mut changes = self.changes.lock().unwrap();
77			if !changes.remove(&BlockStorageChange::Set(*cid)) {
78				changes.insert(BlockStorageChange::Remove(*cid));
79			}
80		}
81
82		// result
83		Ok(result)
84	}
85
86	async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
87		Ok(self.next.stat(cid).await?)
88	}
89
90	fn max_block_size(&self) -> usize {
91		self.next.max_block_size()
92	}
93}
94#[async_trait]
95impl<S> ExtendedBlockStorage for ChangeBlockStorage<S>
96where
97	S: ExtendedBlockStorage + 'static,
98{
99	async fn set_extended(&self, block: ExtendedBlock) -> Result<Cid, StorageError> {
100		self.next.set_extended(block).await
101	}
102
103	async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
104		self.next.exists(cid).await
105	}
106
107	async fn clear(&self) -> Result<(), StorageError> {
108		self.next.clear().await
109	}
110}
111impl<S> CloneWithBlockStorageSettings for ChangeBlockStorage<S>
112where
113	S: BlockStorage + CloneWithBlockStorageSettings + 'static,
114{
115	fn clone_with_settings(&self, settings: co_primitives::BlockStorageCloneSettings) -> Self {
116		Self { next: self.next.clone_with_settings(settings), changes: self.changes.clone(), record: self.record }
117	}
118}
119#[async_trait]
120impl<S> BlockStorageContentMapping for ChangeBlockStorage<S>
121where
122	S: BlockStorage + BlockStorageContentMapping + 'static,
123{
124	async fn is_content_mapped(&self) -> bool {
125		self.next.is_content_mapped().await
126	}
127
128	async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
129		self.next.to_plain(mapped).await
130	}
131
132	async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
133		self.next.to_mapped(plain).await
134	}
135
136	async fn insert_mappings(&self, mappings: BTreeSet<MappedCid>) {
137		self.next.insert_mappings(mappings).await
138	}
139}
140impl<S> BlockStorageStoreParams for ChangeBlockStorage<S>
141where
142	S: BlockStorageStoreParams,
143{
144	type StoreParams = S::StoreParams;
145}
146
147#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
148pub enum BlockStorageChange {
149	Set(Cid),
150	Remove(Cid),
151}