monoutils_store/implementations/stores/
dualstore.rs

1use std::{collections::HashSet, pin::Pin};
2
3use bytes::Bytes;
4use libipld::Cid;
5use serde::{de::DeserializeOwned, Serialize};
6use tokio::io::AsyncRead;
7
8use crate::{Codec, IpldReferences, IpldStore, StoreError, StoreResult};
9
10//--------------------------------------------------------------------------------------------------
11// Types
12//--------------------------------------------------------------------------------------------------
13
14/// A dual store that stores blocks on two different stores.
15#[derive(Debug, Clone)]
16pub struct DualStore<A, B>
17where
18    A: IpldStore,
19    B: IpldStore,
20{
21    store_a: A,
22    store_b: B,
23    config: DualStoreConfig,
24}
25
26/// Choices for selecting which store to use for a given operation.
27#[derive(Clone, Copy, Debug, PartialEq)]
28pub enum Choice {
29    /// Use the first store.
30    A,
31    /// Use the second store.
32    B,
33}
34
35/// Configuration for a dual store.
36#[derive(Debug, Clone)]
37pub struct DualStoreConfig {
38    /// The default store to use.
39    pub default: Choice,
40}
41
42//--------------------------------------------------------------------------------------------------
43// Methods
44//--------------------------------------------------------------------------------------------------
45
46impl<A, B> DualStore<A, B>
47where
48    A: IpldStore,
49    B: IpldStore,
50{
51    /// Creates a new dual store from two stores.
52    pub fn new(store_a: A, store_b: B, config: DualStoreConfig) -> Self {
53        Self {
54            store_a,
55            store_b,
56            config,
57        }
58    }
59
60    /// Gets the type stored as an IPLD data from a chosen store by its `Cid`.
61    pub async fn get_node_from<D>(&self, cid: &Cid, choice: Choice) -> StoreResult<D>
62    where
63        D: DeserializeOwned + Send,
64    {
65        match choice {
66            Choice::A => self.store_a.get_node(cid).await,
67            Choice::B => self.store_b.get_node(cid).await,
68        }
69    }
70
71    /// Gets the bytes stored in a chosen store as raw bytes by its `Cid`.
72    pub async fn get_bytes_from<'a>(
73        &'a self,
74        cid: &'a Cid,
75        choice: Choice,
76    ) -> StoreResult<Pin<Box<dyn AsyncRead + Send + Sync + 'a>>> {
77        match choice {
78            Choice::A => self.store_a.get_bytes(cid).await,
79            Choice::B => self.store_b.get_bytes(cid).await,
80        }
81    }
82
83    /// Gets raw bytes from a chosen store as a single block by its `Cid`.
84    pub async fn get_raw_block_from(&self, cid: &Cid, choice: Choice) -> StoreResult<Bytes> {
85        match choice {
86            Choice::A => self.store_a.get_raw_block(cid).await,
87            Choice::B => self.store_b.get_raw_block(cid).await,
88        }
89    }
90
91    /// Saves a serializable type to a chosen store and returns the `Cid` to it.
92    pub async fn put_node_into<T>(&self, data: &T, choice: Choice) -> StoreResult<Cid>
93    where
94        T: Serialize + IpldReferences + Sync,
95    {
96        match choice {
97            Choice::A => self.store_a.put_node(data).await,
98            Choice::B => self.store_b.put_node(data).await,
99        }
100    }
101
102    /// Saves raw bytes to a chosen store and returns the `Cid` to it.
103    pub async fn put_bytes_into(
104        &self,
105        bytes: impl AsyncRead + Send + Sync,
106        choice: Choice,
107    ) -> StoreResult<Cid> {
108        match choice {
109            Choice::A => self.store_a.put_bytes(bytes).await,
110            Choice::B => self.store_b.put_bytes(bytes).await,
111        }
112    }
113
114    /// Saves raw bytes as a single block to a chosen store and returns the `Cid` to it.
115    pub async fn put_raw_block_into(
116        &self,
117        bytes: impl Into<Bytes> + Send,
118        choice: Choice,
119    ) -> StoreResult<Cid> {
120        match choice {
121            Choice::A => self.store_a.put_raw_block(bytes).await,
122            Choice::B => self.store_b.put_raw_block(bytes).await,
123        }
124    }
125
126    /// Checks if a block exists in a chosen store by its `Cid`.
127    pub async fn has_from(&self, cid: &Cid, choice: Choice) -> bool {
128        match choice {
129            Choice::A => self.store_a.has(cid).await,
130            Choice::B => self.store_b.has(cid).await,
131        }
132    }
133}
134
135impl Choice {
136    /// Returns the other choice.
137    pub fn other(&self) -> Self {
138        match self {
139            Choice::A => Choice::B,
140            Choice::B => Choice::A,
141        }
142    }
143}
144
145//--------------------------------------------------------------------------------------------------
146// Trait Implementations
147//--------------------------------------------------------------------------------------------------
148
149impl<A, B> IpldStore for DualStore<A, B>
150where
151    A: IpldStore + Sync,
152    B: IpldStore + Sync,
153{
154    async fn put_node<T>(&self, data: &T) -> StoreResult<Cid>
155    where
156        T: Serialize + IpldReferences + Sync,
157    {
158        self.put_node_into(data, self.config.default).await
159    }
160
161    async fn put_bytes<'a>(&'a self, bytes: impl AsyncRead + Send + Sync + 'a) -> StoreResult<Cid> {
162        self.put_bytes_into(bytes, self.config.default).await
163    }
164
165    async fn put_raw_block(&self, bytes: impl Into<Bytes> + Send) -> StoreResult<Cid> {
166        self.put_raw_block_into(bytes, self.config.default).await
167    }
168
169    async fn get_node<D>(&self, cid: &Cid) -> StoreResult<D>
170    where
171        D: DeserializeOwned + Send,
172    {
173        match self.get_node_from(cid, self.config.default).await {
174            Ok(data) => Ok(data),
175            Err(StoreError::BlockNotFound(_)) => {
176                let choice = self.config.default.other();
177                self.get_node_from(cid, choice).await
178            }
179            Err(err) => Err(err),
180        }
181    }
182
183    async fn get_bytes<'a>(
184        &'a self,
185        cid: &'a Cid,
186    ) -> StoreResult<Pin<Box<dyn AsyncRead + Send + Sync + 'a>>> {
187        match self.get_bytes_from(cid, self.config.default).await {
188            Ok(bytes) => Ok(bytes),
189            Err(StoreError::BlockNotFound(_)) => {
190                let choice = self.config.default.other();
191                self.get_bytes_from(cid, choice).await
192            }
193            Err(err) => Err(err),
194        }
195    }
196
197    async fn get_raw_block(&self, cid: &Cid) -> StoreResult<Bytes> {
198        match self.get_raw_block_from(cid, self.config.default).await {
199            Ok(bytes) => Ok(bytes),
200            Err(StoreError::BlockNotFound(_)) => {
201                let choice = self.config.default.other();
202                self.get_raw_block_from(cid, choice).await
203            }
204            Err(err) => Err(err),
205        }
206    }
207
208    async fn has(&self, cid: &Cid) -> bool {
209        match self.has_from(cid, self.config.default).await {
210            true => true,
211            false => self.has_from(cid, self.config.default.other()).await,
212        }
213    }
214
215    fn get_supported_codecs(&self) -> HashSet<Codec> {
216        self.store_a
217            .get_supported_codecs()
218            .into_iter()
219            .chain(self.store_b.get_supported_codecs())
220            .collect()
221    }
222
223    fn get_node_block_max_size(&self) -> Option<u64> {
224        self.store_a
225            .get_node_block_max_size()
226            .max(self.store_b.get_node_block_max_size())
227    }
228
229    fn get_raw_block_max_size(&self) -> Option<u64> {
230        self.store_a
231            .get_raw_block_max_size()
232            .max(self.store_b.get_raw_block_max_size())
233    }
234
235    async fn is_empty(&self) -> StoreResult<bool> {
236        Ok(self.store_a.is_empty().await? && self.store_b.is_empty().await?)
237    }
238
239    async fn get_size(&self) -> StoreResult<u64> {
240        Ok(self.store_a.get_size().await? + self.store_b.get_size().await?)
241    }
242}
243
244impl Default for DualStoreConfig {
245    fn default() -> Self {
246        Self { default: Choice::A }
247    }
248}
249
250//--------------------------------------------------------------------------------------------------
251// Tests
252//--------------------------------------------------------------------------------------------------
253
254#[cfg(test)]
255mod tests {
256    use crate::MemoryStore;
257
258    use super::*;
259
260    #[tokio::test]
261    async fn test_dual_store_put_and_get() -> anyhow::Result<()> {
262        let store_a = MemoryStore::default();
263        let store_b = MemoryStore::default();
264        let dual_store = DualStore::new(store_a, store_b, Default::default());
265
266        let cid_0 = dual_store.put_node_into(&"hello", Choice::A).await?;
267        let cid_1 = dual_store.put_node_into(&250, Choice::B).await?;
268        let cid_2 = dual_store.put_node_into(&"world", Choice::A).await?;
269        let cid_3 = dual_store.put_node_into(&500, Choice::B).await?;
270
271        assert_eq!(dual_store.get_node::<String>(&cid_0).await?, "hello");
272        assert_eq!(dual_store.get_node::<usize>(&cid_1).await?, 250);
273        assert_eq!(dual_store.get_node::<String>(&cid_2).await?, "world");
274        assert_eq!(dual_store.get_node::<usize>(&cid_3).await?, 500);
275
276        Ok(())
277    }
278}