Skip to main content

object_rainbow_store/
lib.rs

1use std::{
2    ops::{Deref, DerefMut},
3    pin::Pin,
4    sync::Arc,
5};
6
7use object_rainbow::{
8    Address, ExtraFor, FullHash, Hash, Inline, InlineOutput, ListHashes, MaybeHasNiche, Object,
9    ObjectHashes, OptionalHash, Parse, ParseInline, PointInput, PointVisitor, Resolve, Singular,
10    SingularFetch, Size, Tagged, ToOutput, Topological, Traversible, assert_impl,
11    derive_for_wrapped,
12};
13use object_rainbow_point::{Extras, Point};
14
15pub trait RainbowFuture: Send + Future<Output = object_rainbow::Result<Self::T>> {
16    type T;
17}
18
19impl<F: Send + Future<Output = object_rainbow::Result<T>>, T> RainbowFuture for F {
20    type T = T;
21}
22
23struct StoreVisitor<'a, 'x, S: ?Sized> {
24    store: &'a S,
25    futures: &'x mut Vec<Pin<Box<dyn 'a + Send + Future<Output = object_rainbow::Result<()>>>>>,
26}
27
28impl<'a, 'x, S: RainbowStore> PointVisitor for StoreVisitor<'a, 'x, S> {
29    fn visit<T: Traversible>(&mut self, point: &(impl 'static + SingularFetch<T = T> + Clone)) {
30        let point = point.clone();
31        let store = self.store;
32        self.futures.push(Box::pin(async move {
33            store.save_point(&point).await.map(|_| ())
34        }));
35    }
36}
37
38struct StoreResolve<S> {
39    store: S,
40}
41
42impl<S: 'static + Send + RainbowStore> Resolve for StoreResolve<S> {
43    fn resolve<'a>(
44        &'a self,
45        address: Address,
46        this: &'a Arc<dyn Resolve>,
47    ) -> object_rainbow::FailFuture<'a, object_rainbow::ByteNode> {
48        Box::pin(async move {
49            let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
50            Ok((bytes, this.clone()))
51        })
52    }
53
54    fn resolve_data(&'_ self, address: Address) -> object_rainbow::FailFuture<'_, Vec<u8>> {
55        Box::pin(async move {
56            let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
57            Ok(bytes)
58        })
59    }
60}
61
62#[derive_for_wrapped]
63pub trait RainbowStore: 'static + Send + Sync + Clone {
64    fn saved_point<T: 'static + Traversible, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
65        &self,
66        point: &Point<T>,
67        extra: Extra,
68    ) -> impl RainbowFuture<T = Point<T>> {
69        async {
70            self.save_point(point).await?;
71            Ok(point.with_resolve(self.resolve(), extra))
72        }
73    }
74    fn save_point(&self, point: &impl SingularFetch<T: Traversible>) -> impl RainbowFuture<T = ()> {
75        async {
76            if !self.contains(point.hash()).await? {
77                self.save_object(&point.fetch().await?).await?;
78            }
79            Ok(())
80        }
81    }
82    fn save_topology(&self, object: &impl Topological) -> impl RainbowFuture<T = ()> {
83        let mut futures = Vec::with_capacity(object.point_count());
84        object.traverse(&mut StoreVisitor {
85            store: self,
86            futures: &mut futures,
87        });
88        async {
89            for future in futures {
90                future.await?;
91            }
92            Ok(())
93        }
94    }
95    fn save_object(&self, object: &impl Traversible) -> impl RainbowFuture<T = ()> {
96        async {
97            self.save_topology(object).await?;
98            self.save_data(object.hashes(), &object.vec()).await?;
99            Ok(())
100        }
101    }
102    fn resolve(&self) -> Arc<dyn Resolve> {
103        Arc::new(StoreResolve {
104            store: self.clone(),
105        })
106    }
107    fn point_extra<T: 'static + FullHash, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
108        &self,
109        hash: Hash,
110        extra: Extra,
111    ) -> Point<T> {
112        Point::from_address_extra(Address::from_hash(hash), self.resolve(), extra)
113    }
114    fn point<T: Object>(&self, hash: Hash) -> Point<T> {
115        self.point_extra(hash, ())
116    }
117    fn save_data(&self, hashes: ObjectHashes, data: &[u8]) -> impl RainbowFuture<T = ()>;
118    fn contains(&self, hash: Hash) -> impl RainbowFuture<T = bool>;
119    fn fetch(&self, hash: Hash)
120    -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<[u8]>>;
121}
122
123pub trait RainbowStoreMut: RainbowStore {
124    fn create_ref(
125        &self,
126        hash: Hash,
127    ) -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<str>> {
128        let _ = hash;
129        async { Err::<String, _>(object_rainbow::Error::Unimplemented) }
130    }
131    fn update_ref(
132        &self,
133        key: &str,
134        old: Option<OptionalHash>,
135        hash: Hash,
136    ) -> impl RainbowFuture<T = ()>;
137    fn fetch_ref(&self, key: &str) -> impl RainbowFuture<T = OptionalHash>;
138    fn ref_exists(&self, key: &str) -> impl RainbowFuture<T = bool>;
139    fn store_ref_raw<
140        T: Object<Extra>,
141        K: Send + Sync + AsRef<str>,
142        Extra: 'static + Send + Sync + Clone,
143    >(
144        &self,
145        key: K,
146        point: Point<T>,
147        extra: Extra,
148    ) -> StoreRef<Self, K, T, Extra> {
149        StoreRef {
150            store: self.clone(),
151            key,
152            old: point.hash().into(),
153            point,
154            extra,
155        }
156    }
157}
158
159#[derive(Clone)]
160pub struct StoreMut<S, Extra = ()> {
161    store: S,
162    extra: Extra,
163}
164
165impl<S> StoreMut<S> {
166    pub const fn new(store: S) -> Self {
167        Self::new_extra(store, ())
168    }
169}
170
171impl<S, Extra> StoreMut<S, Extra> {
172    pub const fn new_extra(store: S, extra: Extra) -> Self {
173        Self { store, extra }
174    }
175}
176
177impl<S: RainbowStoreMut, Extra: 'static + Send + Sync + Clone> StoreMut<S, Extra> {
178    pub async fn exists<K: Send + Sync + AsRef<str>>(
179        &self,
180        key: K,
181    ) -> object_rainbow::Result<bool> {
182        self.store.ref_exists(key.as_ref()).await
183    }
184
185    pub async fn create<T: Object<Extra>>(
186        &self,
187        point: Point<T>,
188    ) -> object_rainbow::Result<StoreRef<S, impl 'static + Send + Sync + AsRef<str>, T, Extra>>
189    {
190        let point = self.store.saved_point(&point, self.extra.clone()).await?;
191        let key = self.store.create_ref(point.hash()).await?;
192        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
193    }
194
195    pub async fn update<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
196        &self,
197        key: K,
198        point: Point<T>,
199    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
200        let point = self.store.saved_point(&point, self.extra.clone()).await?;
201        self.store
202            .update_ref(key.as_ref(), None, point.hash())
203            .await?;
204        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
205    }
206
207    pub async fn init<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
208        &self,
209        key: K,
210        point: Point<T>,
211    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
212        let point = self.store.saved_point(&point, self.extra.clone()).await?;
213        self.store
214            .update_ref(key.as_ref(), Some(OptionalHash::NONE), point.hash())
215            .await?;
216        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
217    }
218
219    pub async fn load<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
220        &self,
221        key: K,
222    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
223        let hash = self
224            .store
225            .fetch_ref(key.as_ref())
226            .await?
227            .get()
228            .ok_or(object_rainbow::Error::HashNotFound)?;
229        let point = self.store.point_extra(hash, self.extra.clone());
230        Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
231    }
232
233    pub async fn load_or_init<T: Object<Extra> + Default + Clone, K: Send + Sync + AsRef<str>>(
234        &self,
235        key: K,
236    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
237        if let Some(hash) = self.store.fetch_ref(key.as_ref()).await?.get() {
238            let point = self.store.point_extra(hash, self.extra.clone());
239            Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
240        } else {
241            self.init(key, Default::default()).await
242        }
243    }
244
245    pub async fn reference<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
246        &self,
247        key: K,
248        point: Point<T>,
249    ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
250        Ok(StoreRef {
251            old: self.store.fetch_ref(key.as_ref()).await?,
252            ..self.store.store_ref_raw(key, point, self.extra.clone())
253        })
254    }
255}
256
257pub struct StoreRef<S, K, T, Extra> {
258    store: S,
259    key: K,
260    old: OptionalHash,
261    point: Point<T>,
262    extra: Extra,
263}
264
265impl<S, K, T, Extra> Deref for StoreRef<S, K, T, Extra> {
266    type Target = Point<T>;
267
268    fn deref(&self) -> &Self::Target {
269        &self.point
270    }
271}
272
273impl<S, K, T, Extra> DerefMut for StoreRef<S, K, T, Extra> {
274    fn deref_mut(&mut self) -> &mut Self::Target {
275        &mut self.point
276    }
277}
278
279impl<
280    S: RainbowStoreMut,
281    K: Send + Sync + AsRef<str>,
282    T: Object<Extra>,
283    Extra: 'static + Send + Sync + Clone,
284> StoreRef<S, K, T, Extra>
285{
286    pub fn is_modified(&self) -> bool {
287        self.point.hash() != self.old
288    }
289
290    pub fn is_new(&self) -> bool {
291        self.old.is_none()
292    }
293
294    pub async fn save_point(&mut self) -> object_rainbow::Result<()> {
295        self.point = self
296            .store
297            .saved_point(&self.point, self.extra.clone())
298            .await?;
299        Ok(())
300    }
301
302    pub async fn save(&mut self) -> object_rainbow::Result<()> {
303        if self.is_modified() {
304            self.save_point().await?;
305            self.store
306                .update_ref(self.key.as_ref(), Some(self.old), self.point.hash())
307                .await?;
308            self.old = self.point.hash().into();
309        }
310        Ok(())
311    }
312}
313
314#[derive(Parse, ParseInline)]
315struct StoredInner<S, E> {
316    hash: Hash,
317    extra: Extras<E>,
318    store: S,
319}
320
321#[derive(
322    ToOutput, InlineOutput, Tagged, Size, MaybeHasNiche, Clone, PartialEq, Eq, PartialOrd, Ord, Hash,
323)]
324pub struct Stored<S, T> {
325    point: Point<T>,
326    store: S,
327}
328
329impl<S: ListHashes, T> ListHashes for Stored<S, T> {
330    fn list_hashes(&self, f: &mut impl FnMut(Hash)) {
331        self.store.list_hashes(f);
332    }
333}
334
335impl<S: Topological, T> Topological for Stored<S, T> {
336    fn traverse(&self, visitor: &mut impl PointVisitor) {
337        self.store.traverse(visitor);
338    }
339}
340
341impl<S: RainbowStore, T: 'static + FullHash> Stored<S, T> {
342    fn from_inner<E: 'static + Send + Sync + Clone + ExtraFor<T>>(
343        StoredInner { hash, extra, store }: StoredInner<S, E>,
344    ) -> Self {
345        let point = store.point_extra(hash, extra.0);
346        Self { point, store }
347    }
348}
349
350impl<
351    S: RainbowStore + Parse<I>,
352    T: 'static + FullHash + Parse<I>,
353    I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
354> Parse<I> for Stored<S, T>
355{
356    fn parse(input: I) -> object_rainbow::Result<Self> {
357        input.parse().map(Self::from_inner)
358    }
359}
360
361impl<
362    S: RainbowStore + ParseInline<I>,
363    T: 'static + FullHash + Parse<I>,
364    I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
365> ParseInline<I> for Stored<S, T>
366{
367    fn parse_inline(input: &mut I) -> object_rainbow::Result<Self> {
368        input.parse_inline().map(Self::from_inner)
369    }
370}
371
372assert_impl!(
373    impl<S, T, E> Object<E> for Stored<S, T>
374    where
375        S: RainbowStore + Object<E>,
376        T: Object<E>,
377        E: 'static + Send + Sync + Clone,
378    {
379    }
380);
381
382assert_impl!(
383    impl<S, T, E> Inline<E> for Stored<S, T>
384    where
385        S: RainbowStore + Inline<E>,
386        T: Object<E>,
387        E: 'static + Send + Sync + Clone,
388    {
389    }
390);
391
392impl<S, T> Stored<S, T> {
393    pub fn load(&self) -> &Point<T> {
394        &self.point
395    }
396}
397
398impl<S: RainbowStore, T: Traversible> Stored<S, T> {
399    pub async fn replace(&mut self, point: Point<T>) -> object_rainbow::Result<Point<T>> {
400        self.store.save_point(&point).await?;
401        Ok(std::mem::replace(&mut self.point, point))
402    }
403
404    pub async fn new(store: S, point: Point<T>) -> object_rainbow::Result<Self> {
405        store.save_point(&point).await?;
406        Ok(Self { point, store })
407    }
408}