jazz_telepathy/
lib.rs

1use futures::{SinkExt, Future};
2use log::{info, error};
3use logs::{LogAppendMessage, LogID, LogKnownState, Logs};
4use rand::RngCore;
5use serde_derive::{Deserialize, Serialize};
6use sets::{AcceptInsertResult, SetID, SetInsertMessage, SetKnownState, Sets};
7use thiserror::Error;
8use std::{
9    cell::RefCell,
10    collections::HashMap,
11    rc::Rc
12};
13
14// TODO-V1(test): Test local log usage
15// TODO-V1(test): Test local set usage
16// TODO-V1(test): Test persistence
17// TODO-V1(test): Test log syncing
18// TODO-V1(test): Test set syncing
19// TODO-V1(test): Test auth
20
21const KEEP_UNKNOWN: bool = true;
22
23#[derive(Copy, Clone, PartialEq, Eq, Debug)]
24pub enum UpdateSource {
25    CurrentState,
26    Loaded,
27    ReceivedFrom(NodeID),
28    CreatedLocally,
29}
30
31#[derive(Default)]
32pub struct TelepathyState {
33    pub logs: Logs,
34    pub sets: Sets,
35}
36
37mod causal_set;
38pub mod logs;
39pub mod sets;
40
41#[derive(Clone, Default)]
42struct KnownState {
43    logs: HashMap<LogID, LogKnownState>,
44    sets: HashMap<SetID, SetKnownState>,
45}
46
47
48pub struct TelepathyNode {
49    pub local_state: TelepathyState,
50    remotes: HashMap<NodeID, RemoteNode>,
51    new_remote_mpsc: (futures::channel::mpsc::Sender<RemoteNode>, futures::channel::mpsc::Receiver<RemoteNode>),
52}
53
54impl TelepathyNode {
55    pub fn create() -> Self {
56        TelepathyNode {
57            local_state: TelepathyState::default(),
58            remotes: Default::default(),
59            new_remote_mpsc: futures::channel::mpsc::channel(10),
60        }
61    }
62
63    pub fn new_rc() -> Rc<RefCell<Self>> {
64        Rc::new(RefCell::new(Self::create()))
65    }
66
67    fn add_remote(
68        &mut self,
69        remote: RemoteNode
70    ) {
71        info!("Remote added {:?}", remote.id);
72        self.remotes.insert(
73            remote.id,
74            remote,
75        );
76    }
77
78    pub fn remotes(&self) -> RemoteManager {
79        RemoteManager{adder: self.new_remote_mpsc.0.clone()}
80    }
81
82    pub fn sync_with_remotes(&mut self) {
83        while let Ok(Some(new_remote)) = self.new_remote_mpsc.1.try_next() {
84            self.add_remote(new_remote);
85        }
86
87        // Receive from all
88        self.remotes.retain(|_, remote| {
89            loop {
90                match remote.receiver.try_receive() {
91                    Err(UpdateReceiverError::TryAgainLater) => {
92                        break true;
93                    },
94                    Err(UpdateReceiverError::Closed) => {
95                        info!("Remote {:?} disconnected", remote.id);
96                        break false
97                    },
98                    Ok(update) => match update {
99                        UpdateMessage::LogUpdateKnownState { id, known_state } => {
100                            remote
101                                .optimistic_remote_known_state
102                                .logs
103                                .insert(id, known_state);
104                        }
105                        UpdateMessage::LogAppend { id, new_append } => {
106                            match self
107                                .local_state
108                                .logs
109                                .accept_append(&new_append, UpdateSource::ReceivedFrom(remote.id))
110                            {
111                                Ok(()) => {}
112                                Err(logs::LogError::InvalidHash) => {
113                                    remote.logs_that_need_full_resync.insert(id, true);
114                                }
115                                Err(accept_err) => {
116                                    error!(
117                                        "Error accepting append: {} {:?}",
118                                        accept_err, new_append
119                                    );
120                                }
121                            }
122                            remote
123                                .optimistic_remote_known_state
124                                .logs
125                                .entry(id)
126                                .or_default()
127                                .update_optimistically(new_append);
128                        }
129                        UpdateMessage::SetUpdateKnownState { id, known_state } => {
130                            remote
131                                .optimistic_remote_known_state
132                                .sets
133                                .insert(id, known_state);
134                        }
135
136                        UpdateMessage::SetInsert { id, new_insert } => {
137                            match self
138                                .local_state
139                                .sets
140                                .accept_insert(&new_insert, UpdateSource::ReceivedFrom(remote.id))
141                            {
142                                Ok(AcceptInsertResult::AllConnected) => {}
143                                Ok(AcceptInsertResult::HasDisconnected) => {
144                                    remote.sets_that_need_full_resync.insert(id, true);
145                                }
146                                Err(accept_err) => {
147                                    error!(
148                                        "Error accepting insert: {} {:?}",
149                                        accept_err, new_insert
150                                    );
151                                }
152                            }
153                            remote
154                                .optimistic_remote_known_state
155                                .sets
156                                .entry(id)
157                                .or_default()
158                                .update_optimistically(
159                                    new_insert,
160                                    self.local_state.sets.current_items(&id),
161                                );
162                        }
163                    }
164                }
165            }
166        });
167
168        // Send to all
169        for remote in self.remotes.values_mut() {
170            let log_ids_to_send = match remote.sync_mode {
171                SyncMode::SyncAll => self
172                    .local_state
173                    .logs
174                    .all_log_ids()
175                    .cloned()
176                    .collect::<Vec<_>>(),
177                SyncMode::SyncWhatRemoteWants => remote
178                    .optimistic_remote_known_state
179                    .logs
180                    .keys()
181                    .cloned()
182                    .collect(),
183            };
184
185            for log_id in log_ids_to_send {
186                if let Some(append_since) = self.local_state.logs.get_append_since(
187                    &log_id,
188                    remote.optimistic_remote_known_state.logs.get(&log_id),
189                ) {
190                    remote
191                        .optimistic_remote_known_state
192                        .logs
193                        .entry(log_id)
194                        .or_default()
195                        .update_optimistically(append_since.clone());
196
197                    remote.sender.send(UpdateMessage::LogAppend {
198                        id: log_id,
199                        new_append: append_since,
200                    });
201
202                    if remote.logs_that_need_full_resync.get(&log_id).is_none() {
203                        remote.logs_that_need_full_resync.insert(log_id, false);
204                    }
205                };
206
207                if remote.logs_that_need_full_resync.get(&log_id).cloned().unwrap_or(true)
208                {
209                    remote.sender.send(UpdateMessage::LogUpdateKnownState {
210                        id: log_id,
211                        known_state: self
212                            .local_state
213                            .logs
214                            .known_state(&log_id)
215                            .unwrap_or_default(),
216                    });
217                    remote.logs_that_need_full_resync.insert(log_id, false);
218                }
219            }
220
221            let set_ids_to_send = match remote.sync_mode {
222                SyncMode::SyncAll => self
223                    .local_state
224                    .sets
225                    .all_set_ids()
226                    .cloned()
227                    .collect::<Vec<_>>(),
228                SyncMode::SyncWhatRemoteWants => remote
229                    .optimistic_remote_known_state
230                    .sets
231                    .keys()
232                    .cloned()
233                    .collect(),
234            };
235
236            for set_id in set_ids_to_send {
237                if let Some(inserts_since) = self.local_state.sets.get_inserts_since(
238                    &set_id,
239                    remote
240                        .optimistic_remote_known_state
241                        .sets
242                        .entry(set_id)
243                        .or_default(),
244                ) {
245                    remote
246                        .optimistic_remote_known_state
247                        .sets
248                        .entry(set_id)
249                        .or_default()
250                        .update_optimistically(
251                            inserts_since.clone(),
252                            self.local_state.sets.current_items(&set_id),
253                        );
254
255                    remote.sender.send(UpdateMessage::SetInsert {
256                        id: set_id,
257                        new_insert: inserts_since,
258                    });
259
260                    if remote.sets_that_need_full_resync.get(&set_id).is_none() {
261                        remote.sets_that_need_full_resync.insert(set_id, false);
262                    }
263                };
264
265                if remote.sets_that_need_full_resync.get(&set_id).cloned().unwrap_or(true)
266                {
267                    remote.sender.send(UpdateMessage::SetUpdateKnownState {
268                        id: set_id,
269                        known_state: self
270                            .local_state
271                            .sets
272                            .known_state(&set_id)
273                            .unwrap_or_default(),
274                    });
275                    remote.sets_that_need_full_resync.insert(set_id, false);
276                }
277            }
278        }
279    }
280}
281
282pub struct RemoteNode {
283    id: NodeID,
284    optimistic_remote_known_state: KnownState,
285    receiver: Box<dyn UpdateReceiver>,
286    sender: Box<dyn UpdateSender>,
287    sync_mode: SyncMode,
288    logs_that_need_full_resync: HashMap<LogID, bool>,
289    sets_that_need_full_resync: HashMap<SetID, bool>,
290}
291
292impl RemoteNode {
293    pub fn new<R: UpdateReceiver + 'static, S: UpdateSender + 'static>(
294        receiver: R,
295        sender: S,
296        sync_mode: SyncMode
297    ) -> Self {
298        RemoteNode {
299            id: NodeID::new_random(),
300            receiver: Box::new(receiver),
301            sender: Box::new(sender),
302            optimistic_remote_known_state: KnownState::default(),
303            sync_mode,
304            logs_that_need_full_resync: HashMap::new(),
305            sets_that_need_full_resync: HashMap::new(),
306        }
307    }
308
309    pub fn new_connected_test_pair() -> (Self, Self) {
310        let (sender1, receiver1) = futures::channel::mpsc::channel(100);
311        let (sender2, receiver2) = futures::channel::mpsc::channel(100);
312
313        (RemoteNode::new(
314            receiver1,
315            sender2,
316            SyncMode::SyncAll,
317        ), RemoteNode::new(
318            receiver2,
319            sender1,
320            SyncMode::SyncAll,
321        ))
322    }
323}
324
325#[derive(Clone)]
326pub struct RemoteManager {
327    adder: futures::channel::mpsc::Sender<RemoteNode>
328}
329
330impl RemoteManager {
331    pub fn add(&mut self, remote: RemoteNode) -> impl Future<Output=Result<(), futures::channel::mpsc::SendError>> + '_{
332        self.adder.send(remote)
333    }
334
335    pub fn try_add_sync(&mut self, remote: RemoteNode) -> Result<(), futures::channel::mpsc::TrySendError<RemoteNode>> {
336        self.adder.try_send(remote)
337    }
338}
339
340#[derive(Copy, Clone, PartialEq, Eq)]
341pub enum SyncMode {
342    SyncAll,
343    SyncWhatRemoteWants,
344}
345
346pub trait UpdateReceiver: Send {
347    fn try_receive(&mut self) -> Result<UpdateMessage, UpdateReceiverError>;
348}
349
350#[derive(Error, Debug)]
351pub enum UpdateReceiverError {
352    #[error("Try getting updates again later (transient error)")]
353    TryAgainLater,
354    #[error("Update receiver closed")]
355    Closed
356}
357
358impl UpdateReceiver for futures::channel::mpsc::Receiver<UpdateMessage> {
359    fn try_receive(&mut self) ->  Result<UpdateMessage, UpdateReceiverError> {
360        let maybe_update = futures::channel::mpsc::Receiver::<UpdateMessage>::try_next(self);
361        match maybe_update {
362            Ok(Some(update)) => {
363                info!("Received: {}", update.short_summary());
364                Ok(update)
365            },
366            Ok(None) => Err(UpdateReceiverError::Closed),
367            Err(_) => Err(UpdateReceiverError::TryAgainLater)
368        }
369    }
370}
371
372pub trait UpdateSender: Send {
373    fn send(&mut self, update: UpdateMessage);
374}
375
376impl UpdateSender for futures::channel::mpsc::Sender<UpdateMessage> {
377    fn send(&mut self, update: UpdateMessage) {
378        info!("Sending: {}", update.short_summary());
379        futures::channel::mpsc::Sender::try_send(self, update).unwrap();
380    }
381}
382
383#[derive(Debug, Serialize, Deserialize)]
384pub enum UpdateMessage {
385    LogAppend {
386        id: LogID,
387        new_append: LogAppendMessage,
388    },
389    LogUpdateKnownState {
390        id: LogID,
391        known_state: LogKnownState,
392    },
393    SetInsert {
394        id: SetID,
395        new_insert: SetInsertMessage,
396    },
397    SetUpdateKnownState {
398        id: SetID,
399        known_state: SetKnownState,
400    },
401}
402
403impl UpdateMessage {
404    pub fn short_summary(&self) -> String {
405        match self {
406            UpdateMessage::LogAppend { id, new_append } => format!("{:?}: + {} bytes", id, new_append.append.len()),
407            UpdateMessage::LogUpdateKnownState { id, known_state } => format!("{:?}: new known state {} bytes)", id, known_state.log_len),
408            UpdateMessage::SetInsert { id, new_insert } => format!("{:?}: + {} items", id, new_insert.new_items.len()),
409            UpdateMessage::SetUpdateKnownState { id, known_state } => format!("{:?}: new known state {:?}", id, known_state.frontier),
410        }
411    }
412}
413
414#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
415pub struct NodeID([u8; 12]);
416
417impl NodeID {
418    pub fn new_random() -> Self {
419        let mut rng = rand::thread_rng();
420        let mut id = [0u8; 12];
421        rng.fill_bytes(&mut id);
422        NodeID(id)
423    }
424}