use std::{collections::BTreeMap, marker::PhantomData, ops::Deref, pin::Pin};
use anyhow::{anyhow, Result};
use cid::Cid;
use futures::Stream;
use libipld_cbor::DagCborCodec;
use libipld_core::{
codec::{Codec, Encode},
ipld::Ipld,
};
use tokio::sync::OnceCell;
use crate::data::{
ChangelogIpld, ContentIpld, DelegationIpld, IdentityIpld, Jwt, Link, MapOperation, MemoIpld,
RevocationIpld, VersionedMapIpld, VersionedMapKey, VersionedMapValue,
};
use noosphere_collections::hamt::Hamt;
use noosphere_storage::{block_serialize, BlockStore};
use super::VersionedMapMutation;
pub type Content<S> = VersionedMap<String, Link<MemoIpld>, S>;
pub type Identities<S> = VersionedMap<String, IdentityIpld, S>;
pub type Delegations<S> = VersionedMap<Link<Jwt>, DelegationIpld, S>;
pub type Revocations<S> = VersionedMap<Link<Jwt>, RevocationIpld, S>;
#[derive(Debug)]
pub struct VersionedMap<K, V, S>
where
K: VersionedMapKey,
V: VersionedMapValue,
S: BlockStore,
{
cid: Cid,
store: S,
body: OnceCell<VersionedMapIpld<K, V>>,
hamt: OnceCell<Hamt<S, V, K>>,
changelog: OnceCell<ChangelogIpld<MapOperation<K, V>>>,
}
impl<K, V, S> VersionedMap<K, V, S>
where
K: VersionedMapKey,
V: VersionedMapValue,
S: BlockStore,
{
pub async fn to_body(&self) -> Result<VersionedMapIpld<K, V>> {
Ok(self
.body
.get_or_try_init(|| async { self.store.load::<DagCborCodec, _>(&self.cid).await })
.await?
.clone())
}
pub async fn get_changelog(&self) -> Result<&ChangelogIpld<MapOperation<K, V>>> {
self.changelog
.get_or_try_init(|| async { self.load_changelog().await })
.await
}
pub async fn load_changelog(&self) -> Result<ChangelogIpld<MapOperation<K, V>>> {
let ipld = self.to_body().await?;
self.store
.load::<DagCborCodec, ChangelogIpld<MapOperation<K, V>>>(&ipld.changelog)
.await
}
pub async fn get_hamt(&self) -> Result<&Hamt<S, V, K>> {
self.hamt
.get_or_try_init(|| async { self.load_hamt().await })
.await
}
async fn load_hamt(&self) -> Result<Hamt<S, V, K>> {
let ipld = self.to_body().await?;
ipld.load_hamt(&self.store).await
}
pub async fn at_or_empty<C>(cid: Option<C>, store: &mut S) -> Result<VersionedMap<K, V, S>>
where
C: Deref<Target = Cid>,
{
Ok(match cid {
Some(cid) => VersionedMap::<K, V, S>::at(&cid, store),
None => VersionedMap::<K, V, S>::empty(store).await?,
})
}
pub fn cid(&self) -> &Cid {
&self.cid
}
pub fn at(cid: &Cid, store: &S) -> VersionedMap<K, V, S> {
VersionedMap {
cid: *cid,
store: store.clone(),
body: OnceCell::new(),
hamt: OnceCell::new(),
changelog: OnceCell::new(),
}
}
pub async fn empty(store: &mut S) -> Result<VersionedMap<K, V, S>> {
let ipld = VersionedMapIpld::<K, V>::empty(store).await?;
let cid = store.save::<DagCborCodec, _>(ipld).await?;
Ok(VersionedMap {
cid,
hamt: OnceCell::new(),
body: OnceCell::new(),
changelog: OnceCell::new(),
store: store.clone(),
})
}
pub async fn get_added(&self) -> Result<BTreeMap<K, V>> {
let changelog = self.get_changelog().await?;
let mut added = BTreeMap::new();
for item in changelog.changes.iter() {
match item {
MapOperation::Add { key, value } => added.insert(key.clone(), value.clone()),
MapOperation::Remove { .. } => continue,
};
}
Ok(added)
}
pub async fn get(&self, key: &K) -> Result<Option<&V>> {
let hamt = self.get_hamt().await?;
hamt.get(key).await
}
pub async fn get_as_cid<C>(&self, key: &K) -> Result<Option<Cid>>
where
C: Codec + Default,
Ipld: Encode<C>,
u64: From<C>,
{
let hamt = self.get_hamt().await?;
let value = hamt.get(key).await?;
Ok(match value {
Some(value) => Some(block_serialize::<C, _>(value)?.0),
None => None,
})
}
pub async fn require(&self, key: &K) -> Result<&V> {
self.get(key)
.await?
.ok_or_else(|| anyhow!("Key {} not found!", key))
}
pub async fn require_as_cid<C>(&self, key: &K) -> Result<Cid>
where
C: Codec + Default,
Ipld: Encode<C>,
u64: From<C>,
{
self.get_as_cid(key)
.await?
.ok_or_else(|| anyhow!("Key {} not found!", key))
}
pub async fn apply_with_cid<C>(
cid: Option<C>,
mutation: &VersionedMapMutation<K, V>,
store: &mut S,
) -> Result<Cid>
where
C: Deref<Target = Cid>,
{
let map = Self::at_or_empty(cid, store).await?;
let mut changelog = map.get_changelog().await?.mark(mutation.did());
let mut hamt = map.load_hamt().await?;
for change in mutation.changes() {
match change {
MapOperation::Add { key, value } => {
hamt.set(key.clone(), value.clone()).await?;
}
MapOperation::Remove { key } => {
hamt.delete(key).await?;
}
};
changelog.push(change.clone())?;
}
let changelog_cid = store.save::<DagCborCodec, _>(&changelog).await?;
let hamt_cid = hamt.flush().await?;
let links_ipld = ContentIpld {
hamt: hamt_cid,
changelog: changelog_cid,
signature: PhantomData,
};
store.save::<DagCborCodec, _>(&links_ipld).await
}
pub async fn for_each<ForEach>(&self, for_each: ForEach) -> Result<()>
where
ForEach: FnMut(&K, &V) -> Result<()>,
{
self.get_hamt().await?.for_each(for_each).await
}
pub async fn stream<'a>(
&'a self,
) -> Result<Pin<Box<dyn Stream<Item = Result<(&'a K, &'a V)>> + 'a>>> {
Ok(self.get_hamt().await?.stream())
}
}
impl<K, V, S> VersionedMap<K, V, S>
where
K: VersionedMapKey + 'static,
V: VersionedMapValue + 'static,
S: BlockStore + 'static,
{
pub async fn into_stream(self) -> Result<impl Stream<Item = Result<(K, V)>>> {
Ok(self.load_hamt().await?.into_stream())
}
}