monoutils_store/implementations/stores/
dualstore.rs1use std::{collections::HashSet, pin::Pin};
2
3use bytes::Bytes;
4use libipld::Cid;
5use serde::{de::DeserializeOwned, Serialize};
6use tokio::io::AsyncRead;
7
8use crate::{Codec, IpldReferences, IpldStore, StoreError, StoreResult};
9
10#[derive(Debug, Clone)]
16pub struct DualStore<A, B>
17where
18 A: IpldStore,
19 B: IpldStore,
20{
21 store_a: A,
22 store_b: B,
23 config: DualStoreConfig,
24}
25
26#[derive(Clone, Copy, Debug, PartialEq)]
28pub enum Choice {
29 A,
31 B,
33}
34
35#[derive(Debug, Clone)]
37pub struct DualStoreConfig {
38 pub default: Choice,
40}
41
42impl<A, B> DualStore<A, B>
47where
48 A: IpldStore,
49 B: IpldStore,
50{
51 pub fn new(store_a: A, store_b: B, config: DualStoreConfig) -> Self {
53 Self {
54 store_a,
55 store_b,
56 config,
57 }
58 }
59
60 pub async fn get_node_from<D>(&self, cid: &Cid, choice: Choice) -> StoreResult<D>
62 where
63 D: DeserializeOwned + Send,
64 {
65 match choice {
66 Choice::A => self.store_a.get_node(cid).await,
67 Choice::B => self.store_b.get_node(cid).await,
68 }
69 }
70
71 pub async fn get_bytes_from<'a>(
73 &'a self,
74 cid: &'a Cid,
75 choice: Choice,
76 ) -> StoreResult<Pin<Box<dyn AsyncRead + Send + Sync + 'a>>> {
77 match choice {
78 Choice::A => self.store_a.get_bytes(cid).await,
79 Choice::B => self.store_b.get_bytes(cid).await,
80 }
81 }
82
83 pub async fn get_raw_block_from(&self, cid: &Cid, choice: Choice) -> StoreResult<Bytes> {
85 match choice {
86 Choice::A => self.store_a.get_raw_block(cid).await,
87 Choice::B => self.store_b.get_raw_block(cid).await,
88 }
89 }
90
91 pub async fn put_node_into<T>(&self, data: &T, choice: Choice) -> StoreResult<Cid>
93 where
94 T: Serialize + IpldReferences + Sync,
95 {
96 match choice {
97 Choice::A => self.store_a.put_node(data).await,
98 Choice::B => self.store_b.put_node(data).await,
99 }
100 }
101
102 pub async fn put_bytes_into(
104 &self,
105 bytes: impl AsyncRead + Send + Sync,
106 choice: Choice,
107 ) -> StoreResult<Cid> {
108 match choice {
109 Choice::A => self.store_a.put_bytes(bytes).await,
110 Choice::B => self.store_b.put_bytes(bytes).await,
111 }
112 }
113
114 pub async fn put_raw_block_into(
116 &self,
117 bytes: impl Into<Bytes> + Send,
118 choice: Choice,
119 ) -> StoreResult<Cid> {
120 match choice {
121 Choice::A => self.store_a.put_raw_block(bytes).await,
122 Choice::B => self.store_b.put_raw_block(bytes).await,
123 }
124 }
125
126 pub async fn has_from(&self, cid: &Cid, choice: Choice) -> bool {
128 match choice {
129 Choice::A => self.store_a.has(cid).await,
130 Choice::B => self.store_b.has(cid).await,
131 }
132 }
133}
134
135impl Choice {
136 pub fn other(&self) -> Self {
138 match self {
139 Choice::A => Choice::B,
140 Choice::B => Choice::A,
141 }
142 }
143}
144
145impl<A, B> IpldStore for DualStore<A, B>
150where
151 A: IpldStore + Sync,
152 B: IpldStore + Sync,
153{
154 async fn put_node<T>(&self, data: &T) -> StoreResult<Cid>
155 where
156 T: Serialize + IpldReferences + Sync,
157 {
158 self.put_node_into(data, self.config.default).await
159 }
160
161 async fn put_bytes<'a>(&'a self, bytes: impl AsyncRead + Send + Sync + 'a) -> StoreResult<Cid> {
162 self.put_bytes_into(bytes, self.config.default).await
163 }
164
165 async fn put_raw_block(&self, bytes: impl Into<Bytes> + Send) -> StoreResult<Cid> {
166 self.put_raw_block_into(bytes, self.config.default).await
167 }
168
169 async fn get_node<D>(&self, cid: &Cid) -> StoreResult<D>
170 where
171 D: DeserializeOwned + Send,
172 {
173 match self.get_node_from(cid, self.config.default).await {
174 Ok(data) => Ok(data),
175 Err(StoreError::BlockNotFound(_)) => {
176 let choice = self.config.default.other();
177 self.get_node_from(cid, choice).await
178 }
179 Err(err) => Err(err),
180 }
181 }
182
183 async fn get_bytes<'a>(
184 &'a self,
185 cid: &'a Cid,
186 ) -> StoreResult<Pin<Box<dyn AsyncRead + Send + Sync + 'a>>> {
187 match self.get_bytes_from(cid, self.config.default).await {
188 Ok(bytes) => Ok(bytes),
189 Err(StoreError::BlockNotFound(_)) => {
190 let choice = self.config.default.other();
191 self.get_bytes_from(cid, choice).await
192 }
193 Err(err) => Err(err),
194 }
195 }
196
197 async fn get_raw_block(&self, cid: &Cid) -> StoreResult<Bytes> {
198 match self.get_raw_block_from(cid, self.config.default).await {
199 Ok(bytes) => Ok(bytes),
200 Err(StoreError::BlockNotFound(_)) => {
201 let choice = self.config.default.other();
202 self.get_raw_block_from(cid, choice).await
203 }
204 Err(err) => Err(err),
205 }
206 }
207
208 async fn has(&self, cid: &Cid) -> bool {
209 match self.has_from(cid, self.config.default).await {
210 true => true,
211 false => self.has_from(cid, self.config.default.other()).await,
212 }
213 }
214
215 fn get_supported_codecs(&self) -> HashSet<Codec> {
216 self.store_a
217 .get_supported_codecs()
218 .into_iter()
219 .chain(self.store_b.get_supported_codecs())
220 .collect()
221 }
222
223 fn get_node_block_max_size(&self) -> Option<u64> {
224 self.store_a
225 .get_node_block_max_size()
226 .max(self.store_b.get_node_block_max_size())
227 }
228
229 fn get_raw_block_max_size(&self) -> Option<u64> {
230 self.store_a
231 .get_raw_block_max_size()
232 .max(self.store_b.get_raw_block_max_size())
233 }
234
235 async fn is_empty(&self) -> StoreResult<bool> {
236 Ok(self.store_a.is_empty().await? && self.store_b.is_empty().await?)
237 }
238
239 async fn get_size(&self) -> StoreResult<u64> {
240 Ok(self.store_a.get_size().await? + self.store_b.get_size().await?)
241 }
242}
243
244impl Default for DualStoreConfig {
245 fn default() -> Self {
246 Self { default: Choice::A }
247 }
248}
249
250#[cfg(test)]
255mod tests {
256 use crate::MemoryStore;
257
258 use super::*;
259
260 #[tokio::test]
261 async fn test_dual_store_put_and_get() -> anyhow::Result<()> {
262 let store_a = MemoryStore::default();
263 let store_b = MemoryStore::default();
264 let dual_store = DualStore::new(store_a, store_b, Default::default());
265
266 let cid_0 = dual_store.put_node_into(&"hello", Choice::A).await?;
267 let cid_1 = dual_store.put_node_into(&250, Choice::B).await?;
268 let cid_2 = dual_store.put_node_into(&"world", Choice::A).await?;
269 let cid_3 = dual_store.put_node_into(&500, Choice::B).await?;
270
271 assert_eq!(dual_store.get_node::<String>(&cid_0).await?, "hello");
272 assert_eq!(dual_store.get_node::<usize>(&cid_1).await?, 250);
273 assert_eq!(dual_store.get_node::<String>(&cid_2).await?, "world");
274 assert_eq!(dual_store.get_node::<usize>(&cid_3).await?, 500);
275
276 Ok(())
277 }
278}