use futures::{future, stream, FutureExt, StreamExt};
use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
use ridl::hashing::HashOf;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use thiserror::Error;
use tracing::error;
use crate::{
telepathic::{ApplyDiffErrorFor, ApplyDiffSuccess, Telepathic, TelepathicDiff},
StorageBackend,
};
#[derive(Debug)]
pub struct BlobState {
pub id: BlobID,
pub data: Option<litl::Val>,
}
impl BlobState {
pub(crate) fn new_empty(id: BlobID) -> Self {
Self { id, data: None }
}
pub(crate) fn new<D: Serialize>(data: D) -> Self {
let value = litl::to_val(data).unwrap();
Self {
id: BlobID(HashOf::hash(&value)),
data: Some(value),
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct BlobID(HashOf<litl::Val>);
impl BlobID {
pub fn test_random() -> Self {
BlobID(HashOf::hash(&litl::to_val(&rand::random::<u32>()).unwrap()))
}
}
impl NestedTaggedData for BlobID {
const TAG: &'static str = "blob";
type Inner = HashOf<litl::Val>;
fn as_inner(&self) -> &Self::Inner {
&self.0
}
fn from_inner(inner: Self::Inner) -> Self
where
Self: Sized,
{
BlobID(inner)
}
}
impl_nested_tagged_data_serde!(BlobID);
impl_debug_as_litl!(BlobID);
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct BlobDiff {
pub id: BlobID,
pub data: Option<litl::Val>,
}
impl TelepathicDiff for BlobDiff {
type ID = BlobID;
fn id(&self) -> Self::ID {
self.id
}
}
pub type BlobStateInfo = bool;
impl Telepathic for BlobState {
type ID = BlobID;
type WriteAccess = ();
type StateInfo = BlobStateInfo;
type Diff = BlobDiff;
type Error = BlobError;
fn id(&self) -> Self::ID {
self.id
}
fn try_apply_diff(
&mut self,
diff: Self::Diff,
) -> crate::telepathic::ApplyDiffResult<Self::StateInfo, Self::ID, Self::Diff, Self::Error>
{
if self.data.is_none() && diff.data.is_some() {
if HashOf::hash(diff.data.as_ref().unwrap()) == self.id.0 {
self.data = diff.data.clone();
Ok(Some(ApplyDiffSuccess {
new_state_info: true,
effective_diff: diff,
}))
} else {
Err(ApplyDiffErrorFor::Other(BlobError::InvalidHash))
}
} else {
Ok(None)
}
}
fn state_info(&self) -> Option<Self::StateInfo> {
if self.data.is_some() {
Some(true)
} else {
None
}
}
fn diff_since(&self, state_info: Option<&Self::StateInfo>) -> Option<Self::Diff> {
if state_info.is_some() {
None
} else if self.data.is_some() {
Some(BlobDiff {
id: self.id,
data: self.data.clone(),
})
} else {
None
}
}
fn load(
id: BlobID,
storage: Box<dyn StorageBackend>,
) -> std::pin::Pin<Box<dyn futures::Stream<Item = Self::Diff>>> {
stream::once(storage.get_key(&id.to_string()))
.filter_map(move |maybe_bytes| {
future::ready(
maybe_bytes
.and_then(|bytes| {
litl::from_slice(&bytes)
.map_err(|err| {
error!(err = ?err, id = ?id, "Failed to read blob data");
err
})
.ok()
})
.map(|data| BlobDiff {
id,
data: Some(data),
}),
)
})
.boxed_local()
}
fn store(
effective_diff: BlobDiff,
storage: Box<dyn StorageBackend>,
) -> std::pin::Pin<Box<dyn futures::Future<Output = ()>>> {
async move {
if let Some(data) = effective_diff.data {
storage
.set_key(&effective_diff.id.to_string(), litl::to_vec(&data).unwrap())
.await;
}
}
.boxed_local()
}
}
#[derive(Error, Debug)]
pub enum BlobError {
#[error("Invalid blob hash")]
InvalidHash,
}