p2panda_sync/
log_sync.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Efficient bidirectional sync protocol for append-only log data types.
4//!
5//! This implementation is generic over the actual data type implementation, as long as it follows
6//! the form of a numbered, linked list it will be compatible for sync. p2panda provides an own log
7//! implementation in `p2panda-core` which can be easily combined with this sync protocol.
8//!
9//! The protocol checks the current local "log heights", that is the index of the latest known
10//! entry in each log, of the "initiating" peer and sends them in form of a "Have" message to the
11//! remote peer. The "accepting" remote peer matches the given log heights with the locally present
12//! ones, calculates the delta of missing entries and sends them to the initiating peer as part of
13//! "Data" messages. The accepting peer then sends a "Done" message to signal that data
14//! transmission is complete. The protocol exchange is then repeated with the roles reversed: the
15//! accepting peer sends their "Have" message and the initiating peer responds with the required
16//! "Data" messages, followed by a final "Done" message.
17//!
18//! To find out which logs to send matching the given "topic query" a `TopicLogMap` is provided. This
19//! interface aids the sync protocol in deciding which logs to transfer for each given topic.
20use std::collections::HashMap;
21use std::fmt::Debug;
22use std::marker::PhantomData;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use futures::{AsyncRead, AsyncWrite, Sink, SinkExt, StreamExt, stream};
27use p2panda_core::{Extensions, PublicKey};
28use p2panda_store::{LogId, LogStore};
29use serde::{Deserialize, Serialize};
30
31use crate::cbor::{into_cbor_sink, into_cbor_stream};
32use crate::{FromSync, SyncError, SyncProtocol, TopicQuery};
33
34type SeqNum = u64;
35
36type LogHeights<T> = Vec<(T, SeqNum)>;
37
38type Logs<T> = HashMap<PublicKey, Vec<T>>;
39
40/// Maps a `TopicQuery` to the related logs being sent over the wire during sync.
41///
42/// Each `SyncProtocol` implementation defines the type of data it is expecting to sync and how
43/// the scope for a particular session should be identified. `LogSyncProtocol` maps a generic
44/// `TopicQuery` to a set of logs; users provide an implementation of the `TopicLogMap` trait in
45/// order to define how this mapping occurs.
46///
47/// Since `TopicLogMap` is generic we can use the same mapping across different sync implementations
48/// for the same data type when necessary.
49///
50/// ## Designing `TopicLogMap` for applications
51///
52/// Considering an example chat application which is based on append-only log data types, we
53/// probably want to organise messages from an author for a certain chat group into one log each.
54/// Like this, a chat group can be expressed as a collection of one to potentially many logs (one
55/// per member of the group):
56///
57/// ```text
58/// All authors: A, B and C
59/// All chat groups: 1 and 2
60///
61/// "Chat group 1 with members A and B"
62/// - Log A1
63/// - Log B1
64///
65/// "Chat group 2 with members A, B and C"
66/// - Log A2
67/// - Log B2
68/// - Log C2
69/// ```
70///
71/// If we implement `TopicQuery` to express that we're interested in syncing over a specific chat
72/// group, for example "Chat Group 2" we would implement `TopicLogMap` to give us all append-only
73/// logs of all members inside this group, that is the entries inside logs `A2`, `B2` and `C2`.
74#[async_trait]
75pub trait TopicLogMap<T, L>: Debug + Send + Sync
76where
77    T: TopicQuery,
78{
79    async fn get(&self, topic: &T) -> Option<Logs<L>>;
80}
81
82/// Messages to be sent over the wire between the two peers.
83#[allow(clippy::large_enum_variant)]
84#[derive(Debug, Clone, Deserialize, Serialize)]
85#[serde(tag = "type", content = "value")]
86enum Message<T, L = String> {
87    Have(T, Vec<(PublicKey, LogHeights<L>)>),
88    Data(Vec<u8>, Option<Vec<u8>>),
89    Done,
90}
91
92/// Efficient sync protocol for append-only log data types.
93#[derive(Clone, Debug)]
94pub struct LogSyncProtocol<TM, L, E, S: LogStore<L, E>> {
95    topic_map: TM,
96    store: S,
97    _marker: PhantomData<(L, E)>,
98}
99
100impl<TM, L, E, S> LogSyncProtocol<TM, L, E, S>
101where
102    S: LogStore<L, E>,
103{
104    /// Returns a new sync protocol instance, configured with a store and `TopicLogMap` implementation
105    /// which associates the to-be-synced logs with a given topic.
106    pub fn new(topic_map: TM, store: S) -> Self {
107        Self {
108            topic_map,
109            store,
110            _marker: PhantomData {},
111        }
112    }
113}
114
115// Bidirectional log sync protocol.
116//
117// Both peers send and receive data during the same session.
118//
119// [ Initiator ]        [ Acceptor ]
120// -------------        ------------
121//       have ->        -> have
122//       data <-        <- data
123//       done <-        <- done
124//       have <-        <- have
125//       data ->        -> data
126//       done ->        -> done
127//
128#[async_trait]
129impl<'a, T, TM, L, E, S> SyncProtocol<T, 'a> for LogSyncProtocol<TM, L, E, S>
130where
131    T: TopicQuery,
132    TM: TopicLogMap<T, L>,
133    L: LogId + Send + Sync + for<'de> Deserialize<'de> + Serialize + 'a,
134    E: Extensions + Send + Sync + 'a,
135    S: Debug + Sync + LogStore<L, E>,
136{
137    fn name(&self) -> &'static str {
138        "p2panda-log-sync-v1"
139    }
140
141    async fn initiate(
142        self: Arc<Self>,
143        topic_query: T,
144        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
145        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
146        mut app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
147    ) -> Result<(), SyncError> {
148        let mut sync_done_received = false;
149        let mut sync_done_sent = false;
150
151        let mut sink = into_cbor_sink(tx);
152        let mut stream = into_cbor_stream(rx);
153
154        // Retrieve the local log heights for all logs matching the topic query.
155        let local_log_heights =
156            local_log_heights(&self.store, &self.topic_map, &topic_query).await?;
157
158        // Send our `Have` message to the remote peer.
159        sink.send(Message::<T, L>::Have(
160            topic_query.clone(),
161            local_log_heights.clone(),
162        ))
163        .await?;
164
165        // Announce the topic query of the sync session to the app layer.
166        app_tx
167            .send(FromSync::HandshakeSuccess(topic_query.clone()))
168            .await?;
169
170        // Consume messages arriving on the receive stream.
171        while let Some(result) = stream.next().await {
172            let message: Message<T, L> = result?;
173
174            match message {
175                Message::Data(header, payload) => {
176                    // Forward data received from the remote to the app layer.
177                    app_tx.send(FromSync::Data { header, payload }).await?;
178                }
179                Message::Done => {
180                    sync_done_received = true;
181                }
182                Message::Have(remote_topic_query, remote_log_heights) => {
183                    if !sync_done_received {
184                        return Err(SyncError::UnexpectedBehaviour(
185                            "unexpected \"have\" message received".to_string(),
186                        ));
187                    }
188
189                    // Topic queries must match.
190                    if remote_topic_query != topic_query {
191                        return Err(SyncError::UnexpectedBehaviour(format!(
192                            "incompatible topic query {topic_query:?} requested from remote peer"
193                        )));
194                    }
195
196                    // Get the log ids which are associated with this topic query.
197                    let Some(logs) = self.topic_map.get(&topic_query).await else {
198                        return Err(SyncError::UnexpectedBehaviour(format!(
199                            "unsupported topic query {topic_query:?} requested from remote peer"
200                        )));
201                    };
202
203                    let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
204                        remote_log_heights.clone().into_iter().collect();
205
206                    // Retrieve and send all messages needed by the remote peer.
207                    let messages: Vec<Message<T, L>> =
208                        messages_needed_by_remote(&self.store, &logs, remote_log_heights_map)
209                            .await?;
210                    sink.send_all(&mut stream::iter(messages.into_iter().map(Ok)))
211                        .await?;
212
213                    // Signal to the remote peer that we have finished sending data.
214                    sink.send(Message::Done).await?;
215                    sync_done_sent = true;
216                }
217            };
218
219            if sync_done_received && sync_done_sent {
220                break;
221            }
222        }
223
224        // Flush all bytes so that no messages are lost.
225        sink.flush().await?;
226        app_tx.flush().await?;
227
228        Ok(())
229    }
230
231    async fn accept(
232        self: Arc<Self>,
233        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
234        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
235        mut app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
236    ) -> Result<(), SyncError> {
237        let mut sync_done_sent = false;
238        let mut sync_done_received = false;
239
240        let mut sink = into_cbor_sink(tx);
241        let mut stream = into_cbor_stream(rx);
242
243        while let Some(result) = stream.next().await {
244            let message: Message<T, L> = result?;
245            match message {
246                Message::Have(topic_query, remote_log_heights) => {
247                    // Signal that the "handshake" phase of this protocol is complete as we
248                    // received the topic query.
249                    app_tx
250                        .send(FromSync::HandshakeSuccess(topic_query.clone()))
251                        .await?;
252
253                    // Get the log ids which are associated with this topic query.
254                    let Some(logs) = self.topic_map.get(&topic_query).await else {
255                        return Err(SyncError::UnexpectedBehaviour(format!(
256                            "unsupported topic query {topic_query:?} requested from remote peer"
257                        )));
258                    };
259
260                    let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
261                        remote_log_heights.clone().into_iter().collect();
262
263                    // Retrieve and send all messages needed by the remote peer.
264                    let messages: Vec<Message<T, L>> =
265                        messages_needed_by_remote(&self.store, &logs, remote_log_heights_map)
266                            .await?;
267                    sink.send_all(&mut stream::iter(messages.into_iter().map(Ok)))
268                        .await?;
269
270                    // Signal to the remote peer that we have finished sending data.
271                    sink.send(Message::Done).await?;
272                    sync_done_sent = true;
273
274                    // Retrieve the local log heights for all logs matching the topic query.
275                    let local_log_heights =
276                        local_log_heights(&self.store, &self.topic_map, &topic_query).await?;
277
278                    // Send our `Have` message to the remote peer.
279                    sink.send(Message::<T, L>::Have(
280                        topic_query.clone(),
281                        local_log_heights.clone(),
282                    ))
283                    .await?;
284                }
285                Message::Data(header, payload) => {
286                    // Forward data received from the remote to the app layer.
287                    app_tx.send(FromSync::Data { header, payload }).await?;
288                }
289                Message::Done => {
290                    sync_done_received = true;
291                }
292            };
293
294            if sync_done_received && sync_done_sent {
295                break;
296            }
297        }
298
299        // Flush all bytes so that no messages are lost.
300        sink.flush().await?;
301        app_tx.flush().await?;
302
303        Ok(())
304    }
305}
306
307/// Return the log heights and public keys for all authors who have published under log ids
308/// which match the given topic query.
309async fn local_log_heights<T, L, E>(
310    store: &impl LogStore<L, E>,
311    topic_map: &impl TopicLogMap<T, L>,
312    topic_query: &T,
313) -> Result<Vec<(PublicKey, Vec<(L, u64)>)>, SyncError>
314where
315    T: TopicQuery,
316    L: LogId,
317{
318    // Get the log ids which are associated with this topic query.
319    let Some(logs) = topic_map.get(topic_query).await else {
320        return Err(SyncError::Critical(format!(
321            "unknown {topic_query:?} topic query"
322        )));
323    };
324
325    // Get local log heights for all authors who have published under the requested log ids.
326    let mut local_log_heights = Vec::new();
327    for (public_key, log_ids) in logs {
328        let mut log_heights = Vec::new();
329        for log_id in log_ids {
330            let latest = store
331                .latest_operation(&public_key, &log_id)
332                .await
333                .map_err(|err| {
334                    SyncError::Critical(format!("can't retrieve log heights from store, {err}"))
335                })?;
336
337            if let Some((header, _)) = latest {
338                log_heights.push((log_id.clone(), header.seq_num));
339            };
340        }
341        local_log_heights.push((public_key, log_heights));
342    }
343
344    Ok(local_log_heights)
345}
346
347/// Return all messages needed by a remote peer for the given log id and format them as data
348/// messages for transport over the wire.
349async fn remote_needs<T, L, E>(
350    store: &impl LogStore<L, E>,
351    log_id: &L,
352    public_key: &PublicKey,
353    from: SeqNum,
354) -> Result<Vec<Message<T, L>>, SyncError>
355where
356    E: Extensions + Send + Sync,
357{
358    let log = store
359        .get_raw_log(public_key, log_id, Some(from))
360        .await
361        .map_err(|err| SyncError::Critical(format!("could not retrieve log from store, {err}")))?;
362
363    let messages = log
364        .unwrap_or_default()
365        .into_iter()
366        .map(|(header, payload)| Message::Data(header, payload))
367        .collect();
368
369    Ok(messages)
370}
371
372/// Compare the local log heights with the remote log heights for all given logs and return all
373/// messages needed by the remote peer.
374async fn messages_needed_by_remote<T, L, E>(
375    store: &impl LogStore<L, E>,
376    logs: &Logs<L>,
377    remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>>,
378) -> Result<Vec<Message<T, L>>, SyncError>
379where
380    L: LogId,
381    E: Extensions + Send + Sync,
382{
383    // Now that the topic query has been translated into a collection of logs we want to
384    // compare our own local log heights with what the remote sent for this topic query.
385    //
386    // If our logs are more advanced for any log we should collect the entries for sending.
387    let mut messages_for_remote = Vec::new();
388
389    for (public_key, log_ids) in logs {
390        for log_id in log_ids {
391            // For all logs in this topic query scope get the local height.
392            let latest_operation =
393                store
394                    .latest_operation(public_key, log_id)
395                    .await
396                    .map_err(|err| {
397                        SyncError::Critical(format!("can't retreive log heights from store, {err}"))
398                    })?;
399
400            let log_height = match latest_operation {
401                Some((header, _)) => header.seq_num,
402                // If we don't have this log then continue onto the next without
403                // sending any messages.
404                None => continue,
405            };
406
407            // Calculate from which seq num in the log the remote needs operations.
408            let remote_needs_from = match remote_log_heights_map.get(public_key) {
409                Some(log_heights) => {
410                    match log_heights.iter().find(|(id, _)| *id == *log_id) {
411                        // The log is known by the remote, take their log height
412                        // and plus one.
413                        Some((_, log_height)) => log_height + 1,
414                        // The log is not known, they need from seq num 0
415                        None => 0,
416                    }
417                }
418                // The author is not known, they need from seq num 0.
419                None => 0,
420            };
421
422            if remote_needs_from <= log_height {
423                let messages: Vec<Message<T, L>> =
424                    remote_needs(store, log_id, public_key, remote_needs_from).await?;
425                for message in messages {
426                    messages_for_remote.push(message);
427                }
428            };
429        }
430    }
431
432    Ok(messages_for_remote)
433}
434
435#[cfg(test)]
436mod tests {
437    use std::collections::HashMap;
438    use std::sync::Arc;
439
440    use async_trait::async_trait;
441    use futures::SinkExt;
442    use p2panda_core::{Body, Hash, Header, PrivateKey};
443    use p2panda_store::{MemoryStore, OperationStore};
444    use serde::{Deserialize, Serialize};
445    use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, ReadHalf};
446    use tokio::sync::mpsc;
447    use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
448    use tokio_util::sync::PollSender;
449
450    use crate::{FromSync, SyncError, SyncProtocol, TopicQuery};
451
452    use super::{LogSyncProtocol, Logs, Message, TopicLogMap};
453
454    impl<T, L> Message<T, L>
455    where
456        T: Serialize,
457        L: Serialize,
458    {
459        pub fn to_bytes(&self) -> Vec<u8> {
460            p2panda_core::cbor::encode_cbor(&self).expect("type can be serialized")
461        }
462    }
463
464    fn create_operation(
465        private_key: &PrivateKey,
466        body: &Body,
467        seq_num: u64,
468        timestamp: u64,
469        backlink: Option<Hash>,
470    ) -> (Hash, Header, Vec<u8>) {
471        let mut header = Header {
472            version: 1,
473            public_key: private_key.public_key(),
474            signature: None,
475            payload_size: body.size(),
476            payload_hash: Some(body.hash()),
477            timestamp,
478            seq_num,
479            backlink,
480            previous: vec![],
481            extensions: None,
482        };
483        header.sign(private_key);
484        let header_bytes = header.to_bytes();
485        (header.hash(), header, header_bytes)
486    }
487
488    #[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)]
489    pub struct LogHeightTopic(String, [u8; 32]);
490
491    impl LogHeightTopic {
492        pub fn new(name: &str) -> Self {
493            Self(name.to_owned(), [0; 32])
494        }
495    }
496
497    impl TopicQuery for LogHeightTopic {}
498
499    #[derive(Clone, Debug)]
500    struct LogHeightTopicMap<T>(HashMap<T, Logs<u64>>);
501
502    impl<T> LogHeightTopicMap<T>
503    where
504        T: TopicQuery,
505    {
506        pub fn new() -> Self {
507            LogHeightTopicMap(HashMap::new())
508        }
509
510        fn insert(&mut self, topic_query: &T, logs: Logs<u64>) -> Option<Logs<u64>> {
511            self.0.insert(topic_query.clone(), logs)
512        }
513    }
514
515    #[async_trait]
516    impl<T> TopicLogMap<T, u64> for LogHeightTopicMap<T>
517    where
518        T: TopicQuery,
519    {
520        async fn get(&self, topic_query: &T) -> Option<Logs<u64>> {
521            self.0.get(topic_query).cloned()
522        }
523    }
524
525    async fn assert_message_bytes(
526        mut rx: ReadHalf<DuplexStream>,
527        messages: Vec<Message<LogHeightTopic, u8>>,
528    ) {
529        let mut buf = Vec::new();
530        rx.read_to_end(&mut buf).await.unwrap();
531        assert_eq!(
532            buf,
533            messages.iter().fold(Vec::new(), |mut acc, message| {
534                acc.extend(message.to_bytes());
535                acc
536            })
537        );
538    }
539
540    fn to_bytes(messages: Vec<Message<LogHeightTopic>>) -> Vec<u8> {
541        messages.iter().fold(Vec::new(), |mut acc, message| {
542            acc.extend(message.to_bytes());
543            acc
544        })
545    }
546
547    #[tokio::test]
548    async fn sync_no_operations_accept() {
549        let topic_query = LogHeightTopic::new("messages");
550        let logs = HashMap::new();
551        let store = MemoryStore::<u64>::new();
552
553        // Duplex streams which simulate both ends of a bi-directional network connection
554        let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
555        let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
556        let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
557
558        // Channel for sending messages out of a running sync session
559        let (app_tx, mut app_rx) = mpsc::channel(128);
560
561        // Write some message into peer_b's send buffer
562        let message_bytes = to_bytes(vec![
563            Message::Have(topic_query.clone(), vec![]),
564            Message::Done,
565        ]);
566        peer_b_write.write_all(&message_bytes[..]).await.unwrap();
567
568        // Accept a sync session on peer a (which consumes the above messages)
569        let mut topic_map = LogHeightTopicMap::new();
570        topic_map.insert(&topic_query, logs);
571        let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
572        let mut sink =
573            PollSender::new(app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
574        protocol
575            .accept(
576                Box::new(&mut peer_a_write.compat_write()),
577                Box::new(&mut peer_a_read.compat()),
578                Box::new(&mut sink),
579            )
580            .await
581            .unwrap();
582
583        // Assert that peer a sent peer b the expected messages
584        assert_message_bytes(
585            peer_b_read,
586            vec![Message::Done, Message::Have(topic_query.clone(), vec![])],
587        )
588        .await;
589
590        // Assert that peer a sent the expected messages on it's app channel
591        let mut messages = Vec::new();
592        app_rx.recv_many(&mut messages, 10).await;
593        assert_eq!(messages, vec![FromSync::HandshakeSuccess(topic_query)])
594    }
595
596    #[tokio::test]
597    async fn sync_no_operations_initiate() {
598        let topic_query = LogHeightTopic::new("messages");
599        let logs = HashMap::new();
600        let store = MemoryStore::<u64>::new();
601
602        // Duplex streams which simulate both ends of a bi-directional network connection
603        let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
604        let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
605        let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
606
607        // Channel for sending messages out of a running sync session
608        let (app_tx, mut app_rx) = mpsc::channel(128);
609
610        // Write some message into peer_b's send buffer
611        let messages = [
612            Message::Done,
613            Message::Have::<LogHeightTopic>(topic_query.clone(), vec![]),
614        ];
615        let message_bytes = messages.iter().fold(Vec::new(), |mut acc, message| {
616            acc.extend(message.to_bytes());
617            acc
618        });
619        peer_b_write.write_all(&message_bytes[..]).await.unwrap();
620
621        // Initiate a sync session on peer a (which consumes the above messages)
622        let mut topic_map = LogHeightTopicMap::new();
623        topic_map.insert(&topic_query, logs);
624        let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
625        let mut sink =
626            PollSender::new(app_tx).sink_map_err(|err| crate::SyncError::Critical(err.to_string()));
627        protocol
628            .initiate(
629                topic_query.clone(),
630                Box::new(&mut peer_a_write.compat_write()),
631                Box::new(&mut peer_a_read.compat()),
632                Box::new(&mut sink),
633            )
634            .await
635            .unwrap();
636
637        // Assert that peer a sent peer b the expected messages
638        assert_message_bytes(
639            peer_b_read,
640            vec![Message::Have(topic_query.clone(), vec![]), Message::Done],
641        )
642        .await;
643
644        // Assert that peer a sent the expected messages on it's app channel
645        let mut messages = Vec::new();
646        app_rx.recv_many(&mut messages, 10).await;
647        assert_eq!(messages, vec![FromSync::HandshakeSuccess(topic_query)])
648    }
649
650    #[tokio::test]
651    async fn sync_operations_accept() {
652        let private_key = PrivateKey::new();
653        let log_id = 0;
654        let topic_query = LogHeightTopic::new("messages");
655        let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
656
657        let mut store = MemoryStore::<u64>::new();
658
659        let body = Body::new("Hello, Sloth!".as_bytes());
660        let (hash_0, header_0, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
661        let (hash_1, header_1, header_bytes_1) =
662            create_operation(&private_key, &body, 1, 100, Some(hash_0));
663        let (hash_2, header_2, header_bytes_2) =
664            create_operation(&private_key, &body, 2, 200, Some(hash_1));
665
666        store
667            .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
668            .await
669            .unwrap();
670        store
671            .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
672            .await
673            .unwrap();
674        store
675            .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
676            .await
677            .unwrap();
678
679        // Duplex streams which simulate both ends of a bi-directional network connection
680        let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
681        let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
682        let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
683
684        // Channel for sending messages out of a running sync session
685        let (app_tx, mut app_rx) = mpsc::channel(128);
686
687        // Write some message into peer_b's send buffer
688        let messages = [
689            Message::Have::<LogHeightTopic>(topic_query.clone(), vec![]),
690            Message::Done,
691        ];
692        let message_bytes = messages.iter().fold(Vec::new(), |mut acc, message| {
693            acc.extend(message.to_bytes());
694            acc
695        });
696        peer_b_write.write_all(&message_bytes[..]).await.unwrap();
697
698        // Accept a sync session on peer a (which consumes the above messages)
699        let mut topic_map = LogHeightTopicMap::new();
700        topic_map.insert(&topic_query, logs);
701        let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
702        let mut sink =
703            PollSender::new(app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
704        protocol
705            .accept(
706                Box::new(&mut peer_a_write.compat_write()),
707                Box::new(&mut peer_a_read.compat()),
708                Box::new(&mut sink),
709            )
710            .await
711            .unwrap();
712
713        // Assert that peer a sent peer b the expected messages
714        let messages = vec![
715            Message::Data(header_bytes_0, Some(body.to_bytes())),
716            Message::Data(header_bytes_1, Some(body.to_bytes())),
717            Message::Data(header_bytes_2, Some(body.to_bytes())),
718            Message::Done,
719            Message::Have(
720                topic_query.clone(),
721                vec![(private_key.public_key(), vec![(0, 2)])],
722            ),
723        ];
724        assert_message_bytes(peer_b_read, messages).await;
725
726        // Assert that peer a sent the expected messages on it's app channel
727        let mut messages = Vec::new();
728        app_rx.recv_many(&mut messages, 10).await;
729        assert_eq!(messages, [FromSync::HandshakeSuccess(topic_query)])
730    }
731
732    #[tokio::test]
733    async fn sync_operations_initiate() {
734        let private_key = PrivateKey::new();
735        let log_id = 0;
736        let topic_query = LogHeightTopic::new("messages");
737        let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
738
739        let store = MemoryStore::<u64>::new();
740
741        // Duplex streams which simulate both ends of a bi-directional network connection
742        let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
743        let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
744        let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
745
746        // Channel for sending messages out of a running sync session
747        let (app_tx, mut app_rx) = mpsc::channel(128);
748
749        // Create operations which will be sent to peer a
750        let body = Body::new("Hello, Sloth!".as_bytes());
751
752        let (hash_0, _, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
753        let (hash_1, _, header_bytes_1) =
754            create_operation(&private_key, &body, 1, 100, Some(hash_0));
755        let (_, _, header_bytes_2) = create_operation(&private_key, &body, 2, 200, Some(hash_1));
756
757        // Write some message into peer_b's send buffer
758        let messages = vec![
759            Message::Data(header_bytes_0.clone(), Some(body.to_bytes())),
760            Message::Data(header_bytes_1.clone(), Some(body.to_bytes())),
761            Message::Data(header_bytes_2.clone(), Some(body.to_bytes())),
762            Message::Done,
763            Message::Have::<LogHeightTopic>(topic_query.clone(), vec![]),
764        ];
765        let message_bytes = messages.iter().fold(Vec::new(), |mut acc, message| {
766            acc.extend(message.to_bytes());
767            acc
768        });
769        peer_b_write.write_all(&message_bytes[..]).await.unwrap();
770
771        // Initiate a sync session on peer a (which consumes the above messages)
772        let mut topic_map = LogHeightTopicMap::new();
773        topic_map.insert(&topic_query, logs);
774        let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
775        let mut sink =
776            PollSender::new(app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
777        protocol
778            .initiate(
779                topic_query.clone(),
780                Box::new(&mut peer_a_write.compat_write()),
781                Box::new(&mut peer_a_read.compat()),
782                Box::new(&mut sink),
783            )
784            .await
785            .unwrap();
786
787        // Assert that peer a sent peer b the expected messages
788        assert_message_bytes(
789            peer_b_read,
790            vec![
791                Message::Have(
792                    topic_query.clone(),
793                    vec![(private_key.public_key(), vec![])],
794                ),
795                Message::Done,
796            ],
797        )
798        .await;
799
800        // Assert that peer a sent the expected messages on it's app channel
801        let mut messages = Vec::new();
802        app_rx.recv_many(&mut messages, 10).await;
803        assert_eq!(
804            messages,
805            [
806                FromSync::HandshakeSuccess(topic_query),
807                FromSync::Data {
808                    header: header_bytes_0,
809                    payload: Some(body.to_bytes())
810                },
811                FromSync::Data {
812                    header: header_bytes_1,
813                    payload: Some(body.to_bytes())
814                },
815                FromSync::Data {
816                    header: header_bytes_2,
817                    payload: Some(body.to_bytes())
818                },
819            ]
820        );
821    }
822
823    #[tokio::test]
824    async fn e2e_sync_where_one_peer_has_data() {
825        let private_key = PrivateKey::new();
826        let log_id = 0;
827        let topic_query = LogHeightTopic::new("messages");
828        let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
829
830        // Create an empty store for peer a
831        let store_1 = MemoryStore::default();
832
833        // Construct a log height protocol and engine for peer a
834        let mut topic_map = LogHeightTopicMap::new();
835        topic_map.insert(&topic_query, logs);
836        let peer_a_protocol = Arc::new(LogSyncProtocol::new(topic_map.clone(), store_1));
837
838        // Create a store for peer b and populate it with 3 operations
839        let mut store_2 = MemoryStore::default();
840        let body = Body::new("Hello, Sloth!".as_bytes());
841
842        let (hash_0, header_0, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
843        let (hash_1, header_1, header_bytes_1) =
844            create_operation(&private_key, &body, 1, 100, Some(hash_0));
845        let (hash_2, header_2, header_bytes_2) =
846            create_operation(&private_key, &body, 2, 200, Some(hash_1));
847
848        store_2
849            .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
850            .await
851            .unwrap();
852        store_2
853            .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
854            .await
855            .unwrap();
856        store_2
857            .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
858            .await
859            .unwrap();
860
861        // Construct b log height protocol and engine for peer a
862        let peer_b_protocol = Arc::new(LogSyncProtocol::new(topic_map, store_2));
863
864        // Duplex streams which simulate both ends of a bi-directional network connection
865        let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
866        let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
867        let (peer_b_read, peer_b_write) = tokio::io::split(peer_b);
868
869        // Spawn a task which opens a sync session from peer a runs it to completion
870        let peer_a_protocol_clone = peer_a_protocol.clone();
871        let (peer_a_app_tx, mut peer_a_app_rx) = mpsc::channel(128);
872        let mut sink =
873            PollSender::new(peer_a_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
874        let topic_clone = topic_query.clone();
875        let handle_1 = tokio::spawn(async move {
876            peer_a_protocol_clone
877                .initiate(
878                    topic_clone,
879                    Box::new(&mut peer_a_write.compat_write()),
880                    Box::new(&mut peer_a_read.compat()),
881                    Box::new(&mut sink),
882                )
883                .await
884                .unwrap();
885        });
886
887        // Spawn a task which accepts a sync session on peer b runs it to completion
888        let peer_b_protocol_clone = peer_b_protocol.clone();
889        let (peer_b_app_tx, mut peer_b_app_rx) = mpsc::channel(128);
890        let mut sink =
891            PollSender::new(peer_b_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
892        let handle_2 = tokio::spawn(async move {
893            peer_b_protocol_clone
894                .accept(
895                    Box::new(&mut peer_b_write.compat_write()),
896                    Box::new(&mut peer_b_read.compat()),
897                    Box::new(&mut sink),
898                )
899                .await
900                .unwrap();
901        });
902
903        // Wait for both to complete
904        let (_, _) = tokio::join!(handle_1, handle_2);
905
906        let peer_a_expected_messages = vec![
907            FromSync::HandshakeSuccess(topic_query.clone()),
908            FromSync::Data {
909                header: header_bytes_0,
910                payload: Some(body.to_bytes()),
911            },
912            FromSync::Data {
913                header: header_bytes_1,
914                payload: Some(body.to_bytes()),
915            },
916            FromSync::Data {
917                header: header_bytes_2,
918                payload: Some(body.to_bytes()),
919            },
920        ];
921
922        let mut peer_a_messages = Vec::new();
923        peer_a_app_rx.recv_many(&mut peer_a_messages, 10).await;
924        assert_eq!(peer_a_messages, peer_a_expected_messages);
925
926        let peer_b_expected_messages = vec![FromSync::HandshakeSuccess(topic_query.clone())];
927        let mut peer_b_messages = Vec::new();
928        peer_b_app_rx.recv_many(&mut peer_b_messages, 10).await;
929        assert_eq!(peer_b_messages, peer_b_expected_messages);
930    }
931
932    #[tokio::test]
933    async fn e2e_partial_sync() {
934        let private_key = PrivateKey::new();
935        let log_id = 0;
936        let topic_query = LogHeightTopic::new("messages");
937        let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
938
939        let body = Body::new("Hello, Sloth!".as_bytes());
940
941        let (hash_0, header_0, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
942        let (hash_1, header_1, header_bytes_1) =
943            create_operation(&private_key, &body, 1, 100, Some(hash_0));
944        let (hash_2, header_2, header_bytes_2) =
945            create_operation(&private_key, &body, 2, 200, Some(hash_1));
946
947        let mut store_1 = MemoryStore::default();
948        store_1
949            .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
950            .await
951            .unwrap();
952
953        // Construct a log height protocol and engine for peer a
954        let mut topic_map = LogHeightTopicMap::new();
955        topic_map.insert(&topic_query, logs);
956        let peer_a_protocol = Arc::new(LogSyncProtocol::new(topic_map.clone(), store_1));
957
958        // Create a store for peer b and populate it with 3 operations
959        let mut store_2 = MemoryStore::default();
960
961        // Insert these operations to the store
962        store_2
963            .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
964            .await
965            .unwrap();
966        store_2
967            .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
968            .await
969            .unwrap();
970        store_2
971            .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
972            .await
973            .unwrap();
974
975        // Construct a log height protocol and engine for peer a
976        let peer_b_protocol = Arc::new(LogSyncProtocol::new(topic_map, store_2));
977
978        // Duplex streams which simulate both ends of a bi-directional network connection
979        let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
980        let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
981        let (peer_b_read, peer_b_write) = tokio::io::split(peer_b);
982
983        // Spawn a task which opens a sync session from peer a runs it to completion
984        let peer_a_protocol_clone = peer_a_protocol.clone();
985        let (peer_a_app_tx, mut peer_a_app_rx) = mpsc::channel(128);
986        let mut sink =
987            PollSender::new(peer_a_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
988        let topic_clone = topic_query.clone();
989        let handle_1 = tokio::spawn(async move {
990            peer_a_protocol_clone
991                .initiate(
992                    topic_clone,
993                    Box::new(&mut peer_a_write.compat_write()),
994                    Box::new(&mut peer_a_read.compat()),
995                    Box::new(&mut sink),
996                )
997                .await
998                .unwrap();
999        });
1000
1001        // Spawn a task which accepts a sync session on peer b runs it to completion
1002        let peer_b_protocol_clone = peer_b_protocol.clone();
1003        let (peer_b_app_tx, mut peer_b_app_rx) = mpsc::channel(128);
1004        let mut sink =
1005            PollSender::new(peer_b_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
1006        let handle_2 = tokio::spawn(async move {
1007            peer_b_protocol_clone
1008                .accept(
1009                    Box::new(&mut peer_b_write.compat_write()),
1010                    Box::new(&mut peer_b_read.compat()),
1011                    Box::new(&mut sink),
1012                )
1013                .await
1014                .unwrap();
1015        });
1016
1017        // Wait for both to complete
1018        let (_, _) = tokio::join!(handle_1, handle_2);
1019
1020        let peer_a_expected_messages = vec![
1021            FromSync::HandshakeSuccess(topic_query.clone()),
1022            FromSync::Data {
1023                header: header_bytes_1,
1024                payload: Some(body.to_bytes()),
1025            },
1026            FromSync::Data {
1027                header: header_bytes_2,
1028                payload: Some(body.to_bytes()),
1029            },
1030        ];
1031
1032        let mut peer_a_messages = Vec::new();
1033        peer_a_app_rx.recv_many(&mut peer_a_messages, 10).await;
1034        assert_eq!(peer_a_messages, peer_a_expected_messages);
1035
1036        let peer_b_expected_messages = vec![FromSync::HandshakeSuccess(topic_query.clone())];
1037        let mut peer_b_messages = Vec::new();
1038        peer_b_app_rx.recv_many(&mut peer_b_messages, 10).await;
1039        assert_eq!(peer_b_messages, peer_b_expected_messages);
1040    }
1041
1042    #[tokio::test]
1043    async fn e2e_sync_two_logs() {
1044        // Scenario: peer A holds three operations for log 0 while peer B holds three operations
1045        // for log 1. All operations are authored by the same keypair.
1046        //
1047        // Expectation: peer B receives log 0 operations from peer A and peer A receives log 1
1048        // operations from peer B, all in a single sync session.
1049
1050        let private_key = PrivateKey::new();
1051        let log_id_1 = 0;
1052        let log_id_2 = 1;
1053
1054        let body_1 = Body::new("Hello, Sloth!".as_bytes());
1055        let body_2 = Body::new("Hello, Panda!".as_bytes());
1056
1057        // Create a sequence of three operations authored by the same private key.
1058        let (hash_0, header_0, header_bytes_1_0) =
1059            create_operation(&private_key, &body_1, 0, 0, None);
1060        let (hash_1, header_1, header_bytes_1_1) =
1061            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
1062        let (hash_2, header_2, header_bytes_1_2) =
1063            create_operation(&private_key, &body_1, 2, 200, Some(hash_1));
1064
1065        // Create a store for peer a and insert the three operations with log_id_1.
1066        let mut store_1 = MemoryStore::default();
1067        store_1
1068            .insert_operation(
1069                hash_0,
1070                &header_0,
1071                Some(&body_1),
1072                &header_bytes_1_0,
1073                &log_id_1,
1074            )
1075            .await
1076            .unwrap();
1077        store_1
1078            .insert_operation(
1079                hash_1,
1080                &header_1,
1081                Some(&body_1),
1082                &header_bytes_1_1,
1083                &log_id_1,
1084            )
1085            .await
1086            .unwrap();
1087        store_1
1088            .insert_operation(
1089                hash_2,
1090                &header_2,
1091                Some(&body_1),
1092                &header_bytes_1_2,
1093                &log_id_1,
1094            )
1095            .await
1096            .unwrap();
1097
1098        // Create a second sequence of three operations authored by the same private key.
1099        let (hash_0, header_0, header_bytes_2_0) =
1100            create_operation(&private_key, &body_2, 0, 300, None);
1101        let (hash_1, header_1, header_bytes_2_1) =
1102            create_operation(&private_key, &body_2, 1, 400, Some(hash_0));
1103        let (hash_2, header_2, header_bytes_2_2) =
1104            create_operation(&private_key, &body_2, 2, 500, Some(hash_1));
1105
1106        // Create a store for peer b and insert the three operations with log_id_2.
1107        let mut store_2 = MemoryStore::default();
1108        store_2
1109            .insert_operation(
1110                hash_0,
1111                &header_0,
1112                Some(&body_2),
1113                &header_bytes_2_0,
1114                &log_id_2,
1115            )
1116            .await
1117            .unwrap();
1118        store_2
1119            .insert_operation(
1120                hash_1,
1121                &header_1,
1122                Some(&body_2),
1123                &header_bytes_2_1,
1124                &log_id_2,
1125            )
1126            .await
1127            .unwrap();
1128        store_2
1129            .insert_operation(
1130                hash_2,
1131                &header_2,
1132                Some(&body_2),
1133                &header_bytes_2_2,
1134                &log_id_2,
1135            )
1136            .await
1137            .unwrap();
1138
1139        // Define the topic query, logs and topic map.
1140        let topic_query = LogHeightTopic::new("messages");
1141        let logs = HashMap::from([(private_key.public_key(), vec![log_id_1, log_id_2])]);
1142        let mut topic_map = LogHeightTopicMap::new();
1143        topic_map.insert(&topic_query, logs);
1144
1145        // Instantiate the sync protocol for both peers.
1146        let peer_a_protocol = Arc::new(LogSyncProtocol::new(topic_map.clone(), store_1.clone()));
1147        let peer_b_protocol = Arc::new(LogSyncProtocol::new(topic_map, store_2.clone()));
1148
1149        // Duplex streams which simulate both ends of a bi-directional network connection
1150        let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
1151        let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
1152        let (peer_b_read, peer_b_write) = tokio::io::split(peer_b);
1153
1154        // Spawn a task which opens a sync session from peer a runs it to completion
1155        let peer_a_protocol_clone = peer_a_protocol.clone();
1156        let (peer_a_app_tx, mut peer_a_app_rx) = mpsc::channel(128);
1157        let mut sink =
1158            PollSender::new(peer_a_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
1159        let topic_clone = topic_query.clone();
1160        let handle_1 = tokio::spawn(async move {
1161            peer_a_protocol_clone
1162                .initiate(
1163                    topic_clone,
1164                    Box::new(&mut peer_a_write.compat_write()),
1165                    Box::new(&mut peer_a_read.compat()),
1166                    Box::new(&mut sink),
1167                )
1168                .await
1169                .unwrap();
1170        });
1171
1172        // Spawn a task which accepts a sync session on peer b runs it to completion
1173        let peer_b_protocol_clone = peer_b_protocol.clone();
1174        let (peer_b_app_tx, mut peer_b_app_rx) = mpsc::channel(128);
1175        let mut sink =
1176            PollSender::new(peer_b_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
1177        let handle_2 = tokio::spawn(async move {
1178            peer_b_protocol_clone
1179                .accept(
1180                    Box::new(&mut peer_b_write.compat_write()),
1181                    Box::new(&mut peer_b_read.compat()),
1182                    Box::new(&mut sink),
1183                )
1184                .await
1185                .unwrap();
1186        });
1187
1188        // Wait for both to complete
1189        let (_, _) = tokio::join!(handle_1, handle_2);
1190
1191        // Peer b should receive log_1 data from peer a.
1192        let peer_b_expected_messages = vec![
1193            FromSync::HandshakeSuccess(topic_query.clone()),
1194            FromSync::Data {
1195                header: header_bytes_1_0,
1196                payload: Some(body_1.to_bytes()),
1197            },
1198            FromSync::Data {
1199                header: header_bytes_1_1,
1200                payload: Some(body_1.to_bytes()),
1201            },
1202            FromSync::Data {
1203                header: header_bytes_1_2,
1204                payload: Some(body_1.to_bytes()),
1205            },
1206        ];
1207
1208        let mut peer_b_messages = Vec::new();
1209        peer_b_app_rx.recv_many(&mut peer_b_messages, 10).await;
1210        assert_eq!(peer_b_messages, peer_b_expected_messages);
1211
1212        // Peer a should receive log_2 data from peer b.
1213        let peer_a_expected_messages = vec![
1214            FromSync::HandshakeSuccess(topic_query.clone()),
1215            FromSync::Data {
1216                header: header_bytes_2_0,
1217                payload: Some(body_2.to_bytes()),
1218            },
1219            FromSync::Data {
1220                header: header_bytes_2_1,
1221                payload: Some(body_2.to_bytes()),
1222            },
1223            FromSync::Data {
1224                header: header_bytes_2_2,
1225                payload: Some(body_2.to_bytes()),
1226            },
1227        ];
1228
1229        let mut peer_a_messages = Vec::new();
1230        peer_a_app_rx.recv_many(&mut peer_a_messages, 10).await;
1231        assert_eq!(peer_a_messages, peer_a_expected_messages);
1232    }
1233}