use futures::{SinkExt, Future};
use log::{info, error};
use logs::{LogAppendMessage, LogID, LogKnownState, Logs};
use rand::RngCore;
use serde_derive::{Deserialize, Serialize};
use sets::{AcceptInsertResult, SetID, SetInsertMessage, SetKnownState, Sets};
use thiserror::Error;
use std::{
cell::RefCell,
collections::HashMap,
rc::Rc
};
const KEEP_UNKNOWN: bool = true;
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum UpdateSource {
CurrentState,
Loaded,
ReceivedFrom(NodeID),
CreatedLocally,
}
#[derive(Default)]
pub struct TelepathyState {
pub logs: Logs,
pub sets: Sets,
}
mod causal_set;
pub mod logs;
pub mod sets;
#[derive(Clone, Default)]
struct KnownState {
logs: HashMap<LogID, LogKnownState>,
sets: HashMap<SetID, SetKnownState>,
}
pub struct TelepathyNode {
pub local_state: TelepathyState,
remotes: HashMap<NodeID, RemoteNode>,
new_remote_mpsc: (futures::channel::mpsc::Sender<RemoteNode>, futures::channel::mpsc::Receiver<RemoteNode>),
}
impl TelepathyNode {
pub fn create() -> Self {
TelepathyNode {
local_state: TelepathyState::default(),
remotes: Default::default(),
new_remote_mpsc: futures::channel::mpsc::channel(10),
}
}
pub fn new_rc() -> Rc<RefCell<Self>> {
Rc::new(RefCell::new(Self::create()))
}
fn add_remote(
&mut self,
remote: RemoteNode
) {
info!("Remote added {:?}", remote.id);
self.remotes.insert(
remote.id,
remote,
);
}
pub fn remotes(&self) -> RemoteManager {
RemoteManager{adder: self.new_remote_mpsc.0.clone()}
}
pub fn sync_with_remotes(&mut self) {
while let Ok(Some(new_remote)) = self.new_remote_mpsc.1.try_next() {
self.add_remote(new_remote);
}
self.remotes.retain(|_, remote| {
loop {
match remote.receiver.try_receive() {
Err(UpdateReceiverError::TryAgainLater) => {
break true;
},
Err(UpdateReceiverError::Closed) => {
info!("Remote {:?} disconnected", remote.id);
break false
},
Ok(update) => match update {
UpdateMessage::LogUpdateKnownState { id, known_state } => {
remote
.optimistic_remote_known_state
.logs
.insert(id, known_state);
}
UpdateMessage::LogAppend { id, new_append } => {
match self
.local_state
.logs
.accept_append(&new_append, UpdateSource::ReceivedFrom(remote.id))
{
Ok(()) => {}
Err(logs::LogError::InvalidHash) => {
remote.logs_that_need_full_resync.insert(id, true);
}
Err(accept_err) => {
error!(
"Error accepting append: {} {:?}",
accept_err, new_append
);
}
}
remote
.optimistic_remote_known_state
.logs
.entry(id)
.or_default()
.update_optimistically(new_append);
}
UpdateMessage::SetUpdateKnownState { id, known_state } => {
remote
.optimistic_remote_known_state
.sets
.insert(id, known_state);
}
UpdateMessage::SetInsert { id, new_insert } => {
match self
.local_state
.sets
.accept_insert(&new_insert, UpdateSource::ReceivedFrom(remote.id))
{
Ok(AcceptInsertResult::AllConnected) => {}
Ok(AcceptInsertResult::HasDisconnected) => {
remote.sets_that_need_full_resync.insert(id, true);
}
Err(accept_err) => {
error!(
"Error accepting insert: {} {:?}",
accept_err, new_insert
);
}
}
remote
.optimistic_remote_known_state
.sets
.entry(id)
.or_default()
.update_optimistically(
new_insert,
self.local_state.sets.current_items(&id),
);
}
}
}
}
});
for remote in self.remotes.values_mut() {
let log_ids_to_send = match remote.sync_mode {
SyncMode::SyncAll => self
.local_state
.logs
.all_log_ids()
.cloned()
.collect::<Vec<_>>(),
SyncMode::SyncWhatRemoteWants => remote
.optimistic_remote_known_state
.logs
.keys()
.cloned()
.collect(),
};
for log_id in log_ids_to_send {
if let Some(append_since) = self.local_state.logs.get_append_since(
&log_id,
remote.optimistic_remote_known_state.logs.get(&log_id),
) {
remote
.optimistic_remote_known_state
.logs
.entry(log_id)
.or_default()
.update_optimistically(append_since.clone());
remote.sender.send(UpdateMessage::LogAppend {
id: log_id,
new_append: append_since,
});
if remote.logs_that_need_full_resync.get(&log_id).is_none() {
remote.logs_that_need_full_resync.insert(log_id, false);
}
};
if remote.logs_that_need_full_resync.get(&log_id).cloned().unwrap_or(true)
{
remote.sender.send(UpdateMessage::LogUpdateKnownState {
id: log_id,
known_state: self
.local_state
.logs
.known_state(&log_id)
.unwrap_or_default(),
});
remote.logs_that_need_full_resync.insert(log_id, false);
}
}
let set_ids_to_send = match remote.sync_mode {
SyncMode::SyncAll => self
.local_state
.sets
.all_set_ids()
.cloned()
.collect::<Vec<_>>(),
SyncMode::SyncWhatRemoteWants => remote
.optimistic_remote_known_state
.sets
.keys()
.cloned()
.collect(),
};
for set_id in set_ids_to_send {
if let Some(inserts_since) = self.local_state.sets.get_inserts_since(
&set_id,
remote
.optimistic_remote_known_state
.sets
.entry(set_id)
.or_default(),
) {
remote
.optimistic_remote_known_state
.sets
.entry(set_id)
.or_default()
.update_optimistically(
inserts_since.clone(),
self.local_state.sets.current_items(&set_id),
);
remote.sender.send(UpdateMessage::SetInsert {
id: set_id,
new_insert: inserts_since,
});
if remote.sets_that_need_full_resync.get(&set_id).is_none() {
remote.sets_that_need_full_resync.insert(set_id, false);
}
};
if remote.sets_that_need_full_resync.get(&set_id).cloned().unwrap_or(true)
{
remote.sender.send(UpdateMessage::SetUpdateKnownState {
id: set_id,
known_state: self
.local_state
.sets
.known_state(&set_id)
.unwrap_or_default(),
});
remote.sets_that_need_full_resync.insert(set_id, false);
}
}
}
}
}
pub struct RemoteNode {
id: NodeID,
optimistic_remote_known_state: KnownState,
receiver: Box<dyn UpdateReceiver>,
sender: Box<dyn UpdateSender>,
sync_mode: SyncMode,
logs_that_need_full_resync: HashMap<LogID, bool>,
sets_that_need_full_resync: HashMap<SetID, bool>,
}
impl RemoteNode {
pub fn new<R: UpdateReceiver + 'static, S: UpdateSender + 'static>(
receiver: R,
sender: S,
sync_mode: SyncMode
) -> Self {
RemoteNode {
id: NodeID::new_random(),
receiver: Box::new(receiver),
sender: Box::new(sender),
optimistic_remote_known_state: KnownState::default(),
sync_mode,
logs_that_need_full_resync: HashMap::new(),
sets_that_need_full_resync: HashMap::new(),
}
}
pub fn new_connected_test_pair() -> (Self, Self) {
let (sender1, receiver1) = futures::channel::mpsc::channel(100);
let (sender2, receiver2) = futures::channel::mpsc::channel(100);
(RemoteNode::new(
receiver1,
sender2,
SyncMode::SyncAll,
), RemoteNode::new(
receiver2,
sender1,
SyncMode::SyncAll,
))
}
}
#[derive(Clone)]
pub struct RemoteManager {
adder: futures::channel::mpsc::Sender<RemoteNode>
}
impl RemoteManager {
pub fn add(&mut self, remote: RemoteNode) -> impl Future<Output=Result<(), futures::channel::mpsc::SendError>> + '_{
self.adder.send(remote)
}
pub fn try_add_sync(&mut self, remote: RemoteNode) -> Result<(), futures::channel::mpsc::TrySendError<RemoteNode>> {
self.adder.try_send(remote)
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum SyncMode {
SyncAll,
SyncWhatRemoteWants,
}
pub trait UpdateReceiver: Send {
fn try_receive(&mut self) -> Result<UpdateMessage, UpdateReceiverError>;
}
#[derive(Error, Debug)]
pub enum UpdateReceiverError {
#[error("Try getting updates again later (transient error)")]
TryAgainLater,
#[error("Update receiver closed")]
Closed
}
impl UpdateReceiver for futures::channel::mpsc::Receiver<UpdateMessage> {
fn try_receive(&mut self) -> Result<UpdateMessage, UpdateReceiverError> {
let maybe_update = futures::channel::mpsc::Receiver::<UpdateMessage>::try_next(self);
match maybe_update {
Ok(Some(update)) => {
info!("Received: {}", update.short_summary());
Ok(update)
},
Ok(None) => Err(UpdateReceiverError::Closed),
Err(_) => Err(UpdateReceiverError::TryAgainLater)
}
}
}
pub trait UpdateSender: Send {
fn send(&mut self, update: UpdateMessage);
}
impl UpdateSender for futures::channel::mpsc::Sender<UpdateMessage> {
fn send(&mut self, update: UpdateMessage) {
info!("Sending: {}", update.short_summary());
futures::channel::mpsc::Sender::try_send(self, update).unwrap();
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum UpdateMessage {
LogAppend {
id: LogID,
new_append: LogAppendMessage,
},
LogUpdateKnownState {
id: LogID,
known_state: LogKnownState,
},
SetInsert {
id: SetID,
new_insert: SetInsertMessage,
},
SetUpdateKnownState {
id: SetID,
known_state: SetKnownState,
},
}
impl UpdateMessage {
pub fn short_summary(&self) -> String {
match self {
UpdateMessage::LogAppend { id, new_append } => format!("{:?}: + {} bytes", id, new_append.append.len()),
UpdateMessage::LogUpdateKnownState { id, known_state } => format!("{:?}: new known state {} bytes)", id, known_state.log_len),
UpdateMessage::SetInsert { id, new_insert } => format!("{:?}: + {} items", id, new_insert.new_items.len()),
UpdateMessage::SetUpdateKnownState { id, known_state } => format!("{:?}: new known state {:?}", id, known_state.frontier),
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub struct NodeID([u8; 12]);
impl NodeID {
pub fn new_random() -> Self {
let mut rng = rand::thread_rng();
let mut id = [0u8; 12];
rng.fill_bytes(&mut id);
NodeID(id)
}
}