Skip to main content

p2panda_sync/protocols/
log_sync.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Two-party sync protocol over append-only logs.
4use std::collections::HashMap;
5use std::fmt::{Debug, Display};
6use std::marker::PhantomData;
7
8use futures::{Sink, SinkExt, Stream, StreamExt, stream};
9use p2panda_core::cbor::{DecodeError, decode_cbor};
10use p2panda_core::{Body, Extensions, Hash, Header, Operation, PublicKey};
11use p2panda_store::{LogId, LogStore, OperationStore};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::select;
15use tokio::sync::broadcast;
16use tracing::{debug, trace, warn};
17
18use crate::dedup::{DEFAULT_BUFFER_CAPACITY, Dedup};
19use crate::traits::Protocol;
20
21/// A map of author logs.
22pub type Logs<L> = HashMap<PublicKey, Vec<L>>;
23
24/// Sync session life-cycle states.
25#[derive(Default)]
26enum State<L> {
27    /// Initialise session metrics and announce sync start on event stream.
28    #[default]
29    Start,
30
31    /// Calculate local log heights and send Have message to remote.
32    SendHave { metrics: LogSyncMetrics },
33
34    /// Receive have message from remote and calculate operation diff.
35    ReceiveHave { metrics: LogSyncMetrics },
36
37    /// Send PreSync message to remote or Done if we have nothing to send.
38    SendPreSyncOrDone {
39        operations: Vec<(PublicKey, L, u64, Hash)>,
40        metrics: LogSyncMetrics,
41    },
42
43    /// Receive PreSync message from remote or Done if they have nothing to send.
44    ReceivePreSyncOrDone {
45        operations: Vec<(PublicKey, L, u64, Hash)>,
46        metrics: LogSyncMetrics,
47    },
48
49    /// Enter sync loop where we exchange operations with the remote, moves onto next state when
50    /// both peers have send Done messages.
51    Sync {
52        operations: Vec<(PublicKey, L, u64, Hash)>,
53        metrics: LogSyncMetrics,
54    },
55
56    /// Announce on the event stream that the sync session successfully completed.
57    End { metrics: LogSyncMetrics },
58}
59
60/// Efficient sync protocol for append-only log data types.
61pub struct LogSync<L, E, S, Evt> {
62    state: State<L>,
63    logs: Logs<L>,
64    store: S,
65    event_tx: broadcast::Sender<Evt>,
66    buffer_capacity: usize,
67    _marker: PhantomData<E>,
68}
69
70impl<L, E, S, Evt> LogSync<L, E, S, Evt> {
71    pub fn new(store: S, logs: Logs<L>, event_tx: broadcast::Sender<Evt>) -> Self {
72        Self::new_with_capacity(store, logs, event_tx, DEFAULT_BUFFER_CAPACITY)
73    }
74
75    pub fn new_with_capacity(
76        store: S,
77        logs: Logs<L>,
78        event_tx: broadcast::Sender<Evt>,
79        buffer_capacity: usize,
80    ) -> Self {
81        Self {
82            state: Default::default(),
83            store,
84            event_tx,
85            logs,
86            buffer_capacity,
87            _marker: PhantomData,
88        }
89    }
90}
91
92impl<L, E, S, Evt> Protocol for LogSync<L, E, S, Evt>
93where
94    L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
95    E: Extensions + Send + 'static,
96    S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
97    Evt: Debug + From<LogSyncEvent<E>> + Send + 'static,
98{
99    type Error = LogSyncError;
100    type Output = (Dedup<Hash>, LogSyncMetrics);
101    type Message = LogSyncMessage<L>;
102
103    async fn run(
104        mut self,
105        sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
106        stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
107    ) -> Result<Self::Output, Self::Error> {
108        let mut sync_done_received = false;
109        let mut sync_done_sent = false;
110        let mut dedup = Dedup::new(self.buffer_capacity);
111
112        let metrics = loop {
113            match self.state {
114                State::Start => {
115                    let metrics = LogSyncMetrics::default();
116                    self.event_tx
117                        .send(
118                            LogSyncEvent::Status(LogSyncStatus::Started {
119                                metrics: metrics.clone(),
120                            })
121                            .into(),
122                        )
123                        .map_err(|_| LogSyncError::BroadcastSend)?;
124                    self.state = State::SendHave { metrics };
125                }
126                State::SendHave { metrics } => {
127                    let local_log_heights = local_log_heights(&self.store, &self.logs).await?;
128                    sink.send(LogSyncMessage::<L>::Have(local_log_heights.clone()))
129                        .await
130                        .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
131                    self.state = State::ReceiveHave { metrics };
132                }
133                State::ReceiveHave { mut metrics } => {
134                    let Some(message) = stream.next().await else {
135                        return Err(LogSyncError::UnexpectedStreamClosure);
136                    };
137                    let message =
138                        message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
139                    let LogSyncMessage::Have(remote_log_heights) = message else {
140                        return Err(LogSyncError::UnexpectedMessage(message.to_string()));
141                    };
142
143                    let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
144                        remote_log_heights.clone().into_iter().collect();
145
146                    // We only fetch the hashes of the operations we should send to the remote in
147                    // this step. This avoids keeping all headers and payloads in memory, we can
148                    // fetch one at a time as they are needed within the sync phase later.
149                    let (operations, total_size) = operations_needed_by_remote(
150                        &self.store,
151                        &self.logs,
152                        remote_log_heights_map,
153                    )
154                    .await?;
155
156                    metrics.total_operations_local = Some(operations.len() as u64);
157                    metrics.total_bytes_local = Some(total_size);
158
159                    self.state = State::SendPreSyncOrDone {
160                        operations,
161                        metrics,
162                    };
163                }
164                State::SendPreSyncOrDone {
165                    operations,
166                    metrics,
167                } => {
168                    let total_operations = metrics.total_operations_local.unwrap();
169                    let total_bytes = metrics.total_bytes_local.unwrap();
170
171                    if total_operations > 0 {
172                        sink.send(LogSyncMessage::PreSync {
173                            total_bytes,
174                            total_operations,
175                        })
176                        .await
177                        .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
178                    } else {
179                        sink.send(LogSyncMessage::Done)
180                            .await
181                            .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
182                    }
183
184                    self.event_tx
185                        .send(
186                            LogSyncEvent::Status(LogSyncStatus::Progress {
187                                metrics: metrics.clone(),
188                            })
189                            .into(),
190                        )
191                        .map_err(|_| LogSyncError::BroadcastSend)?;
192
193                    self.state = State::ReceivePreSyncOrDone {
194                        operations,
195                        metrics,
196                    };
197                }
198                State::ReceivePreSyncOrDone {
199                    operations,
200                    mut metrics,
201                } => {
202                    let Some(message) = stream.next().await else {
203                        return Err(LogSyncError::UnexpectedStreamClosure);
204                    };
205                    let message =
206                        message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
207
208                    metrics.total_bytes_remote = Some(0);
209                    metrics.total_operations_remote = Some(0);
210
211                    match message {
212                        LogSyncMessage::PreSync {
213                            total_operations,
214                            total_bytes,
215                        } => {
216                            metrics.total_bytes_remote = Some(total_bytes);
217                            metrics.total_operations_remote = Some(total_operations);
218                        }
219                        LogSyncMessage::Done => sync_done_received = true,
220                        message => {
221                            return Err(LogSyncError::UnexpectedMessage(message.to_string()));
222                        }
223                    }
224
225                    debug!(
226                        local_ops = metrics.total_operations_local.unwrap_or_default(),
227                        remote_ops = metrics.total_operations_remote.unwrap_or_default(),
228                        local_bytes = metrics.total_bytes_local.unwrap_or_default(),
229                        remote_bytes = metrics.total_bytes_remote.unwrap_or_default(),
230                        "sync metrics received",
231                    );
232
233                    self.event_tx
234                        .send(
235                            LogSyncEvent::Status(LogSyncStatus::Progress {
236                                metrics: metrics.clone(),
237                            })
238                            .into(),
239                        )
240                        .map_err(|_| LogSyncError::BroadcastSend)?;
241
242                    self.state = State::Sync {
243                        operations,
244                        metrics,
245                    };
246                }
247                State::Sync {
248                    operations,
249                    mut metrics,
250                } => {
251                    let mut operation_stream = stream::iter(operations);
252                    let mut sent_operations = 0;
253                    let total_operations = metrics
254                        .total_operations_local
255                        .expect("total operations set");
256
257                    // We perform a loop awaiting futures on both the receiving stream and the list
258                    // of operations we have to send. This means that processing of both streams is
259                    // done concurrently.
260                    loop {
261                        select! {
262                            message = stream.next(), if !sync_done_received => {
263                                let Some(message) = message else {
264                                    break;
265                                };
266                                let message =
267                                    message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
268                                match message {
269                                    LogSyncMessage::Operation(header, body) => {
270                                        metrics.total_bytes_received += {
271                                            header.len()
272                                                + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
273                                        } as u64;
274                                        metrics.total_operations_received += 1;
275
276                                        // TODO: validate that the operations and bytes received
277                                        // matches the total bytes the remote sent in their PreSync
278                                        // message.
279                                        let header: Header<E> = decode_cbor(&header[..])?;
280                                        let body = body.map(|ref bytes| Body::new(bytes));
281
282                                        // Insert message hash into deduplication buffer.
283                                        //
284                                        // NOTE: we don't deduplicate any received messages during
285                                        // sync as for this session they have not been seen before.
286                                        dedup.insert(header.hash());
287
288                                        trace!(
289                                            phase = "sync",
290                                            id = ?header.hash().fmt_short(),
291                                            received_ops = metrics.total_operations_received,
292                                            received_bytes = metrics.total_bytes_received,
293                                            "received operation"
294                                        );
295
296                                        // Forward data received from the remote to the app layer.
297                                        self.event_tx
298                                            .send(
299                                                LogSyncEvent::Data(Box::new(Operation {
300                                                    hash: header.hash(),
301                                                    header,
302                                                    body,
303                                                }))
304                                                .into(),
305                                            )
306                                            .map_err(|_| LogSyncError::BroadcastSend)?;
307                                    }
308                                    LogSyncMessage::Done => {
309                                        sync_done_received = true;
310                                    }
311                                    message => {
312                                        return Err(LogSyncError::UnexpectedMessage(message.to_string()));
313                                    }
314                                }
315                            },
316                            Some((public_key, log_id, seq_num, hash)) = operation_stream.next() => {
317                                // Insert message hash into deduplication buffer.
318                                dedup.insert(hash);
319
320                                // Fetch raw message bytes and send to remote.
321                                let result = self
322                                    .store
323                                    .get_raw_operation(hash)
324                                    .await
325                                    .map_err(|err| LogSyncError::OperationStore(format!("{err}")))?;
326
327                                // For testing, where the store is an in memory implementation
328                                // which may not yield work to other tasks, we force the task to
329                                // yield in order to test concurrent database actions occurring
330                                // during sync.
331                                #[cfg(test)]
332                                tokio::task::yield_now().await;
333
334                                if let Some((header, body)) = result {
335                                    metrics.total_bytes_sent += {
336                                        header.len() + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
337                                    } as u64;
338                                    metrics.total_operations_sent += 1;
339
340                                    trace!(
341                                        phase = "sync",
342                                        public_key = %public_key.fmt_short(),
343                                        log_id = ?log_id,
344                                        seq_num,
345                                        id = %hash.fmt_short(),
346                                        sent_ops = metrics.total_operations_sent,
347                                        sent_bytes = metrics.total_bytes_sent,
348                                        "send operation",
349                                    );
350
351                                    sink.send(LogSyncMessage::Operation(header, body))
352                                        .await
353                                        .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
354                                } else {
355                                    warn!(
356                                        phase = "sync",
357                                        log_id = ?log_id,
358                                        seq_num,
359                                        id = %hash.fmt_short(),
360                                        "expected operation missing from store",
361                                    );
362                                };
363
364                                sent_operations += 1;
365
366                                if sent_operations >= total_operations {
367                                    sink.send(LogSyncMessage::Done)
368                                        .await
369                                        .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
370                                    sync_done_sent = true;
371                                }
372                            },
373                            else => {
374                                // If both streams are empty (they return None), or we received a
375                                // sync done message and we sent all our pending operations, exit
376                                // the loop.
377                                break;
378                            }
379                        }
380                        if sync_done_received && sync_done_sent {
381                            break;
382                        }
383                    }
384                    self.state = State::End { metrics };
385                }
386                State::End { metrics } => {
387                    self.event_tx
388                        .send(
389                            LogSyncEvent::Status(LogSyncStatus::Completed {
390                                metrics: metrics.clone(),
391                            })
392                            .into(),
393                        )
394                        .map_err(|_| LogSyncError::BroadcastSend)?;
395                    break metrics;
396                }
397            }
398        };
399
400        Ok((dedup, metrics))
401    }
402}
403
404/// Return the local log heights of all passed logs.
405async fn local_log_heights<L, E, S>(
406    store: &S,
407    logs: &Logs<L>,
408) -> Result<Vec<(PublicKey, Vec<(L, u64)>)>, LogSyncError>
409where
410    L: LogId,
411    S: LogStore<L, E> + OperationStore<L, E>,
412{
413    let mut local_log_heights = Vec::new();
414    for (public_key, log_ids) in logs {
415        let mut log_heights = Vec::new();
416        for log_id in log_ids {
417            let latest = store
418                .latest_operation(public_key, log_id)
419                .await
420                .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
421
422            if let Some((header, _)) = latest {
423                log_heights.push((log_id.clone(), header.seq_num));
424            };
425        }
426        local_log_heights.push((*public_key, log_heights));
427    }
428
429    Ok(local_log_heights)
430}
431
432/// Compare the local log heights with the remote log heights for all given logs and return the
433/// hashes of all operations the remote needs, as well as the total bytes.
434async fn operations_needed_by_remote<L, E, S>(
435    store: &S,
436    local_logs: &Logs<L>,
437    remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>>,
438) -> Result<(Vec<(PublicKey, L, u64, Hash)>, u64), LogSyncError>
439where
440    L: LogId,
441    E: Extensions,
442    S: LogStore<L, E> + OperationStore<L, E>,
443{
444    // Now that the topic query has been translated into a collection of logs we want to
445    // compare our own local log heights with what the remote sent for this topic query.
446    //
447    // If our logs are more advanced for any log we should collect the entries for sending.
448    let mut remote_needs = Vec::new();
449    let mut total_size = 0;
450
451    for (public_key, log_ids) in local_logs {
452        for log_id in log_ids {
453            // For all logs in this topic query scope get the local height.
454            let latest_operation = store
455                .latest_operation(public_key, log_id)
456                .await
457                .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
458
459            let log_height = match latest_operation {
460                Some((header, _)) => header.seq_num,
461                // If we don't have this log then continue onto the next without
462                // sending any messages.
463                None => continue,
464            };
465
466            // Calculate from which seq num in the log the remote needs operations.
467            let remote_needs_from = match remote_log_heights_map.get(public_key) {
468                Some(log_heights) => {
469                    match log_heights.iter().find(|(id, _)| *id == *log_id) {
470                        // The log is known by the remote, take their log height
471                        // and plus one.
472                        Some((_, log_height)) => log_height + 1,
473                        // The log is not known, they need from seq num 0
474                        None => 0,
475                    }
476                }
477                // The author is not known, they need from seq num 0.
478                None => 0,
479            };
480
481            if remote_needs_from <= log_height {
482                let log = store
483                    .get_log_hashes(public_key, log_id, Some(remote_needs_from))
484                    .await
485                    .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
486
487                if let Some(log) = log {
488                    // @TODO: we could avoid this extra iteration if we return the public key and
489                    // log id from the get_log_hashes method as well as the hash and seq num. We
490                    // only need the additional fields for logging, so an alternative approach
491                    // would be to place this iteration behind a debug config flag.
492                    let log: Vec<_> = log
493                        .into_iter()
494                        .map(|(seq_num, hash)| (*public_key, log_id.clone(), seq_num, hash))
495                        .collect();
496                    remote_needs.extend(log);
497                }
498
499                let size = store
500                    .get_log_size(public_key, log_id, Some(remote_needs_from))
501                    .await
502                    .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
503
504                if let Some(size) = size {
505                    total_size += size;
506                }
507            };
508        }
509    }
510
511    Ok((remote_needs, total_size))
512}
513
514/// Protocol messages.
515#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
516#[serde(tag = "type", content = "value")]
517pub enum LogSyncMessage<L> {
518    Have(Vec<(PublicKey, Vec<(L, u64)>)>),
519    PreSync {
520        total_operations: u64,
521        total_bytes: u64,
522    },
523    // TODO: use Header and Body here.
524    Operation(Vec<u8>, Option<Vec<u8>>),
525    Done,
526}
527
528impl<L> Display for LogSyncMessage<L>
529where
530    L: Debug,
531{
532    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533        let value = match self {
534            LogSyncMessage::Have(_) => "have",
535            LogSyncMessage::PreSync { .. } => "pre_sync",
536            LogSyncMessage::Operation(_, _) => "operation",
537            LogSyncMessage::Done => "done",
538        };
539
540        write!(f, "{value}")
541    }
542}
543
544/// Events emitted from log sync sessions.
545#[derive(Clone, Debug, PartialEq)]
546pub enum LogSyncEvent<E> {
547    Status(LogSyncStatus),
548    Data(Box<Operation<E>>),
549}
550
551/// Sync metrics emitted in event messages.
552#[derive(Clone, Debug, PartialEq, Default)]
553pub struct LogSyncMetrics {
554    pub total_operations_local: Option<u64>,
555    pub total_operations_remote: Option<u64>,
556    pub total_operations_received: u64,
557    pub total_operations_sent: u64,
558    pub total_bytes_local: Option<u64>,
559    pub total_bytes_remote: Option<u64>,
560    pub total_bytes_received: u64,
561    pub total_bytes_sent: u64,
562}
563
564/// Sync status variants sent on log sync events.
565#[derive(Clone, Debug, PartialEq)]
566pub enum LogSyncStatus {
567    Started { metrics: LogSyncMetrics },
568    Progress { metrics: LogSyncMetrics },
569    Completed { metrics: LogSyncMetrics },
570}
571
572/// Protocol error types.
573#[derive(Debug, Error)]
574pub enum LogSyncError {
575    #[error(transparent)]
576    Decode(#[from] DecodeError),
577
578    #[error("log store error: {0}")]
579    LogStore(String),
580
581    #[error("operation store error: {0}")]
582    OperationStore(String),
583
584    #[error("no active receivers when broadcasting")]
585    BroadcastSend,
586
587    #[error("log sync error sending on message sink: {0}")]
588    MessageSink(String),
589
590    #[error("log sync error receiving from message stream: {0}")]
591    MessageStream(String),
592
593    #[error("remote unexpectedly closed stream during initial sync")]
594    UnexpectedStreamClosure,
595
596    #[error("log sync received unexpected protocol message: {0}")]
597    UnexpectedMessage(String),
598}
599
600/// Returns a displayable string representing the underlying value in a short format, easy to read
601/// during debugging and logging.
602pub trait ShortFormat {
603    fn fmt_short(&self) -> String;
604}
605
606impl ShortFormat for PublicKey {
607    fn fmt_short(&self) -> String {
608        self.to_hex()[0..10].to_string()
609    }
610}
611
612impl ShortFormat for Hash {
613    fn fmt_short(&self) -> String {
614        self.to_hex()[0..5].to_string()
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use std::time::Duration;
621
622    use assert_matches::assert_matches;
623    use futures::StreamExt;
624    use p2panda_core::Body;
625    use p2panda_store::OperationStore;
626
627    use crate::protocols::log_sync::{
628        LogSyncError, LogSyncEvent, LogSyncMetrics, LogSyncStatus, Logs, Operation,
629    };
630    use crate::test_utils::{
631        Peer, TestLogSyncMessage, run_protocol, run_protocol_uni, setup_logging,
632    };
633
634    #[tokio::test]
635    async fn log_sync_no_operations() {
636        let mut peer: Peer = Peer::new(0);
637
638        let (session, mut event_rx) = peer.log_sync_protocol(&Logs::default());
639        let remote_message_rx = run_protocol_uni(
640            session,
641            &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
642        )
643        .await
644        .unwrap();
645
646        for index in 0..=3 {
647            let event = event_rx.recv().await.unwrap();
648            match index {
649                0 => {
650                    let (total_operations, total_bytes) = assert_matches!(
651                        event,
652                        LogSyncEvent::Status(LogSyncStatus::Started { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
653                         => (total_operations_remote, total_bytes_remote)
654                    );
655                    assert_eq!(total_operations, None);
656                    assert_eq!(total_bytes, None);
657                }
658                1 => {
659                    let (total_operations, total_bytes) = assert_matches!(
660                        event,
661                        LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. } })
662                         => (total_operations_local, total_bytes_local)
663                    );
664                    assert_eq!(total_operations, Some(0));
665                    assert_eq!(total_bytes, Some(0));
666                }
667                2 => {
668                    let (total_operations, total_bytes) = assert_matches!(
669                        event,
670                        LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
671                         => (total_operations_remote, total_bytes_remote)
672                    );
673                    assert_eq!(total_operations, Some(0));
674                    assert_eq!(total_bytes, Some(0));
675                }
676                3 => {
677                    let (total_operations, total_bytes) = assert_matches!(
678                        event,
679                        LogSyncEvent::Status(LogSyncStatus::Completed { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
680                         => (total_operations_remote, total_bytes_remote)
681                    );
682                    assert_eq!(total_operations, Some(0));
683                    assert_eq!(total_bytes, Some(0));
684                }
685                _ => panic!(),
686            };
687        }
688
689        let messages = remote_message_rx.collect::<Vec<_>>().await;
690        assert_eq!(messages.len(), 2);
691        for (index, message) in messages.into_iter().enumerate() {
692            match index {
693                0 => assert_eq!(message, TestLogSyncMessage::Have(vec![])),
694                1 => {
695                    assert_eq!(message, TestLogSyncMessage::Done);
696                    break;
697                }
698                _ => panic!(),
699            };
700        }
701    }
702
703    #[tokio::test]
704    async fn log_sync_some_operations() {
705        let mut peer = Peer::new(0);
706        let log_id = 0;
707
708        let body = Body::new("Hello, Sloth!".as_bytes());
709        let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
710        let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
711        let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
712
713        let mut logs = Logs::default();
714        logs.insert(peer.id(), vec![log_id]);
715
716        let (session, mut event_rx) = peer.log_sync_protocol(&logs);
717        let remote_message_rx = run_protocol_uni(
718            session,
719            &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
720        )
721        .await
722        .unwrap();
723
724        let expected_bytes = header_0.payload_size
725            + header_bytes_0.len() as u64
726            + header_1.payload_size
727            + header_bytes_1.len() as u64
728            + header_2.payload_size
729            + header_bytes_2.len() as u64;
730
731        for index in 0..=3 {
732            let event = event_rx.recv().await.unwrap();
733            match index {
734                0 => {
735                    assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. }));
736                }
737                1 => {
738                    let (total_operations, total_bytes) = assert_matches!(
739                        event,
740                        LogSyncEvent::Status(LogSyncStatus::Progress {
741                            metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. }
742                        }) => (total_operations_local, total_bytes_local)
743                    );
744                    assert_eq!(total_operations, Some(3));
745
746                    assert_eq!(total_bytes, Some(expected_bytes));
747                }
748                2 => {
749                    let (total_operations, total_bytes) = assert_matches!(
750                        event,
751                        LogSyncEvent::Status(LogSyncStatus::Progress {
752                            metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
753                        }) => (total_operations_remote, total_bytes_remote)
754                    );
755                    assert_eq!(total_operations, Some(0));
756                    assert_eq!(total_bytes, Some(0));
757                }
758                3 => {
759                    let (total_operations, total_bytes) = assert_matches!(
760                        event,
761                        LogSyncEvent::Status(LogSyncStatus::Completed {
762                            metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
763                        }) => (total_operations_remote, total_bytes_remote)
764                    );
765                    assert_eq!(total_operations, Some(0));
766                    assert_eq!(total_bytes, Some(0));
767                }
768                _ => panic!(),
769            };
770        }
771
772        let messages = remote_message_rx.collect::<Vec<_>>().await;
773        assert_eq!(messages.len(), 6);
774        for (index, message) in messages.into_iter().enumerate() {
775            match index {
776                0 => assert_eq!(
777                    message,
778                    TestLogSyncMessage::Have(vec![(peer.id(), vec![(0, 2)])])
779                ),
780                1 => assert_eq!(
781                    message,
782                    TestLogSyncMessage::PreSync {
783                        total_operations: 3,
784                        total_bytes: expected_bytes
785                    }
786                ),
787                2 => {
788                    let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
789                        header,
790                        Some(body),
791                    ) => (header, body));
792                    assert_eq!(header, header_bytes_0);
793                    assert_eq!(Body::new(&body_inner), body)
794                }
795                3 => {
796                    let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
797                        header,
798                        Some(body),
799                    ) => (header, body));
800                    assert_eq!(header, header_bytes_1);
801                    assert_eq!(Body::new(&body_inner), body)
802                }
803                4 => {
804                    let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
805                        header,
806                        Some(body),
807                    ) => (header, body));
808                    assert_eq!(header, header_bytes_2);
809                    assert_eq!(Body::new(&body_inner), body)
810                }
811                5 => {
812                    assert_eq!(message, TestLogSyncMessage::Done);
813                }
814                _ => panic!(),
815            };
816        }
817    }
818
819    #[tokio::test]
820    async fn log_sync_bidirectional_exchange() {
821        const LOG_ID: u64 = 0;
822
823        let mut peer_a = Peer::new(0);
824        let mut peer_b = Peer::new(1);
825
826        let body_a = Body::new("From Alice".as_bytes());
827        let body_b = Body::new("From Bob".as_bytes());
828
829        let (header_a0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
830        let (header_a1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
831
832        let (header_b0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
833        let (header_b1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
834
835        let mut logs = Logs::default();
836        logs.insert(peer_a.id(), vec![LOG_ID]);
837        logs.insert(peer_b.id(), vec![LOG_ID]);
838
839        let (a_session, mut peer_a_event_rx) = peer_a.log_sync_protocol(&logs);
840        let (b_session, mut peer_b_event_rx) = peer_b.log_sync_protocol(&logs);
841
842        run_protocol(a_session, b_session).await.unwrap();
843
844        for index in 0..=5 {
845            let event = peer_a_event_rx.recv().await.unwrap();
846            match index {
847                0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
848                1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
849                2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
850                3 => {
851                    let (header, body_inner) = assert_matches!(
852                        event,
853                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
854                    );
855                    assert_eq!(header, header_b0);
856                    assert_eq!(body_inner.unwrap(), body_b);
857                }
858                4 => {
859                    let (header, body_inner) = assert_matches!(
860                        event,
861                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
862                    );
863                    assert_eq!(header, header_b1);
864                    assert_eq!(body_inner.unwrap(), body_b);
865                }
866                5 => {
867                    assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { .. }));
868                    break;
869                }
870                _ => panic!(),
871            }
872        }
873
874        for index in 0..=5 {
875            let event = peer_b_event_rx.recv().await.unwrap();
876            match index {
877                0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
878                1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
879                2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
880                3 => {
881                    let (header, body_inner) = assert_matches!(
882                        event,
883                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
884                    );
885                    assert_eq!(header, header_a0);
886                    assert_eq!(body_inner.unwrap(), body_a);
887                }
888                4 => {
889                    let (header, body_inner) = assert_matches!(
890                        event,
891                        LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
892                    );
893                    assert_eq!(header, header_a1);
894                    assert_eq!(body_inner.unwrap(), body_a);
895                }
896                5 => {
897                    let metrics = assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { metrics }) => metrics);
898                    let LogSyncMetrics {
899                        total_operations_local,
900                        total_operations_remote,
901                        total_operations_received,
902                        total_operations_sent,
903                        total_bytes_local,
904                        total_bytes_remote,
905                        total_bytes_received,
906                        total_bytes_sent,
907                    } = metrics;
908
909                    assert_eq!(total_operations_remote.unwrap(), total_operations_received);
910                    assert_eq!(total_bytes_remote.unwrap(), total_bytes_received);
911                    assert_eq!(total_operations_local.unwrap(), total_operations_sent);
912                    assert_eq!(total_bytes_local.unwrap(), total_bytes_sent);
913                }
914                _ => panic!(),
915            }
916        }
917    }
918
919    #[tokio::test]
920    async fn log_sync_unexpected_operation_before_presend() {
921        let mut peer = Peer::new(0);
922        const LOG_ID: u64 = 1;
923
924        let body = Body::new(b"unexpected op before presend");
925        let (_, header_bytes) = peer.create_operation(&body, LOG_ID).await;
926
927        let mut logs = Logs::default();
928        logs.insert(peer.id(), vec![LOG_ID]);
929
930        let (session, _event_rx) = peer.log_sync_protocol(&logs);
931
932        // Remote sends Operation without PreSync first.
933        let messages = vec![
934            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
935            TestLogSyncMessage::Operation(header_bytes.clone(), Some(body.to_bytes())),
936            TestLogSyncMessage::PreSync {
937                total_operations: 1,
938                total_bytes: 100,
939            },
940            TestLogSyncMessage::Done,
941        ];
942
943        let result = run_protocol_uni(session, &messages).await;
944        assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
945    }
946
947    #[tokio::test]
948    async fn log_sync_unexpected_presend_twice() {
949        let mut peer = Peer::new(0);
950        const LOG_ID: u64 = 1;
951
952        let body = Body::new(b"two presends");
953        peer.create_operation(&body, LOG_ID).await;
954
955        let mut logs = Logs::default();
956        logs.insert(peer.id(), vec![LOG_ID]);
957        let (session, _event_rx) = peer.log_sync_protocol(&logs);
958
959        let messages = vec![
960            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
961            TestLogSyncMessage::PreSync {
962                total_operations: 1,
963                total_bytes: 32,
964            },
965            TestLogSyncMessage::PreSync {
966                total_operations: 1,
967                total_bytes: 32,
968            },
969            TestLogSyncMessage::Done,
970        ];
971
972        let result = run_protocol_uni(session, &messages).await;
973        assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
974    }
975
976    #[tokio::test]
977    async fn log_sync_unexpected_done_before_anything() {
978        let mut peer = Peer::new(0);
979        let logs = Logs::default();
980
981        let (session, _event_rx) = peer.log_sync_protocol(&logs);
982
983        let messages = vec![TestLogSyncMessage::Done];
984        let result = run_protocol_uni(session, &messages).await;
985
986        assert!(
987            matches!(result, Err(LogSyncError::UnexpectedMessage(_))),
988            "{:?}",
989            result
990        );
991    }
992
993    #[tokio::test]
994    async fn log_sync_unexpected_have_after_presend() {
995        let mut peer = Peer::new(0);
996        const LOG_ID: u64 = 1;
997        let body = Body::new(b"bad have order");
998        peer.create_operation(&body, LOG_ID).await;
999
1000        let mut logs = Logs::default();
1001        logs.insert(peer.id(), vec![LOG_ID]);
1002        let (session, _event_rx) = peer.log_sync_protocol(&logs);
1003
1004        let messages = vec![
1005            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
1006            TestLogSyncMessage::PreSync {
1007                total_operations: 1,
1008                total_bytes: 64,
1009            },
1010            TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 99)])]), // invalid here
1011            TestLogSyncMessage::Done,
1012        ];
1013
1014        let result = run_protocol_uni(session, &messages).await;
1015        assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
1016    }
1017
1018    #[tokio::test]
1019    async fn log_sync_with_concurrently_pruned_log() {
1020        setup_logging();
1021
1022        let mut peer_a = Peer::new(0);
1023        let mut peer_b = Peer::new(1);
1024
1025        let body = Body::new(&[0; 10000]);
1026
1027        // Load up the peer with three logs.
1028        for _ in 0..30 {
1029            let _ = peer_a.create_operation(&body, 0).await;
1030        }
1031        for _ in 0..30 {
1032            let _ = peer_a.create_operation(&body, 1).await;
1033        }
1034        let mut to_be_pruned_log = vec![];
1035        for _ in 0..30 {
1036            let (header, _) = peer_a.create_operation(&body, 2).await;
1037            to_be_pruned_log.push(header.hash());
1038        }
1039
1040        let mut logs = Logs::default();
1041        logs.insert(peer_a.id(), vec![0, 1, 2]);
1042
1043        let (a_session, _peer_b_event_rx) = peer_a.log_sync_protocol(&logs);
1044        let (b_session, mut peer_b_event_rx) = peer_b.log_sync_protocol(&logs);
1045
1046        let _peer_b_event_tx_clone = b_session.event_tx.clone();
1047
1048        // Spawn a task to run the sync session.
1049        tokio::spawn(async move {
1050            run_protocol(a_session, b_session).await.unwrap();
1051        });
1052
1053        // Concurrently delete the first operation from the last log.
1054        tokio::time::sleep(Duration::from_micros(1)).await;
1055        peer_a
1056            .store
1057            .delete_operation(to_be_pruned_log[0])
1058            .await
1059            .unwrap();
1060
1061        loop {
1062            let event = peer_b_event_rx.recv().await.unwrap();
1063            if let LogSyncEvent::Status(LogSyncStatus::Completed { metrics }) = event {
1064                let LogSyncMetrics {
1065                    total_operations_remote,
1066                    total_operations_received,
1067                    ..
1068                } = metrics;
1069
1070                // We expect all operations to be included in the total remote operations as these
1071                // were calculated before pruning occurred.
1072                assert_eq!(total_operations_remote.unwrap(), 90);
1073
1074                // One operation was not sent because it got deleted after the sync session
1075                // started.
1076                assert_eq!(total_operations_received, 89);
1077                break;
1078            }
1079        }
1080    }
1081}