1use std::{
2 ops::{Deref, DerefMut},
3 pin::Pin,
4 sync::Arc,
5};
6
7use object_rainbow::{
8 Address, ExtraFor, FullHash, Hash, Inline, InlineOutput, ListHashes, MaybeHasNiche, Object,
9 OptionalHash, Parse, ParseInline, ParseSlice, ParseSliceExtra, PointInput, PointVisitor,
10 ReflessInline, Resolve, Singular, SingularFetch, Size, Tagged, ToOutput, Topological,
11 Traversible, WithHash, assert_impl, derive_for_wrapped,
12};
13use object_rainbow_point::{ExtractResolve, Extras, Point};
14
15mod externally_stored;
16
17pub trait RainbowFuture: Send + Future<Output = object_rainbow::Result<Self::T>> {
18 type T;
19}
20
21impl<F: Send + Future<Output = object_rainbow::Result<T>>, T> RainbowFuture for F {
22 type T = T;
23}
24
25struct StoreVisitor<'a, 'x, S: ?Sized> {
26 store: &'a S,
27 futures: &'x mut Vec<Pin<Box<dyn 'a + Send + Future<Output = object_rainbow::Result<()>>>>>,
28}
29
30impl<'a, 'x, S: RainbowStore> PointVisitor for StoreVisitor<'a, 'x, S> {
31 fn visit(&mut self, point: &(impl 'static + SingularFetch<T: Traversible> + Clone)) {
32 let point = point.clone();
33 let store = self.store;
34 self.futures.push(Box::pin(async move {
35 store.save_point(&point).await.map(|_| ())
36 }));
37 }
38}
39
40struct StoreResolve<S> {
41 store: S,
42}
43
44impl<S: 'static + Send + RainbowStore> Resolve for StoreResolve<S> {
45 fn resolve<'a>(
46 &'a self,
47 address: Address,
48 this: &'a Arc<dyn Resolve>,
49 ) -> object_rainbow::FailFuture<'a, object_rainbow::ByteNode> {
50 Box::pin(async move {
51 let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
52 Ok((bytes, this.clone()))
53 })
54 }
55
56 fn resolve_data(&'_ self, address: Address) -> object_rainbow::FailFuture<'_, Vec<u8>> {
57 Box::pin(async move {
58 let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
59 Ok(bytes)
60 })
61 }
62}
63
64#[derive_for_wrapped]
65pub trait RainbowStore: 'static + Send + Sync + Clone + PartialEq {
66 fn saved_point<T: 'static + Traversible, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
67 &self,
68 point: &Point<T>,
69 extra: Extra,
70 ) -> impl RainbowFuture<T = Point<T>> {
71 async {
72 self.save_point(point).await?;
73 Ok(point.with_resolve(self.resolve(), extra))
74 }
75 }
76 fn save_point(&self, point: &impl SingularFetch<T: Traversible>) -> impl RainbowFuture<T = ()> {
77 async {
78 let already_stored = point
79 .extract_resolve::<StoreResolve<Self>>()
80 .is_some_and(|(_, resolve)| resolve.store == *self);
81 if !already_stored && !self.contains(point.hash()).await? {
82 self.save_object(&point.fetch().await?).await?;
83 }
84 Ok(())
85 }
86 }
87 fn save_topology(&self, object: &impl Topological) -> impl RainbowFuture<T = ()> {
88 let mut futures = Vec::with_capacity(object.point_count());
89 object.traverse(&mut StoreVisitor {
90 store: self,
91 futures: &mut futures,
92 });
93 async {
94 futures_util::future::try_join_all(futures).await?;
95 Ok(())
96 }
97 }
98 fn save_object(&self, object: &impl Traversible) -> impl RainbowFuture<T = ()> {
99 async {
100 self.save_topology(object).await?;
101 self.save_data(object.with_hash()).await?;
102 Ok(())
103 }
104 }
105 fn resolve(&self) -> Arc<dyn Resolve> {
106 Arc::new(StoreResolve {
107 store: self.clone(),
108 })
109 }
110 fn point_extra<T: 'static + FullHash, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
111 &self,
112 hash: Hash,
113 extra: Extra,
114 ) -> Point<T> {
115 Point::from_address_extra(Address::from_hash(hash), self.resolve(), extra)
116 }
117 fn point<T: Object>(&self, hash: Hash) -> Point<T> {
118 self.point_extra(hash, ())
119 }
120 fn save_data(
121 &self,
122 wh: WithHash<'_, impl Send + Sync + ToOutput>,
123 ) -> impl RainbowFuture<T = ()>;
124 fn contains(&self, hash: Hash) -> impl RainbowFuture<T = bool>;
125 fn fetch(&self, hash: Hash)
126 -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<[u8]>>;
127}
128
129pub trait RainbowStoreMut: RainbowStore {
130 fn create_ref(
131 &self,
132 hash: Hash,
133 ) -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<str>> {
134 let _ = hash;
135 async { Err::<String, _>(object_rainbow::Error::Unimplemented) }
136 }
137 fn update_ref(
138 &self,
139 key: &str,
140 old: Option<OptionalHash>,
141 hash: Hash,
142 ) -> impl RainbowFuture<T = ()>;
143 fn fetch_ref(&self, key: &str) -> impl RainbowFuture<T = OptionalHash>;
144 fn ref_exists(&self, key: &str) -> impl RainbowFuture<T = bool>;
145 fn store_ref_raw<
146 T: Object<Extra>,
147 K: Send + Sync + AsRef<str>,
148 Extra: 'static + Send + Sync + Clone,
149 >(
150 &self,
151 key: K,
152 point: Point<T>,
153 extra: Extra,
154 ) -> StoreRef<Self, K, T, Extra> {
155 StoreRef {
156 store: self.clone(),
157 key,
158 old: point.hash().into(),
159 point,
160 extra,
161 }
162 }
163}
164
165#[derive(Clone)]
166pub struct StoreMut<S, Extra = ()> {
167 store: S,
168 extra: Extra,
169}
170
171impl<S> StoreMut<S> {
172 pub const fn new(store: S) -> Self {
173 Self::new_extra(store, ())
174 }
175}
176
177impl<S, Extra> StoreMut<S, Extra> {
178 pub const fn new_extra(store: S, extra: Extra) -> Self {
179 Self { store, extra }
180 }
181}
182
183impl<S: RainbowStoreMut, Extra: 'static + Send + Sync + Clone> StoreMut<S, Extra> {
184 pub async fn exists<K: Send + Sync + AsRef<str>>(
185 &self,
186 key: K,
187 ) -> object_rainbow::Result<bool> {
188 self.store.ref_exists(key.as_ref()).await
189 }
190
191 pub async fn create<T: Object<Extra>>(
192 &self,
193 point: Point<T>,
194 ) -> object_rainbow::Result<StoreRef<S, impl 'static + Send + Sync + AsRef<str>, T, Extra>>
195 {
196 let point = self.store.saved_point(&point, self.extra.clone()).await?;
197 let key = self.store.create_ref(point.hash()).await?;
198 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
199 }
200
201 pub async fn update<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
202 &self,
203 key: K,
204 point: Point<T>,
205 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
206 let point = self.store.saved_point(&point, self.extra.clone()).await?;
207 self.store
208 .update_ref(key.as_ref(), None, point.hash())
209 .await?;
210 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
211 }
212
213 pub async fn init<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
214 &self,
215 key: K,
216 point: Point<T>,
217 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
218 let point = self.store.saved_point(&point, self.extra.clone()).await?;
219 self.store
220 .update_ref(key.as_ref(), Some(OptionalHash::NONE), point.hash())
221 .await?;
222 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
223 }
224
225 pub async fn load<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
226 &self,
227 key: K,
228 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
229 let hash = self
230 .store
231 .fetch_ref(key.as_ref())
232 .await?
233 .get()
234 .ok_or(object_rainbow::Error::HashNotFound)?;
235 let point = self.store.point_extra(hash, self.extra.clone());
236 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
237 }
238
239 pub async fn load_or_init<T: Object<Extra> + Default + Clone, K: Send + Sync + AsRef<str>>(
240 &self,
241 key: K,
242 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
243 if let Some(hash) = self.store.fetch_ref(key.as_ref()).await?.get() {
244 let point = self.store.point_extra(hash, self.extra.clone());
245 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
246 } else {
247 self.init(key, Default::default()).await
248 }
249 }
250
251 pub async fn reference<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
252 &self,
253 key: K,
254 point: Point<T>,
255 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
256 Ok(StoreRef {
257 old: self.store.fetch_ref(key.as_ref()).await?,
258 ..self.store.store_ref_raw(key, point, self.extra.clone())
259 })
260 }
261}
262
263pub struct StoreRef<S, K, T, Extra> {
264 store: S,
265 key: K,
266 old: OptionalHash,
267 point: Point<T>,
268 extra: Extra,
269}
270
271impl<S, K, T, Extra> Deref for StoreRef<S, K, T, Extra> {
272 type Target = Point<T>;
273
274 fn deref(&self) -> &Self::Target {
275 &self.point
276 }
277}
278
279impl<S, K, T, Extra> DerefMut for StoreRef<S, K, T, Extra> {
280 fn deref_mut(&mut self) -> &mut Self::Target {
281 &mut self.point
282 }
283}
284
285impl<
286 S: RainbowStoreMut,
287 K: Send + Sync + AsRef<str>,
288 T: Object<Extra>,
289 Extra: 'static + Send + Sync + Clone,
290> StoreRef<S, K, T, Extra>
291{
292 pub fn is_modified(&self) -> bool {
293 self.point.hash() != self.old
294 }
295
296 pub fn is_new(&self) -> bool {
297 self.old.is_none()
298 }
299
300 pub async fn save_point(&mut self) -> object_rainbow::Result<()> {
301 self.point = self
302 .store
303 .saved_point(&self.point, self.extra.clone())
304 .await?;
305 Ok(())
306 }
307
308 pub async fn save(&mut self) -> object_rainbow::Result<()> {
309 if self.is_modified() {
310 self.save_point().await?;
311 self.store
312 .update_ref(self.key.as_ref(), Some(self.old), self.point.hash())
313 .await?;
314 self.old = self.point.hash().into();
315 }
316 Ok(())
317 }
318}
319
320#[derive(Parse, ParseInline)]
321struct StoredInner<S, E> {
322 hash: Hash,
323 extra: Extras<E>,
324 store: S,
325}
326
327#[derive(
328 ToOutput, InlineOutput, Tagged, Size, MaybeHasNiche, Clone, PartialEq, Eq, PartialOrd, Ord, Hash,
329)]
330pub struct Stored<S, T> {
331 point: Point<T>,
332 store: S,
333}
334
335impl<S: ListHashes, T> ListHashes for Stored<S, T> {
336 fn list_hashes(&self, f: &mut impl FnMut(Hash)) {
337 self.store.list_hashes(f);
338 }
339}
340
341impl<S: Topological, T> Topological for Stored<S, T> {
342 fn traverse(&self, visitor: &mut impl PointVisitor) {
343 self.store.traverse(visitor);
344 }
345}
346
347impl<S: RainbowStore, T: 'static + FullHash> Stored<S, T> {
348 fn from_inner<E: 'static + Send + Sync + Clone + ExtraFor<T>>(
349 StoredInner { hash, extra, store }: StoredInner<S, E>,
350 ) -> Self {
351 let point = store.point_extra(hash, extra.0);
352 Self { point, store }
353 }
354}
355
356impl<
357 S: RainbowStore + Parse<I>,
358 T: 'static + FullHash + Parse<I>,
359 I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
360> Parse<I> for Stored<S, T>
361{
362 fn parse(input: I) -> object_rainbow::Result<Self> {
363 input.parse().map(Self::from_inner)
364 }
365}
366
367impl<
368 S: RainbowStore + ParseInline<I>,
369 T: 'static + FullHash + Parse<I>,
370 I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
371> ParseInline<I> for Stored<S, T>
372{
373 fn parse_inline(input: &mut I) -> object_rainbow::Result<Self> {
374 input.parse_inline().map(Self::from_inner)
375 }
376}
377
378assert_impl!(
379 impl<S, T, E> Object<E> for Stored<S, T>
380 where
381 S: RainbowStore + Object<E>,
382 T: Object<E>,
383 E: 'static + Send + Sync + Clone,
384 {
385 }
386);
387
388assert_impl!(
389 impl<S, T, E> Inline<E> for Stored<S, T>
390 where
391 S: RainbowStore + Inline<E>,
392 T: Object<E>,
393 E: 'static + Send + Sync + Clone,
394 {
395 }
396);
397
398impl<S, T> Stored<S, T> {
399 pub fn load(&self) -> &Point<T> {
400 &self.point
401 }
402}
403
404impl<S: RainbowStore, T: Traversible> Stored<S, T> {
405 pub async fn replace(&mut self, point: Point<T>) -> object_rainbow::Result<Point<T>> {
406 self.store.save_point(&point).await?;
407 Ok(std::mem::replace(&mut self.point, point))
408 }
409
410 pub async fn new(store: S, point: Point<T>) -> object_rainbow::Result<Self> {
411 store.save_point(&point).await?;
412 Ok(Self { point, store })
413 }
414}
415
416pub trait ExternalStore: 'static + Send + Sync + Clone + PartialEq {
417 type Id: ReflessInline + Clone + Eq;
418 fn save_data(
419 &self,
420 data: &[u8],
421 refs: &[Self::Id],
422 wh: WithHash<'_, impl Send + Sync + ToOutput>,
423 ) -> impl RainbowFuture<T = Self::Id>;
424 fn contains_data(
425 &self,
426 data: &[u8],
427 refs: &[Self::Id],
428 wh: WithHash<'_, impl Send + Sync + ToOutput>,
429 ) -> impl RainbowFuture<T = bool>;
430 fn contains(&self, id: &Self::Id) -> impl RainbowFuture<T = bool>;
431 fn fetch(
432 &self,
433 id: &Self::Id,
434 ) -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<[u8]>>;
435 fn store_point<T: Traversible>(
436 &self,
437 fetch: impl 'static + SingularFetch<T = T>,
438 ) -> impl RainbowFuture<T = Self::Id>
439 where
440 Self: PartialEq,
441 {
442 externally_stored::store_point(self, fetch)
443 }
444 fn store_object<T: Traversible>(&self, object: T) -> impl RainbowFuture<T = Self::Id>
445 where
446 Self: PartialEq,
447 {
448 externally_stored::store_object(self, object)
449 }
450 fn load_extra<T: ParseSliceExtra<E> + Tagged, E: 'static + Send + Sync + Clone>(
451 &self,
452 id: &Self::Id,
453 extra: E,
454 ) -> impl RainbowFuture<T = T> {
455 externally_stored::load_extra::<_, T, _>(self, id, extra)
456 }
457 fn load<T: ParseSlice + Tagged>(&self, id: &Self::Id) -> impl RainbowFuture<T = T> {
458 externally_stored::load::<_, T>(self, id)
459 }
460}