1use futures::{SinkExt, Future};
2use log::{info, error};
3use logs::{LogAppendMessage, LogID, LogKnownState, Logs};
4use rand::RngCore;
5use serde_derive::{Deserialize, Serialize};
6use sets::{AcceptInsertResult, SetID, SetInsertMessage, SetKnownState, Sets};
7use thiserror::Error;
8use std::{
9 cell::RefCell,
10 collections::HashMap,
11 rc::Rc
12};
13
14const KEEP_UNKNOWN: bool = true;
22
23#[derive(Copy, Clone, PartialEq, Eq, Debug)]
24pub enum UpdateSource {
25 CurrentState,
26 Loaded,
27 ReceivedFrom(NodeID),
28 CreatedLocally,
29}
30
31#[derive(Default)]
32pub struct TelepathyState {
33 pub logs: Logs,
34 pub sets: Sets,
35}
36
37mod causal_set;
38pub mod logs;
39pub mod sets;
40
41#[derive(Clone, Default)]
42struct KnownState {
43 logs: HashMap<LogID, LogKnownState>,
44 sets: HashMap<SetID, SetKnownState>,
45}
46
47
48pub struct TelepathyNode {
49 pub local_state: TelepathyState,
50 remotes: HashMap<NodeID, RemoteNode>,
51 new_remote_mpsc: (futures::channel::mpsc::Sender<RemoteNode>, futures::channel::mpsc::Receiver<RemoteNode>),
52}
53
54impl TelepathyNode {
55 pub fn create() -> Self {
56 TelepathyNode {
57 local_state: TelepathyState::default(),
58 remotes: Default::default(),
59 new_remote_mpsc: futures::channel::mpsc::channel(10),
60 }
61 }
62
63 pub fn new_rc() -> Rc<RefCell<Self>> {
64 Rc::new(RefCell::new(Self::create()))
65 }
66
67 fn add_remote(
68 &mut self,
69 remote: RemoteNode
70 ) {
71 info!("Remote added {:?}", remote.id);
72 self.remotes.insert(
73 remote.id,
74 remote,
75 );
76 }
77
78 pub fn remotes(&self) -> RemoteManager {
79 RemoteManager{adder: self.new_remote_mpsc.0.clone()}
80 }
81
82 pub fn sync_with_remotes(&mut self) {
83 while let Ok(Some(new_remote)) = self.new_remote_mpsc.1.try_next() {
84 self.add_remote(new_remote);
85 }
86
87 self.remotes.retain(|_, remote| {
89 loop {
90 match remote.receiver.try_receive() {
91 Err(UpdateReceiverError::TryAgainLater) => {
92 break true;
93 },
94 Err(UpdateReceiverError::Closed) => {
95 info!("Remote {:?} disconnected", remote.id);
96 break false
97 },
98 Ok(update) => match update {
99 UpdateMessage::LogUpdateKnownState { id, known_state } => {
100 remote
101 .optimistic_remote_known_state
102 .logs
103 .insert(id, known_state);
104 }
105 UpdateMessage::LogAppend { id, new_append } => {
106 match self
107 .local_state
108 .logs
109 .accept_append(&new_append, UpdateSource::ReceivedFrom(remote.id))
110 {
111 Ok(()) => {}
112 Err(logs::LogError::InvalidHash) => {
113 remote.logs_that_need_full_resync.insert(id, true);
114 }
115 Err(accept_err) => {
116 error!(
117 "Error accepting append: {} {:?}",
118 accept_err, new_append
119 );
120 }
121 }
122 remote
123 .optimistic_remote_known_state
124 .logs
125 .entry(id)
126 .or_default()
127 .update_optimistically(new_append);
128 }
129 UpdateMessage::SetUpdateKnownState { id, known_state } => {
130 remote
131 .optimistic_remote_known_state
132 .sets
133 .insert(id, known_state);
134 }
135
136 UpdateMessage::SetInsert { id, new_insert } => {
137 match self
138 .local_state
139 .sets
140 .accept_insert(&new_insert, UpdateSource::ReceivedFrom(remote.id))
141 {
142 Ok(AcceptInsertResult::AllConnected) => {}
143 Ok(AcceptInsertResult::HasDisconnected) => {
144 remote.sets_that_need_full_resync.insert(id, true);
145 }
146 Err(accept_err) => {
147 error!(
148 "Error accepting insert: {} {:?}",
149 accept_err, new_insert
150 );
151 }
152 }
153 remote
154 .optimistic_remote_known_state
155 .sets
156 .entry(id)
157 .or_default()
158 .update_optimistically(
159 new_insert,
160 self.local_state.sets.current_items(&id),
161 );
162 }
163 }
164 }
165 }
166 });
167
168 for remote in self.remotes.values_mut() {
170 let log_ids_to_send = match remote.sync_mode {
171 SyncMode::SyncAll => self
172 .local_state
173 .logs
174 .all_log_ids()
175 .cloned()
176 .collect::<Vec<_>>(),
177 SyncMode::SyncWhatRemoteWants => remote
178 .optimistic_remote_known_state
179 .logs
180 .keys()
181 .cloned()
182 .collect(),
183 };
184
185 for log_id in log_ids_to_send {
186 if let Some(append_since) = self.local_state.logs.get_append_since(
187 &log_id,
188 remote.optimistic_remote_known_state.logs.get(&log_id),
189 ) {
190 remote
191 .optimistic_remote_known_state
192 .logs
193 .entry(log_id)
194 .or_default()
195 .update_optimistically(append_since.clone());
196
197 remote.sender.send(UpdateMessage::LogAppend {
198 id: log_id,
199 new_append: append_since,
200 });
201
202 if remote.logs_that_need_full_resync.get(&log_id).is_none() {
203 remote.logs_that_need_full_resync.insert(log_id, false);
204 }
205 };
206
207 if remote.logs_that_need_full_resync.get(&log_id).cloned().unwrap_or(true)
208 {
209 remote.sender.send(UpdateMessage::LogUpdateKnownState {
210 id: log_id,
211 known_state: self
212 .local_state
213 .logs
214 .known_state(&log_id)
215 .unwrap_or_default(),
216 });
217 remote.logs_that_need_full_resync.insert(log_id, false);
218 }
219 }
220
221 let set_ids_to_send = match remote.sync_mode {
222 SyncMode::SyncAll => self
223 .local_state
224 .sets
225 .all_set_ids()
226 .cloned()
227 .collect::<Vec<_>>(),
228 SyncMode::SyncWhatRemoteWants => remote
229 .optimistic_remote_known_state
230 .sets
231 .keys()
232 .cloned()
233 .collect(),
234 };
235
236 for set_id in set_ids_to_send {
237 if let Some(inserts_since) = self.local_state.sets.get_inserts_since(
238 &set_id,
239 remote
240 .optimistic_remote_known_state
241 .sets
242 .entry(set_id)
243 .or_default(),
244 ) {
245 remote
246 .optimistic_remote_known_state
247 .sets
248 .entry(set_id)
249 .or_default()
250 .update_optimistically(
251 inserts_since.clone(),
252 self.local_state.sets.current_items(&set_id),
253 );
254
255 remote.sender.send(UpdateMessage::SetInsert {
256 id: set_id,
257 new_insert: inserts_since,
258 });
259
260 if remote.sets_that_need_full_resync.get(&set_id).is_none() {
261 remote.sets_that_need_full_resync.insert(set_id, false);
262 }
263 };
264
265 if remote.sets_that_need_full_resync.get(&set_id).cloned().unwrap_or(true)
266 {
267 remote.sender.send(UpdateMessage::SetUpdateKnownState {
268 id: set_id,
269 known_state: self
270 .local_state
271 .sets
272 .known_state(&set_id)
273 .unwrap_or_default(),
274 });
275 remote.sets_that_need_full_resync.insert(set_id, false);
276 }
277 }
278 }
279 }
280}
281
282pub struct RemoteNode {
283 id: NodeID,
284 optimistic_remote_known_state: KnownState,
285 receiver: Box<dyn UpdateReceiver>,
286 sender: Box<dyn UpdateSender>,
287 sync_mode: SyncMode,
288 logs_that_need_full_resync: HashMap<LogID, bool>,
289 sets_that_need_full_resync: HashMap<SetID, bool>,
290}
291
292impl RemoteNode {
293 pub fn new<R: UpdateReceiver + 'static, S: UpdateSender + 'static>(
294 receiver: R,
295 sender: S,
296 sync_mode: SyncMode
297 ) -> Self {
298 RemoteNode {
299 id: NodeID::new_random(),
300 receiver: Box::new(receiver),
301 sender: Box::new(sender),
302 optimistic_remote_known_state: KnownState::default(),
303 sync_mode,
304 logs_that_need_full_resync: HashMap::new(),
305 sets_that_need_full_resync: HashMap::new(),
306 }
307 }
308
309 pub fn new_connected_test_pair() -> (Self, Self) {
310 let (sender1, receiver1) = futures::channel::mpsc::channel(100);
311 let (sender2, receiver2) = futures::channel::mpsc::channel(100);
312
313 (RemoteNode::new(
314 receiver1,
315 sender2,
316 SyncMode::SyncAll,
317 ), RemoteNode::new(
318 receiver2,
319 sender1,
320 SyncMode::SyncAll,
321 ))
322 }
323}
324
325#[derive(Clone)]
326pub struct RemoteManager {
327 adder: futures::channel::mpsc::Sender<RemoteNode>
328}
329
330impl RemoteManager {
331 pub fn add(&mut self, remote: RemoteNode) -> impl Future<Output=Result<(), futures::channel::mpsc::SendError>> + '_{
332 self.adder.send(remote)
333 }
334
335 pub fn try_add_sync(&mut self, remote: RemoteNode) -> Result<(), futures::channel::mpsc::TrySendError<RemoteNode>> {
336 self.adder.try_send(remote)
337 }
338}
339
340#[derive(Copy, Clone, PartialEq, Eq)]
341pub enum SyncMode {
342 SyncAll,
343 SyncWhatRemoteWants,
344}
345
346pub trait UpdateReceiver: Send {
347 fn try_receive(&mut self) -> Result<UpdateMessage, UpdateReceiverError>;
348}
349
350#[derive(Error, Debug)]
351pub enum UpdateReceiverError {
352 #[error("Try getting updates again later (transient error)")]
353 TryAgainLater,
354 #[error("Update receiver closed")]
355 Closed
356}
357
358impl UpdateReceiver for futures::channel::mpsc::Receiver<UpdateMessage> {
359 fn try_receive(&mut self) -> Result<UpdateMessage, UpdateReceiverError> {
360 let maybe_update = futures::channel::mpsc::Receiver::<UpdateMessage>::try_next(self);
361 match maybe_update {
362 Ok(Some(update)) => {
363 info!("Received: {}", update.short_summary());
364 Ok(update)
365 },
366 Ok(None) => Err(UpdateReceiverError::Closed),
367 Err(_) => Err(UpdateReceiverError::TryAgainLater)
368 }
369 }
370}
371
372pub trait UpdateSender: Send {
373 fn send(&mut self, update: UpdateMessage);
374}
375
376impl UpdateSender for futures::channel::mpsc::Sender<UpdateMessage> {
377 fn send(&mut self, update: UpdateMessage) {
378 info!("Sending: {}", update.short_summary());
379 futures::channel::mpsc::Sender::try_send(self, update).unwrap();
380 }
381}
382
383#[derive(Debug, Serialize, Deserialize)]
384pub enum UpdateMessage {
385 LogAppend {
386 id: LogID,
387 new_append: LogAppendMessage,
388 },
389 LogUpdateKnownState {
390 id: LogID,
391 known_state: LogKnownState,
392 },
393 SetInsert {
394 id: SetID,
395 new_insert: SetInsertMessage,
396 },
397 SetUpdateKnownState {
398 id: SetID,
399 known_state: SetKnownState,
400 },
401}
402
403impl UpdateMessage {
404 pub fn short_summary(&self) -> String {
405 match self {
406 UpdateMessage::LogAppend { id, new_append } => format!("{:?}: + {} bytes", id, new_append.append.len()),
407 UpdateMessage::LogUpdateKnownState { id, known_state } => format!("{:?}: new known state {} bytes)", id, known_state.log_len),
408 UpdateMessage::SetInsert { id, new_insert } => format!("{:?}: + {} items", id, new_insert.new_items.len()),
409 UpdateMessage::SetUpdateKnownState { id, known_state } => format!("{:?}: new known state {:?}", id, known_state.frontier),
410 }
411 }
412}
413
414#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
415pub struct NodeID([u8; 12]);
416
417impl NodeID {
418 pub fn new_random() -> Self {
419 let mut rng = rand::thread_rng();
420 let mut id = [0u8; 12];
421 rng.fill_bytes(&mut id);
422 NodeID(id)
423 }
424}