use std::{
cell::{RefCell, RefMut},
collections::{hash_map::Entry, HashMap},
rc::Rc,
};
use audi::{Listener, ListenerSet, ListenerWithState};
use futures::{
channel::mpsc::{channel, Receiver, Sender},
future::join_all,
stream, Future, Stream, StreamExt,
};
use litl::impl_debug_as_litl;
use mofo::Mofo;
use serde::Serialize;
use serde_derive::{Deserialize, Serialize};
use tracing::{debug, debug_span, trace, warn};
use tracing::{info_span, Instrument};
mod blob;
mod causal_set;
mod log;
mod object;
mod set;
mod storage;
mod telepathic;
use crate::log::LogState;
pub use blob::{BlobDiff, BlobID, BlobState};
pub use object::{Diff, ObjectID, ObjectState, WriteAccess};
use object::{ObjectError, StateInfo};
use set::{SetDiff, SetState};
pub use set::{SetItem, SetItemID};
pub use storage::{MemoryStorageBackend, ShardedStorageBackend, StorageBackend};
use telepathic::{ApplyDiffErrorFor, ApplyDiffSuccess, Telepathic, TelepathicDiff};
pub type ApplyDiffError = ApplyDiffErrorFor<ObjectError>;
struct NodeInner {
objects: HashMap<ObjectID, Rc<RefCell<ObjectState>>>,
local_listeners: HashMap<ObjectID, ListenerSet<Diff>>,
remotes: ListenerSet<UpdateMessage, RemoteState>,
background: Mofo,
storage: Box<dyn StorageBackend>,
}
impl NodeInner {
fn load_object(&mut self, id: ObjectID) -> impl Future<Output = Rc<RefCell<ObjectState>>> {
let mut needs_to_be_loaded = false;
let object = self.objects.entry(id).or_insert_with(|| {
needs_to_be_loaded = true;
match id {
ObjectID::Log(id) => Rc::new(RefCell::new(ObjectState::Log(Box::new(
LogState::new_empty(id),
)))),
ObjectID::Set(id) => Rc::new(RefCell::new(ObjectState::Set(Box::new(
SetState::new_empty(id),
)))),
ObjectID::Blob(id) => Rc::new(RefCell::new(ObjectState::Blob(Box::new(
BlobState::new_empty(id),
)))),
}
});
let object = Rc::clone(object);
let storage = self.storage.clone_ref();
async move {
if needs_to_be_loaded {
ObjectState::load(id, storage)
.for_each(|diff| {
let object = Rc::clone(&object);
async move {
let mut object = object.borrow_mut();
object.try_apply_diff(diff).unwrap();
}
})
.await;
object
} else {
object
}
}
}
fn get_object(&self, id: &ObjectID) -> Option<Rc<RefCell<ObjectState>>> {
self.objects.get(id).cloned()
}
}
#[derive(Clone)]
pub struct Node(Rc<RefCell<NodeInner>>);
impl Node {
pub fn new(background: Mofo) -> Self {
Self(Rc::new(RefCell::new(NodeInner {
objects: HashMap::new(),
local_listeners: HashMap::new(),
remotes: ListenerSet::new(),
background,
storage: Box::new(MemoryStorageBackend::new()),
})))
}
pub async fn create_log<M: Serialize>(&self, meta: Option<M>) -> (ObjectID, WriteAccess) {
let (log, write_access) = LogState::new(meta);
let log_id = log.id;
let as_initial_diff = log.state_as_initial_diff().unwrap();
match (*self.0).borrow_mut().objects.entry(ObjectID::Log(log.id)) {
Entry::Occupied(_) => unreachable!("Log for given ID already exists"),
Entry::Vacant(entry) => {
entry.insert(Rc::new(RefCell::new(ObjectState::Log(Box::new(log)))))
}
};
LogState::store(as_initial_diff, self.0.borrow().storage.clone_ref()).await;
(ObjectID::Log(log_id), WriteAccess::Log(write_access))
}
pub async fn create_set<M: Serialize>(&self, meta: Option<M>) -> (ObjectID, WriteAccess) {
let (set, write_access) = SetState::new(meta);
let set_id = set.id;
let as_initial_diff = set.state_as_initial_diff().unwrap();
match (*self.0).borrow_mut().objects.entry(ObjectID::Set(set.id)) {
Entry::Occupied(_) => unreachable!("Set for given ID already exists"),
Entry::Vacant(entry) => {
entry.insert(Rc::new(RefCell::new(ObjectState::Set(Box::new(set)))))
}
};
SetState::store(as_initial_diff, self.0.borrow().storage.clone_ref()).await;
(ObjectID::Set(set_id), WriteAccess::Set(write_access))
}
pub async fn create_blob<D: Serialize>(&self, data: D) -> ObjectID {
let blob = BlobState::new(data);
let id = blob.id();
let as_initial_diff = blob.state_as_initial_diff().unwrap();
(*self.0).borrow_mut().objects.insert(
ObjectID::Blob(id),
Rc::new(RefCell::new(ObjectState::Blob(Box::new(blob)))),
);
let storage = self.0.borrow().storage.clone_ref();
BlobState::store(as_initial_diff, storage).await;
ObjectID::Blob(id)
}
pub fn diff_for_log_append(
&self,
id: ObjectID,
write_access: &WriteAccess,
append: &[u8],
) -> Diff {
if append.is_empty() {
panic!("Empty append")
}
match write_access {
WriteAccess::Log(log_access) => {
let self_ref = (*self.0).borrow_mut();
let log = self_ref
.get_object(&id)
.expect("Expected log to be created or loaded before appending");
let log = RefMut::map((*log).borrow_mut(), |object| match object {
ObjectState::Log(log) => log,
_ => unreachable!("Expected log to be log"),
});
Diff::Log(log.diff_for_new_append(append, log_access))
}
_ => panic!("Tried to append to log with set write access"),
}
}
pub async fn append_to_log(&self, id: ObjectID, write_access: &WriteAccess, append: &[u8]) {
let diff = self.diff_for_log_append(id, write_access, append);
self.apply_new_diff(diff).await;
}
pub async fn insert_into_set(
&self,
id: ObjectID,
write_access: &WriteAccess,
set_item: SetItem,
) -> Result<SetItemID, ApplyDiffError> {
match write_access {
WriteAccess::Set(set_access) => {
let item_id = set_item.id();
let diff = SetDiff {
id: id.expect_set(),
header: None,
new_items: vec![set_access.0.sign(set_item)],
};
self.apply_new_diff(Diff::Set(diff)).await;
Ok(item_id)
}
_ => panic!("Tried to insert into set with log write access"),
}
}
pub async fn apply_new_diff(&self, diff: Diff) {
let span = info_span!(
"Tlpt::apply_new_diff",
object_id = format!("{:?}", diff.id())
);
let span_id = span.id();
async {
let object_id = diff.id();
let listeners = {
let self_ref = (*self.0).borrow_mut();
let object = self_ref
.get_object(&diff.id())
.expect("Expected object to be created or loaded before applying diff");
let result = (*object)
.borrow_mut()
.try_apply_diff(diff.clone())
.expect("New diffs should always be valid");
ObjectState::store(result.effective_diff, self_ref.storage.clone_ref()).await;
debug_assert!(match (&diff, (*object).borrow().state_as_initial_diff()) {
(Diff::Set(set_diff), Some(Diff::Set(all_items))) => set_diff.new_items.iter().all(|i| all_items.new_items.contains(i)),
_ => true
}, "Expected new set items to be connected and show up in state - make sure they were inserted into the correct set.");
self_ref.local_listeners.get(&diff.id()).cloned()
};
if let Some(listeners) = listeners {
listeners.broadcast(diff).instrument(info_span!(parent: span_id.clone(), "Tlpt::local_listener_broadcast", object_id = format!("{:?}", object_id))).await;
}
self.sync_object_with_remotes(object_id, span_id.clone()).await;
}.instrument(span).await;
}
pub async fn sync_object_with_remotes(&self, id: ObjectID, parent_span: Option<tracing::Id>) {
let (object, remotes) = {
let self_ref = (*self.0).borrow_mut();
let object = self_ref
.get_object(&id)
.expect("Expected object to be created or loaded before syncing");
let remotes = self_ref.remotes.clone();
(object, remotes)
};
remotes
.broadcast_filter_map(|remote| {
let optimistic_state_info_entry = remote.optimistic_state_info.entry(id);
match (optimistic_state_info_entry, remote.mode) {
(entry @ Entry::Occupied(_), _)
| (entry @ Entry::Vacant(_), RemoteMode::SyncEverything) => {
let optimistic_state = entry.or_default();
let update = (*object)
.borrow()
.diff_since(optimistic_state.as_ref())
.map(UpdateMessage::NewContent);
optimistic_state.replace(
(*object)
.borrow()
.state_info()
.expect("Should have state info of synced object"),
);
update
}
_ => None,
}
})
.instrument(info_span!(
parent: parent_span,
"sync_object_with_remotes",
id = format!("{:?}", id)
))
.await
}
pub fn get_object(&self, id: &ObjectID) -> Option<Rc<RefCell<ObjectState>>> {
(*self.0).borrow().get_object(id)
}
pub async fn load_object(&self, id: ObjectID) -> Rc<RefCell<ObjectState>> {
let loaded = (*self.0).borrow_mut().load_object(id);
loaded.await
}
async fn add_diff_listener(&self, id: ObjectID, listener: Listener<Diff>) {
let (listeners, loading_object) = {
let mut self_ref = (*self.0).borrow_mut();
let listeners = self_ref
.local_listeners
.entry(id)
.or_insert_with(ListenerSet::new)
.clone();
let loading_object = self_ref.load_object(id);
(listeners, loading_object)
};
let object = loading_object.await;
let initial_diff = object.borrow().state_as_initial_diff();
let is_empty = object.borrow().state_info().is_none();
listeners.add_with_initial_msg(listener, initial_diff).await;
if is_empty {
debug!(object_id = ?id, "Listened to object is empty, syncing with remotes");
let remotes = (*self.0).borrow().remotes.clone();
remotes
.broadcast_filter_map(|remote| {
if remote.mode == RemoteMode::SyncEverything {
Some(UpdateMessage::StateInfo(StateInfoUpdate {
id: object.borrow().id(),
state_info: None,
needs_full_resync: true,
}))
} else {
None
}
})
.await;
}
}
pub fn diffs(
&self,
id: ObjectID,
listener_prefix: String,
) -> impl Stream<Item = Diff> + 'static {
let (diffs_tx, diffs_rx) = channel(100);
let self_clone = self.clone();
stream::once(async move {
self_clone
.add_diff_listener(
id,
Listener::new(
&format!("{}_{:?}", listener_prefix, rand::random::<u64>()),
diffs_tx,
),
)
.await;
diffs_rx
})
.flatten()
.boxed_local()
}
pub async fn add_remote(&self, remote: Remote) {
(*self.0).borrow_mut().remotes.add(RemoteState {
id: remote.id.clone(),
mode: remote.mode,
optimistic_state_info: HashMap::new(),
outgoing: remote.outgoing.clone(),
});
let self_rc = self.clone();
let remotes = (*self.0).borrow().remotes.clone();
let remote_id = remote.id.clone();
let receiving = remote.incoming.for_each(move |msg| {
let self_rc = self_rc.clone();
let remotes = remotes.clone();
let remote_id = remote_id.clone();
async move {
trace!(remote_id = remote_id, msg = ?msg, "Receiving from remote");
match msg {
UpdateMessage::NewContent(diff) => {
self_rc
.handle_incoming_diff(diff, &remote_id, remotes)
.await;
}
UpdateMessage::StateInfo(StateInfoUpdate {
id,
state_info: new_state_info,
needs_full_resync,
}) => {
self_rc
.handle_incoming_state_info(
id,
new_state_info,
needs_full_resync,
&remote_id,
remotes,
)
.await;
}
}
}
});
if remote.mode == RemoteMode::SyncEverything {
let remotes = (*self.0).borrow().remotes.clone();
let remote_id = remote.id.clone();
#[allow(clippy::needless_collect)]
let all_state_infoes = (*self.0)
.borrow()
.objects
.iter()
.map(|(id, obj)| (*id, (**obj).borrow().state_info()))
.collect::<Vec<_>>();
let span = debug_span!("Sending all state_infos", remote_id = remote_id);
join_all(all_state_infoes.into_iter().map(|(id, state_info)| {
remotes.send_or_remove(
&remote_id,
UpdateMessage::StateInfo(StateInfoUpdate {
id,
state_info,
needs_full_resync: true,
}),
span.id(),
)
}))
.instrument(span)
.await;
}
(*self.0)
.borrow()
.background
.add_background_task(Box::pin(receiving));
}
async fn handle_incoming_diff(
&self,
diff: Diff,
remote_id: &str,
remotes: ListenerSet<UpdateMessage, RemoteState>,
) {
let span = info_span!("Tlpt::handle_incoming_diff", remote_id = remote_id);
let span_id = span.id();
async {
let (apply_result, id, current_state_info) = {
let object_promise = (*self.0).borrow_mut().load_object(diff.id());
let object_rc = object_promise.instrument(info_span!(parent: span_id.clone(), "NodeInner::load_object", id = format!("{:?}", diff.id()))).await;
let mut object = (*object_rc).borrow_mut();
(
info_span!(parent: span_id.clone(), "ObjectState::try_apply_diff", id = format!("{:?}", diff.id())).in_scope(||
object.try_apply_diff(diff.clone())
),
object.id(),
info_span!(parent: span_id.clone(), "ObjectState::state_info", id = format!("{:?}", diff.id())).in_scope(||
object.state_info()
),
)
};
match apply_result {
Ok(ApplyDiffSuccess{new_state_info, effective_diff}) => {
if let Some(remote) = remotes.state_of(remote_id) {
(*remote)
.borrow_mut()
.optimistic_state_info
.insert(id, Some(new_state_info));
}
ObjectState::store(effective_diff.clone(), self.0.borrow().storage.clone_ref()).await;
let listeners = (*self.0).borrow_mut().local_listeners.get(&id).cloned();
if let Some(listeners) = listeners {
listeners.broadcast(effective_diff).instrument(info_span!(parent: span_id.clone(), "Tlpt::local_listener_broadcast", object_id = format!("{:?}", id))).await;
}
self.sync_object_with_remotes(id, span_id.clone()).await;
}
Err(ApplyDiffError::InvalidKnownStateAssumption) => {
warn!(state_info = ?current_state_info, diff = ?diff, "Invalid known state assumption");
let span = info_span!(parent: span_id.clone(), "reply_invalid_state_assumption", id = format!("{:?}", id));
remotes
.send_or_remove(
remote_id,
UpdateMessage::StateInfo(StateInfoUpdate {
id,
state_info: current_state_info,
needs_full_resync: true
}),
span.id(),
)
.instrument(span)
.await;
}
Err(err) => Err(err).unwrap(),
}
}
.instrument(span)
.await;
}
async fn handle_incoming_state_info(
&self,
id: ObjectID,
new_state_info: Option<StateInfo>,
needs_full_resync: bool,
remote_id: &str,
remotes: ListenerSet<UpdateMessage, RemoteState>,
) {
let span = info_span!("Tlpt::handle_incoming_state_info", remote_id = remote_id);
let span_id = span.id();
async {
if !needs_full_resync {
return
}
let update_to_respond_with = if let Some(remote) = remotes.state_of(remote_id) {
(*remote)
.borrow_mut()
.optimistic_state_info
.insert(id, new_state_info.clone());
if let Some(object) = (*self.0).borrow_mut().objects.get(&id).cloned() {
let diff = info_span!(parent: span_id.clone(), "ObjectState::diff_since", id = format!("{:?}", id)).in_scope(||
(*object).borrow().diff_since(new_state_info.as_ref())
);
if let Some(update) = diff {
Some(UpdateMessage::NewContent(update))
} else {
let own_state_info = (*object).borrow().state_info();
if own_state_info != new_state_info {
Some(UpdateMessage::StateInfo(StateInfoUpdate {
id,
state_info: own_state_info,
needs_full_resync: true,
}))
} else {
None
}
}
} else {
Some(UpdateMessage::StateInfo(StateInfoUpdate {
id,
state_info: None,
needs_full_resync: new_state_info.is_some()
}))
}
} else {
None
};
if let Some(update) = update_to_respond_with {
let remotes = remotes.clone();
let span = info_span!(parent: span_id.clone(), "reply_with_update");
remotes.send_or_remove(remote_id, update, span.id()).instrument(span).await;
}
}
.instrument(span)
.await;
}
}
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum UpdateMessage {
NewContent(Diff),
StateInfo(StateInfoUpdate),
}
#[derive(Clone, Serialize, Deserialize)]
pub struct StateInfoUpdate {
id: ObjectID,
state_info: Option<StateInfo>,
needs_full_resync: bool,
}
impl_debug_as_litl!(UpdateMessage);
pub struct Remote {
pub id: String,
pub mode: RemoteMode,
pub incoming: Receiver<UpdateMessage>,
pub outgoing: Sender<UpdateMessage>,
}
impl Remote {
pub fn new_connected_test_pair(node1_name: &str, node2_name: &str) -> (Remote, Remote) {
let (to_node1, node1_rx) = channel(100);
let (to_node2, node2_rx) = channel(100);
(
Remote {
id: node1_name.to_string(),
mode: RemoteMode::ProvideWhatRemoteWants,
incoming: node2_rx,
outgoing: to_node1,
},
Remote {
id: node2_name.to_string(),
mode: RemoteMode::SyncEverything,
incoming: node1_rx,
outgoing: to_node2,
},
)
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum RemoteMode {
SyncEverything,
ProvideWhatRemoteWants,
}
struct RemoteState {
id: String,
mode: RemoteMode,
optimistic_state_info: HashMap<ObjectID, Option<StateInfo>>,
outgoing: Sender<UpdateMessage>,
}
impl ListenerWithState<UpdateMessage> for RemoteState {
fn id(&self) -> &str {
&self.id
}
fn sender(&self) -> &Sender<UpdateMessage> {
&self.outgoing
}
}