Skip to main content

object_rainbow_store/
lib.rs

1use std::{
2    ops::{Deref, DerefMut},
3    pin::Pin,
4    sync::Arc,
5};
6
7use object_rainbow::{
8    Address, ExtraFor, FullHash, Hash, Inline, InlineOutput, ListHashes, MaybeHasNiche, Object,
9    OptionalHash, Parse, ParseInline, ParseSlice, ParseSliceExtra, PointInput, PointVisitor,
10    ReflessInline, Resolve, Singular, SingularFetch, Size, Tagged, ToOutput, Topological,
11    Traversible, WithHash, assert_impl, derive_for_wrapped,
12};
13use object_rainbow_point::{ExtractResolve, Extras, Point};
14
15mod externally_stored;
16
17pub trait RainbowFuture: Send + Future<Output = object_rainbow::Result<Self::T>> {
18    type T;
19}
20
21impl<F: Send + Future<Output = object_rainbow::Result<T>>, T> RainbowFuture for F {
22    type T = T;
23}
24
25struct StoreVisitor<'a, 'x, S: ?Sized> {
26    store: &'a S,
27    futures: &'x mut Vec<Pin<Box<dyn 'a + Send + Future<Output = object_rainbow::Result<()>>>>>,
28}
29
30impl<'a, 'x, S: RainbowStore> PointVisitor for StoreVisitor<'a, 'x, S> {
31    fn visit(&mut self, point: &(impl 'static + SingularFetch<T: Traversible> + Clone)) {
32        let point = point.clone();
33        let store = self.store;
34        self.futures.push(Box::pin(async move {
35            store.save_point(&point).await.map(|_| ())
36        }));
37    }
38}
39
40struct StoreResolve<S> {
41    store: S,
42}
43
44impl<S: 'static + Send + RainbowStore> Resolve for StoreResolve<S> {
45    fn resolve<'a>(
46        &'a self,
47        address: Address,
48        this: &'a Arc<dyn Resolve>,
49    ) -> object_rainbow::FailFuture<'a, object_rainbow::ByteNode> {
50        Box::pin(async move {
51            let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
52            Ok((bytes, this.clone()))
53        })
54    }
55
56    fn resolve_data(&'_ self, address: Address) -> object_rainbow::FailFuture<'_, Vec<u8>> {
57        Box::pin(async move {
58            let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
59            Ok(bytes)
60        })
61    }
62}
63
64#[derive_for_wrapped]
65pub trait RainbowStore: 'static + Send + Sync + Clone + PartialEq {
66    fn saved_point<T: 'static + Traversible, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
67        &self,
68        point: &Point<T>,
69        extra: Extra,
70    ) -> impl RainbowFuture<T = Point<T>> {
71        async {
72            self.save_point(point).await?;
73            Ok(point.with_resolve(self.resolve(), extra))
74        }
75    }
76    fn save_point(&self, point: &impl SingularFetch<T: Traversible>) -> impl RainbowFuture<T = ()> {
77        async {
78            let already_stored = point
79                .extract_resolve::<StoreResolve<Self>>()
80                .is_some_and(|(_, resolve)| resolve.store == *self);
81            if !already_stored && !self.contains(point.hash()).await? {
82                self.save_object(&point.fetch().await?).await?;
83            }
84            Ok(())
85        }
86    }
87    fn save_topology(&self, object: &impl Topological) -> impl RainbowFuture<T = ()> {
88        let mut futures = Vec::with_capacity(object.point_count());
89        object.traverse(&mut StoreVisitor {
90            store: self,
91            futures: &mut futures,
92        });
93        async {
94            futures_util::future::try_join_all(futures).await?;
95            Ok(())
96        }
97    }
98    fn save_object(&self, object: &impl Traversible) -> impl RainbowFuture<T = ()> {
99        async {
100            self.save_topology(object).await?;
101            self.save_data(object.with_hash()).await?;
102            Ok(())
103        }
104    }
105    fn resolve(&self) -> Arc<dyn Resolve> {
106        Arc::new(StoreResolve {
107            store: self.clone(),
108        })
109    }
110    fn point_extra<T: 'static + FullHash, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
111        &self,
112        hash: Hash,
113        extra: Extra,
114    ) -> Point<T> {
115        Point::from_address_extra(Address::from_hash(hash), self.resolve(), extra)
116    }
117    fn point<T: Object>(&self, hash: Hash) -> Point<T> {
118        self.point_extra(hash, ())
119    }
120    fn save_data(
121        &self,
122        wh: WithHash<'_, impl Send + Sync + ToOutput>,
123    ) -> impl RainbowFuture<T = ()>;
124    fn contains(&self, hash: Hash) -> impl RainbowFuture<T = bool>;
125    fn fetch(&self, hash: Hash)
126    -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<[u8]>>;
127}
128
129pub trait RainbowStoreMut: RainbowStore {
130    fn create_ref(
131        &self,
132        hash: Hash,
133    ) -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<str>> {
134        let _ = hash;
135        async { Err::<String, _>(object_rainbow::Error::Unimplemented) }
136    }
137    fn update_ref(
138        &self,
139        key: &str,
140        old: Option<OptionalHash>,
141        hash: Hash,
142    ) -> impl RainbowFuture<T = ()>;
143    fn fetch_ref(&self, key: &str) -> impl RainbowFuture<T = OptionalHash>;
144    fn ref_exists(&self, key: &str) -> impl RainbowFuture<T = bool>;
145    fn store_ref_raw<
146        T: Object<Extra>,
147        K: Send + Sync + AsRef<str>,
148        Extra: 'static + Send + Sync + Clone,
149    >(
150        &self,
151        key: K,
152        point: Point<T>,
153        extra: Extra,
154    ) -> StoreRef<Self, K, T, Extra> {
155        StoreRef {
156            store: self.clone(),
157            key,
158            old: point.hash().into(),
159            point,
160            extra,
161        }
162    }
163}
164
165#[derive(Clone)]
166pub struct StoreMut<S, Extra = ()> {
167    store: S,
168    extra: Extra,
169}
170
171impl<S> StoreMut<S> {
172    pub const fn new(store: S) -> Self {
173        Self::new_extra(store, ())
174    }
175}
176
177impl<S, Extra> StoreMut<S, Extra> {
178    pub const fn new_extra(store: S, extra: Extra) -> Self {
179        Self { store, extra }
180    }
181}
182
183impl<S: RainbowStoreMut, Extra: 'static + Send + Sync + Clone> StoreMut<S, Extra> {
184    pub async fn exists<K: Send + Sync + AsRef<str>>(
185        &self,
186        key: K,
187    ) -> object_rainbow::Result<bool> {
188        self.store.ref_exists(key.as_ref()).await
189    }
190
191    pub async fn create<T: Object<Extra>>(
192        &self,
193        point: Point<T>,
194    ) -> object_rainbow::Result<StoreRef<S, impl 'static + Send + Sync + AsRef<str>, T, Extra>>
195    {
196        let point = self.store.saved_point(&point, self.extra.clone()).await?;
197        let key = self.store.create_ref(point.hash()).await?;
198        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
199    }
200
201    pub async fn update<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
202        &self,
203        key: K,
204        point: Point<T>,
205    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
206        let point = self.store.saved_point(&point, self.extra.clone()).await?;
207        self.store
208            .update_ref(key.as_ref(), None, point.hash())
209            .await?;
210        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
211    }
212
213    pub async fn init<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
214        &self,
215        key: K,
216        point: Point<T>,
217    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
218        let point = self.store.saved_point(&point, self.extra.clone()).await?;
219        self.store
220            .update_ref(key.as_ref(), Some(OptionalHash::NONE), point.hash())
221            .await?;
222        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
223    }
224
225    pub async fn load<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
226        &self,
227        key: K,
228    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
229        let hash = self
230            .store
231            .fetch_ref(key.as_ref())
232            .await?
233            .get()
234            .ok_or(object_rainbow::Error::HashNotFound)?;
235        let point = self.store.point_extra(hash, self.extra.clone());
236        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
237    }
238
239    pub async fn load_or_init<T: Object<Extra> + Default + Clone, K: Send + Sync + AsRef<str>>(
240        &self,
241        key: K,
242    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
243        if let Some(hash) = self.store.fetch_ref(key.as_ref()).await?.get() {
244            let point = self.store.point_extra(hash, self.extra.clone());
245            Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
246        } else {
247            self.init(key, Default::default()).await
248        }
249    }
250
251    pub async fn reference<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
252        &self,
253        key: K,
254        point: Point<T>,
255    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
256        Ok(StoreRef {
257            old: self.store.fetch_ref(key.as_ref()).await?,
258            ..self.store.store_ref_raw(key, point, self.extra.clone())
259        })
260    }
261}
262
263pub struct StoreRef<S, K, T, Extra> {
264    store: S,
265    key: K,
266    old: OptionalHash,
267    point: Point<T>,
268    extra: Extra,
269}
270
271impl<S, K, T, Extra> Deref for StoreRef<S, K, T, Extra> {
272    type Target = Point<T>;
273
274    fn deref(&self) -> &Self::Target {
275        &self.point
276    }
277}
278
279impl<S, K, T, Extra> DerefMut for StoreRef<S, K, T, Extra> {
280    fn deref_mut(&mut self) -> &mut Self::Target {
281        &mut self.point
282    }
283}
284
285impl<
286    S: RainbowStoreMut,
287    K: Send + Sync + AsRef<str>,
288    T: Object<Extra>,
289    Extra: 'static + Send + Sync + Clone,
290> StoreRef<S, K, T, Extra>
291{
292    pub fn is_modified(&self) -> bool {
293        self.point.hash() != self.old
294    }
295
296    pub fn is_new(&self) -> bool {
297        self.old.is_none()
298    }
299
300    pub async fn save_point(&mut self) -> object_rainbow::Result<()> {
301        self.point = self
302            .store
303            .saved_point(&self.point, self.extra.clone())
304            .await?;
305        Ok(())
306    }
307
308    pub async fn save(&mut self) -> object_rainbow::Result<()> {
309        if self.is_modified() {
310            self.save_point().await?;
311            self.store
312                .update_ref(self.key.as_ref(), Some(self.old), self.point.hash())
313                .await?;
314            self.old = self.point.hash().into();
315        }
316        Ok(())
317    }
318}
319
320#[derive(Parse, ParseInline)]
321struct StoredInner<S, E> {
322    hash: Hash,
323    extra: Extras<E>,
324    store: S,
325}
326
327#[derive(
328    ToOutput, InlineOutput, Tagged, Size, MaybeHasNiche, Clone, PartialEq, Eq, PartialOrd, Ord, Hash,
329)]
330pub struct Stored<S, T> {
331    point: Point<T>,
332    store: S,
333}
334
335impl<S: ListHashes, T> ListHashes for Stored<S, T> {
336    fn list_hashes(&self, f: &mut impl FnMut(Hash)) {
337        self.store.list_hashes(f);
338    }
339}
340
341impl<S: Topological, T> Topological for Stored<S, T> {
342    fn traverse(&self, visitor: &mut impl PointVisitor) {
343        self.store.traverse(visitor);
344    }
345}
346
347impl<S: RainbowStore, T: 'static + FullHash> Stored<S, T> {
348    fn from_inner<E: 'static + Send + Sync + Clone + ExtraFor<T>>(
349        StoredInner { hash, extra, store }: StoredInner<S, E>,
350    ) -> Self {
351        let point = store.point_extra(hash, extra.0);
352        Self { point, store }
353    }
354}
355
356impl<
357    S: RainbowStore + Parse<I>,
358    T: 'static + FullHash + Parse<I>,
359    I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
360> Parse<I> for Stored<S, T>
361{
362    fn parse(input: I) -> object_rainbow::Result<Self> {
363        input.parse().map(Self::from_inner)
364    }
365}
366
367impl<
368    S: RainbowStore + ParseInline<I>,
369    T: 'static + FullHash + Parse<I>,
370    I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
371> ParseInline<I> for Stored<S, T>
372{
373    fn parse_inline(input: &mut I) -> object_rainbow::Result<Self> {
374        input.parse_inline().map(Self::from_inner)
375    }
376}
377
378assert_impl!(
379    impl<S, T, E> Object<E> for Stored<S, T>
380    where
381        S: RainbowStore + Object<E>,
382        T: Object<E>,
383        E: 'static + Send + Sync + Clone,
384    {
385    }
386);
387
388assert_impl!(
389    impl<S, T, E> Inline<E> for Stored<S, T>
390    where
391        S: RainbowStore + Inline<E>,
392        T: Object<E>,
393        E: 'static + Send + Sync + Clone,
394    {
395    }
396);
397
398impl<S, T> Stored<S, T> {
399    pub fn load(&self) -> &Point<T> {
400        &self.point
401    }
402}
403
404impl<S: RainbowStore, T: Traversible> Stored<S, T> {
405    pub async fn replace(&mut self, point: Point<T>) -> object_rainbow::Result<Point<T>> {
406        self.store.save_point(&point).await?;
407        Ok(std::mem::replace(&mut self.point, point))
408    }
409
410    pub async fn new(store: S, point: Point<T>) -> object_rainbow::Result<Self> {
411        store.save_point(&point).await?;
412        Ok(Self { point, store })
413    }
414}
415
416pub trait ExternalStore: 'static + Send + Sync + Clone + PartialEq {
417    type Id: ReflessInline + Clone + Eq;
418    fn save_data(
419        &self,
420        data: &[u8],
421        refs: &[Self::Id],
422        wh: WithHash<'_, impl Send + Sync + ToOutput>,
423    ) -> impl RainbowFuture<T = Self::Id>;
424    fn contains_data(
425        &self,
426        data: &[u8],
427        refs: &[Self::Id],
428        wh: WithHash<'_, impl Send + Sync + ToOutput>,
429    ) -> impl RainbowFuture<T = bool>;
430    fn contains(&self, id: &Self::Id) -> impl RainbowFuture<T = bool>;
431    fn fetch(
432        &self,
433        id: &Self::Id,
434    ) -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<[u8]>>;
435    fn store_point<T: Traversible>(
436        &self,
437        fetch: impl 'static + SingularFetch<T = T>,
438    ) -> impl RainbowFuture<T = Self::Id>
439    where
440        Self: PartialEq,
441    {
442        externally_stored::store_point(self, fetch)
443    }
444    fn store_object<T: Traversible>(&self, object: T) -> impl RainbowFuture<T = Self::Id>
445    where
446        Self: PartialEq,
447    {
448        externally_stored::store_object(self, object)
449    }
450    fn load_extra<T: ParseSliceExtra<E> + Tagged, E: 'static + Send + Sync + Clone>(
451        &self,
452        id: &Self::Id,
453        extra: E,
454    ) -> impl RainbowFuture<T = T> {
455        externally_stored::load_extra::<_, T, _>(self, id, extra)
456    }
457    fn load<T: ParseSlice + Tagged>(&self, id: &Self::Id) -> impl RainbowFuture<T = T> {
458        externally_stored::load::<_, T>(self, id)
459    }
460}