p2panda_sync/protocols/
topic_log_sync.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Two-party sync protocol over a topic associated with a collection of append-only logs.
4use std::fmt::Debug;
5use std::future::ready;
6use std::hash::Hash as StdHash;
7use std::marker::PhantomData;
8
9use futures::channel::mpsc;
10use futures::{Sink, SinkExt, Stream, StreamExt};
11use p2panda_core::{Body, Extensions, Header, Operation};
12use p2panda_store::{LogId, LogStore, OperationStore};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::sync::broadcast;
16use tracing::{debug, warn};
17
18use crate::ToSync;
19use crate::dedup::DEFAULT_BUFFER_CAPACITY;
20use crate::protocols::Logs;
21use crate::protocols::log_sync::{
22    LogSync, LogSyncError, LogSyncEvent, LogSyncMessage, LogSyncMetrics, LogSyncStatus,
23};
24use crate::traits::{Protocol, TopicMap};
25
26/// Protocol for synchronizing logs which are associated with a generic T topic.
27///
28/// The mapping of T to a set of logs is handled on the application layer using an implementation
29/// of the `TopicMap` trait.
30///
31/// After sync is complete peers optionally enter "live-mode" where concurrently received and
32/// future messages will be sent directly to the application layer and forwarded to any
33/// concurrently running sync sessions. As we may receive messages from many sync sessions
34/// concurrently, messages forwarded to a sync session in live-mode are de-duplicated in order to
35/// avoid flooding the network with redundant data.
36///
37/// It is assumed that the T topic has been negotiated between parties prior to initiating this
38/// sync protocol.
39#[derive(Debug)]
40pub struct TopicLogSync<T, S, M, L, E> {
41    pub store: S,
42    pub topic_map: M,
43    pub topic: T,
44    pub event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
45    pub live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
46    pub buffer_capacity: usize,
47    pub _phantom: PhantomData<L>,
48}
49
50impl<T, S, M, L, E> TopicLogSync<T, S, M, L, E>
51where
52    T: Eq + StdHash + Serialize + for<'a> Deserialize<'a>,
53    S: LogStore<L, E> + OperationStore<L, E>,
54    M: TopicMap<T, Logs<L>>,
55    L: LogId + for<'de> Deserialize<'de> + Serialize,
56    E: Extensions,
57{
58    /// Returns a new sync protocol instance, configured with a store and `TopicMap` implementation
59    /// which associates the to-be-synced logs with a given topic.
60    pub fn new(
61        topic: T,
62        store: S,
63        topic_map: M,
64        live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
65        event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
66    ) -> Self {
67        Self::new_with_capacity(
68            topic,
69            store,
70            topic_map,
71            live_mode_rx,
72            event_tx,
73            DEFAULT_BUFFER_CAPACITY,
74        )
75    }
76
77    /// Instantiates a sync protocol with custom buffer capacity.
78    pub fn new_with_capacity(
79        topic: T,
80        store: S,
81        topic_map: M,
82        live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
83        event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
84        buffer_capacity: usize,
85    ) -> Self {
86        Self {
87            topic,
88            topic_map,
89            store,
90            event_tx,
91            live_mode_rx,
92            buffer_capacity,
93            _phantom: PhantomData,
94        }
95    }
96}
97
98impl<T, S, M, L, E> Protocol for TopicLogSync<T, S, M, L, E>
99where
100    T: Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
101    S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
102    M: TopicMap<T, Logs<L>> + Send + 'static,
103    L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
104    E: Extensions + Send + 'static,
105{
106    type Error = TopicLogSyncError;
107    type Message = TopicLogSyncMessage<L, E>;
108    type Output = ();
109
110    async fn run(
111        self,
112        mut sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
113        mut stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
114    ) -> Result<Self::Output, Self::Error> {
115        // TODO: check there is overlap between the local and remote topic filters and end the
116        // session now if not.
117
118        // Get the log ids which are associated with this topic query.
119        let logs = self
120            .topic_map
121            .get(&self.topic)
122            .await
123            .map_err(|err| TopicLogSyncError::TopicMap(err.to_string()))?;
124
125        // Run the log sync protocol passing in our local topic logs.
126        let mut dedup = {
127            let (mut log_sync_sink, mut log_sync_stream) = sync_channels(&mut sink, &mut stream);
128            let protocol = LogSync::new_with_capacity(
129                self.store.clone(),
130                logs,
131                self.event_tx.clone(),
132                self.buffer_capacity,
133            );
134            let result = protocol.run(&mut log_sync_sink, &mut log_sync_stream).await;
135
136            // If the log sync session ended with an error, then send a "failed" event and return
137            // here with the error itself.
138            match result {
139                Ok(dedup) => dedup,
140                Err(err) => {
141                    self.event_tx
142                        .send(TopicLogSyncEvent::Failed {
143                            error: err.to_string(),
144                        })
145                        .map_err(|_| TopicLogSyncChannelError::EventSend)?;
146
147                    log_sync_sink
148                        .close()
149                        .await
150                        .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
151
152                    return Err(err.into());
153                }
154            }
155        };
156
157        let Some(mut live_mode_rx) = self.live_mode_rx else {
158            sink.close()
159                .await
160                .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
161
162            self.event_tx
163                .send(TopicLogSyncEvent::Success)
164                .map_err(|_| TopicLogSyncChannelError::EventSend)?;
165
166            return Ok(());
167        };
168
169        // Enter live-mode.
170        //
171        // In live-mode we process messages sent from the remote peer and received locally from a
172        // subscription or other concurrent sync sessions. In both cases we should deduplicate
173        // messages and also check they are part of our topic sub-set selection before forwarding
174        // them on the event stream, or to the remote peer.
175        let mut close_sent = false;
176        let mut metrics = TopicLogSyncLiveMetrics::default();
177        self.event_tx
178            .send(TopicLogSyncEvent::LiveModeStarted)
179            .map_err(|_| TopicLogSyncChannelError::EventSend)?;
180
181        let result = loop {
182            tokio::select! {
183                biased;
184                Some(message) = live_mode_rx.next() => {
185                    match message {
186                        ToSync::Payload(operation) => {
187                            if !dedup.insert(operation.hash) {
188                                continue;
189                            }
190
191                            metrics.bytes_sent +=
192                                operation.header.to_bytes().len() as u64 + operation.header.payload_size;
193                            metrics.operations_sent += 1;
194
195                            let result = sink
196                                .send(TopicLogSyncMessage::Live(
197                                    operation.header.clone(),
198                                    operation.body.clone(),
199                                ))
200                                .await
201                                .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
202
203                            if result.is_err() {
204                                break result;
205                            };
206                        }
207                        ToSync::Close => {
208                            // We send the close and wait for the remote to close the
209                            // connection.
210                            let result = sink
211                                .send(TopicLogSyncMessage::Close)
212                                .await
213                                .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
214                            if result.is_err() {
215                                break result;
216                            };
217                            close_sent = true;
218                        }
219                    };
220                }
221                message = stream.next() => {
222                    let Some(message) = message else {
223                        debug!("remote closed the stream unexpectedly");
224                        if close_sent {
225                            debug!("remote closed the stream after we sent a close message");
226                            break Ok(());
227                        }
228                        break Err(TopicLogSyncError::UnexpectedStreamClosure);
229                    };
230
231                    match message {
232                        Ok(message) => {
233                            if let TopicLogSyncMessage::Close = message {
234                                // We received the remotes close message and should close the
235                                // connection ourselves.
236                                debug!("received close message from remote");
237                                break Ok(());
238                            };
239
240                            let TopicLogSyncMessage::Live(header, body) = message else {
241                                break Err(TopicLogSyncError::UnexpectedProtocolMessage(
242                                    message.to_string(),
243                                ));
244                            };
245
246                            // TODO: check that this message is a part of our topic T set.
247
248                            // Insert operation hash into deduplication buffer and if it was
249                            // previously present do not forward the operation to the application
250                            // layer.
251                            if !dedup.insert(header.hash()) {
252                                continue;
253                            }
254
255                            metrics.bytes_received += header.to_bytes().len() as u64 + header.payload_size;
256                            metrics.operations_received += 1;
257                            self.event_tx
258                                .send(TopicLogSyncEvent::Operation(Box::new(Operation {
259                                    hash: header.hash(),
260                                    header,
261                                    body,
262                                })))
263                                .map_err(|_| TopicLogSyncChannelError::EventSend)?;
264                        }
265                        Err(err) => {
266                            if close_sent {
267                                debug!("remote closed the stream after we sent a close message");
268                                break Ok(());
269                            }
270                            warn!("error in live-mode: {err:#?}");
271                            break Err(TopicLogSyncError::DecodeMessage(format!("{err:?}")));
272                        }
273                    }
274                }
275            }
276        };
277
278        self.event_tx
279            .send(TopicLogSyncEvent::LiveModeFinished(metrics.clone()))
280            .map_err(|_| TopicLogSyncChannelError::EventSend)?;
281
282        sink.close()
283            .await
284            .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
285
286        let final_event = match result.as_ref() {
287            Ok(_) => TopicLogSyncEvent::Success,
288            Err(err) => TopicLogSyncEvent::Failed {
289                error: err.to_string(),
290            },
291        };
292
293        self.event_tx
294            .send(final_event)
295            .map_err(|_| TopicLogSyncChannelError::EventSend)?;
296
297        result
298    }
299}
300
301/// Map raw message sink and stream into log sync protocol specific channels.
302#[allow(clippy::complexity)]
303fn sync_channels<'a, L, E>(
304    sink: &mut (impl Sink<TopicLogSyncMessage<L, E>, Error = impl Debug> + Unpin),
305    stream: &mut (impl Stream<Item = Result<TopicLogSyncMessage<L, E>, impl Debug>> + Unpin),
306) -> (
307    impl Sink<LogSyncMessage<L>, Error = TopicLogSyncChannelError> + Unpin,
308    impl Stream<Item = Result<LogSyncMessage<L>, TopicLogSyncChannelError>> + Unpin,
309) {
310    let log_sync_sink = sink
311        .sink_map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
312        .with(|message| {
313            ready(Ok::<_, TopicLogSyncChannelError>(
314                TopicLogSyncMessage::<L, E>::Sync(message),
315            ))
316        });
317
318    let log_sync_stream = stream.by_ref().map(|message| match message {
319        Ok(TopicLogSyncMessage::Sync(message)) => Ok(message),
320        Ok(TopicLogSyncMessage::Live { .. }) | Ok(TopicLogSyncMessage::Close) => Err(
321            TopicLogSyncChannelError::MessageStream("non-protocol message received".to_string()),
322        ),
323        Err(err) => Err(TopicLogSyncChannelError::MessageStream(format!("{err:?}"))),
324    });
325
326    (log_sync_sink, log_sync_stream)
327}
328
329/// Error type occurring in topic log sync channels.
330#[derive(Debug, Error)]
331pub enum TopicLogSyncChannelError {
332    #[error("error sending on message sink: {0}")]
333    MessageSink(String),
334
335    #[error("error receiving from message stream: {0}")]
336    MessageStream(String),
337
338    #[error("no active receivers for broadcast")]
339    EventSend,
340}
341
342/// Error type occurring in topic log sync protocol.
343#[derive(Debug, Error)]
344pub enum TopicLogSyncError {
345    #[error(transparent)]
346    Sync(#[from] LogSyncError),
347
348    #[error("topic map error: {0}")]
349    TopicMap(String),
350
351    #[error("unexpected protocol message: {0}")]
352    UnexpectedProtocolMessage(String),
353
354    #[error(transparent)]
355    Channel(#[from] TopicLogSyncChannelError),
356
357    #[error("remote unexpectedly closed stream in live-mode")]
358    UnexpectedStreamClosure,
359
360    #[error("{0}")]
361    DecodeMessage(String),
362}
363
364/// Sync metrics emitted in event messages.
365#[derive(Clone, Debug, PartialEq, Default)]
366pub struct TopicLogSyncLiveMetrics {
367    pub operations_received: u64,
368    pub operations_sent: u64,
369    pub bytes_received: u64,
370    pub bytes_sent: u64,
371}
372
373/// Events emitted from topic log sync sessions.
374#[derive(Debug, Clone, PartialEq)]
375pub enum TopicLogSyncEvent<E> {
376    SyncStarted(LogSyncMetrics),
377    SyncStatus(LogSyncMetrics),
378    SyncFinished(LogSyncMetrics),
379    LiveModeStarted,
380    LiveModeFinished(TopicLogSyncLiveMetrics),
381    Operation(Box<Operation<E>>),
382    Success,
383    Failed { error: String },
384}
385
386impl<E> From<LogSyncEvent<E>> for TopicLogSyncEvent<E> {
387    fn from(event: LogSyncEvent<E>) -> Self {
388        match event {
389            LogSyncEvent::Status(status_event) => match status_event {
390                LogSyncStatus::Started { metrics } => TopicLogSyncEvent::SyncStarted(metrics),
391                LogSyncStatus::Progress { metrics } => TopicLogSyncEvent::SyncStatus(metrics),
392                LogSyncStatus::Completed { metrics } => TopicLogSyncEvent::SyncFinished(metrics),
393            },
394            LogSyncEvent::Data(operation) => TopicLogSyncEvent::Operation(operation),
395        }
396    }
397}
398
399/// Protocol message types.
400#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
401#[serde(tag = "type", content = "value")]
402#[allow(clippy::large_enum_variant)]
403pub enum TopicLogSyncMessage<L, E> {
404    Sync(LogSyncMessage<L>),
405    Live(Header<E>, Option<Body>),
406    Close,
407}
408
409impl<L, E> std::fmt::Display for TopicLogSyncMessage<L, E> {
410    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
411        let value = match self {
412            TopicLogSyncMessage::Sync(_) => "sync",
413            TopicLogSyncMessage::Live(_, _) => "live",
414            TopicLogSyncMessage::Close => "close",
415        };
416        write!(f, "{value}")
417    }
418}
419
420#[cfg(test)]
421pub mod tests {
422    use std::collections::HashMap;
423
424    use assert_matches::assert_matches;
425    use futures::channel::mpsc;
426    use futures::{SinkExt, StreamExt};
427    use p2panda_core::{Body, Operation};
428
429    use crate::ToSync;
430    use crate::protocols::{LogSyncError, LogSyncMessage};
431    use crate::test_utils::{
432        Peer, TestTopic, TestTopicSyncEvent, TestTopicSyncMessage, run_protocol, run_protocol_uni,
433    };
434    use crate::traits::Protocol;
435
436    use super::{TopicLogSyncError, TopicLogSyncEvent, TopicLogSyncLiveMetrics};
437
438    #[tokio::test]
439    async fn sync_session_no_operations() {
440        let topic = TestTopic::new("messages");
441        let mut peer = Peer::new(0);
442        peer.insert_topic(&topic, &HashMap::default());
443
444        let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
445
446        let remote_rx = run_protocol_uni(
447            session,
448            &[
449                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
450                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
451            ],
452        )
453        .await
454        .unwrap();
455
456        assert_matches!(
457            events_rx.recv().await.unwrap(),
458            TestTopicSyncEvent::SyncStarted(_)
459        );
460        assert_matches!(
461            events_rx.recv().await.unwrap(),
462            TestTopicSyncEvent::SyncStatus(_)
463        );
464        assert_matches!(
465            events_rx.recv().await.unwrap(),
466            TestTopicSyncEvent::SyncStatus(_)
467        );
468        assert_matches!(
469            events_rx.recv().await.unwrap(),
470            TestTopicSyncEvent::SyncFinished(_)
471        );
472        assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
473
474        let messages = remote_rx.collect::<Vec<_>>().await;
475        assert_eq!(messages.len(), 2);
476        for (index, message) in messages.into_iter().enumerate() {
477            match index {
478                0 => assert_eq!(
479                    message,
480                    TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))
481                ),
482                1 => {
483                    assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
484                    break;
485                }
486                _ => panic!(),
487            };
488        }
489    }
490
491    #[tokio::test]
492    async fn sync_operations_accept() {
493        let log_id = 0;
494        let topic = TestTopic::new("messages");
495        let mut peer = Peer::new(0);
496
497        let body = Body::new("Hello, Sloth!".as_bytes());
498        let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
499        let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
500        let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
501
502        let logs = HashMap::from([(peer.id(), vec![log_id])]);
503        peer.insert_topic(&topic, &logs);
504
505        let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
506
507        let remote_rx = run_protocol_uni(
508            session,
509            &[
510                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
511                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
512            ],
513        )
514        .await
515        .unwrap();
516
517        assert_matches!(
518            events_rx.recv().await.unwrap(),
519            TestTopicSyncEvent::SyncStarted(_)
520        );
521        assert_matches!(
522            events_rx.recv().await.unwrap(),
523            TestTopicSyncEvent::SyncStatus(_)
524        );
525        assert_matches!(
526            events_rx.recv().await.unwrap(),
527            TestTopicSyncEvent::SyncStatus(_)
528        );
529        assert_matches!(
530            events_rx.recv().await.unwrap(),
531            TestTopicSyncEvent::SyncFinished(_)
532        );
533        assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
534
535        let messages = remote_rx.collect::<Vec<_>>().await;
536        assert_eq!(messages.len(), 6);
537        for (index, message) in messages.into_iter().enumerate() {
538            match index {
539                0 => assert_eq!(
540                    message,
541                    TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![(
542                        peer.id(),
543                        vec![(0, 2)]
544                    )]))
545                ),
546                1 => {
547                    let expected_bytes = header_0.payload_size
548                        + header_bytes_0.len() as u64
549                        + header_1.payload_size
550                        + header_bytes_1.len() as u64
551                        + header_2.payload_size
552                        + header_bytes_2.len() as u64;
553
554                    assert_eq!(
555                        message,
556                        TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
557                            total_operations: 3,
558                            total_bytes: expected_bytes
559                        })
560                    )
561                }
562                2 => {
563                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
564                    header,
565                    Some(body),
566                )) => (header, body));
567                    assert_eq!(header, header_bytes_0);
568                    assert_eq!(Body::new(&body_inner), body)
569                }
570                3 => {
571                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
572                    header,
573                    Some(body),
574                )) => (header, body));
575                    assert_eq!(header, header_bytes_1);
576                    assert_eq!(Body::new(&body_inner), body)
577                }
578                4 => {
579                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
580                    header,
581                    Some(body),
582                )) => (header, body));
583                    assert_eq!(header, header_bytes_2);
584                    assert_eq!(Body::new(&body_inner), body)
585                }
586                5 => {
587                    assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
588                    break;
589                }
590                _ => panic!(),
591            };
592        }
593    }
594
595    #[tokio::test]
596    async fn topic_log_sync_full_duplex() {
597        let topic = TestTopic::new("messages");
598        let log_id = 0;
599
600        let mut peer_a = Peer::new(0);
601        let mut peer_b = Peer::new(1);
602
603        let body = Body::new("Hello, Sloth!".as_bytes());
604        let (header_0, _) = peer_a.create_operation(&body, 0).await;
605        let (header_1, _) = peer_a.create_operation(&body, 0).await;
606        let (header_2, _) = peer_a.create_operation(&body, 0).await;
607
608        let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
609        peer_a.insert_topic(&topic, &logs);
610
611        let (peer_a_session, mut peer_a_events_rx, _) =
612            peer_a.topic_sync_protocol(topic.clone(), false);
613
614        let (peer_b_session, mut peer_b_events_rx, _) =
615            peer_b.topic_sync_protocol(topic.clone(), false);
616
617        run_protocol(peer_a_session, peer_b_session).await.unwrap();
618
619        // Assert peer a events.
620        assert_matches!(
621            peer_a_events_rx.recv().await.unwrap(),
622            TestTopicSyncEvent::SyncStarted(_)
623        );
624        assert_matches!(
625            peer_a_events_rx.recv().await.unwrap(),
626            TestTopicSyncEvent::SyncStatus(_)
627        );
628        assert_matches!(
629            peer_a_events_rx.recv().await.unwrap(),
630            TestTopicSyncEvent::SyncStatus(_)
631        );
632        assert_matches!(
633            peer_a_events_rx.recv().await.unwrap(),
634            TestTopicSyncEvent::SyncFinished(_)
635        );
636        assert_matches!(
637            peer_a_events_rx.recv().await.unwrap(),
638            TestTopicSyncEvent::Success
639        );
640
641        // Assert peer b events.
642        assert_matches!(
643            peer_b_events_rx.recv().await.unwrap(),
644            TestTopicSyncEvent::SyncStarted(_)
645        );
646        assert_matches!(
647            peer_b_events_rx.recv().await.unwrap(),
648            TestTopicSyncEvent::SyncStatus(_)
649        );
650        assert_matches!(
651            peer_b_events_rx.recv().await.unwrap(),
652            TestTopicSyncEvent::SyncStatus(_)
653        );
654        let (header, body_inner) = assert_matches!(
655            peer_b_events_rx.recv().await.unwrap(),
656            TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
657        );
658        assert_eq!(header, header_0);
659        assert_eq!(body_inner.unwrap(), body);
660        let (header, body_inner) = assert_matches!(
661            peer_b_events_rx.recv().await.unwrap(),
662            TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
663        );
664        assert_eq!(header, header_1);
665        assert_eq!(body_inner.unwrap(), body);
666        let (header, body_inner) = assert_matches!(
667            peer_b_events_rx.recv().await.unwrap(),
668            TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
669        );
670        assert_eq!(header, header_2);
671        assert_eq!(body_inner.unwrap(), body);
672        assert_matches!(
673            peer_b_events_rx.recv().await.unwrap(),
674            TestTopicSyncEvent::SyncFinished(_)
675        );
676        assert_matches!(
677            peer_b_events_rx.recv().await.unwrap(),
678            TestTopicSyncEvent::Success
679        );
680    }
681
682    #[tokio::test]
683    async fn live_mode() {
684        let log_id = 0;
685        let topic = TestTopic::new("messages");
686        let mut peer_a = Peer::new(0);
687        let mut peer_b = Peer::new(1);
688
689        let body = Body::new("Hello, Sloth!".as_bytes());
690        let (_, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
691
692        let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
693        peer_a.insert_topic(&topic, &logs);
694
695        let logs = HashMap::default();
696        peer_a.insert_topic(&topic, &logs);
697
698        let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
699        let expected_live_mode_bytes_received =
700            header_1.payload_size + header_1.to_bytes().len() as u64;
701        let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
702        let expected_live_mode_bytes_sent =
703            header_2.payload_size + header_2.to_bytes().len() as u64;
704
705        let (protocol, mut events_rx, mut live_mode_tx) =
706            peer_a.topic_sync_protocol(topic.clone(), true);
707
708        live_mode_tx
709            .send(ToSync::Payload(Operation {
710                hash: header_2.hash(),
711                header: header_2.clone(),
712                body: Some(body.clone()),
713            }))
714            .await
715            .unwrap();
716        live_mode_tx.send(ToSync::Close).await.unwrap();
717
718        let total_bytes = header_bytes_0.len() + body.to_bytes().len();
719        let remote_rx = run_protocol_uni(
720            protocol,
721            &[
722                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
723                TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
724                    total_operations: 1,
725                    total_bytes: total_bytes as u64,
726                }),
727                TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
728                    header_bytes_0,
729                    Some(body.to_bytes()),
730                )),
731                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
732                TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
733                TestTopicSyncMessage::Close,
734            ],
735        )
736        .await
737        .unwrap();
738
739        for index in 0..=8 {
740            let event = events_rx.recv().await.unwrap();
741            match index {
742                0 => {
743                    assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
744                }
745                1 => {
746                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
747                }
748                2 => {
749                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
750                }
751                3 => {
752                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
753                }
754                4 => {
755                    assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
756                }
757                5 => {
758                    assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
759                }
760                6 => {
761                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
762                }
763                7 => {
764                    let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
765                    let TopicLogSyncLiveMetrics {
766                        operations_received,
767                        operations_sent,
768                        bytes_received,
769                        bytes_sent,
770                    } = metrics;
771                    assert_eq!(operations_received, 1);
772                    assert_eq!(operations_sent, 1);
773                    assert_eq!(bytes_received, expected_live_mode_bytes_received);
774                    assert_eq!(bytes_sent, expected_live_mode_bytes_sent);
775                }
776                8 => {
777                    assert_matches!(event, TestTopicSyncEvent::Success)
778                }
779                _ => panic!(),
780            };
781        }
782
783        let messages = remote_rx.collect::<Vec<_>>().await;
784        assert_eq!(messages.len(), 4);
785        for (index, message) in messages.into_iter().enumerate() {
786            match index {
787                0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
788                1 => {
789                    assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
790                }
791                2 => {
792                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(
793                    header,
794                    Some(body)
795                ) => (header, body));
796                    assert_eq!(header, header_2);
797                    assert_eq!(body_inner, body);
798                }
799                3 => {
800                    assert_matches!(message, TestTopicSyncMessage::Close)
801                }
802                _ => panic!(),
803            };
804        }
805    }
806
807    #[tokio::test]
808    async fn dedup_live_mode_messages() {
809        let log_id = 0;
810        let topic = TestTopic::new("messages");
811        let mut peer_a = Peer::new(0);
812        let mut peer_b = Peer::new(1);
813
814        let body = Body::new("Hello, Sloth!".as_bytes());
815        let (header_0, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
816
817        let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
818        peer_a.insert_topic(&topic, &logs);
819
820        let logs = HashMap::default();
821        peer_a.insert_topic(&topic, &logs);
822
823        let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
824        let expected_live_mode_bytes_received =
825            header_1.payload_size + header_1.to_bytes().len() as u64;
826        let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
827        let expected_live_mode_bytes_sent =
828            header_2.payload_size + header_2.to_bytes().len() as u64;
829
830        let (protocol, mut events_rx, mut live_mode_tx) =
831            peer_a.topic_sync_protocol(topic.clone(), true);
832
833        live_mode_tx
834            .send(ToSync::Payload(Operation {
835                hash: header_2.hash(),
836                header: header_2.clone(),
837                body: Some(body.clone()),
838            }))
839            .await
840            .unwrap();
841
842        // Sending subscription message twice.
843        live_mode_tx
844            .send(ToSync::Payload(Operation {
845                hash: header_2.hash(),
846                header: header_2.clone(),
847                body: Some(body.clone()),
848            }))
849            .await
850            .unwrap();
851
852        live_mode_tx.send(ToSync::Close).await.unwrap();
853
854        let total_bytes = header_bytes_0.len() + body.to_bytes().len();
855        let remote_rx = run_protocol_uni(
856            protocol,
857            &[
858                TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
859                TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
860                    total_operations: 1,
861                    total_bytes: total_bytes as u64,
862                }),
863                TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
864                    header_bytes_0,
865                    Some(body.to_bytes()),
866                )),
867                TestTopicSyncMessage::Sync(LogSyncMessage::Done),
868                TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
869                // Duplicate of message sent during sync.
870                TestTopicSyncMessage::Live(header_0.clone(), Some(body.clone())),
871                // Duplicate of message sent earlier in live mode.
872                TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
873                TestTopicSyncMessage::Close,
874            ],
875        )
876        .await
877        .unwrap();
878
879        for index in 0..=8 {
880            let event = events_rx.recv().await.unwrap();
881            match index {
882                0 => {
883                    assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
884                }
885                1 => {
886                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
887                }
888                2 => {
889                    assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
890                }
891                3 => {
892                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
893                }
894                4 => {
895                    assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
896                }
897                5 => {
898                    assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
899                }
900                6 => {
901                    assert_matches!(event, TestTopicSyncEvent::Operation(_));
902                }
903                7 => {
904                    let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
905                    let TopicLogSyncLiveMetrics {
906                        operations_received,
907                        operations_sent,
908                        bytes_received,
909                        bytes_sent,
910                    } = metrics;
911                    assert_eq!(operations_received, 1);
912                    assert_eq!(operations_sent, 1);
913                    assert_eq!(bytes_received, expected_live_mode_bytes_received);
914                    assert_eq!(bytes_sent, expected_live_mode_bytes_sent);
915                }
916                8 => {
917                    assert_matches!(event, TestTopicSyncEvent::Success)
918                }
919                _ => panic!(),
920            };
921        }
922
923        let messages = remote_rx.collect::<Vec<_>>().await;
924        assert_eq!(messages.len(), 4);
925        for (index, message) in messages.into_iter().enumerate() {
926            match index {
927                0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
928                1 => {
929                    assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
930                }
931                2 => {
932                    let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(header, Some(body)) => (header, body));
933                    assert_eq!(header, header_2);
934                    assert_eq!(body_inner, body);
935                }
936                3 => {
937                    assert_matches!(message, TestTopicSyncMessage::Close)
938                }
939                _ => panic!(),
940            };
941        }
942    }
943
944    #[tokio::test]
945    async fn unexpected_stream_closure_sync() {
946        let topic = TestTopic::new("messages");
947        let mut peer = Peer::new(0);
948        peer.insert_topic(&topic, &Default::default());
949
950        let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
951
952        let messages = [TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))];
953
954        let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
955        let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
956        let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
957
958        for message in messages {
959            remote_message_tx.send(message.to_owned()).await.unwrap();
960        }
961
962        let handle = tokio::spawn(async move {
963            session
964                .run(&mut local_message_tx, &mut local_message_rx)
965                .await
966                .expect_err("expected unexpected stream closure")
967        });
968
969        drop(remote_message_tx);
970
971        let err = handle.await.unwrap();
972        assert_matches!(
973            err,
974            TopicLogSyncError::Sync(LogSyncError::UnexpectedStreamClosure)
975        );
976
977        while let Ok(event) = events_rx.recv().await {
978            if let TopicLogSyncEvent::Failed { error } = event {
979                assert_eq!(
980                    error,
981                    "remote unexpectedly closed stream during initial sync".to_string()
982                );
983                break;
984            }
985        }
986    }
987
988    #[tokio::test]
989    async fn unexpected_stream_closure_live_mode() {
990        let topic = TestTopic::new("messages");
991        let mut peer = Peer::new(0);
992        peer.insert_topic(&topic, &Default::default());
993
994        let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
995
996        let messages = [
997            TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
998            TestTopicSyncMessage::Sync(LogSyncMessage::Done),
999        ];
1000
1001        let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
1002        let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
1003        let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
1004
1005        for message in messages {
1006            remote_message_tx.send(message.to_owned()).await.unwrap();
1007        }
1008
1009        let handle = tokio::spawn(async move {
1010            session
1011                .run(&mut local_message_tx, &mut local_message_rx)
1012                .await
1013                .expect_err("expected unexpected stream closure")
1014        });
1015
1016        drop(remote_message_tx);
1017
1018        let err = handle.await.unwrap();
1019        assert_matches!(err, TopicLogSyncError::UnexpectedStreamClosure);
1020
1021        while let Ok(event) = events_rx.recv().await {
1022            if let TopicLogSyncEvent::Failed { error } = event {
1023                assert_eq!(
1024                    error,
1025                    "remote unexpectedly closed stream in live-mode".to_string()
1026                );
1027                break;
1028            }
1029        }
1030    }
1031}