object-rainbow-store 0.0.0-a.0

storage abstraction for object-rainbow
Documentation
use std::{
    ops::{Deref, DerefMut},
    pin::Pin,
    sync::Arc,
};

use object_rainbow::{
    Address, Fetch, Hash, Object, ObjectHashes, OptionalHash, Point, PointVisitor, Resolve,
    Singular, Topological,
};

pub trait RainbowFuture: Send + Future<Output = object_rainbow::Result<Self::T>> {
    type T;
}

impl<F: Send + Future<Output = object_rainbow::Result<T>>, T> RainbowFuture for F {
    type T = T;
}

struct StoreVisitor<'a, 'x, S: ?Sized> {
    store: &'a S,
    futures: &'x mut Vec<Pin<Box<dyn 'a + Send + Future<Output = object_rainbow::Result<()>>>>>,
}

impl<'a, 'x, S: RainbowStore, Extra: 'static + Send + Sync + Clone> PointVisitor<Extra>
    for StoreVisitor<'a, 'x, S>
{
    fn visit<T: Object<Extra>>(&mut self, point: &Point<T, Extra>) {
        let point = point.clone();
        let store = self.store;
        self.futures.push(Box::pin(async move {
            store.save_point(&point).await.map(|_| ())
        }));
    }
}

struct StoreResolve<S> {
    store: S,
}

impl<S: 'static + Send + RainbowStore> Resolve for StoreResolve<S> {
    fn resolve(
        &'_ self,
        address: Address,
    ) -> object_rainbow::FailFuture<'_, object_rainbow::ByteNode> {
        Box::pin(async move {
            let bytes = self.store.fetch(address.hash).await?.as_ref().to_vec();
            Ok((
                bytes,
                Arc::new(Self {
                    store: self.store.clone(),
                }) as _,
            ))
        })
    }

    fn name(&self) -> &str {
        self.store.name()
    }
}

pub trait RainbowStore: 'static + Send + Sync + Clone {
    fn save_point<T: Object<Extra>, Extra: 'static + Send + Sync + Clone>(
        &self,
        point: &Point<T, Extra>,
    ) -> impl RainbowFuture<T = Point<T, Extra>> {
        async {
            if !self.contains(point.hash()).await? {
                self.save_object(&point.fetch().await?).await?;
            }
            Ok(self.point_extra(point.hash(), point.extra().clone()))
        }
    }
    fn save_topology<Extra: 'static + Send + Sync + Clone>(
        &self,
        object: &impl Topological<Extra>,
    ) -> impl RainbowFuture<T = ()> {
        let mut futures = Vec::with_capacity(object.point_count());
        object.accept_points(&mut StoreVisitor {
            store: self,
            futures: &mut futures,
        });
        async {
            for future in futures {
                future.await?;
            }
            Ok(())
        }
    }
    fn save_object<Extra: 'static + Send + Sync + Clone>(
        &self,
        object: &impl Object<Extra>,
    ) -> impl RainbowFuture<T = ()> {
        async {
            self.save_topology(object).await?;
            self.save_data(object.hashes(), &object.vec()).await?;
            Ok(())
        }
    }
    fn point_extra<T: Object<Extra>, Extra: 'static + Send + Sync + Clone>(
        &self,
        hash: Hash,
        extra: Extra,
    ) -> Point<T, Extra> {
        Point::from_address_extra(
            Address::from_hash(hash),
            Arc::new(StoreResolve {
                store: self.clone(),
            }),
            extra,
        )
    }
    fn point<T: Object>(&self, hash: Hash) -> Point<T> {
        self.point_extra(hash, ())
    }
    fn save_data(&self, hashes: ObjectHashes, data: &[u8]) -> impl RainbowFuture<T = ()>;
    fn contains(&self, hash: Hash) -> impl RainbowFuture<T = bool>;
    fn fetch(&self, hash: Hash)
    -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<[u8]>>;
    fn name(&self) -> &str;
}

pub trait RainbowStoreMut: RainbowStore {
    fn create_ref(
        &self,
        hash: Hash,
    ) -> impl RainbowFuture<T = impl 'static + Send + Sync + AsRef<str>> {
        let _ = hash;
        async { Err::<String, _>(object_rainbow::error_fetch!("not supported")) }
    }
    fn update_ref(
        &self,
        key: &str,
        old: Option<OptionalHash>,
        hash: Hash,
    ) -> impl RainbowFuture<T = ()>;
    fn fetch_ref(&self, key: &str) -> impl RainbowFuture<T = OptionalHash>;
    fn ref_exists(&self, key: &str) -> impl RainbowFuture<T = bool>;
    fn create<T: Object>(
        &self,
        point: Point<T>,
    ) -> impl RainbowFuture<T = StoreRef<Self, impl 'static + Send + Sync + AsRef<str>, T, ()>>
    {
        async move {
            let point = self.save_point(&point).await?;
            let key = self.create_ref(point.hash()).await?;
            Ok(self.store_ref_raw(key, point))
        }
    }
    fn update<T: Object, K: Send + Sync + AsRef<str>>(
        &self,
        key: K,
        point: Point<T>,
    ) -> impl RainbowFuture<T = StoreRef<Self, K, T, ()>> {
        async move {
            let point = self.save_point(&point).await?;
            self.update_ref(key.as_ref(), None, point.hash()).await?;
            Ok(self.store_ref_raw(key, point))
        }
    }
    fn load<T: Object, K: Send + Sync + AsRef<str>>(
        &self,
        key: K,
    ) -> impl RainbowFuture<T = StoreRef<Self, K, T, ()>> {
        async move {
            let hash = self
                .fetch_ref(key.as_ref())
                .await?
                .get()
                .ok_or_else(|| object_rainbow::error_fetch!("key not found"))?;
            let point = self.point(hash);
            Ok(self.store_ref_raw(key, point))
        }
    }
    fn reference<T: Object, K: Send + Sync + AsRef<str>>(
        &self,
        key: K,
        point: Point<T>,
    ) -> impl RainbowFuture<T = StoreRef<Self, K, T, ()>> {
        async move {
            Ok(StoreRef {
                old: self.fetch_ref(key.as_ref()).await?,
                ..self.store_ref_raw(key, point)
            })
        }
    }
    fn store_ref_raw<
        T: Object<Extra>,
        K: Send + Sync + AsRef<str>,
        Extra: 'static + Send + Sync + Clone,
    >(
        &self,
        key: K,
        point: Point<T, Extra>,
    ) -> StoreRef<Self, K, T, Extra> {
        StoreRef {
            store: self.clone(),
            key,
            old: point.hash().into(),
            point,
        }
    }
}

pub struct StoreRef<S, K, T, Extra> {
    store: S,
    key: K,
    old: OptionalHash,
    point: Point<T, Extra>,
}

impl<S, K, T, Extra> Deref for StoreRef<S, K, T, Extra> {
    type Target = Point<T, Extra>;

    fn deref(&self) -> &Self::Target {
        &self.point
    }
}

impl<S, K, T, Extra> DerefMut for StoreRef<S, K, T, Extra> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.point
    }
}

impl<
    S: RainbowStoreMut,
    K: Send + Sync + AsRef<str>,
    T: Object<Extra>,
    Extra: 'static + Send + Sync + Clone,
> StoreRef<S, K, T, Extra>
{
    pub fn is_modified(&self) -> bool {
        self.point.hash() != self.old
    }

    pub fn is_new(&self) -> bool {
        self.old.is_none()
    }

    pub async fn save_point(&mut self) -> object_rainbow::Result<()> {
        self.point = self.store.save_point(&self.point).await?;
        Ok(())
    }

    pub async fn save(&mut self) -> object_rainbow::Result<()> {
        if self.is_modified() {
            self.save_point().await?;
            self.store
                .update_ref(self.key.as_ref(), Some(self.old), self.point.hash())
                .await?;
            self.old = self.point.hash().into();
        }
        Ok(())
    }
}