co_storage/storage/
join.rs1use crate::{BlockStorageContentMapping, ExtendedBlock, ExtendedBlockStorage};
5use async_trait::async_trait;
6use cid::Cid;
7use co_primitives::{
8 Block, BlockStat, BlockStorage, BlockStorageCloneSettings, BlockStorageStoreParams, CloneWithBlockStorageSettings,
9 MappedCid, StorageError,
10};
11use std::{collections::BTreeSet, sync::Arc};
12
13#[derive(Debug, Clone)]
17pub struct JoinBlockStorage<S, R> {
18 next: Arc<(S, Vec<R>)>,
19}
20impl<S, R> JoinBlockStorage<S, R>
21where
22 S: BlockStorage + 'static,
23 R: ExtendedBlockStorage + 'static,
24{
25 pub fn new(next: S, read: Vec<R>) -> Self {
26 Self { next: Arc::new((next, read)) }
27 }
28}
29#[async_trait]
30impl<S, R> BlockStorage for JoinBlockStorage<S, R>
31where
32 S: BlockStorage + 'static,
33 R: ExtendedBlockStorage,
34{
35 async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
36 for read in self.next.1.iter() {
37 if read.exists(cid).await.unwrap_or(false) {
38 return read.get(cid).await;
39 }
40 }
41 self.next.0.get(cid).await
42 }
43
44 async fn set(&self, block: Block) -> Result<Cid, StorageError> {
45 self.next.0.set(block).await
46 }
47
48 async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
49 self.next.0.remove(cid).await
50 }
51
52 async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
53 for read in self.next.1.iter() {
54 if read.exists(cid).await.unwrap_or(false) {
55 return read.stat(cid).await;
56 }
57 }
58 self.next.0.stat(cid).await
59 }
60
61 fn max_block_size(&self) -> usize {
62 self.next.0.max_block_size()
63 }
64}
65#[async_trait]
66impl<S, R> ExtendedBlockStorage for JoinBlockStorage<S, R>
67where
68 S: ExtendedBlockStorage + 'static,
69 R: ExtendedBlockStorage + 'static,
70{
71 async fn set_extended(&self, block: ExtendedBlock) -> Result<Cid, StorageError> {
72 self.next.0.set_extended(block).await
73 }
74
75 async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
76 for read in self.next.1.iter() {
77 if read.exists(cid).await.unwrap_or(false) {
78 return Ok(true);
79 }
80 }
81 self.next.0.exists(cid).await
82 }
83
84 async fn clear(&self) -> Result<(), StorageError> {
85 self.next.0.clear().await
86 }
87}
88#[async_trait]
89impl<S, R> BlockStorageContentMapping for JoinBlockStorage<S, R>
90where
91 S: BlockStorage + BlockStorageContentMapping + 'static,
92 R: ExtendedBlockStorage + 'static,
93{
94 async fn is_content_mapped(&self) -> bool {
95 self.next.0.is_content_mapped().await
96 }
97
98 async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
99 self.next.0.to_plain(mapped).await
100 }
101
102 async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
103 self.next.0.to_mapped(plain).await
104 }
105
106 async fn insert_mappings(&self, mappings: BTreeSet<MappedCid>) {
107 self.next.0.insert_mappings(mappings).await
108 }
109}
110#[async_trait]
111impl<S, R> CloneWithBlockStorageSettings for JoinBlockStorage<S, R>
112where
113 S: BlockStorage + CloneWithBlockStorageSettings + 'static,
114 R: ExtendedBlockStorage + Clone + 'static,
115{
116 fn clone_with_settings(&self, settings: BlockStorageCloneSettings) -> Self {
117 Self::new(self.next.0.clone_with_settings(settings), self.next.1.clone())
118 }
119}
120impl<S, R> BlockStorageStoreParams for JoinBlockStorage<S, R>
121where
122 S: BlockStorage + BlockStorageStoreParams + 'static,
123 R: ExtendedBlockStorage + 'static,
124{
125 type StoreParams = S::StoreParams;
126}