Skip to main content

co_storage/storage/
join.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{BlockStorageContentMapping, ExtendedBlock, ExtendedBlockStorage};
5use async_trait::async_trait;
6use cid::Cid;
7use co_primitives::{
8	Block, BlockStat, BlockStorage, BlockStorageCloneSettings, BlockStorageStoreParams, CloneWithBlockStorageSettings,
9	MappedCid, StorageError,
10};
11use std::{collections::BTreeSet, sync::Arc};
12
13/// Joins multiple block storages together.
14/// Write operations will be always delegated to the first storage (which passed to `new`).
15/// Read operations starts with the last up to the first.
16#[derive(Debug, Clone)]
17pub struct JoinBlockStorage<S, R> {
18	next: Arc<(S, Vec<R>)>,
19}
20impl<S, R> JoinBlockStorage<S, R>
21where
22	S: BlockStorage + 'static,
23	R: ExtendedBlockStorage + 'static,
24{
25	pub fn new(next: S, read: Vec<R>) -> Self {
26		Self { next: Arc::new((next, read)) }
27	}
28}
29#[async_trait]
30impl<S, R> BlockStorage for JoinBlockStorage<S, R>
31where
32	S: BlockStorage + 'static,
33	R: ExtendedBlockStorage,
34{
35	async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
36		for read in self.next.1.iter() {
37			if read.exists(cid).await.unwrap_or(false) {
38				return read.get(cid).await;
39			}
40		}
41		self.next.0.get(cid).await
42	}
43
44	async fn set(&self, block: Block) -> Result<Cid, StorageError> {
45		self.next.0.set(block).await
46	}
47
48	async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
49		self.next.0.remove(cid).await
50	}
51
52	async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
53		for read in self.next.1.iter() {
54			if read.exists(cid).await.unwrap_or(false) {
55				return read.stat(cid).await;
56			}
57		}
58		self.next.0.stat(cid).await
59	}
60
61	fn max_block_size(&self) -> usize {
62		self.next.0.max_block_size()
63	}
64}
65#[async_trait]
66impl<S, R> ExtendedBlockStorage for JoinBlockStorage<S, R>
67where
68	S: ExtendedBlockStorage + 'static,
69	R: ExtendedBlockStorage + 'static,
70{
71	async fn set_extended(&self, block: ExtendedBlock) -> Result<Cid, StorageError> {
72		self.next.0.set_extended(block).await
73	}
74
75	async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
76		for read in self.next.1.iter() {
77			if read.exists(cid).await.unwrap_or(false) {
78				return Ok(true);
79			}
80		}
81		self.next.0.exists(cid).await
82	}
83
84	async fn clear(&self) -> Result<(), StorageError> {
85		self.next.0.clear().await
86	}
87}
88#[async_trait]
89impl<S, R> BlockStorageContentMapping for JoinBlockStorage<S, R>
90where
91	S: BlockStorage + BlockStorageContentMapping + 'static,
92	R: ExtendedBlockStorage + 'static,
93{
94	async fn is_content_mapped(&self) -> bool {
95		self.next.0.is_content_mapped().await
96	}
97
98	async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
99		self.next.0.to_plain(mapped).await
100	}
101
102	async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
103		self.next.0.to_mapped(plain).await
104	}
105
106	async fn insert_mappings(&self, mappings: BTreeSet<MappedCid>) {
107		self.next.0.insert_mappings(mappings).await
108	}
109}
110#[async_trait]
111impl<S, R> CloneWithBlockStorageSettings for JoinBlockStorage<S, R>
112where
113	S: BlockStorage + CloneWithBlockStorageSettings + 'static,
114	R: ExtendedBlockStorage + Clone + 'static,
115{
116	fn clone_with_settings(&self, settings: BlockStorageCloneSettings) -> Self {
117		Self::new(self.next.0.clone_with_settings(settings), self.next.1.clone())
118	}
119}
120impl<S, R> BlockStorageStoreParams for JoinBlockStorage<S, R>
121where
122	S: BlockStorage + BlockStorageStoreParams + 'static,
123	R: ExtendedBlockStorage + 'static,
124{
125	type StoreParams = S::StoreParams;
126}