caro 0.7.1

caro: creation-addressed replicated objects
Documentation
use std::{
    cell::{RefCell, RefMut},
    collections::{hash_map::Entry, HashMap},
    rc::Rc,
};

use audi::{Listener, ListenerSet, ListenerWithState};
use futures::{
    channel::mpsc::{channel, Receiver, Sender},
    future::join_all,
    stream, Future, Stream, StreamExt,
};
use litl::impl_debug_as_litl;
use log::LogID;
use mofo::Mofo;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use tracing::{debug, debug_span, trace, warn};
use tracing::{info_span, Instrument};

mod blob;
mod causal_set;
mod log;
mod object;
mod set;
mod storage;
mod telepathic;

use crate::log::LogState;
pub use blob::{BlobDiff, BlobID, BlobState};
pub use object::{Diff, ObjectID, ObjectState, WriteAccess};
use object::{ObjectError, StateInfo};
use set::{SetDiff, SetState};
pub use set::{SetItem, SetItemID};
pub use storage::{MemoryStorageBackend, ShardedStorageBackend, StorageBackend};
use telepathic::{ApplyDiffErrorFor, ApplyDiffSuccess, Telepathic, TelepathicDiff};

pub type ApplyDiffError = ApplyDiffErrorFor<ObjectError>;

struct NodeInner {
    objects: HashMap<ObjectID, Rc<RefCell<ObjectState>>>,
    local_listeners: HashMap<ObjectID, ListenerSet<Diff>>,
    remotes: ListenerSet<UpdateMessage, RemoteState>,
    background: Mofo,
    storage: Box<dyn StorageBackend>,
}

impl NodeInner {
    fn load_object(&mut self, id: ObjectID) -> impl Future<Output = Rc<RefCell<ObjectState>>> {
        let mut needs_to_be_loaded = false;
        let object = self.objects.entry(id).or_insert_with(|| {
            needs_to_be_loaded = true;

            match id {
                ObjectID::Log(id) => Rc::new(RefCell::new(ObjectState::Log(Box::new(
                    LogState::new_empty(id),
                )))),
                ObjectID::Set(id) => Rc::new(RefCell::new(ObjectState::Set(Box::new(
                    SetState::new_empty(id),
                )))),
                ObjectID::Blob(id) => Rc::new(RefCell::new(ObjectState::Blob(Box::new(
                    BlobState::new_empty(id),
                )))),
            }
        });

        let object = Rc::clone(object);
        let storage = self.storage.clone_ref();

        async move {
            if needs_to_be_loaded {
                ObjectState::load(id, storage)
                    .for_each(|diff| {
                        let object = Rc::clone(&object);
                        async move {
                            let mut object = object.borrow_mut();
                            object.try_apply_diff(diff).unwrap();
                        }
                    })
                    .await;
                object
            } else {
                object
            }
        }
    }

    fn get_object(&self, id: &ObjectID) -> Option<Rc<RefCell<ObjectState>>> {
        self.objects.get(id).cloned()
    }
}

#[derive(Clone)]
pub struct Node(Rc<RefCell<NodeInner>>);

impl Node {
    pub fn new(background: Mofo) -> Self {
        Self(Rc::new(RefCell::new(NodeInner {
            objects: HashMap::new(),
            local_listeners: HashMap::new(),
            remotes: ListenerSet::new(),
            background,
            storage: Box::new(MemoryStorageBackend::new()),
        })))
    }

    pub async fn create_log<M: Serialize>(&self, meta: Option<M>) -> (ObjectID, WriteAccess) {
        let (log_id, write_access, stored_fut) = self.create_log_sync(meta);

        stored_fut.await;

        (log_id, write_access)
    }

    pub fn create_log_sync<M: Serialize>(&self, meta: Option<M>) -> (ObjectID, WriteAccess, impl Future<Output = ()>) {
        let (log, write_access) = LogState::new(meta);
        let log_id = log.id;

        let as_initial_diff = log.state_as_initial_diff().unwrap();

        match (*self.0).borrow_mut().objects.entry(ObjectID::Log(log.id)) {
            Entry::Occupied(_) => unreachable!("Log for given ID already exists"),
            Entry::Vacant(entry) => {
                entry.insert(Rc::new(RefCell::new(ObjectState::Log(Box::new(log)))))
            }
        };

        let stored_fut = LogState::store(as_initial_diff, self.0.borrow().storage.clone_ref());

        (ObjectID::Log(log_id),  WriteAccess::Log(write_access), stored_fut)
    }

    pub async fn create_set<M: Serialize>(&self, meta: Option<M>) -> (ObjectID, WriteAccess) {
        let (set, write_access) = SetState::new(meta);
        let set_id = set.id;

        let as_initial_diff = set.state_as_initial_diff().unwrap();

        match (*self.0).borrow_mut().objects.entry(ObjectID::Set(set.id)) {
            Entry::Occupied(_) => unreachable!("Set for given ID already exists"),
            Entry::Vacant(entry) => {
                entry.insert(Rc::new(RefCell::new(ObjectState::Set(Box::new(set)))))
            }
        };

        SetState::store(as_initial_diff, self.0.borrow().storage.clone_ref()).await;

        (ObjectID::Set(set_id), WriteAccess::Set(write_access))
    }

    pub async fn create_blob<D: Serialize>(&self, data: D) -> ObjectID {
        let blob = BlobState::new(data);
        let id = blob.id();

        let as_initial_diff = blob.state_as_initial_diff().unwrap();

        (*self.0).borrow_mut().objects.insert(
            ObjectID::Blob(id),
            Rc::new(RefCell::new(ObjectState::Blob(Box::new(blob)))),
        );

        let storage = self.0.borrow().storage.clone_ref();

        BlobState::store(as_initial_diff, storage).await;

        ObjectID::Blob(id)
    }

    pub fn diff_for_log_append(
        &self,
        id: ObjectID,
        write_access: &WriteAccess,
        append: &[u8],
    ) -> Diff {
        if append.is_empty() {
            panic!("Empty append")
        }
        match write_access {
            WriteAccess::Log(log_access) => {
                let self_ref = (*self.0).borrow_mut();

                let log = self_ref
                    .get_object(&id)
                    .expect("Expected log to be created or loaded before appending");
                let log = RefMut::map((*log).borrow_mut(), |object| match object {
                    ObjectState::Log(log) => log,
                    _ => unreachable!("Expected log to be log"),
                });

                Diff::Log(log.diff_for_new_append(append, log_access))
            }
            _ => panic!("Tried to append to log with set write access"),
        }
    }
    pub async fn append_to_log(&self, id: ObjectID, write_access: &WriteAccess, append: &[u8]) {
        let diff = self.diff_for_log_append(id, write_access, append);
        self.apply_new_diff(diff).await;
    }

    pub async fn insert_into_set(
        &self,
        id: ObjectID,
        write_access: &WriteAccess,
        set_item: SetItem,
    ) -> Result<SetItemID, ApplyDiffError> {
        match write_access {
            WriteAccess::Set(set_access) => {
                let item_id = set_item.id();

                let diff = SetDiff {
                    id: id.expect_set(),
                    header: None,
                    new_items: vec![set_access.0.sign(set_item)],
                };

                self.apply_new_diff(Diff::Set(diff)).await;

                Ok(item_id)
            }
            _ => panic!("Tried to insert into set with log write access"),
        }
    }

    pub async fn apply_new_diff(&self, diff: Diff) {
        let span = info_span!(
            "Caro::apply_new_diff",
            object_id = format!("{:?}", diff.id())
        );
        let span_id = span.id();

        async {
            let object_id = diff.id();

            let listeners = {
                let self_ref = (*self.0).borrow_mut();
                let object = self_ref
                    .get_object(&diff.id())
                    .expect("Expected object to be created or loaded before applying diff");

                let result = (*object)
                    .borrow_mut()
                    .try_apply_diff(diff.clone())
                    .expect("New diffs should always be valid");

                ObjectState::store(result.effective_diff, self_ref.storage.clone_ref()).await;

                debug_assert!(match (&diff, (*object).borrow().state_as_initial_diff()) {
                    (Diff::Set(set_diff), Some(Diff::Set(all_items))) => set_diff.new_items.iter().all(|i| all_items.new_items.contains(i)),
                    _ => true
                }, "Expected new set items to be connected and show up in state - make sure they were inserted into the correct set.");

                self_ref.local_listeners.get(&diff.id()).cloned()
            };

            if let Some(listeners) = listeners {
                listeners.broadcast(diff).instrument(info_span!(parent: span_id.clone(), "Caro::local_listener_broadcast", object_id = format!("{:?}", object_id))).await;
            }

            self.sync_object_with_remotes(object_id, span_id.clone()).await;
        }.instrument(span).await;
    }

    pub async fn sync_object_with_remotes(&self, id: ObjectID, parent_span: Option<tracing::Id>) {
        let (object, remotes) = {
            let self_ref = (*self.0).borrow_mut();

            let object = self_ref
                .get_object(&id)
                .expect("Expected object to be created or loaded before syncing");

            let remotes = self_ref.remotes.clone();

            (object, remotes)
        };

        remotes
            .broadcast_filter_map(|remote| {
                let optimistic_state_info_entry = remote.optimistic_state_info.entry(id);

                match (optimistic_state_info_entry, remote.mode) {
                    (entry @ Entry::Occupied(_), _)
                    | (entry @ Entry::Vacant(_), RemoteMode::SyncEverything) => {
                        let optimistic_state = entry.or_default();

                        let update = (*object)
                            .borrow()
                            .diff_since(optimistic_state.as_ref())
                            .map(UpdateMessage::NewContent);

                        optimistic_state.replace(
                            (*object)
                                .borrow()
                                .state_info()
                                .expect("Should have state info of synced object"),
                        );

                        update
                    }
                    _ => None,
                }
            })
            .instrument(info_span!(
                parent: parent_span,
                "sync_object_with_remotes",
                id = format!("{:?}", id)
            ))
            .await
    }

    pub fn get_object(&self, id: &ObjectID) -> Option<Rc<RefCell<ObjectState>>> {
        (*self.0).borrow().get_object(id)
    }

    pub async fn load_object(&self, id: ObjectID) -> Rc<RefCell<ObjectState>> {
        let loaded = (*self.0).borrow_mut().load_object(id);
        loaded.await
    }

    async fn add_diff_listener(&self, id: ObjectID, listener: Listener<Diff>) {
        let (listeners, loading_object) = {
            let mut self_ref = (*self.0).borrow_mut();
            let listeners = self_ref
                .local_listeners
                .entry(id)
                .or_insert_with(ListenerSet::new)
                .clone();
            let loading_object = self_ref.load_object(id);

            (listeners, loading_object)
        };
        let object = loading_object.await;

        let initial_diff = object.borrow().state_as_initial_diff();
        let is_empty = object.borrow().state_info().is_none();

        listeners.add_with_initial_msg(listener, initial_diff).await;

        if is_empty {
            debug!(object_id = ?id, "Listened to object is empty, syncing with remotes");
            let remotes = (*self.0).borrow().remotes.clone();
            remotes
                .broadcast_filter_map(|remote| {
                    if remote.mode == RemoteMode::SyncEverything {
                        Some(UpdateMessage::StateInfo(StateInfoUpdate {
                            id: object.borrow().id(),
                            state_info: None,
                            needs_full_resync: true,
                        }))
                    } else {
                        None
                    }
                })
                .await;
        }
    }

    pub fn diffs(
        &self,
        id: ObjectID,
        listener_prefix: String,
    ) -> impl Stream<Item = Diff> + 'static {
        let (diffs_tx, diffs_rx) = channel(100);

        let self_clone = self.clone();

        stream::once(async move {
            self_clone
                .add_diff_listener(
                    id,
                    Listener::new(
                        &format!("{}_{:?}", listener_prefix, rand::random::<u64>()),
                        diffs_tx,
                    ),
                )
                .await;

            diffs_rx
        })
        .flatten()
        .boxed_local()
    }

    pub async fn add_remote(&self, remote: Remote) {
        (*self.0).borrow_mut().remotes.add(RemoteState {
            id: remote.id.clone(),
            mode: remote.mode,
            optimistic_state_info: HashMap::new(),
            outgoing: remote.outgoing.clone(),
        });

        let self_rc = self.clone();
        let remotes = (*self.0).borrow().remotes.clone();
        let remote_id = remote.id.clone();

        let receiving = remote.incoming.for_each(move |msg| {
            let self_rc = self_rc.clone();
            let remotes = remotes.clone();
            let remote_id = remote_id.clone();

            async move {
                trace!(remote_id = remote_id, msg = ?msg, "Receiving from remote");

                match msg {
                    UpdateMessage::NewContent(diff) => {
                        self_rc
                            .handle_incoming_diff(diff, &remote_id, remotes)
                            .await;
                    }
                    UpdateMessage::StateInfo(StateInfoUpdate {
                        id,
                        state_info: new_state_info,
                        needs_full_resync,
                    }) => {
                        self_rc
                            .handle_incoming_state_info(
                                id,
                                new_state_info,
                                needs_full_resync,
                                &remote_id,
                                remotes,
                            )
                            .await;
                    }
                }
            }
        });

        if remote.mode == RemoteMode::SyncEverything {
            let remotes = (*self.0).borrow().remotes.clone();
            let remote_id = remote.id.clone();
            #[allow(clippy::needless_collect)]
            let all_state_infoes = (*self.0)
                .borrow()
                .objects
                .iter()
                .map(|(id, obj)| (*id, (**obj).borrow().state_info()))
                .collect::<Vec<_>>();

            let span = debug_span!("Sending all state_infos", remote_id = remote_id);

            join_all(all_state_infoes.into_iter().map(|(id, state_info)| {
                remotes.send_or_remove(
                    &remote_id,
                    UpdateMessage::StateInfo(StateInfoUpdate {
                        id,
                        state_info,
                        needs_full_resync: true,
                    }),
                    span.id(),
                )
            }))
            .instrument(span)
            .await;
        }

        (*self.0)
            .borrow()
            .background
            .add_background_task(Box::pin(receiving));
    }

    async fn handle_incoming_diff(
        &self,
        diff: Diff,
        remote_id: &str,
        remotes: ListenerSet<UpdateMessage, RemoteState>,
    ) {
        let span = info_span!("Caro::handle_incoming_diff", remote_id = remote_id);
        let span_id = span.id();

        async {
            let (apply_result, id, current_state_info) = {
                let object_promise = (*self.0).borrow_mut().load_object(diff.id());
                let object_rc = object_promise.instrument(info_span!(parent: span_id.clone(), "NodeInner::load_object", id = format!("{:?}", diff.id()))).await;
                let mut object = (*object_rc).borrow_mut();

                (
                    info_span!(parent: span_id.clone(), "ObjectState::try_apply_diff", id = format!("{:?}", diff.id())).in_scope(||
                        object.try_apply_diff(diff.clone())
                    ),
                    object.id(),
                    info_span!(parent: span_id.clone(), "ObjectState::state_info", id = format!("{:?}", diff.id())).in_scope(||
                        object.state_info()
                    ),
                )
            };
            match apply_result {
                Ok(ApplyDiffSuccess{new_state_info, effective_diff}) => {
                    if let Some(remote) = remotes.state_of(remote_id) {
                        (*remote)
                            .borrow_mut()
                            .optimistic_state_info
                            .insert(id, Some(new_state_info));
                    }

                    ObjectState::store(effective_diff.clone(), self.0.borrow().storage.clone_ref()).await;

                    let listeners = (*self.0).borrow_mut().local_listeners.get(&id).cloned();

                    if let Some(listeners) = listeners {
                        listeners.broadcast(effective_diff).instrument(info_span!(parent: span_id.clone(), "Caro::local_listener_broadcast", object_id = format!("{:?}", id))).await;
                    }

                    self.sync_object_with_remotes(id, span_id.clone()).await;
                }
                Err(ApplyDiffError::InvalidKnownStateAssumption) => {
                    warn!(state_info = ?current_state_info, diff = ?diff, "Invalid known state assumption");
                    let span = info_span!(parent: span_id.clone(), "reply_invalid_state_assumption", id = format!("{:?}", id));
                    // remote must be mistaken about our state, let them know
                    remotes
                        .send_or_remove(
                            remote_id,
                            UpdateMessage::StateInfo(StateInfoUpdate {
                                id,
                                state_info: current_state_info,
                                needs_full_resync: true
                            }),
                            span.id(),
                        )
                        .instrument(span)
                        .await;
                }
                Err(err) => Err(err).unwrap(),
            }
        }
        .instrument(span)
        .await;
    }

    async fn handle_incoming_state_info(
        &self,
        id: ObjectID,
        new_state_info: Option<StateInfo>,
        needs_full_resync: bool,
        remote_id: &str,
        remotes: ListenerSet<UpdateMessage, RemoteState>,
    ) {
        let span = info_span!("Caro::handle_incoming_state_info", remote_id = remote_id);
        let span_id = span.id();

        async {
            if !needs_full_resync {
                return
            }
        let update_to_respond_with = if let Some(remote) = remotes.state_of(remote_id) {
            (*remote)
                .borrow_mut()
                .optimistic_state_info
                .insert(id, new_state_info.clone());

            if let Some(object) = (*self.0).borrow_mut().objects.get(&id).cloned() {
                let diff = info_span!(parent: span_id.clone(), "ObjectState::diff_since", id = format!("{:?}", id)).in_scope(||
                    (*object).borrow().diff_since(new_state_info.as_ref())
                );
                if let Some(update) = diff {
                    Some(UpdateMessage::NewContent(update))
                } else {
                    let own_state_info = (*object).borrow().state_info();

                    if own_state_info != new_state_info {
                        // TODO: we can still get stuck in an endless loop here if we can't reconcile
                        // compare if we made progress since the last remembered optimistic state_info, only then send again
                        Some(UpdateMessage::StateInfo(StateInfoUpdate {
                            id,
                            state_info: own_state_info,
                            needs_full_resync: true,
                        }))
                    } else {
                        None
                    }
                }
            } else {
                Some(UpdateMessage::StateInfo(StateInfoUpdate {
                    id,
                    state_info: None,
                    // if we have nothing, we only want a resync if the other party has something
                    needs_full_resync: new_state_info.is_some()
                }))
            }
        } else {
            None
        };
        if let Some(update) = update_to_respond_with {
            let remotes = remotes.clone();
            let span = info_span!(parent: span_id.clone(), "reply_with_update");
            remotes.send_or_remove(remote_id, update, span.id()).instrument(span).await;
        }
    }
    .instrument(span)
    .await;
    }
}

#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum UpdateMessage {
    NewContent(Diff),
    StateInfo(StateInfoUpdate),
}

#[derive(Clone, Serialize, Deserialize)]
pub struct StateInfoUpdate {
    id: ObjectID,
    state_info: Option<StateInfo>,
    needs_full_resync: bool,
}

impl_debug_as_litl!(UpdateMessage);

pub struct Remote {
    pub id: String,
    pub mode: RemoteMode,
    pub incoming: Receiver<UpdateMessage>,
    pub outgoing: Sender<UpdateMessage>,
}

impl Remote {
    pub fn new_connected_test_pair(node1_name: &str, node2_name: &str) -> (Remote, Remote) {
        let (to_node1, node1_rx) = channel(100);
        let (to_node2, node2_rx) = channel(100);

        (
            Remote {
                id: node1_name.to_string(),
                mode: RemoteMode::ProvideWhatRemoteWants,
                incoming: node2_rx,
                outgoing: to_node1,
            },
            Remote {
                id: node2_name.to_string(),
                mode: RemoteMode::SyncEverything,
                incoming: node1_rx,
                outgoing: to_node2,
            },
        )
    }
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum RemoteMode {
    SyncEverything,
    ProvideWhatRemoteWants,
}

struct RemoteState {
    id: String,
    mode: RemoteMode,
    optimistic_state_info: HashMap<ObjectID, Option<StateInfo>>,
    outgoing: Sender<UpdateMessage>,
}

impl ListenerWithState<UpdateMessage> for RemoteState {
    fn id(&self) -> &str {
        &self.id
    }

    fn sender(&self) -> &Sender<UpdateMessage> {
        &self.outgoing
    }
}