Skip to main content

p2panda_sync/protocols/
topic_log_sync.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Two-party sync protocol over a topic associated with a collection of append-only logs.
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::hash::Hash as StdHash;
7use std::marker::PhantomData;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use futures::channel::mpsc;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use p2panda_core::{Body, Extensions, Header, Operation};
14use p2panda_store::{LogId, LogStore, OperationStore};
15use pin_project_lite::pin_project;
16use serde::{Deserialize, Serialize};
17use thiserror::Error;
18use tokio::sync::broadcast;
19use tracing::{Level, debug, enabled, trace, warn};
20
21use crate::ToSync;
22use crate::dedup::DEFAULT_BUFFER_CAPACITY;
23use crate::protocols::log_sync::{
24    LogSync, LogSyncError, LogSyncEvent, LogSyncMessage, LogSyncMetrics, LogSyncStatus,
25};
26use crate::protocols::{Logs, ShortFormat};
27use crate::traits::{Protocol, TopicMap};
28
29/// Protocol for synchronizing logs which are associated with a generic T topic.
30///
31/// The mapping of T to a set of logs is handled on the application layer using an implementation
32/// of the `TopicMap` trait.
33///
34/// After sync is complete peers optionally enter "live-mode" where concurrently received and
35/// future messages will be sent directly to the application layer and forwarded to any
36/// concurrently running sync sessions. As we may receive messages from many sync sessions
37/// concurrently, messages forwarded to a sync session in live-mode are de-duplicated in order to
38/// avoid flooding the network with redundant data.
39///
40/// It is assumed that the T topic has been negotiated between parties prior to initiating this
41/// sync protocol.
42#[derive(Debug)]
43pub struct TopicLogSync<T, S, M, L, E> {
44    pub store: S,
45    pub topic_map: M,
46    pub topic: T,
47    pub event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
48    pub live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
49    pub buffer_capacity: usize,
50    pub _phantom: PhantomData<L>,
51}
52
53impl<T, S, M, L, E> TopicLogSync<T, S, M, L, E>
54where
55    T: Eq + StdHash + Serialize + for<'a> Deserialize<'a>,
56    S: LogStore<L, E> + OperationStore<L, E>,
57    M: TopicMap<T, Logs<L>>,
58    L: LogId + for<'de> Deserialize<'de> + Serialize,
59    E: Extensions,
60{
61    /// Returns a new sync protocol instance, configured with a store and `TopicMap` implementation
62    /// which associates the to-be-synced logs with a given topic.
63    pub fn new(
64        topic: T,
65        store: S,
66        topic_map: M,
67        live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
68        event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
69    ) -> Self {
70        Self::new_with_capacity(
71            topic,
72            store,
73            topic_map,
74            live_mode_rx,
75            event_tx,
76            DEFAULT_BUFFER_CAPACITY,
77        )
78    }
79
80    /// Instantiates a sync protocol with custom buffer capacity.
81    pub fn new_with_capacity(
82        topic: T,
83        store: S,
84        topic_map: M,
85        live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
86        event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
87        buffer_capacity: usize,
88    ) -> Self {
89        Self {
90            topic,
91            topic_map,
92            store,
93            event_tx,
94            live_mode_rx,
95            buffer_capacity,
96            _phantom: PhantomData,
97        }
98    }
99}
100
101impl<T, S, M, L, E> Protocol for TopicLogSync<T, S, M, L, E>
102where
103    T: Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
104    S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
105    M: TopicMap<T, Logs<L>> + Send + 'static,
106    L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
107    E: Extensions + Send + 'static,
108{
109    type Error = TopicLogSyncError;
110    type Message = TopicLogSyncMessage<L, E>;
111    type Output = ();
112
113    async fn run(
114        self,
115        mut sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
116        mut stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
117    ) -> Result<Self::Output, Self::Error> {
118        // TODO: check there is overlap between the local and remote topic filters and end the
119        // session now if not.
120        debug!(
121            live_mode = self.live_mode_rx.is_some(),
122            "start sync session"
123        );
124
125        // Get the log ids which are associated with this topic query.
126        let logs = self
127            .topic_map
128            .get(&self.topic)
129            .await
130            .map_err(|err| TopicLogSyncError::TopicMap(err.to_string()))?;
131
132        if enabled!(Level::DEBUG) {
133            let display_logs: HashMap<String, usize> =
134                logs.iter().map(|(k, v)| (k.fmt_short(), v.len())).collect();
135            debug!(logs = ?display_logs, "local topic logs retrieved");
136        }
137
138        if enabled!(Level::TRACE) {
139            let trace_logs: Vec<(String, &Vec<L>)> =
140                logs.iter().map(|(k, v)| (k.fmt_short(), v)).collect();
141            trace!(logs = ?trace_logs, "local topic logs retrieved");
142        }
143
144        // Run the log sync protocol passing in our local topic logs.
145        let (mut dedup, sync_metrics) = {
146            let (mut log_sync_sink, mut log_sync_stream) = sync_channels(&mut sink, &mut stream);
147            let protocol = LogSync::new_with_capacity(
148                self.store.clone(),
149                logs,
150                self.event_tx.clone(),
151                self.buffer_capacity,
152            );
153            let result = protocol.run(&mut log_sync_sink, &mut log_sync_stream).await;
154
155            // If the log sync session ended with an error, then send a "failed" event and return
156            // here with the error itself.
157            match result {
158                Ok(output) => output,
159                Err(err) => {
160                    self.event_tx
161                        .send(TopicLogSyncEvent::Failed {
162                            error: err.to_string(),
163                        })
164                        .map_err(|_| TopicLogSyncChannelError::EventSend)?;
165
166                    log_sync_sink
167                        .close()
168                        .await
169                        .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
170
171                    return Err(err.into());
172                }
173            }
174        };
175
176        let mut metrics: TopicLogSyncLiveMetrics = sync_metrics.into();
177        let result = match self.live_mode_rx {
178            None => Ok(()),
179            Some(mut live_mode_rx) => {
180                // Enter live-mode.
181                //
182                // In live-mode we process messages sent from the remote peer and received locally from a
183                // subscription or other concurrent sync sessions. In both cases we should deduplicate
184                // messages and also check they are part of our topic sub-set selection before forwarding
185                // them on the event stream, or to the remote peer.
186                let mut close_sent = false;
187                self.event_tx
188                    .send(TopicLogSyncEvent::LiveModeStarted)
189                    .map_err(|_| TopicLogSyncChannelError::EventSend)?;
190
191                let result = loop {
192                    tokio::select! {
193                        biased;
194                        Some(message) = live_mode_rx.next() => {
195                            match message {
196                                ToSync::Payload(operation) => {
197                                    if !dedup.insert(operation.hash) {
198                                        trace!(id = ?operation.hash.fmt_short(), "ignore duplicate operation sent on live-mode channel");
199                                        continue;
200                                    }
201
202                                    metrics.bytes_sent +=
203                                        operation.header.to_bytes().len() as u64 + operation.header.payload_size;
204                                    metrics.operations_sent += 1;
205
206                                    trace!(
207                                        phase = "live",
208                                        id = ?operation.hash.fmt_short(),
209                                        sent_ops = ?metrics.operations_sent,
210                                        sent_bytes = ?metrics.bytes_sent,
211                                        "sent operation"
212                                    );
213
214                                    let result = sink
215                                        .send(TopicLogSyncMessage::Live(
216                                            operation.header,
217                                            operation.body,
218                                        ))
219                                        .await
220                                        .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
221
222                                    if result.is_err() {
223                                        break result;
224                                    };
225                                }
226                                ToSync::Close => {
227                                    // We send the close and wait for the remote to close the
228                                    // connection.
229
230                                    debug!("closing sync session");
231
232                                    let result = sink
233                                        .send(TopicLogSyncMessage::Close)
234                                        .await
235                                        .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
236                                    if result.is_err() {
237                                        break result;
238                                    };
239                                    close_sent = true;
240                                }
241                            };
242                        }
243                        message = stream.next() => {
244                            let Some(message) = message else {
245                                if close_sent {
246                                    break Ok(());
247                                }
248                                break Err(TopicLogSyncError::UnexpectedStreamClosure);
249                            };
250
251                            match message {
252                                Ok(message) => {
253                                    if let TopicLogSyncMessage::Close = message {
254                                        // We received the remotes close message and should close the
255                                        // connection ourselves.
256                                        debug!("received close message from remote");
257                                        break Ok(());
258                                    };
259
260                                    let TopicLogSyncMessage::Live(header, body) = message else {
261                                        break Err(TopicLogSyncError::UnexpectedProtocolMessage(
262                                            message.to_string(),
263                                        ));
264                                    };
265
266                                    // TODO: check that this message is a part of our topic T set.
267
268                                    // Insert operation hash into deduplication buffer and if it was
269                                    // previously present do not forward the operation to the application
270                                    // layer.
271                                    if !dedup.insert(header.hash()) {
272                                        trace!(phase = "live", operation_id = ?header.hash().fmt_short(), "ignore duplicate operation sent from remote");
273                                        continue;
274                                    }
275
276                                    metrics.bytes_received += header.to_bytes().len() as u64 + header.payload_size;
277                                    metrics.operations_received += 1;
278
279                                    trace!(
280                                        phase = "live",
281                                        operation_id = ?header.hash().fmt_short(),
282                                        received_ops = %metrics.operations_received,
283                                        received_bytes = %metrics.bytes_received,
284                                        "received operation"
285                                    );
286
287                                    self.event_tx
288                                        .send(TopicLogSyncEvent::Operation(Box::new(Operation {
289                                            hash: header.hash(),
290                                            header,
291                                            body,
292                                        })))
293                                        .map_err(|_| TopicLogSyncChannelError::EventSend)?;
294                                }
295                                Err(err) => {
296                                    if close_sent {
297                                        break Ok(());
298                                    }
299                                    break Err(TopicLogSyncError::DecodeMessage(format!("{err:?}")));
300                                }
301                            }
302                        }
303                    }
304                };
305
306                self.event_tx
307                    .send(TopicLogSyncEvent::LiveModeFinished(metrics.clone()))
308                    .map_err(|_| TopicLogSyncChannelError::EventSend)?;
309
310                result
311            }
312        };
313
314        sink.close()
315            .await
316            .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
317
318        let final_event = match result.as_ref() {
319            Ok(_) => {
320                debug!(
321                    sent_ops = ?metrics.operations_sent,
322                    sent_bytes = ?metrics.bytes_sent,
323                    received_ops = %metrics.operations_received,
324                    received_bytes = %metrics.bytes_received,
325                    "sync session closed"
326                );
327                TopicLogSyncEvent::Success
328            }
329            Err(err) => {
330                warn!(error = ?err, "sync session closed with error");
331                TopicLogSyncEvent::Failed {
332                    error: err.to_string(),
333                }
334            }
335        };
336
337        self.event_tx
338            .send(final_event)
339            .map_err(|_| TopicLogSyncChannelError::EventSend)?;
340
341        result
342    }
343}
344
345/// Map raw message sink and stream into log sync protocol specific channels.
346#[allow(clippy::complexity)]
347fn sync_channels<'a, L, E>(
348    sink: &mut (impl Sink<TopicLogSyncMessage<L, E>, Error = impl Debug> + Unpin),
349    stream: &mut (impl Stream<Item = Result<TopicLogSyncMessage<L, E>, impl Debug>> + Unpin),
350) -> (
351    impl Sink<LogSyncMessage<L>, Error = TopicLogSyncChannelError> + Unpin,
352    impl Stream<Item = Result<LogSyncMessage<L>, TopicLogSyncChannelError>> + Unpin,
353) {
354    let log_sync_sink = LogSyncSink::new(sink);
355
356    let log_sync_stream = stream.by_ref().map(|message| match message {
357        Ok(TopicLogSyncMessage::Sync(message)) => Ok(message),
358        Ok(TopicLogSyncMessage::Live { .. }) | Ok(TopicLogSyncMessage::Close) => Err(
359            TopicLogSyncChannelError::MessageStream("non-protocol message received".to_string()),
360        ),
361        Err(err) => Err(TopicLogSyncChannelError::MessageStream(format!("{err:?}"))),
362    });
363
364    (log_sync_sink, log_sync_stream)
365}
366
367/// Error type occurring in topic log sync channels.
368#[derive(Debug, Error)]
369pub enum TopicLogSyncChannelError {
370    #[error("error sending on message sink: {0}")]
371    MessageSink(String),
372
373    #[error("error receiving from message stream: {0}")]
374    MessageStream(String),
375
376    #[error("no active receivers for broadcast")]
377    EventSend,
378}
379
380/// Error type occurring in topic log sync protocol.
381#[derive(Debug, Error)]
382pub enum TopicLogSyncError {
383    #[error(transparent)]
384    Sync(#[from] LogSyncError),
385
386    #[error("topic map error: {0}")]
387    TopicMap(String),
388
389    #[error("unexpected protocol message: {0}")]
390    UnexpectedProtocolMessage(String),
391
392    #[error(transparent)]
393    Channel(#[from] TopicLogSyncChannelError),
394
395    #[error("remote unexpectedly closed stream in live-mode")]
396    UnexpectedStreamClosure,
397
398    #[error("{0}")]
399    DecodeMessage(String),
400}
401
402/// Sync metrics emitted in event messages.
403#[derive(Clone, Debug, PartialEq, Default)]
404pub struct TopicLogSyncLiveMetrics {
405    pub operations_received: u64,
406    pub operations_sent: u64,
407    pub bytes_received: u64,
408    pub bytes_sent: u64,
409}
410
411impl From<LogSyncMetrics> for TopicLogSyncLiveMetrics {
412    fn from(value: LogSyncMetrics) -> TopicLogSyncLiveMetrics {
413        TopicLogSyncLiveMetrics {
414            operations_received: value.total_operations_received,
415            operations_sent: value.total_operations_sent,
416            bytes_received: value.total_bytes_received,
417            bytes_sent: value.total_bytes_sent,
418        }
419    }
420}
421
422/// Events emitted from topic log sync sessions.
423#[derive(Debug, Clone, PartialEq)]
424pub enum TopicLogSyncEvent<E> {
425    SyncStarted(LogSyncMetrics),
426    SyncStatus(LogSyncMetrics),
427    SyncFinished(LogSyncMetrics),
428    LiveModeStarted,
429    LiveModeFinished(TopicLogSyncLiveMetrics),
430    Operation(Box<Operation<E>>),
431    Success,
432    Failed { error: String },
433}
434
435impl<E> From<LogSyncEvent<E>> for TopicLogSyncEvent<E> {
436    fn from(event: LogSyncEvent<E>) -> Self {
437        match event {
438            LogSyncEvent::Status(status_event) => match status_event {
439                LogSyncStatus::Started { metrics } => TopicLogSyncEvent::SyncStarted(metrics),
440                LogSyncStatus::Progress { metrics } => TopicLogSyncEvent::SyncStatus(metrics),
441                LogSyncStatus::Completed { metrics } => TopicLogSyncEvent::SyncFinished(metrics),
442            },
443            LogSyncEvent::Data(operation) => TopicLogSyncEvent::Operation(operation),
444        }
445    }
446}
447
448/// Protocol message types.
449#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
450#[serde(tag = "type", content = "value")]
451#[allow(clippy::large_enum_variant)]
452pub enum TopicLogSyncMessage<L, E> {
453    Sync(LogSyncMessage<L>),
454    Live(Header<E>, Option<Body>),
455    Close,
456}
457
458impl<L, E> std::fmt::Display for TopicLogSyncMessage<L, E> {
459    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
460        let value = match self {
461            TopicLogSyncMessage::Sync(_) => "sync",
462            TopicLogSyncMessage::Live(_, _) => "live",
463            TopicLogSyncMessage::Close => "close",
464        };
465        write!(f, "{value}")
466    }
467}
468
469pin_project! {
470    /// Sink wrapper which converts messages and errors into the expected types.
471    pub struct LogSyncSink<S, L, E> {
472        #[pin]
473        inner: S,
474        _phantom: std::marker::PhantomData<(L, E)>,
475    }
476}
477
478impl<S, L, E> LogSyncSink<S, L, E> {
479    pub fn new(inner: S) -> Self {
480        Self {
481            inner,
482            _phantom: std::marker::PhantomData,
483        }
484    }
485}
486
487impl<S, L, E> Sink<LogSyncMessage<L>> for LogSyncSink<S, L, E>
488where
489    S: Sink<TopicLogSyncMessage<L, E>>,
490    S::Error: Debug,
491{
492    type Error = TopicLogSyncChannelError;
493
494    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
495        let this = self.project();
496        this.inner
497            .poll_ready(cx)
498            .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
499    }
500
501    fn start_send(self: Pin<&mut Self>, item: LogSyncMessage<L>) -> Result<(), Self::Error> {
502        let this = self.project();
503        let msg = TopicLogSyncMessage::Sync(item);
504        this.inner
505            .start_send(msg)
506            .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
507    }
508
509    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
510        let this = self.project();
511        this.inner
512            .poll_flush(cx)
513            .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
514    }
515
516    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
517        let this = self.project();
518        this.inner
519            .poll_close(cx)
520            .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
521    }
522}
523
524#[cfg(test)]
525pub mod tests {
526    use std::collections::HashMap;
527
528    use assert_matches::assert_matches;
529    use futures::channel::mpsc;
530    use futures::{SinkExt, StreamExt};
531    use p2panda_core::{Body, Operation};
532
533    use crate::ToSync;
534    use crate::protocols::{LogSyncError, LogSyncMessage};
535    use crate::test_utils::{
536        Peer, TestTopic, TestTopicSyncEvent, TestTopicSyncMessage, run_protocol, run_protocol_uni,
537        setup_logging,
538    };
539    use crate::traits::Protocol;
540
541    use super::{TopicLogSyncError, TopicLogSyncEvent, TopicLogSyncLiveMetrics};
542
543    #[tokio::test]
544    async fn sync_session_no_operations() {
545        let topic = TestTopic::new("messages");
546        let mut peer = Peer::new(0);
547        peer.insert_topic(&topic, &HashMap::default());
548
549        let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
550
551        let remote_rx = run_protocol_uni(
552            session,
553            &[
554                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
555                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
556            ],
557        )
558        .await
559        .unwrap();
560
561        assert_matches!(
562            events_rx.recv().await.unwrap(),
563            TestTopicSyncEvent::SyncStarted(_)
564        );
565        assert_matches!(
566            events_rx.recv().await.unwrap(),
567            TestTopicSyncEvent::SyncStatus(_)
568        );
569        assert_matches!(
570            events_rx.recv().await.unwrap(),
571            TestTopicSyncEvent::SyncStatus(_)
572        );
573        assert_matches!(
574            events_rx.recv().await.unwrap(),
575            TestTopicSyncEvent::SyncFinished(_)
576        );
577        assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
578
579        let messages = remote_rx.collect::<Vec<_>>().await;
580        assert_eq!(messages.len(), 2);
581        for (index, message) in messages.into_iter().enumerate() {
582            match index {
583                0 => assert_eq!(
584                    message,
585                    TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))
586                ),
587                1 => {
588                    assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
589                    break;
590                }
591                _ => panic!(),
592            };
593        }
594    }
595
596    #[tokio::test]
597    async fn sync_operations_accept() {
598        let log_id = 0;
599        let topic = TestTopic::new("messages");
600        let mut peer = Peer::new(0);
601
602        let body = Body::new("Hello, Sloth!".as_bytes());
603        let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
604        let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
605        let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
606
607        let logs = HashMap::from([(peer.id(), vec![log_id])]);
608        peer.insert_topic(&topic, &logs);
609
610        let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
611
612        let remote_rx = run_protocol_uni(
613            session,
614            &[
615                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
616                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
617            ],
618        )
619        .await
620        .unwrap();
621
622        assert_matches!(
623            events_rx.recv().await.unwrap(),
624            TestTopicSyncEvent::SyncStarted(_)
625        );
626        assert_matches!(
627            events_rx.recv().await.unwrap(),
628            TestTopicSyncEvent::SyncStatus(_)
629        );
630        assert_matches!(
631            events_rx.recv().await.unwrap(),
632            TestTopicSyncEvent::SyncStatus(_)
633        );
634        assert_matches!(
635            events_rx.recv().await.unwrap(),
636            TestTopicSyncEvent::SyncFinished(_)
637        );
638        assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
639
640        let messages = remote_rx.collect::<Vec<_>>().await;
641        assert_eq!(messages.len(), 6);
642        for (index, message) in messages.into_iter().enumerate() {
643            match index {
644                0 => assert_eq!(
645                    message,
646                    TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![(
647                        peer.id(),
648                        vec![(0, 2)]
649                    )]))
650                ),
651                1 => {
652                    let expected_bytes = header_0.payload_size
653                        + header_bytes_0.len() as u64
654                        + header_1.payload_size
655                        + header_bytes_1.len() as u64
656                        + header_2.payload_size
657                        + header_bytes_2.len() as u64;
658
659                    assert_eq!(
660                        message,
661                        TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
662                            total_operations: 3,
663                            total_bytes: expected_bytes
664                        })
665                    )
666                }
667                2 => {
668                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
669                    header,
670                    Some(body),
671                )) => (header, body));
672                    assert_eq!(header, header_bytes_0);
673                    assert_eq!(Body::new(&body_inner), body)
674                }
675                3 => {
676                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
677                    header,
678                    Some(body),
679                )) => (header, body));
680                    assert_eq!(header, header_bytes_1);
681                    assert_eq!(Body::new(&body_inner), body)
682                }
683                4 => {
684                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
685                    header,
686                    Some(body),
687                )) => (header, body));
688                    assert_eq!(header, header_bytes_2);
689                    assert_eq!(Body::new(&body_inner), body)
690                }
691                5 => {
692                    assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
693                    break;
694                }
695                _ => panic!(),
696            };
697        }
698    }
699
700    #[tokio::test]
701    async fn topic_log_sync_full_duplex() {
702        setup_logging();
703        let topic = TestTopic::new("messages");
704        let log_id = 0;
705
706        let mut peer_a = Peer::new(0);
707        let mut peer_b = Peer::new(1);
708
709        let body = Body::new("Hello, Sloth!".as_bytes());
710        let (header_0, _) = peer_a.create_operation(&body, 0).await;
711        let (header_1, _) = peer_a.create_operation(&body, 0).await;
712        let (header_2, _) = peer_a.create_operation(&body, 0).await;
713
714        let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
715        peer_a.insert_topic(&topic, &logs);
716
717        let (peer_a_session, mut peer_a_events_rx, _) =
718            peer_a.topic_sync_protocol(topic.clone(), false);
719
720        let (peer_b_session, mut peer_b_events_rx, _) =
721            peer_b.topic_sync_protocol(topic.clone(), false);
722
723        run_protocol(peer_a_session, peer_b_session).await.unwrap();
724
725        // Assert peer a events.
726        assert_matches!(
727            peer_a_events_rx.recv().await.unwrap(),
728            TestTopicSyncEvent::SyncStarted(_)
729        );
730        assert_matches!(
731            peer_a_events_rx.recv().await.unwrap(),
732            TestTopicSyncEvent::SyncStatus(_)
733        );
734        assert_matches!(
735            peer_a_events_rx.recv().await.unwrap(),
736            TestTopicSyncEvent::SyncStatus(_)
737        );
738        assert_matches!(
739            peer_a_events_rx.recv().await.unwrap(),
740            TestTopicSyncEvent::SyncFinished(_)
741        );
742        assert_matches!(
743            peer_a_events_rx.recv().await.unwrap(),
744            TestTopicSyncEvent::Success
745        );
746
747        // Assert peer b events.
748        assert_matches!(
749            peer_b_events_rx.recv().await.unwrap(),
750            TestTopicSyncEvent::SyncStarted(_)
751        );
752        assert_matches!(
753            peer_b_events_rx.recv().await.unwrap(),
754            TestTopicSyncEvent::SyncStatus(_)
755        );
756        assert_matches!(
757            peer_b_events_rx.recv().await.unwrap(),
758            TestTopicSyncEvent::SyncStatus(_)
759        );
760        let (header, body_inner) = assert_matches!(
761            peer_b_events_rx.recv().await.unwrap(),
762            TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
763        );
764        assert_eq!(header, header_0);
765        assert_eq!(body_inner.unwrap(), body);
766        let (header, body_inner) = assert_matches!(
767            peer_b_events_rx.recv().await.unwrap(),
768            TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
769        );
770        assert_eq!(header, header_1);
771        assert_eq!(body_inner.unwrap(), body);
772        let (header, body_inner) = assert_matches!(
773            peer_b_events_rx.recv().await.unwrap(),
774            TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
775        );
776        assert_eq!(header, header_2);
777        assert_eq!(body_inner.unwrap(), body);
778        assert_matches!(
779            peer_b_events_rx.recv().await.unwrap(),
780            TestTopicSyncEvent::SyncFinished(_)
781        );
782        assert_matches!(
783            peer_b_events_rx.recv().await.unwrap(),
784            TestTopicSyncEvent::Success
785        );
786    }
787
788    #[tokio::test]
789    async fn live_mode() {
790        let log_id = 0;
791        let topic = TestTopic::new("messages");
792        let mut peer_a = Peer::new(0);
793        let mut peer_b = Peer::new(1);
794
795        let body = Body::new("Hello, Sloth!".as_bytes());
796        let (header_0, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
797
798        let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
799        peer_a.insert_topic(&topic, &logs);
800
801        let logs = HashMap::default();
802        peer_a.insert_topic(&topic, &logs);
803
804        let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
805        let expected_bytes_received = header_0.payload_size
806            + header_0.to_bytes().len() as u64
807            + header_1.payload_size
808            + header_1.to_bytes().len() as u64;
809        let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
810        let expected_bytes_sent = header_2.payload_size + header_2.to_bytes().len() as u64;
811
812        let (protocol, mut events_rx, mut live_mode_tx) =
813            peer_a.topic_sync_protocol(topic.clone(), true);
814
815        live_mode_tx
816            .send(ToSync::Payload(Operation {
817                hash: header_2.hash(),
818                header: header_2.clone(),
819                body: Some(body.clone()),
820            }))
821            .await
822            .unwrap();
823        live_mode_tx.send(ToSync::Close).await.unwrap();
824
825        let total_bytes = header_bytes_0.len() + body.to_bytes().len();
826        let remote_rx = run_protocol_uni(
827            protocol,
828            &[
829                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
830                TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
831                    total_operations: 1,
832                    total_bytes: total_bytes as u64,
833                }),
834                TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
835                    header_bytes_0,
836                    Some(body.to_bytes()),
837                )),
838                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
839                TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
840                TestTopicSyncMessage::Close,
841            ],
842        )
843        .await
844        .unwrap();
845
846        for index in 0..=8 {
847            let event = events_rx.recv().await.unwrap();
848            match index {
849                0 => {
850                    assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
851                }
852                1 => {
853                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
854                }
855                2 => {
856                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
857                }
858                3 => {
859                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
860                }
861                4 => {
862                    assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
863                }
864                5 => {
865                    assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
866                }
867                6 => {
868                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
869                }
870                7 => {
871                    let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
872                    let TopicLogSyncLiveMetrics {
873                        operations_received,
874                        operations_sent,
875                        bytes_received,
876                        bytes_sent,
877                    } = metrics;
878                    assert_eq!(operations_received, 2);
879                    assert_eq!(operations_sent, 1);
880                    assert_eq!(bytes_received, expected_bytes_received);
881                    assert_eq!(bytes_sent, expected_bytes_sent);
882                }
883                8 => {
884                    assert_matches!(event, TestTopicSyncEvent::Success)
885                }
886                _ => panic!(),
887            };
888        }
889
890        let messages = remote_rx.collect::<Vec<_>>().await;
891        assert_eq!(messages.len(), 4);
892        for (index, message) in messages.into_iter().enumerate() {
893            match index {
894                0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
895                1 => {
896                    assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
897                }
898                2 => {
899                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(
900                    header,
901                    Some(body)
902                ) => (header, body));
903                    assert_eq!(header, header_2);
904                    assert_eq!(body_inner, body);
905                }
906                3 => {
907                    assert_matches!(message, TestTopicSyncMessage::Close)
908                }
909                _ => panic!(),
910            };
911        }
912    }
913
914    #[tokio::test]
915    async fn dedup_live_mode_messages() {
916        let log_id = 0;
917        let topic = TestTopic::new("messages");
918        let mut peer_a = Peer::new(0);
919        let mut peer_b = Peer::new(1);
920
921        let body = Body::new("Hello, Sloth!".as_bytes());
922        let (header_0, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
923
924        let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
925        peer_a.insert_topic(&topic, &logs);
926
927        let logs = HashMap::default();
928        peer_a.insert_topic(&topic, &logs);
929
930        let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
931        let expected_bytes_received = header_0.payload_size
932            + header_0.to_bytes().len() as u64
933            + header_1.payload_size
934            + header_1.to_bytes().len() as u64;
935        let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
936        let expected_bytes_sent = header_2.payload_size + header_2.to_bytes().len() as u64;
937
938        let (protocol, mut events_rx, mut live_mode_tx) =
939            peer_a.topic_sync_protocol(topic.clone(), true);
940
941        live_mode_tx
942            .send(ToSync::Payload(Operation {
943                hash: header_2.hash(),
944                header: header_2.clone(),
945                body: Some(body.clone()),
946            }))
947            .await
948            .unwrap();
949
950        // Sending subscription message twice.
951        live_mode_tx
952            .send(ToSync::Payload(Operation {
953                hash: header_2.hash(),
954                header: header_2.clone(),
955                body: Some(body.clone()),
956            }))
957            .await
958            .unwrap();
959
960        live_mode_tx.send(ToSync::Close).await.unwrap();
961
962        let total_bytes = header_bytes_0.len() + body.to_bytes().len();
963        let remote_rx = run_protocol_uni(
964            protocol,
965            &[
966                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
967                TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
968                    total_operations: 1,
969                    total_bytes: total_bytes as u64,
970                }),
971                TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
972                    header_bytes_0,
973                    Some(body.to_bytes()),
974                )),
975                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
976                TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
977                // Duplicate of message sent during sync.
978                TestTopicSyncMessage::Live(header_0.clone(), Some(body.clone())),
979                // Duplicate of message sent earlier in live-mode.
980                TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
981                TestTopicSyncMessage::Close,
982            ],
983        )
984        .await
985        .unwrap();
986
987        for index in 0..=8 {
988            let event = events_rx.recv().await.unwrap();
989            match index {
990                0 => {
991                    assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
992                }
993                1 => {
994                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
995                }
996                2 => {
997                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
998                }
999                3 => {
1000                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
1001                }
1002                4 => {
1003                    assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
1004                }
1005                5 => {
1006                    assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
1007                }
1008                6 => {
1009                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
1010                }
1011                7 => {
1012                    let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
1013                    let TopicLogSyncLiveMetrics {
1014                        operations_received,
1015                        operations_sent,
1016                        bytes_received,
1017                        bytes_sent,
1018                    } = metrics;
1019                    assert_eq!(operations_received, 2);
1020                    assert_eq!(operations_sent, 1);
1021                    assert_eq!(bytes_received, expected_bytes_received);
1022                    assert_eq!(bytes_sent, expected_bytes_sent);
1023                }
1024                8 => {
1025                    assert_matches!(event, TestTopicSyncEvent::Success)
1026                }
1027                _ => panic!(),
1028            };
1029        }
1030
1031        let messages = remote_rx.collect::<Vec<_>>().await;
1032        assert_eq!(messages.len(), 4);
1033        for (index, message) in messages.into_iter().enumerate() {
1034            match index {
1035                0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
1036                1 => {
1037                    assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
1038                }
1039                2 => {
1040                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(header, Some(body)) => (header, body));
1041                    assert_eq!(header, header_2);
1042                    assert_eq!(body_inner, body);
1043                }
1044                3 => {
1045                    assert_matches!(message, TestTopicSyncMessage::Close)
1046                }
1047                _ => panic!(),
1048            };
1049        }
1050    }
1051
1052    #[tokio::test]
1053    async fn unexpected_stream_closure_sync() {
1054        let topic = TestTopic::new("messages");
1055        let mut peer = Peer::new(0);
1056        peer.insert_topic(&topic, &Default::default());
1057
1058        let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
1059
1060        let messages = [TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))];
1061
1062        let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
1063        let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
1064        let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
1065
1066        for message in messages {
1067            remote_message_tx.send(message.to_owned()).await.unwrap();
1068        }
1069
1070        let handle = tokio::spawn(async move {
1071            session
1072                .run(&mut local_message_tx, &mut local_message_rx)
1073                .await
1074                .expect_err("expected unexpected stream closure")
1075        });
1076
1077        drop(remote_message_tx);
1078
1079        let err = handle.await.unwrap();
1080        assert_matches!(
1081            err,
1082            TopicLogSyncError::Sync(LogSyncError::UnexpectedStreamClosure)
1083        );
1084
1085        while let Ok(event) = events_rx.recv().await {
1086            if let TopicLogSyncEvent::Failed { error } = event {
1087                assert_eq!(
1088                    error,
1089                    "remote unexpectedly closed stream during initial sync".to_string()
1090                );
1091                break;
1092            }
1093        }
1094    }
1095
1096    #[tokio::test]
1097    async fn unexpected_stream_closure_live_mode() {
1098        let topic = TestTopic::new("messages");
1099        let mut peer = Peer::new(0);
1100        peer.insert_topic(&topic, &Default::default());
1101
1102        let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
1103
1104        let messages = [
1105            TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
1106            TestTopicSyncMessage::Sync(LogSyncMessage::Done),
1107        ];
1108
1109        let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
1110        let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
1111        let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
1112
1113        for message in messages {
1114            remote_message_tx.send(message.to_owned()).await.unwrap();
1115        }
1116
1117        let handle = tokio::spawn(async move {
1118            session
1119                .run(&mut local_message_tx, &mut local_message_rx)
1120                .await
1121                .expect_err("expected unexpected stream closure")
1122        });
1123
1124        drop(remote_message_tx);
1125
1126        let err = handle.await.unwrap();
1127        assert_matches!(err, TopicLogSyncError::UnexpectedStreamClosure);
1128
1129        while let Ok(event) = events_rx.recv().await {
1130            if let TopicLogSyncEvent::Failed { error } = event {
1131                assert_eq!(
1132                    error,
1133                    "remote unexpectedly closed stream in live-mode".to_string()
1134                );
1135                break;
1136            }
1137        }
1138    }
1139}