tlpt 0.7.0

Telepathy: syncable, append-only logs and sets
Documentation
use futures::{future, stream, FutureExt, StreamExt};
use litl::{impl_debug_as_litl, impl_nested_tagged_data_serde, NestedTaggedData};
use ridl::hashing::HashOf;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use thiserror::Error;
use tracing::error;

use crate::{
    telepathic::{ApplyDiffErrorFor, ApplyDiffSuccess, Telepathic, TelepathicDiff},
    StorageBackend,
};

#[derive(Debug)]
pub struct BlobState {
    pub id: BlobID,
    pub data: Option<litl::Val>,
}

impl BlobState {
    pub(crate) fn new_empty(id: BlobID) -> Self {
        Self { id, data: None }
    }

    pub(crate) fn new<D: Serialize>(data: D) -> Self {
        let value = litl::to_val(data).unwrap();
        Self {
            id: BlobID(HashOf::hash(&value)),
            data: Some(value),
        }
    }
}

#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct BlobID(HashOf<litl::Val>);

impl BlobID {
    pub fn test_random() -> Self {
        BlobID(HashOf::hash(&litl::to_val(&rand::random::<u32>()).unwrap()))
    }
}

impl NestedTaggedData for BlobID {
    const TAG: &'static str = "blob";

    type Inner = HashOf<litl::Val>;

    fn as_inner(&self) -> &Self::Inner {
        &self.0
    }

    fn from_inner(inner: Self::Inner) -> Self
    where
        Self: Sized,
    {
        BlobID(inner)
    }
}

impl_nested_tagged_data_serde!(BlobID);
impl_debug_as_litl!(BlobID);

#[derive(Clone, Serialize, Deserialize)]
pub struct BlobDiff {
    pub id: BlobID,
    pub data: Option<litl::Val>,
}

impl TelepathicDiff for BlobDiff {
    type ID = BlobID;

    fn id(&self) -> Self::ID {
        self.id
    }
}

pub type BlobStateInfo = bool;

impl Telepathic for BlobState {
    type ID = BlobID;
    type WriteAccess = ();
    type StateInfo = BlobStateInfo;
    type Diff = BlobDiff;
    type Error = BlobError;

    fn id(&self) -> Self::ID {
        self.id
    }

    fn try_apply_diff(
        &mut self,
        diff: Self::Diff,
    ) -> crate::telepathic::ApplyDiffResult<Self::StateInfo, Self::ID, Self::Diff, Self::Error>
    {
        if self.data.is_none() && diff.data.is_some() {
            if HashOf::hash(diff.data.as_ref().unwrap()) == self.id.0 {
                self.data = diff.data.clone();

                Ok(ApplyDiffSuccess {
                    new_state_info: true,
                    effective_diff: diff,
                })
            } else {
                Err(ApplyDiffErrorFor::Other(BlobError::InvalidHash))
            }
        } else {
            Ok(ApplyDiffSuccess {
                new_state_info: self.data.is_some(),
                effective_diff: BlobDiff {
                    id: self.id,
                    data: None,
                },
            })
        }
    }

    fn state_info(&self) -> Option<Self::StateInfo> {
        if self.data.is_some() {
            Some(true)
        } else {
            None
        }
    }

    fn diff_since(&self, state_info: Option<&Self::StateInfo>) -> Option<Self::Diff> {
        if state_info.is_some() {
            None
        } else if self.data.is_some() {
            Some(BlobDiff {
                id: self.id,
                data: self.data.clone(),
            })
        } else {
            None
        }
    }

    fn load(
        id: BlobID,
        storage: Box<dyn StorageBackend>,
    ) -> std::pin::Pin<Box<dyn futures::Stream<Item = Self::Diff>>> {
        stream::once(storage.get_key(&id.to_string()))
            .filter_map(move |maybe_bytes| {
                future::ready(
                    maybe_bytes
                        .and_then(|bytes| {
                            litl::from_slice(&bytes)
                                .map_err(|err| {
                                    error!(err = ?err, id = ?id, "Failed to read blob data");
                                    err
                                })
                                .ok()
                        })
                        .map(|data| BlobDiff {
                            id,
                            data: Some(data),
                        }),
                )
            })
            .boxed_local()
    }

    fn store(
        effective_diff: BlobDiff,
        storage: Box<dyn StorageBackend>,
    ) -> std::pin::Pin<Box<dyn futures::Future<Output = ()>>> {
        async move {
            if let Some(data) = effective_diff.data {
                storage
                    .set_key(&effective_diff.id.to_string(), litl::to_vec(&data).unwrap())
                    .await;
            }
        }
        .boxed_local()
    }
}

#[derive(Error, Debug)]
pub enum BlobError {
    #[error("Invalid blob hash")]
    InvalidHash,
}