jazz-telepathy 0.1.1

A framework for distributed logs and append-only sets
Documentation
use futures::{SinkExt, Future};
use log::{info, error};
use logs::{LogAppendMessage, LogID, LogKnownState, Logs};
use rand::RngCore;
use serde_derive::{Deserialize, Serialize};
use sets::{AcceptInsertResult, SetID, SetInsertMessage, SetKnownState, Sets};
use thiserror::Error;
use std::{
    cell::RefCell,
    collections::HashMap,
    rc::Rc
};

// TODO-V1(test): Test local log usage
// TODO-V1(test): Test local set usage
// TODO-V1(test): Test persistence
// TODO-V1(test): Test log syncing
// TODO-V1(test): Test set syncing
// TODO-V1(test): Test auth

const KEEP_UNKNOWN: bool = true;

#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum UpdateSource {
    CurrentState,
    Loaded,
    ReceivedFrom(NodeID),
    CreatedLocally,
}

#[derive(Default)]
pub struct TelepathyState {
    pub logs: Logs,
    pub sets: Sets,
}

mod causal_set;
pub mod logs;
pub mod sets;

#[derive(Clone, Default)]
struct KnownState {
    logs: HashMap<LogID, LogKnownState>,
    sets: HashMap<SetID, SetKnownState>,
}


pub struct TelepathyNode {
    pub local_state: TelepathyState,
    remotes: HashMap<NodeID, RemoteNode>,
    new_remote_mpsc: (futures::channel::mpsc::Sender<RemoteNode>, futures::channel::mpsc::Receiver<RemoteNode>),
}

impl TelepathyNode {
    pub fn create() -> Self {
        TelepathyNode {
            local_state: TelepathyState::default(),
            remotes: Default::default(),
            new_remote_mpsc: futures::channel::mpsc::channel(10),
        }
    }

    pub fn new_rc() -> Rc<RefCell<Self>> {
        Rc::new(RefCell::new(Self::create()))
    }

    fn add_remote(
        &mut self,
        remote: RemoteNode
    ) {
        info!("Remote added {:?}", remote.id);
        self.remotes.insert(
            remote.id,
            remote,
        );
    }

    pub fn remotes(&self) -> RemoteManager {
        RemoteManager{adder: self.new_remote_mpsc.0.clone()}
    }

    pub fn sync_with_remotes(&mut self) {
        while let Ok(Some(new_remote)) = self.new_remote_mpsc.1.try_next() {
            self.add_remote(new_remote);
        }

        // Receive from all
        self.remotes.retain(|_, remote| {
            loop {
                match remote.receiver.try_receive() {
                    Err(UpdateReceiverError::TryAgainLater) => {
                        break true;
                    },
                    Err(UpdateReceiverError::Closed) => {
                        info!("Remote {:?} disconnected", remote.id);
                        break false
                    },
                    Ok(update) => match update {
                        UpdateMessage::LogUpdateKnownState { id, known_state } => {
                            remote
                                .optimistic_remote_known_state
                                .logs
                                .insert(id, known_state);
                        }
                        UpdateMessage::LogAppend { id, new_append } => {
                            match self
                                .local_state
                                .logs
                                .accept_append(&new_append, UpdateSource::ReceivedFrom(remote.id))
                            {
                                Ok(()) => {}
                                Err(logs::LogError::InvalidHash) => {
                                    remote.logs_that_need_full_resync.insert(id, true);
                                }
                                Err(accept_err) => {
                                    error!(
                                        "Error accepting append: {} {:?}",
                                        accept_err, new_append
                                    );
                                }
                            }
                            remote
                                .optimistic_remote_known_state
                                .logs
                                .entry(id)
                                .or_default()
                                .update_optimistically(new_append);
                        }
                        UpdateMessage::SetUpdateKnownState { id, known_state } => {
                            remote
                                .optimistic_remote_known_state
                                .sets
                                .insert(id, known_state);
                        }

                        UpdateMessage::SetInsert { id, new_insert } => {
                            match self
                                .local_state
                                .sets
                                .accept_insert(&new_insert, UpdateSource::ReceivedFrom(remote.id))
                            {
                                Ok(AcceptInsertResult::AllConnected) => {}
                                Ok(AcceptInsertResult::HasDisconnected) => {
                                    remote.sets_that_need_full_resync.insert(id, true);
                                }
                                Err(accept_err) => {
                                    error!(
                                        "Error accepting insert: {} {:?}",
                                        accept_err, new_insert
                                    );
                                }
                            }
                            remote
                                .optimistic_remote_known_state
                                .sets
                                .entry(id)
                                .or_default()
                                .update_optimistically(
                                    new_insert,
                                    self.local_state.sets.current_items(&id),
                                );
                        }
                    }
                }
            }
        });

        // Send to all
        for remote in self.remotes.values_mut() {
            let log_ids_to_send = match remote.sync_mode {
                SyncMode::SyncAll => self
                    .local_state
                    .logs
                    .all_log_ids()
                    .cloned()
                    .collect::<Vec<_>>(),
                SyncMode::SyncWhatRemoteWants => remote
                    .optimistic_remote_known_state
                    .logs
                    .keys()
                    .cloned()
                    .collect(),
            };

            for log_id in log_ids_to_send {
                if let Some(append_since) = self.local_state.logs.get_append_since(
                    &log_id,
                    remote.optimistic_remote_known_state.logs.get(&log_id),
                ) {
                    remote
                        .optimistic_remote_known_state
                        .logs
                        .entry(log_id)
                        .or_default()
                        .update_optimistically(append_since.clone());

                    remote.sender.send(UpdateMessage::LogAppend {
                        id: log_id,
                        new_append: append_since,
                    });

                    if remote.logs_that_need_full_resync.get(&log_id).is_none() {
                        remote.logs_that_need_full_resync.insert(log_id, false);
                    }
                };

                if remote.logs_that_need_full_resync.get(&log_id).cloned().unwrap_or(true)
                {
                    remote.sender.send(UpdateMessage::LogUpdateKnownState {
                        id: log_id,
                        known_state: self
                            .local_state
                            .logs
                            .known_state(&log_id)
                            .unwrap_or_default(),
                    });
                    remote.logs_that_need_full_resync.insert(log_id, false);
                }
            }

            let set_ids_to_send = match remote.sync_mode {
                SyncMode::SyncAll => self
                    .local_state
                    .sets
                    .all_set_ids()
                    .cloned()
                    .collect::<Vec<_>>(),
                SyncMode::SyncWhatRemoteWants => remote
                    .optimistic_remote_known_state
                    .sets
                    .keys()
                    .cloned()
                    .collect(),
            };

            for set_id in set_ids_to_send {
                if let Some(inserts_since) = self.local_state.sets.get_inserts_since(
                    &set_id,
                    remote
                        .optimistic_remote_known_state
                        .sets
                        .entry(set_id)
                        .or_default(),
                ) {
                    remote
                        .optimistic_remote_known_state
                        .sets
                        .entry(set_id)
                        .or_default()
                        .update_optimistically(
                            inserts_since.clone(),
                            self.local_state.sets.current_items(&set_id),
                        );

                    remote.sender.send(UpdateMessage::SetInsert {
                        id: set_id,
                        new_insert: inserts_since,
                    });

                    if remote.sets_that_need_full_resync.get(&set_id).is_none() {
                        remote.sets_that_need_full_resync.insert(set_id, false);
                    }
                };

                if remote.sets_that_need_full_resync.get(&set_id).cloned().unwrap_or(true)
                {
                    remote.sender.send(UpdateMessage::SetUpdateKnownState {
                        id: set_id,
                        known_state: self
                            .local_state
                            .sets
                            .known_state(&set_id)
                            .unwrap_or_default(),
                    });
                    remote.sets_that_need_full_resync.insert(set_id, false);
                }
            }
        }
    }
}

pub struct RemoteNode {
    id: NodeID,
    optimistic_remote_known_state: KnownState,
    receiver: Box<dyn UpdateReceiver>,
    sender: Box<dyn UpdateSender>,
    sync_mode: SyncMode,
    logs_that_need_full_resync: HashMap<LogID, bool>,
    sets_that_need_full_resync: HashMap<SetID, bool>,
}

impl RemoteNode {
    pub fn new<R: UpdateReceiver + 'static, S: UpdateSender + 'static>(
        receiver: R,
        sender: S,
        sync_mode: SyncMode
    ) -> Self {
        RemoteNode {
            id: NodeID::new_random(),
            receiver: Box::new(receiver),
            sender: Box::new(sender),
            optimistic_remote_known_state: KnownState::default(),
            sync_mode,
            logs_that_need_full_resync: HashMap::new(),
            sets_that_need_full_resync: HashMap::new(),
        }
    }

    pub fn new_connected_test_pair() -> (Self, Self) {
        let (sender1, receiver1) = futures::channel::mpsc::channel(100);
        let (sender2, receiver2) = futures::channel::mpsc::channel(100);

        (RemoteNode::new(
            receiver1,
            sender2,
            SyncMode::SyncAll,
        ), RemoteNode::new(
            receiver2,
            sender1,
            SyncMode::SyncAll,
        ))
    }
}

#[derive(Clone)]
pub struct RemoteManager {
    adder: futures::channel::mpsc::Sender<RemoteNode>
}

impl RemoteManager {
    pub fn add(&mut self, remote: RemoteNode) -> impl Future<Output=Result<(), futures::channel::mpsc::SendError>> + '_{
        self.adder.send(remote)
    }

    pub fn try_add_sync(&mut self, remote: RemoteNode) -> Result<(), futures::channel::mpsc::TrySendError<RemoteNode>> {
        self.adder.try_send(remote)
    }
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum SyncMode {
    SyncAll,
    SyncWhatRemoteWants,
}

pub trait UpdateReceiver: Send {
    fn try_receive(&mut self) -> Result<UpdateMessage, UpdateReceiverError>;
}

#[derive(Error, Debug)]
pub enum UpdateReceiverError {
    #[error("Try getting updates again later (transient error)")]
    TryAgainLater,
    #[error("Update receiver closed")]
    Closed
}

impl UpdateReceiver for futures::channel::mpsc::Receiver<UpdateMessage> {
    fn try_receive(&mut self) ->  Result<UpdateMessage, UpdateReceiverError> {
        let maybe_update = futures::channel::mpsc::Receiver::<UpdateMessage>::try_next(self);
        match maybe_update {
            Ok(Some(update)) => {
                info!("Received: {}", update.short_summary());
                Ok(update)
            },
            Ok(None) => Err(UpdateReceiverError::Closed),
            Err(_) => Err(UpdateReceiverError::TryAgainLater)
        }
    }
}

pub trait UpdateSender: Send {
    fn send(&mut self, update: UpdateMessage);
}

impl UpdateSender for futures::channel::mpsc::Sender<UpdateMessage> {
    fn send(&mut self, update: UpdateMessage) {
        info!("Sending: {}", update.short_summary());
        futures::channel::mpsc::Sender::try_send(self, update).unwrap();
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum UpdateMessage {
    LogAppend {
        id: LogID,
        new_append: LogAppendMessage,
    },
    LogUpdateKnownState {
        id: LogID,
        known_state: LogKnownState,
    },
    SetInsert {
        id: SetID,
        new_insert: SetInsertMessage,
    },
    SetUpdateKnownState {
        id: SetID,
        known_state: SetKnownState,
    },
}

impl UpdateMessage {
    pub fn short_summary(&self) -> String {
        match self {
            UpdateMessage::LogAppend { id, new_append } => format!("{:?}: + {} bytes", id, new_append.append.len()),
            UpdateMessage::LogUpdateKnownState { id, known_state } => format!("{:?}: new known state {} bytes)", id, known_state.log_len),
            UpdateMessage::SetInsert { id, new_insert } => format!("{:?}: + {} items", id, new_insert.new_items.len()),
            UpdateMessage::SetUpdateKnownState { id, known_state } => format!("{:?}: new known state {:?}", id, known_state.frontier),
        }
    }
}

#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub struct NodeID([u8; 12]);

impl NodeID {
    pub fn new_random() -> Self {
        let mut rng = rand::thread_rng();
        let mut id = [0u8; 12];
        rng.fill_bytes(&mut id);
        NodeID(id)
    }
}