p2panda_sync/protocols/
log_sync.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Two-party sync protocol over append-only logs.
4use std::collections::HashMap;
5use std::fmt::{Debug, Display};
6use std::marker::PhantomData;
7
8use futures::{Sink, SinkExt, Stream, StreamExt, stream};
9use p2panda_core::cbor::{DecodeError, decode_cbor};
10use p2panda_core::{Body, Extensions, Hash, Header, Operation, PublicKey};
11use p2panda_store::{LogId, LogStore, OperationStore};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::select;
15use tokio::sync::broadcast;
16
17use crate::dedup::{DEFAULT_BUFFER_CAPACITY, Dedup};
18use crate::traits::Protocol;
19
20/// A map of author logs.
21pub type Logs<L> = HashMap<PublicKey, Vec<L>>;
22
23/// Sync session life-cycle states.
24#[derive(Default)]
25enum State {
26    /// Initialise session metrics and announce sync start on event stream.
27    #[default]
28    Start,
29
30    /// Calculate local log heights and send Have message to remote.
31    SendHave { metrics: LogSyncMetrics },
32
33    /// Receive have message from remote and calculate operation diff.
34    ReceiveHave { metrics: LogSyncMetrics },
35
36    /// Send PreSync message to remote or Done if we have nothing to send.
37    SendPreSyncOrDone {
38        operations: Vec<Hash>,
39        metrics: LogSyncMetrics,
40    },
41
42    /// Receive PreSync message from remote or Done if they have nothing to send.
43    ReceivePreSyncOrDone {
44        operations: Vec<Hash>,
45        metrics: LogSyncMetrics,
46    },
47
48    /// Enter sync loop where we exchange operations with the remote, moves onto next state when
49    /// both peers have send Done messages.
50    Sync {
51        operations: Vec<Hash>,
52        metrics: LogSyncMetrics,
53    },
54
55    /// Announce on the event stream that the sync session successfully completed.
56    End { metrics: LogSyncMetrics },
57}
58
59/// Efficient sync protocol for append-only log data types.
60pub struct LogSync<L, E, S, Evt> {
61    state: State,
62    logs: Logs<L>,
63    store: S,
64    event_tx: broadcast::Sender<Evt>,
65    buffer_capacity: usize,
66    _marker: PhantomData<E>,
67}
68
69impl<L, E, S, Evt> LogSync<L, E, S, Evt> {
70    pub fn new(store: S, logs: Logs<L>, event_tx: broadcast::Sender<Evt>) -> Self {
71        Self::new_with_capacity(store, logs, event_tx, DEFAULT_BUFFER_CAPACITY)
72    }
73
74    pub fn new_with_capacity(
75        store: S,
76        logs: Logs<L>,
77        event_tx: broadcast::Sender<Evt>,
78        buffer_capacity: usize,
79    ) -> Self {
80        Self {
81            state: Default::default(),
82            store,
83            event_tx,
84            logs,
85            buffer_capacity,
86            _marker: PhantomData,
87        }
88    }
89}
90
91impl<L, E, S, Evt> Protocol for LogSync<L, E, S, Evt>
92where
93    L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
94    E: Extensions + Send + 'static,
95    S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
96    Evt: Debug + From<LogSyncEvent<E>> + Send + 'static,
97{
98    type Error = LogSyncError;
99    type Output = Dedup<Hash>;
100    type Message = LogSyncMessage<L>;
101
102    async fn run(
103        mut self,
104        sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
105        stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
106    ) -> Result<Self::Output, Self::Error> {
107        let mut sync_done_received = false;
108        let mut sync_done_sent = false;
109        let mut dedup = Dedup::new(self.buffer_capacity);
110
111        loop {
112            match self.state {
113                State::Start => {
114                    let metrics = LogSyncMetrics::default();
115                    self.event_tx
116                        .send(
117                            LogSyncEvent::Status(LogSyncStatus::Started {
118                                metrics: metrics.clone(),
119                            })
120                            .into(),
121                        )
122                        .map_err(|_| LogSyncError::BroadcastSend)?;
123                    self.state = State::SendHave { metrics };
124                }
125                State::SendHave { metrics } => {
126                    let local_log_heights = local_log_heights(&self.store, &self.logs).await?;
127                    sink.send(LogSyncMessage::<L>::Have(local_log_heights.clone()))
128                        .await
129                        .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
130                    self.state = State::ReceiveHave { metrics };
131                }
132                State::ReceiveHave { mut metrics } => {
133                    let Some(message) = stream.next().await else {
134                        return Err(LogSyncError::UnexpectedStreamClosure);
135                    };
136                    let message =
137                        message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
138                    let LogSyncMessage::Have(remote_log_heights) = message else {
139                        return Err(LogSyncError::UnexpectedMessage(message.to_string()));
140                    };
141
142                    let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
143                        remote_log_heights.clone().into_iter().collect();
144
145                    // We only fetch the hashes of the operations we should send to the remote in
146                    // this step. This avoids keeping all headers and payloads in memory, we can
147                    // fetch one at a time as they are needed within the sync phase later.
148                    let (operations, total_size) = operations_needed_by_remote(
149                        &self.store,
150                        &self.logs,
151                        remote_log_heights_map,
152                    )
153                    .await?;
154
155                    metrics.total_operations_local = Some(operations.len() as u64);
156                    metrics.total_bytes_local = Some(total_size);
157
158                    self.state = State::SendPreSyncOrDone {
159                        operations,
160                        metrics,
161                    };
162                }
163                State::SendPreSyncOrDone {
164                    operations,
165                    metrics,
166                } => {
167                    let total_operations = metrics.total_operations_local.unwrap();
168                    let total_bytes = metrics.total_bytes_local.unwrap();
169
170                    if total_operations > 0 {
171                        sink.send(LogSyncMessage::PreSync {
172                            total_bytes,
173                            total_operations,
174                        })
175                        .await
176                        .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
177                    } else {
178                        sink.send(LogSyncMessage::Done)
179                            .await
180                            .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
181                    }
182
183                    self.event_tx
184                        .send(
185                            LogSyncEvent::Status(LogSyncStatus::Progress {
186                                metrics: metrics.clone(),
187                            })
188                            .into(),
189                        )
190                        .map_err(|_| LogSyncError::BroadcastSend)?;
191
192                    self.state = State::ReceivePreSyncOrDone {
193                        operations,
194                        metrics,
195                    };
196                }
197                State::ReceivePreSyncOrDone {
198                    operations,
199                    mut metrics,
200                } => {
201                    let Some(message) = stream.next().await else {
202                        return Err(LogSyncError::UnexpectedStreamClosure);
203                    };
204                    let message =
205                        message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
206
207                    metrics.total_bytes_remote = Some(0);
208                    metrics.total_operations_remote = Some(0);
209
210                    match message {
211                        LogSyncMessage::PreSync {
212                            total_operations,
213                            total_bytes,
214                        } => {
215                            metrics.total_bytes_remote = Some(total_bytes);
216                            metrics.total_operations_remote = Some(total_operations);
217                        }
218                        LogSyncMessage::Done => sync_done_received = true,
219                        message => {
220                            return Err(LogSyncError::UnexpectedMessage(message.to_string()));
221                        }
222                    }
223
224                    self.event_tx
225                        .send(
226                            LogSyncEvent::Status(LogSyncStatus::Progress {
227                                metrics: metrics.clone(),
228                            })
229                            .into(),
230                        )
231                        .map_err(|_| LogSyncError::BroadcastSend)?;
232
233                    self.state = State::Sync {
234                        operations,
235                        metrics,
236                    };
237                }
238                State::Sync {
239                    operations,
240                    mut metrics,
241                } => {
242                    let mut operation_stream = stream::iter(operations);
243                    let mut sent_operations = 0;
244                    let total_operations = metrics
245                        .total_operations_local
246                        .expect("total operations set");
247
248                    // We perform a loop awaiting futures on both the receiving stream and the list
249                    // of operations we have to send. This means that processing of both streams is
250                    // done concurrently.
251                    loop {
252                        select! {
253                            message = stream.next(), if !sync_done_received => {
254                                let Some(message) = message else {
255                                    break;
256                                };
257                                let message =
258                                    message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
259                                match message {
260                                    LogSyncMessage::Operation(header, body) => {
261                                        metrics.total_bytes_received += {
262                                            header.len()
263                                                + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
264                                        } as u64;
265                                        metrics.total_operations_received += 1;
266
267                                        // TODO: validate that the operations and bytes received
268                                        // matches the total bytes the remote sent in their PreSync
269                                        // message.
270                                        let header: Header<E> = decode_cbor(&header[..])?;
271                                        let body = body.map(|ref bytes| Body::new(bytes));
272
273                                        // Insert message hash into deduplication buffer.
274                                        //
275                                        // NOTE: we don't deduplicate any received messages during
276                                        // sync as for this session they have not been seen before.
277                                        dedup.insert(header.hash());
278
279                                        // Forward data received from the remote to the app layer.
280                                        self.event_tx
281                                            .send(
282                                                LogSyncEvent::Data(Box::new(Operation {
283                                                    hash: header.hash(),
284                                                    header,
285                                                    body,
286                                                }))
287                                                .into(),
288                                            )
289                                            .map_err(|_| LogSyncError::BroadcastSend)?;
290                                    }
291                                    LogSyncMessage::Done => {
292                                        sync_done_received = true;
293                                    }
294                                    message => {
295                                        return Err(LogSyncError::UnexpectedMessage(message.to_string()));
296                                    }
297                                }
298                            },
299                            Some(hash) = operation_stream.next() => {
300                                // Insert message hash into deduplication buffer.
301                                dedup.insert(hash);
302
303                                // Fetch raw message bytes and send to remote.
304                                let (header, body) = self
305                                    .store
306                                    .get_raw_operation(hash)
307                                    .await
308                                    .map_err(|err| LogSyncError::OperationStore(format!("{err}")))?
309                                    .expect("operation to be in store");
310
311                                metrics.total_bytes_sent += {
312                                    header.len() + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
313                                } as u64;
314                                metrics.total_operations_sent += 1;
315
316                                sink.send(LogSyncMessage::Operation(header, body))
317                                    .await
318                                    .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
319                                sent_operations += 1;
320
321                                if sent_operations >= total_operations {
322                                    sink.send(LogSyncMessage::Done)
323                                        .await
324                                        .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
325                                    sync_done_sent = true;
326                                }
327                            },
328                            else => {
329                                // If both streams are empty (they return None), or we received a
330                                // sync done message and we sent all our pending operations, exit
331                                // the loop.
332                                break;
333                            }
334                        }
335                        if sync_done_received && sync_done_sent {
336                            break;
337                        }
338                    }
339                    self.state = State::End { metrics };
340                }
341                State::End { metrics } => {
342                    self.event_tx
343                        .send(
344                            LogSyncEvent::Status(LogSyncStatus::Completed {
345                                metrics: metrics.clone(),
346                            })
347                            .into(),
348                        )
349                        .map_err(|_| LogSyncError::BroadcastSend)?;
350                    break;
351                }
352            }
353        }
354
355        Ok(dedup)
356    }
357}
358
359/// Return the local log heights of all passed logs.
360async fn local_log_heights<L, E, S>(
361    store: &S,
362    logs: &Logs<L>,
363) -> Result<Vec<(PublicKey, Vec<(L, u64)>)>, LogSyncError>
364where
365    L: LogId,
366    S: LogStore<L, E> + OperationStore<L, E>,
367{
368    let mut local_log_heights = Vec::new();
369    for (public_key, log_ids) in logs {
370        let mut log_heights = Vec::new();
371        for log_id in log_ids {
372            let latest = store
373                .latest_operation(public_key, log_id)
374                .await
375                .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
376
377            if let Some((header, _)) = latest {
378                log_heights.push((log_id.clone(), header.seq_num));
379            };
380        }
381        local_log_heights.push((*public_key, log_heights));
382    }
383
384    Ok(local_log_heights)
385}
386
387/// Compare the local log heights with the remote log heights for all given logs and return the
388/// hashes of all operations the remote needs, as well as the total bytes.
389async fn operations_needed_by_remote<L, E, S>(
390    store: &S,
391    logs: &Logs<L>,
392    remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>>,
393) -> Result<(Vec<Hash>, u64), LogSyncError>
394where
395    L: LogId,
396    E: Extensions,
397    S: LogStore<L, E> + OperationStore<L, E>,
398{
399    // Now that the topic query has been translated into a collection of logs we want to
400    // compare our own local log heights with what the remote sent for this topic query.
401    //
402    // If our logs are more advanced for any log we should collect the entries for sending.
403    let mut operations = Vec::new();
404    let mut total_size = 0;
405
406    for (public_key, log_ids) in logs {
407        for log_id in log_ids {
408            // For all logs in this topic query scope get the local height.
409            let latest_operation = store
410                .latest_operation(public_key, log_id)
411                .await
412                .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
413
414            let log_height = match latest_operation {
415                Some((header, _)) => header.seq_num,
416                // If we don't have this log then continue onto the next without
417                // sending any messages.
418                None => continue,
419            };
420
421            // Calculate from which seq num in the log the remote needs operations.
422            let remote_needs_from = match remote_log_heights_map.get(public_key) {
423                Some(log_heights) => {
424                    match log_heights.iter().find(|(id, _)| *id == *log_id) {
425                        // The log is known by the remote, take their log height
426                        // and plus one.
427                        Some((_, log_height)) => log_height + 1,
428                        // The log is not known, they need from seq num 0
429                        None => 0,
430                    }
431                }
432                // The author is not known, they need from seq num 0.
433                None => 0,
434            };
435
436            if remote_needs_from <= log_height {
437                let log = store
438                    .get_log_hashes(public_key, log_id, Some(remote_needs_from))
439                    .await
440                    .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
441
442                if let Some(log) = log {
443                    operations.extend(log);
444                }
445
446                let size = store
447                    .get_log_size(public_key, log_id, Some(remote_needs_from))
448                    .await
449                    .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
450
451                if let Some(size) = size {
452                    total_size += size;
453                }
454            };
455        }
456    }
457
458    Ok((operations, total_size))
459}
460
461/// Protocol messages.
462#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
463#[serde(tag = "type", content = "value")]
464pub enum LogSyncMessage<L> {
465    Have(Vec<(PublicKey, Vec<(L, u64)>)>),
466    PreSync {
467        total_operations: u64,
468        total_bytes: u64,
469    },
470    // TODO: use Header and Body here.
471    Operation(Vec<u8>, Option<Vec<u8>>),
472    Done,
473}
474
475impl<L> Display for LogSyncMessage<L>
476where
477    L: Debug,
478{
479    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
480        let value = match self {
481            LogSyncMessage::Have(_) => "have",
482            LogSyncMessage::PreSync { .. } => "pre_sync",
483            LogSyncMessage::Operation(_, _) => "operation",
484            LogSyncMessage::Done => "done",
485        };
486
487        write!(f, "{value}")
488    }
489}
490
491/// Events emitted from log sync sessions.
492#[derive(Clone, Debug, PartialEq)]
493pub enum LogSyncEvent<E> {
494    Status(LogSyncStatus),
495    Data(Box<Operation<E>>),
496}
497
498/// Sync metrics emitted in event messages.
499#[derive(Clone, Debug, PartialEq, Default)]
500pub struct LogSyncMetrics {
501    pub total_operations_local: Option<u64>,
502    pub total_operations_remote: Option<u64>,
503    pub total_operations_received: u64,
504    pub total_operations_sent: u64,
505    pub total_bytes_local: Option<u64>,
506    pub total_bytes_remote: Option<u64>,
507    pub total_bytes_received: u64,
508    pub total_bytes_sent: u64,
509}
510
511/// Sync status variants sent on log sync events.
512#[derive(Clone, Debug, PartialEq)]
513pub enum LogSyncStatus {
514    Started { metrics: LogSyncMetrics },
515    Progress { metrics: LogSyncMetrics },
516    Completed { metrics: LogSyncMetrics },
517}
518
519/// Protocol error types.
520#[derive(Debug, Error)]
521pub enum LogSyncError {
522    #[error(transparent)]
523    Decode(#[from] DecodeError),
524
525    #[error("log store error: {0}")]
526    LogStore(String),
527
528    #[error("operation store error: {0}")]
529    OperationStore(String),
530
531    #[error("no active receivers when broadcasting")]
532    BroadcastSend,
533
534    #[error("log sync error sending on message sink: {0}")]
535    MessageSink(String),
536
537    #[error("log sync error receiving from message stream: {0}")]
538    MessageStream(String),
539
540    #[error("remote unexpectedly closed stream during initial sync")]
541    UnexpectedStreamClosure,
542
543    #[error("log sync received unexpected protocol message: {0}")]
544    UnexpectedMessage(String),
545}
546
547#[cfg(test)]
548mod tests {
549    use assert_matches::assert_matches;
550    use futures::StreamExt;
551    use p2panda_core::Body;
552
553    use crate::protocols::log_sync::{
554        LogSyncError, LogSyncEvent, LogSyncMetrics, LogSyncStatus, Logs, Operation,
555    };
556    use crate::test_utils::{Peer, TestLogSyncMessage, run_protocol, run_protocol_uni};
557
558    #[tokio::test]
559    async fn log_sync_no_operations() {
560        let mut peer: Peer = Peer::new(0);
561
562        let (session, mut event_rx) = peer.log_sync_protocol(&Logs::default());
563        let remote_message_rx = run_protocol_uni(
564            session,
565            &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
566        )
567        .await
568        .unwrap();
569
570        for index in 0..=3 {
571            let event = event_rx.recv().await.unwrap();
572            match index {
573                0 => {
574                    let (total_operations, total_bytes) = assert_matches!(
575                        event,
576                        LogSyncEvent::Status(LogSyncStatus::Started { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
577                         => (total_operations_remote, total_bytes_remote)
578                    );
579                    assert_eq!(total_operations, None);
580                    assert_eq!(total_bytes, None);
581                }
582                1 => {
583                    let (total_operations, total_bytes) = assert_matches!(
584                        event,
585                        LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. } })
586                         => (total_operations_local, total_bytes_local)
587                    );
588                    assert_eq!(total_operations, Some(0));
589                    assert_eq!(total_bytes, Some(0));
590                }
591                2 => {
592                    let (total_operations, total_bytes) = assert_matches!(
593                        event,
594                        LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
595                         => (total_operations_remote, total_bytes_remote)
596                    );
597                    assert_eq!(total_operations, Some(0));
598                    assert_eq!(total_bytes, Some(0));
599                }
600                3 => {
601                    let (total_operations, total_bytes) = assert_matches!(
602                        event,
603                        LogSyncEvent::Status(LogSyncStatus::Completed { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
604                         => (total_operations_remote, total_bytes_remote)
605                    );
606                    assert_eq!(total_operations, Some(0));
607                    assert_eq!(total_bytes, Some(0));
608                }
609                _ => panic!(),
610            };
611        }
612
613        let messages = remote_message_rx.collect::<Vec<_>>().await;
614        assert_eq!(messages.len(), 2);
615        for (index, message) in messages.into_iter().enumerate() {
616            match index {
617                0 => assert_eq!(message, TestLogSyncMessage::Have(vec![])),
618                1 => {
619                    assert_eq!(message, TestLogSyncMessage::Done);
620                    break;
621                }
622                _ => panic!(),
623            };
624        }
625    }
626
627    #[tokio::test]
628    async fn log_sync_some_operations() {
629        let mut peer = Peer::new(0);
630        let log_id = 0;
631
632        let body = Body::new("Hello, Sloth!".as_bytes());
633        let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
634        let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
635        let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
636
637        let mut logs = Logs::default();
638        logs.insert(peer.id(), vec![log_id]);
639
640        let (session, mut event_rx) = peer.log_sync_protocol(&logs);
641        let remote_message_rx = run_protocol_uni(
642            session,
643            &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
644        )
645        .await
646        .unwrap();
647
648        let expected_bytes = header_0.payload_size
649            + header_bytes_0.len() as u64
650            + header_1.payload_size
651            + header_bytes_1.len() as u64
652            + header_2.payload_size
653            + header_bytes_2.len() as u64;
654
655        for index in 0..=3 {
656            let event = event_rx.recv().await.unwrap();
657            match index {
658                0 => {
659                    assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. }));
660                }
661                1 => {
662                    let (total_operations, total_bytes) = assert_matches!(
663                        event,
664                        LogSyncEvent::Status(LogSyncStatus::Progress {
665                            metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. }
666                        }) => (total_operations_local, total_bytes_local)
667                    );
668                    assert_eq!(total_operations, Some(3));
669
670                    assert_eq!(total_bytes, Some(expected_bytes));
671                }
672                2 => {
673                    let (total_operations, total_bytes) = assert_matches!(
674                        event,
675                        LogSyncEvent::Status(LogSyncStatus::Progress {
676                            metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
677                        }) => (total_operations_remote, total_bytes_remote)
678                    );
679                    assert_eq!(total_operations, Some(0));
680                    assert_eq!(total_bytes, Some(0));
681                }
682                3 => {
683                    let (total_operations, total_bytes) = assert_matches!(
684                        event,
685                        LogSyncEvent::Status(LogSyncStatus::Completed {
686                            metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
687                        }) => (total_operations_remote, total_bytes_remote)
688                    );
689                    assert_eq!(total_operations, Some(0));
690                    assert_eq!(total_bytes, Some(0));
691                }
692                _ => panic!(),
693            };
694        }
695
696        let messages = remote_message_rx.collect::<Vec<_>>().await;
697        assert_eq!(messages.len(), 6);
698        for (index, message) in messages.into_iter().enumerate() {
699            match index {
700                0 => assert_eq!(
701                    message,
702                    TestLogSyncMessage::Have(vec![(peer.id(), vec![(0, 2)])])
703                ),
704                1 => assert_eq!(
705                    message,
706                    TestLogSyncMessage::PreSync {
707                        total_operations: 3,
708                        total_bytes: expected_bytes
709                    }
710                ),
711                2 => {
712                    let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
713                        header,
714                        Some(body),
715                    ) => (header, body));
716                    assert_eq!(header, header_bytes_0);
717                    assert_eq!(Body::new(&body_inner), body)
718                }
719                3 => {
720                    let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
721                        header,
722                        Some(body),
723                    ) => (header, body));
724                    assert_eq!(header, header_bytes_1);
725                    assert_eq!(Body::new(&body_inner), body)
726                }
727                4 => {
728                    let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
729                        header,
730                        Some(body),
731                    ) => (header, body));
732                    assert_eq!(header, header_bytes_2);
733                    assert_eq!(Body::new(&body_inner), body)
734                }
735                5 => {
736                    assert_eq!(message, TestLogSyncMessage::Done);
737                }
738                _ => panic!(),
739            };
740        }
741    }
742
743    #[tokio::test]
744    async fn log_sync_bidirectional_exchange() {
745        const LOG_ID: u64 = 0;
746
747        let mut peer_a = Peer::new(0);
748        let mut peer_b = Peer::new(1);
749
750        let body_a = Body::new("From Alice".as_bytes());
751        let body_b = Body::new("From Bob".as_bytes());
752
753        let (header_a0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
754        let (header_a1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
755
756        let (header_b0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
757        let (header_b1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
758
759        let mut logs = Logs::default();
760        logs.insert(peer_a.id(), vec![LOG_ID]);
761        logs.insert(peer_b.id(), vec![LOG_ID]);
762
763        let (a_session, mut peer_a_event_rx) = peer_a.log_sync_protocol(&logs);
764        let (b_session, mut peer_b_event_rx) = peer_b.log_sync_protocol(&logs);
765
766        run_protocol(a_session, b_session).await.unwrap();
767
768        for index in 0..=5 {
769            let event = peer_a_event_rx.recv().await.unwrap();
770            match index {
771                0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
772                1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
773                2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
774                3 => {
775                    let (header, body_inner) = assert_matches!(
776                        event,
777                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
778                    );
779                    assert_eq!(header, header_b0);
780                    assert_eq!(body_inner.unwrap(), body_b);
781                }
782                4 => {
783                    let (header, body_inner) = assert_matches!(
784                        event,
785                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
786                    );
787                    assert_eq!(header, header_b1);
788                    assert_eq!(body_inner.unwrap(), body_b);
789                }
790                5 => {
791                    assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { .. }));
792                    break;
793                }
794                _ => panic!(),
795            }
796        }
797
798        for index in 0..=5 {
799            let event = peer_b_event_rx.recv().await.unwrap();
800            match index {
801                0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
802                1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
803                2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
804                3 => {
805                    let (header, body_inner) = assert_matches!(
806                        event,
807                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
808                    );
809                    assert_eq!(header, header_a0);
810                    assert_eq!(body_inner.unwrap(), body_a);
811                }
812                4 => {
813                    let (header, body_inner) = assert_matches!(
814                        event,
815                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
816                    );
817                    assert_eq!(header, header_a1);
818                    assert_eq!(body_inner.unwrap(), body_a);
819                }
820                5 => {
821                    let metrics = assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { metrics }) => metrics);
822                    let LogSyncMetrics {
823                        total_operations_local,
824                        total_operations_remote,
825                        total_operations_received,
826                        total_operations_sent,
827                        total_bytes_local,
828                        total_bytes_remote,
829                        total_bytes_received,
830                        total_bytes_sent,
831                    } = metrics;
832
833                    assert_eq!(total_operations_remote.unwrap(), total_operations_received);
834                    assert_eq!(total_bytes_remote.unwrap(), total_bytes_received);
835                    assert_eq!(total_operations_local.unwrap(), total_operations_sent);
836                    assert_eq!(total_bytes_local.unwrap(), total_bytes_sent);
837                }
838                _ => panic!(),
839            }
840        }
841    }
842
843    #[tokio::test]
844    async fn log_sync_unexpected_operation_before_presend() {
845        let mut peer = Peer::new(0);
846        const LOG_ID: u64 = 1;
847
848        let body = Body::new(b"unexpected op before presend");
849        let (_, header_bytes) = peer.create_operation(&body, LOG_ID).await;
850
851        let mut logs = Logs::default();
852        logs.insert(peer.id(), vec![LOG_ID]);
853
854        let (session, _event_rx) = peer.log_sync_protocol(&logs);
855
856        // Remote sends Operation without PreSync first.
857        let messages = vec![
858            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
859            TestLogSyncMessage::Operation(header_bytes.clone(), Some(body.to_bytes())),
860            TestLogSyncMessage::PreSync {
861                total_operations: 1,
862                total_bytes: 100,
863            },
864            TestLogSyncMessage::Done,
865        ];
866
867        let result = run_protocol_uni(session, &messages).await;
868        assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
869    }
870
871    #[tokio::test]
872    async fn log_sync_unexpected_presend_twice() {
873        let mut peer = Peer::new(0);
874        const LOG_ID: u64 = 1;
875
876        let body = Body::new(b"two presends");
877        peer.create_operation(&body, LOG_ID).await;
878
879        let mut logs = Logs::default();
880        logs.insert(peer.id(), vec![LOG_ID]);
881        let (session, _event_rx) = peer.log_sync_protocol(&logs);
882
883        let messages = vec![
884            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
885            TestLogSyncMessage::PreSync {
886                total_operations: 1,
887                total_bytes: 32,
888            },
889            TestLogSyncMessage::PreSync {
890                total_operations: 1,
891                total_bytes: 32,
892            },
893            TestLogSyncMessage::Done,
894        ];
895
896        let result = run_protocol_uni(session, &messages).await;
897        assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
898    }
899
900    #[tokio::test]
901    async fn log_sync_unexpected_done_before_anything() {
902        let mut peer = Peer::new(0);
903        let logs = Logs::default();
904
905        let (session, _event_rx) = peer.log_sync_protocol(&logs);
906
907        let messages = vec![TestLogSyncMessage::Done];
908        let result = run_protocol_uni(session, &messages).await;
909
910        assert!(
911            matches!(result, Err(LogSyncError::UnexpectedMessage(_))),
912            "{:?}",
913            result
914        );
915    }
916
917    #[tokio::test]
918    async fn log_sync_unexpected_have_after_presend() {
919        let mut peer = Peer::new(0);
920        const LOG_ID: u64 = 1;
921        let body = Body::new(b"bad have order");
922        peer.create_operation(&body, LOG_ID).await;
923
924        let mut logs = Logs::default();
925        logs.insert(peer.id(), vec![LOG_ID]);
926        let (session, _event_rx) = peer.log_sync_protocol(&logs);
927
928        let messages = vec![
929            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
930            TestLogSyncMessage::PreSync {
931                total_operations: 1,
932                total_bytes: 64,
933            },
934            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 99)])]), // invalid here
935            TestLogSyncMessage::Done,
936        ];
937
938        let result = run_protocol_uni(session, &messages).await;
939        assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
940    }
941}