jazz-telepathy 0.1.1

A framework for distributed logs and append-only sets
Documentation
use std::{collections::HashMap, fmt::Debug};

use audi::{Listeners, Listener};
use conundrum::{
    hashing::{RawHash, RawHasher},
    purpose,
    signing::{SignPublicKey, Signed, SignatureError, SignKeyPair},
};
use litl::{impl_debug_as_litl};
use serde_derive::{Serialize, Deserialize};

use crate::{UpdateSource, KEEP_UNKNOWN};

purpose!(LogAppend);

#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename = "Telepathy/LogID")]
pub struct LogID(SignPublicKey<LogAppend>);

impl_debug_as_litl!(LogID);

pub struct LogWriteAccess(SignKeyPair<LogAppend>);

impl LogWriteAccess {
    pub fn id(&self) -> LogID {
        LogID(self.0.public())
    }
}

#[derive(Default)]
pub struct Logs {
    logs: HashMap<LogID, LogState>,
}

#[derive(Default)]
struct LogState {
    priority: u8,
    data: Vec<u8>,
    last_hash: Option<Signed<RawHash, LogAppend>>,
    hasher: RawHasher,
    listeners: Listeners<(LogAppendMessage, UpdateSource)>,
}

#[derive(Clone, Serialize, Deserialize)]
pub struct LogAppendMessage {
    pub log_id: LogID,
    pub after: usize,
    #[serde(with="serde_bytes")]
    pub append: Vec<u8>,
    pub new_hash: Signed<RawHash, LogAppend>,
}

impl_debug_as_litl!(LogAppendMessage);

#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
pub struct LogKnownState {
    pub log_len: usize,
    pub priority: u8,
}

impl LogKnownState {
    pub fn update_optimistically(&mut self, append: LogAppendMessage) {
        if append.after == self.log_len {
            self.log_len += append.append.len();
        }
    }
}

use thiserror::Error;

#[derive(Error, Debug)]
pub enum LogError {
    #[error("Invalid hash after append")]
    InvalidHash,
    #[error(transparent)]
    InvalidSignature(#[from] SignatureError)
}

impl Logs {
    pub fn create_log(&mut self) -> LogWriteAccess {
        let keypair = SignKeyPair::new_random();

        match self.logs.entry(LogID(keypair.public())) {
            std::collections::hash_map::Entry::Occupied(_) => unreachable!("Should never have existing log for new keypair"),
            std::collections::hash_map::Entry::Vacant(entry) => entry.insert(LogState::default()),
        };

        LogWriteAccess(keypair)
    }

    // TODO(correctness): Somehow make sure only one consumer can append
    pub fn append(&mut self, access: &LogWriteAccess, data: &[u8]) -> Result<(), LogError> {
        let log_state = self.logs.get(&access.id()).expect("Log should be created or loaded when appending");

        let mut new_hasher = log_state.hasher.clone();
        new_hasher.update(data);

        let current_len = log_state.data.len();

        self.accept_append(&LogAppendMessage {
            log_id: access.id(),
            after: current_len,
            append: data.to_vec(),
            new_hash: access.0.sign(new_hasher.finalize())
        }, UpdateSource::CreatedLocally)
    }

    pub fn current_data(&self, log_id: &LogID) -> Option<&[u8]> {
        self.logs.get(log_id).map(|log_state| log_state.data.as_slice())
    }

    pub(crate) fn accept_append(
        &mut self,
        msg: &LogAppendMessage,
        source: UpdateSource,
    ) -> Result<(), LogError> {
        msg.new_hash.ensure_signed_by(&msg.log_id.0)?;
        let log = if KEEP_UNKNOWN {
            self.logs.entry(msg.log_id).or_default()
        } else if let Some(log) = self.logs.get_mut(&msg.log_id) {
            log
        } else {
            // ignore unknown log messages
            return Ok(());
        };
        let mut new_hasher = log.hasher.clone();
        new_hasher.update(&msg.append);
        if new_hasher.finalize() == msg.new_hash.verified {
            log.data.extend(&msg.append);
            log.last_hash = Some(msg.new_hash.clone());
            log.hasher = new_hasher;

            log.listeners.emit((msg.clone(), source));

            Ok(())
        } else {
            Err(LogError::InvalidHash)
        }
    }

    pub(crate) fn all_log_ids(&self) -> impl Iterator<Item = &LogID> {
        self.logs.keys()
    }

    pub(crate) fn known_state(&self, log_id: &LogID) -> Option<LogKnownState> {
        self.logs.get(log_id).map(|log_state| LogKnownState {
            log_len: log_state.data.len(),
            priority: log_state.priority,
        })
    }

    pub(crate) fn get_append_since(
        &self,
        log_id: &LogID,
        known_state: Option<&LogKnownState>,
    ) -> Option<LogAppendMessage> {
        let known_len = known_state.map(|state| state.log_len).unwrap_or(0);
        self.logs.get(log_id).and_then(|log| {
            if log.data.len() > known_len {
                Some(LogAppendMessage {
                    log_id: *log_id,
                    after: known_len,
                    append: log.data[known_len..].to_vec(),
                    new_hash: log
                        .last_hash
                        .as_ref()
                        .expect("Should have last_hash with data")
                        .clone(),
                })
            } else {
                None
            }
        })
    }

    pub fn add_listener(
        &mut self,
        log_id: LogID,
        listener: Box<dyn Listener<(LogAppendMessage, UpdateSource)>>,
    ) {
        let log = self.logs.entry(log_id).or_default();
        log.listeners.add_with_initial_msg(
            listener,
            if !log.data.is_empty() {
                Some((
                    LogAppendMessage {
                        append: log.data.clone(),
                        log_id,
                        after: 0,
                        new_hash: log
                            .last_hash
                            .as_ref()
                            .expect("Should have last_hash with data")
                            .clone(),
                    },
                    UpdateSource::CurrentState,
                ))
            } else {
                None
            },
        )
    }
}