1use std::{
2 ops::{Deref, DerefMut},
3 pin::Pin,
4 sync::Arc,
5};
6
7use object_rainbow::{
8 Address, ExtraFor, FullHash, Hash, Inline, InlineOutput, ListHashes, MaybeHasNiche, Object,
9 ObjectHashes, OptionalHash, Parse, ParseInline, PointInput, PointVisitor, Resolve, Singular,
10 SingularFetch, Size, Tagged, ToOutput, Topological, Traversible, assert_impl,
11 derive_for_wrapped,
12};
13use object_rainbow_point::{Extras, Point};
14
15pub trait RainbowFuture: Send + Future<Output = object_rainbow::Result<Self::T>> {
16 type T;
17}
18
19impl<F: Send + Future<Output = object_rainbow::Result<T>>, T> RainbowFuture for F {
20 type T = T;
21}
22
23struct StoreVisitor<'a, 'x, S: ?Sized> {
24 store: &'a S,
25 futures: &'x mut Vec<Pin<Box<dyn 'a + Send + Future<Output = object_rainbow::Result<()>>>>>,
26}
27
28impl<'a, 'x, S: RainbowStore> PointVisitor for StoreVisitor<'a, 'x, S> {
29 fn visit<T: Traversible>(&mut self, point: &(impl 'static + SingularFetch<T = T> + Clone)) {
30 let point = point.clone();
31 let store = self.store;
32 self.futures.push(Box::pin(async move {
33 store.save_point(&point).await.map(|_| ())
34 }));
35 }
36}
37
38struct StoreResolve<S> {
39 store: S,
40}
41
42impl<S: 'static + Send + RainbowStore> Resolve for StoreResolve<S> {
43 fn resolve<'a>(
44 &'a self,
45 address: Address,
46 this: &'a Arc<dyn Resolve>,
47 ) -> object_rainbow::FailFuture<'a, object_rainbow::ByteNode> {
48 Box::pin(async move {
49 let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
50 Ok((bytes, this.clone()))
51 })
52 }
53
54 fn resolve_data(&'_ self, address: Address) -> object_rainbow::FailFuture<'_, Vec<u8>> {
55 Box::pin(async move {
56 let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
57 Ok(bytes)
58 })
59 }
60}
61
62#[derive_for_wrapped]
63pub trait RainbowStore: 'static + Send + Sync + Clone {
64 fn saved_point<T: 'static + Traversible, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
65 &self,
66 point: &Point<T>,
67 extra: Extra,
68 ) -> impl RainbowFuture<T = Point<T>> {
69 async {
70 self.save_point(point).await?;
71 Ok(point.with_resolve(self.resolve(), extra))
72 }
73 }
74 fn save_point(&self, point: &impl SingularFetch<T: Traversible>) -> impl RainbowFuture<T = ()> {
75 async {
76 if !self.contains(point.hash()).await? {
77 self.save_object(&point.fetch().await?).await?;
78 }
79 Ok(())
80 }
81 }
82 fn save_topology(&self, object: &impl Topological) -> impl RainbowFuture<T = ()> {
83 let mut futures = Vec::with_capacity(object.point_count());
84 object.traverse(&mut StoreVisitor {
85 store: self,
86 futures: &mut futures,
87 });
88 async {
89 for future in futures {
90 future.await?;
91 }
92 Ok(())
93 }
94 }
95 fn save_object(&self, object: &impl Traversible) -> impl RainbowFuture<T = ()> {
96 async {
97 self.save_topology(object).await?;
98 self.save_data(object.hashes(), &object.vec()).await?;
99 Ok(())
100 }
101 }
102 fn resolve(&self) -> Arc<dyn Resolve> {
103 Arc::new(StoreResolve {
104 store: self.clone(),
105 })
106 }
107 fn point_extra<T: 'static + FullHash, Extra: 'static + Send + Sync + Clone + ExtraFor<T>>(
108 &self,
109 hash: Hash,
110 extra: Extra,
111 ) -> Point<T> {
112 Point::from_address_extra(Address::from_hash(hash), self.resolve(), extra)
113 }
114 fn point<T: Object>(&self, hash: Hash) -> Point<T> {
115 self.point_extra(hash, ())
116 }
117 fn save_data(&self, hashes: ObjectHashes, data: &[u8]) -> impl RainbowFuture<T = ()>;
118 fn contains(&self, hash: Hash) -> impl RainbowFuture<T = bool>;
119 fn fetch(&self, hash: Hash)
120 -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<[u8]>>;
121}
122
123pub trait RainbowStoreMut: RainbowStore {
124 fn create_ref(
125 &self,
126 hash: Hash,
127 ) -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<str>> {
128 let _ = hash;
129 async { Err::<String, _>(object_rainbow::Error::Unimplemented) }
130 }
131 fn update_ref(
132 &self,
133 key: &str,
134 old: Option<OptionalHash>,
135 hash: Hash,
136 ) -> impl RainbowFuture<T = ()>;
137 fn fetch_ref(&self, key: &str) -> impl RainbowFuture<T = OptionalHash>;
138 fn ref_exists(&self, key: &str) -> impl RainbowFuture<T = bool>;
139 fn store_ref_raw<
140 T: Object<Extra>,
141 K: Send + Sync + AsRef<str>,
142 Extra: 'static + Send + Sync + Clone,
143 >(
144 &self,
145 key: K,
146 point: Point<T>,
147 extra: Extra,
148 ) -> StoreRef<Self, K, T, Extra> {
149 StoreRef {
150 store: self.clone(),
151 key,
152 old: point.hash().into(),
153 point,
154 extra,
155 }
156 }
157}
158
159#[derive(Clone)]
160pub struct StoreMut<S, Extra = ()> {
161 store: S,
162 extra: Extra,
163}
164
165impl<S> StoreMut<S> {
166 pub const fn new(store: S) -> Self {
167 Self::new_extra(store, ())
168 }
169}
170
171impl<S, Extra> StoreMut<S, Extra> {
172 pub const fn new_extra(store: S, extra: Extra) -> Self {
173 Self { store, extra }
174 }
175}
176
177impl<S: RainbowStoreMut, Extra: 'static + Send + Sync + Clone> StoreMut<S, Extra> {
178 pub async fn exists<K: Send + Sync + AsRef<str>>(
179 &self,
180 key: K,
181 ) -> object_rainbow::Result<bool> {
182 self.store.ref_exists(key.as_ref()).await
183 }
184
185 pub async fn create<T: Object<Extra>>(
186 &self,
187 point: Point<T>,
188 ) -> object_rainbow::Result<StoreRef<S, impl 'static + Send + Sync + AsRef<str>, T, Extra>>
189 {
190 let point = self.store.saved_point(&point, self.extra.clone()).await?;
191 let key = self.store.create_ref(point.hash()).await?;
192 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
193 }
194
195 pub async fn update<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
196 &self,
197 key: K,
198 point: Point<T>,
199 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
200 let point = self.store.saved_point(&point, self.extra.clone()).await?;
201 self.store
202 .update_ref(key.as_ref(), None, point.hash())
203 .await?;
204 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
205 }
206
207 pub async fn init<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
208 &self,
209 key: K,
210 point: Point<T>,
211 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
212 let point = self.store.saved_point(&point, self.extra.clone()).await?;
213 self.store
214 .update_ref(key.as_ref(), Some(OptionalHash::NONE), point.hash())
215 .await?;
216 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
217 }
218
219 pub async fn load<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
220 &self,
221 key: K,
222 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
223 let hash = self
224 .store
225 .fetch_ref(key.as_ref())
226 .await?
227 .get()
228 .ok_or(object_rainbow::Error::HashNotFound)?;
229 let point = self.store.point_extra(hash, self.extra.clone());
230 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
231 }
232
233 pub async fn load_or_init<T: Object<Extra> + Default + Clone, K: Send + Sync + AsRef<str>>(
234 &self,
235 key: K,
236 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
237 if let Some(hash) = self.store.fetch_ref(key.as_ref()).await?.get() {
238 let point = self.store.point_extra(hash, self.extra.clone());
239 Ok(self.store.store_ref_raw(key, point, self.extra.clone()))
240 } else {
241 self.init(key, Default::default()).await
242 }
243 }
244
245 pub async fn reference<T: Object<Extra>, K: Send + Sync + AsRef<str>>(
246 &self,
247 key: K,
248 point: Point<T>,
249 ) -> object_rainbow::Result<StoreRef<S, K, T, Extra>> {
250 Ok(StoreRef {
251 old: self.store.fetch_ref(key.as_ref()).await?,
252 ..self.store.store_ref_raw(key, point, self.extra.clone())
253 })
254 }
255}
256
257pub struct StoreRef<S, K, T, Extra> {
258 store: S,
259 key: K,
260 old: OptionalHash,
261 point: Point<T>,
262 extra: Extra,
263}
264
265impl<S, K, T, Extra> Deref for StoreRef<S, K, T, Extra> {
266 type Target = Point<T>;
267
268 fn deref(&self) -> &Self::Target {
269 &self.point
270 }
271}
272
273impl<S, K, T, Extra> DerefMut for StoreRef<S, K, T, Extra> {
274 fn deref_mut(&mut self) -> &mut Self::Target {
275 &mut self.point
276 }
277}
278
279impl<
280 S: RainbowStoreMut,
281 K: Send + Sync + AsRef<str>,
282 T: Object<Extra>,
283 Extra: 'static + Send + Sync + Clone,
284> StoreRef<S, K, T, Extra>
285{
286 pub fn is_modified(&self) -> bool {
287 self.point.hash() != self.old
288 }
289
290 pub fn is_new(&self) -> bool {
291 self.old.is_none()
292 }
293
294 pub async fn save_point(&mut self) -> object_rainbow::Result<()> {
295 self.point = self
296 .store
297 .saved_point(&self.point, self.extra.clone())
298 .await?;
299 Ok(())
300 }
301
302 pub async fn save(&mut self) -> object_rainbow::Result<()> {
303 if self.is_modified() {
304 self.save_point().await?;
305 self.store
306 .update_ref(self.key.as_ref(), Some(self.old), self.point.hash())
307 .await?;
308 self.old = self.point.hash().into();
309 }
310 Ok(())
311 }
312}
313
314#[derive(Parse, ParseInline)]
315struct StoredInner<S, E> {
316 hash: Hash,
317 extra: Extras<E>,
318 store: S,
319}
320
321#[derive(
322 ToOutput, InlineOutput, Tagged, Size, MaybeHasNiche, Clone, PartialEq, Eq, PartialOrd, Ord, Hash,
323)]
324pub struct Stored<S, T> {
325 point: Point<T>,
326 store: S,
327}
328
329impl<S: ListHashes, T> ListHashes for Stored<S, T> {
330 fn list_hashes(&self, f: &mut impl FnMut(Hash)) {
331 self.store.list_hashes(f);
332 }
333}
334
335impl<S: Topological, T> Topological for Stored<S, T> {
336 fn traverse(&self, visitor: &mut impl PointVisitor) {
337 self.store.traverse(visitor);
338 }
339}
340
341impl<S: RainbowStore, T: 'static + FullHash> Stored<S, T> {
342 fn from_inner<E: 'static + Send + Sync + Clone + ExtraFor<T>>(
343 StoredInner { hash, extra, store }: StoredInner<S, E>,
344 ) -> Self {
345 let point = store.point_extra(hash, extra.0);
346 Self { point, store }
347 }
348}
349
350impl<
351 S: RainbowStore + Parse<I>,
352 T: 'static + FullHash + Parse<I>,
353 I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
354> Parse<I> for Stored<S, T>
355{
356 fn parse(input: I) -> object_rainbow::Result<Self> {
357 input.parse().map(Self::from_inner)
358 }
359}
360
361impl<
362 S: RainbowStore + ParseInline<I>,
363 T: 'static + FullHash + Parse<I>,
364 I: PointInput<Extra: Send + Sync + ExtraFor<T>>,
365> ParseInline<I> for Stored<S, T>
366{
367 fn parse_inline(input: &mut I) -> object_rainbow::Result<Self> {
368 input.parse_inline().map(Self::from_inner)
369 }
370}
371
372assert_impl!(
373 impl<S, T, E> Object<E> for Stored<S, T>
374 where
375 S: RainbowStore + Object<E>,
376 T: Object<E>,
377 E: 'static + Send + Sync + Clone,
378 {
379 }
380);
381
382assert_impl!(
383 impl<S, T, E> Inline<E> for Stored<S, T>
384 where
385 S: RainbowStore + Inline<E>,
386 T: Object<E>,
387 E: 'static + Send + Sync + Clone,
388 {
389 }
390);
391
392impl<S, T> Stored<S, T> {
393 pub fn load(&self) -> &Point<T> {
394 &self.point
395 }
396}
397
398impl<S: RainbowStore, T: Traversible> Stored<S, T> {
399 pub async fn replace(&mut self, point: Point<T>) -> object_rainbow::Result<Point<T>> {
400 self.store.save_point(&point).await?;
401 Ok(std::mem::replace(&mut self.point, point))
402 }
403
404 pub async fn new(store: S, point: Point<T>) -> object_rainbow::Result<Self> {
405 store.save_point(&point).await?;
406 Ok(Self { point, store })
407 }
408}