1use std::fmt::Debug;
5use std::future::ready;
6use std::hash::Hash as StdHash;
7use std::marker::PhantomData;
8
9use futures::channel::mpsc;
10use futures::{Sink, SinkExt, Stream, StreamExt};
11use p2panda_core::{Body, Extensions, Header, Operation};
12use p2panda_store::{LogId, LogStore, OperationStore};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tokio::sync::broadcast;
16use tracing::{debug, warn};
17
18use crate::ToSync;
19use crate::dedup::DEFAULT_BUFFER_CAPACITY;
20use crate::protocols::Logs;
21use crate::protocols::log_sync::{
22 LogSync, LogSyncError, LogSyncEvent, LogSyncMessage, LogSyncMetrics, LogSyncStatus,
23};
24use crate::traits::{Protocol, TopicMap};
25
26#[derive(Debug)]
40pub struct TopicLogSync<T, S, M, L, E> {
41 pub store: S,
42 pub topic_map: M,
43 pub topic: T,
44 pub event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
45 pub live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
46 pub buffer_capacity: usize,
47 pub _phantom: PhantomData<L>,
48}
49
50impl<T, S, M, L, E> TopicLogSync<T, S, M, L, E>
51where
52 T: Eq + StdHash + Serialize + for<'a> Deserialize<'a>,
53 S: LogStore<L, E> + OperationStore<L, E>,
54 M: TopicMap<T, Logs<L>>,
55 L: LogId + for<'de> Deserialize<'de> + Serialize,
56 E: Extensions,
57{
58 pub fn new(
61 topic: T,
62 store: S,
63 topic_map: M,
64 live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
65 event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
66 ) -> Self {
67 Self::new_with_capacity(
68 topic,
69 store,
70 topic_map,
71 live_mode_rx,
72 event_tx,
73 DEFAULT_BUFFER_CAPACITY,
74 )
75 }
76
77 pub fn new_with_capacity(
79 topic: T,
80 store: S,
81 topic_map: M,
82 live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
83 event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
84 buffer_capacity: usize,
85 ) -> Self {
86 Self {
87 topic,
88 topic_map,
89 store,
90 event_tx,
91 live_mode_rx,
92 buffer_capacity,
93 _phantom: PhantomData,
94 }
95 }
96}
97
98impl<T, S, M, L, E> Protocol for TopicLogSync<T, S, M, L, E>
99where
100 T: Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
101 S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
102 M: TopicMap<T, Logs<L>> + Send + 'static,
103 L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
104 E: Extensions + Send + 'static,
105{
106 type Error = TopicLogSyncError;
107 type Message = TopicLogSyncMessage<L, E>;
108 type Output = ();
109
110 async fn run(
111 self,
112 mut sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
113 mut stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
114 ) -> Result<Self::Output, Self::Error> {
115 let logs = self
120 .topic_map
121 .get(&self.topic)
122 .await
123 .map_err(|err| TopicLogSyncError::TopicMap(err.to_string()))?;
124
125 let mut dedup = {
127 let (mut log_sync_sink, mut log_sync_stream) = sync_channels(&mut sink, &mut stream);
128 let protocol = LogSync::new_with_capacity(
129 self.store.clone(),
130 logs,
131 self.event_tx.clone(),
132 self.buffer_capacity,
133 );
134 let result = protocol.run(&mut log_sync_sink, &mut log_sync_stream).await;
135
136 match result {
139 Ok(dedup) => dedup,
140 Err(err) => {
141 self.event_tx
142 .send(TopicLogSyncEvent::Failed {
143 error: err.to_string(),
144 })
145 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
146
147 log_sync_sink
148 .close()
149 .await
150 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
151
152 return Err(err.into());
153 }
154 }
155 };
156
157 let Some(mut live_mode_rx) = self.live_mode_rx else {
158 sink.close()
159 .await
160 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
161
162 self.event_tx
163 .send(TopicLogSyncEvent::Success)
164 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
165
166 return Ok(());
167 };
168
169 let mut close_sent = false;
176 let mut metrics = TopicLogSyncLiveMetrics::default();
177 self.event_tx
178 .send(TopicLogSyncEvent::LiveModeStarted)
179 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
180
181 let result = loop {
182 tokio::select! {
183 biased;
184 Some(message) = live_mode_rx.next() => {
185 match message {
186 ToSync::Payload(operation) => {
187 if !dedup.insert(operation.hash) {
188 continue;
189 }
190
191 metrics.bytes_sent +=
192 operation.header.to_bytes().len() as u64 + operation.header.payload_size;
193 metrics.operations_sent += 1;
194
195 let result = sink
196 .send(TopicLogSyncMessage::Live(
197 operation.header.clone(),
198 operation.body.clone(),
199 ))
200 .await
201 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
202
203 if result.is_err() {
204 break result;
205 };
206 }
207 ToSync::Close => {
208 let result = sink
211 .send(TopicLogSyncMessage::Close)
212 .await
213 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
214 if result.is_err() {
215 break result;
216 };
217 close_sent = true;
218 }
219 };
220 }
221 message = stream.next() => {
222 let Some(message) = message else {
223 debug!("remote closed the stream unexpectedly");
224 if close_sent {
225 debug!("remote closed the stream after we sent a close message");
226 break Ok(());
227 }
228 break Err(TopicLogSyncError::UnexpectedStreamClosure);
229 };
230
231 match message {
232 Ok(message) => {
233 if let TopicLogSyncMessage::Close = message {
234 debug!("received close message from remote");
237 break Ok(());
238 };
239
240 let TopicLogSyncMessage::Live(header, body) = message else {
241 break Err(TopicLogSyncError::UnexpectedProtocolMessage(
242 message.to_string(),
243 ));
244 };
245
246 if !dedup.insert(header.hash()) {
252 continue;
253 }
254
255 metrics.bytes_received += header.to_bytes().len() as u64 + header.payload_size;
256 metrics.operations_received += 1;
257 self.event_tx
258 .send(TopicLogSyncEvent::Operation(Box::new(Operation {
259 hash: header.hash(),
260 header,
261 body,
262 })))
263 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
264 }
265 Err(err) => {
266 if close_sent {
267 debug!("remote closed the stream after we sent a close message");
268 break Ok(());
269 }
270 warn!("error in live-mode: {err:#?}");
271 break Err(TopicLogSyncError::DecodeMessage(format!("{err:?}")));
272 }
273 }
274 }
275 }
276 };
277
278 self.event_tx
279 .send(TopicLogSyncEvent::LiveModeFinished(metrics.clone()))
280 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
281
282 sink.close()
283 .await
284 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
285
286 let final_event = match result.as_ref() {
287 Ok(_) => TopicLogSyncEvent::Success,
288 Err(err) => TopicLogSyncEvent::Failed {
289 error: err.to_string(),
290 },
291 };
292
293 self.event_tx
294 .send(final_event)
295 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
296
297 result
298 }
299}
300
301#[allow(clippy::complexity)]
303fn sync_channels<'a, L, E>(
304 sink: &mut (impl Sink<TopicLogSyncMessage<L, E>, Error = impl Debug> + Unpin),
305 stream: &mut (impl Stream<Item = Result<TopicLogSyncMessage<L, E>, impl Debug>> + Unpin),
306) -> (
307 impl Sink<LogSyncMessage<L>, Error = TopicLogSyncChannelError> + Unpin,
308 impl Stream<Item = Result<LogSyncMessage<L>, TopicLogSyncChannelError>> + Unpin,
309) {
310 let log_sync_sink = sink
311 .sink_map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
312 .with(|message| {
313 ready(Ok::<_, TopicLogSyncChannelError>(
314 TopicLogSyncMessage::<L, E>::Sync(message),
315 ))
316 });
317
318 let log_sync_stream = stream.by_ref().map(|message| match message {
319 Ok(TopicLogSyncMessage::Sync(message)) => Ok(message),
320 Ok(TopicLogSyncMessage::Live { .. }) | Ok(TopicLogSyncMessage::Close) => Err(
321 TopicLogSyncChannelError::MessageStream("non-protocol message received".to_string()),
322 ),
323 Err(err) => Err(TopicLogSyncChannelError::MessageStream(format!("{err:?}"))),
324 });
325
326 (log_sync_sink, log_sync_stream)
327}
328
329#[derive(Debug, Error)]
331pub enum TopicLogSyncChannelError {
332 #[error("error sending on message sink: {0}")]
333 MessageSink(String),
334
335 #[error("error receiving from message stream: {0}")]
336 MessageStream(String),
337
338 #[error("no active receivers for broadcast")]
339 EventSend,
340}
341
342#[derive(Debug, Error)]
344pub enum TopicLogSyncError {
345 #[error(transparent)]
346 Sync(#[from] LogSyncError),
347
348 #[error("topic map error: {0}")]
349 TopicMap(String),
350
351 #[error("unexpected protocol message: {0}")]
352 UnexpectedProtocolMessage(String),
353
354 #[error(transparent)]
355 Channel(#[from] TopicLogSyncChannelError),
356
357 #[error("remote unexpectedly closed stream in live-mode")]
358 UnexpectedStreamClosure,
359
360 #[error("{0}")]
361 DecodeMessage(String),
362}
363
364#[derive(Clone, Debug, PartialEq, Default)]
366pub struct TopicLogSyncLiveMetrics {
367 pub operations_received: u64,
368 pub operations_sent: u64,
369 pub bytes_received: u64,
370 pub bytes_sent: u64,
371}
372
373#[derive(Debug, Clone, PartialEq)]
375pub enum TopicLogSyncEvent<E> {
376 SyncStarted(LogSyncMetrics),
377 SyncStatus(LogSyncMetrics),
378 SyncFinished(LogSyncMetrics),
379 LiveModeStarted,
380 LiveModeFinished(TopicLogSyncLiveMetrics),
381 Operation(Box<Operation<E>>),
382 Success,
383 Failed { error: String },
384}
385
386impl<E> From<LogSyncEvent<E>> for TopicLogSyncEvent<E> {
387 fn from(event: LogSyncEvent<E>) -> Self {
388 match event {
389 LogSyncEvent::Status(status_event) => match status_event {
390 LogSyncStatus::Started { metrics } => TopicLogSyncEvent::SyncStarted(metrics),
391 LogSyncStatus::Progress { metrics } => TopicLogSyncEvent::SyncStatus(metrics),
392 LogSyncStatus::Completed { metrics } => TopicLogSyncEvent::SyncFinished(metrics),
393 },
394 LogSyncEvent::Data(operation) => TopicLogSyncEvent::Operation(operation),
395 }
396 }
397}
398
399#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
401#[serde(tag = "type", content = "value")]
402#[allow(clippy::large_enum_variant)]
403pub enum TopicLogSyncMessage<L, E> {
404 Sync(LogSyncMessage<L>),
405 Live(Header<E>, Option<Body>),
406 Close,
407}
408
409impl<L, E> std::fmt::Display for TopicLogSyncMessage<L, E> {
410 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
411 let value = match self {
412 TopicLogSyncMessage::Sync(_) => "sync",
413 TopicLogSyncMessage::Live(_, _) => "live",
414 TopicLogSyncMessage::Close => "close",
415 };
416 write!(f, "{value}")
417 }
418}
419
420#[cfg(test)]
421pub mod tests {
422 use std::collections::HashMap;
423
424 use assert_matches::assert_matches;
425 use futures::channel::mpsc;
426 use futures::{SinkExt, StreamExt};
427 use p2panda_core::{Body, Operation};
428
429 use crate::ToSync;
430 use crate::protocols::{LogSyncError, LogSyncMessage};
431 use crate::test_utils::{
432 Peer, TestTopic, TestTopicSyncEvent, TestTopicSyncMessage, run_protocol, run_protocol_uni,
433 };
434 use crate::traits::Protocol;
435
436 use super::{TopicLogSyncError, TopicLogSyncEvent, TopicLogSyncLiveMetrics};
437
438 #[tokio::test]
439 async fn sync_session_no_operations() {
440 let topic = TestTopic::new("messages");
441 let mut peer = Peer::new(0);
442 peer.insert_topic(&topic, &HashMap::default());
443
444 let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
445
446 let remote_rx = run_protocol_uni(
447 session,
448 &[
449 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
450 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
451 ],
452 )
453 .await
454 .unwrap();
455
456 assert_matches!(
457 events_rx.recv().await.unwrap(),
458 TestTopicSyncEvent::SyncStarted(_)
459 );
460 assert_matches!(
461 events_rx.recv().await.unwrap(),
462 TestTopicSyncEvent::SyncStatus(_)
463 );
464 assert_matches!(
465 events_rx.recv().await.unwrap(),
466 TestTopicSyncEvent::SyncStatus(_)
467 );
468 assert_matches!(
469 events_rx.recv().await.unwrap(),
470 TestTopicSyncEvent::SyncFinished(_)
471 );
472 assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
473
474 let messages = remote_rx.collect::<Vec<_>>().await;
475 assert_eq!(messages.len(), 2);
476 for (index, message) in messages.into_iter().enumerate() {
477 match index {
478 0 => assert_eq!(
479 message,
480 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))
481 ),
482 1 => {
483 assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
484 break;
485 }
486 _ => panic!(),
487 };
488 }
489 }
490
491 #[tokio::test]
492 async fn sync_operations_accept() {
493 let log_id = 0;
494 let topic = TestTopic::new("messages");
495 let mut peer = Peer::new(0);
496
497 let body = Body::new("Hello, Sloth!".as_bytes());
498 let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
499 let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
500 let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
501
502 let logs = HashMap::from([(peer.id(), vec![log_id])]);
503 peer.insert_topic(&topic, &logs);
504
505 let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
506
507 let remote_rx = run_protocol_uni(
508 session,
509 &[
510 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
511 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
512 ],
513 )
514 .await
515 .unwrap();
516
517 assert_matches!(
518 events_rx.recv().await.unwrap(),
519 TestTopicSyncEvent::SyncStarted(_)
520 );
521 assert_matches!(
522 events_rx.recv().await.unwrap(),
523 TestTopicSyncEvent::SyncStatus(_)
524 );
525 assert_matches!(
526 events_rx.recv().await.unwrap(),
527 TestTopicSyncEvent::SyncStatus(_)
528 );
529 assert_matches!(
530 events_rx.recv().await.unwrap(),
531 TestTopicSyncEvent::SyncFinished(_)
532 );
533 assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
534
535 let messages = remote_rx.collect::<Vec<_>>().await;
536 assert_eq!(messages.len(), 6);
537 for (index, message) in messages.into_iter().enumerate() {
538 match index {
539 0 => assert_eq!(
540 message,
541 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![(
542 peer.id(),
543 vec![(0, 2)]
544 )]))
545 ),
546 1 => {
547 let expected_bytes = header_0.payload_size
548 + header_bytes_0.len() as u64
549 + header_1.payload_size
550 + header_bytes_1.len() as u64
551 + header_2.payload_size
552 + header_bytes_2.len() as u64;
553
554 assert_eq!(
555 message,
556 TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
557 total_operations: 3,
558 total_bytes: expected_bytes
559 })
560 )
561 }
562 2 => {
563 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
564 header,
565 Some(body),
566 )) => (header, body));
567 assert_eq!(header, header_bytes_0);
568 assert_eq!(Body::new(&body_inner), body)
569 }
570 3 => {
571 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
572 header,
573 Some(body),
574 )) => (header, body));
575 assert_eq!(header, header_bytes_1);
576 assert_eq!(Body::new(&body_inner), body)
577 }
578 4 => {
579 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
580 header,
581 Some(body),
582 )) => (header, body));
583 assert_eq!(header, header_bytes_2);
584 assert_eq!(Body::new(&body_inner), body)
585 }
586 5 => {
587 assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
588 break;
589 }
590 _ => panic!(),
591 };
592 }
593 }
594
595 #[tokio::test]
596 async fn topic_log_sync_full_duplex() {
597 let topic = TestTopic::new("messages");
598 let log_id = 0;
599
600 let mut peer_a = Peer::new(0);
601 let mut peer_b = Peer::new(1);
602
603 let body = Body::new("Hello, Sloth!".as_bytes());
604 let (header_0, _) = peer_a.create_operation(&body, 0).await;
605 let (header_1, _) = peer_a.create_operation(&body, 0).await;
606 let (header_2, _) = peer_a.create_operation(&body, 0).await;
607
608 let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
609 peer_a.insert_topic(&topic, &logs);
610
611 let (peer_a_session, mut peer_a_events_rx, _) =
612 peer_a.topic_sync_protocol(topic.clone(), false);
613
614 let (peer_b_session, mut peer_b_events_rx, _) =
615 peer_b.topic_sync_protocol(topic.clone(), false);
616
617 run_protocol(peer_a_session, peer_b_session).await.unwrap();
618
619 assert_matches!(
621 peer_a_events_rx.recv().await.unwrap(),
622 TestTopicSyncEvent::SyncStarted(_)
623 );
624 assert_matches!(
625 peer_a_events_rx.recv().await.unwrap(),
626 TestTopicSyncEvent::SyncStatus(_)
627 );
628 assert_matches!(
629 peer_a_events_rx.recv().await.unwrap(),
630 TestTopicSyncEvent::SyncStatus(_)
631 );
632 assert_matches!(
633 peer_a_events_rx.recv().await.unwrap(),
634 TestTopicSyncEvent::SyncFinished(_)
635 );
636 assert_matches!(
637 peer_a_events_rx.recv().await.unwrap(),
638 TestTopicSyncEvent::Success
639 );
640
641 assert_matches!(
643 peer_b_events_rx.recv().await.unwrap(),
644 TestTopicSyncEvent::SyncStarted(_)
645 );
646 assert_matches!(
647 peer_b_events_rx.recv().await.unwrap(),
648 TestTopicSyncEvent::SyncStatus(_)
649 );
650 assert_matches!(
651 peer_b_events_rx.recv().await.unwrap(),
652 TestTopicSyncEvent::SyncStatus(_)
653 );
654 let (header, body_inner) = assert_matches!(
655 peer_b_events_rx.recv().await.unwrap(),
656 TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
657 );
658 assert_eq!(header, header_0);
659 assert_eq!(body_inner.unwrap(), body);
660 let (header, body_inner) = assert_matches!(
661 peer_b_events_rx.recv().await.unwrap(),
662 TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
663 );
664 assert_eq!(header, header_1);
665 assert_eq!(body_inner.unwrap(), body);
666 let (header, body_inner) = assert_matches!(
667 peer_b_events_rx.recv().await.unwrap(),
668 TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
669 );
670 assert_eq!(header, header_2);
671 assert_eq!(body_inner.unwrap(), body);
672 assert_matches!(
673 peer_b_events_rx.recv().await.unwrap(),
674 TestTopicSyncEvent::SyncFinished(_)
675 );
676 assert_matches!(
677 peer_b_events_rx.recv().await.unwrap(),
678 TestTopicSyncEvent::Success
679 );
680 }
681
682 #[tokio::test]
683 async fn live_mode() {
684 let log_id = 0;
685 let topic = TestTopic::new("messages");
686 let mut peer_a = Peer::new(0);
687 let mut peer_b = Peer::new(1);
688
689 let body = Body::new("Hello, Sloth!".as_bytes());
690 let (_, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
691
692 let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
693 peer_a.insert_topic(&topic, &logs);
694
695 let logs = HashMap::default();
696 peer_a.insert_topic(&topic, &logs);
697
698 let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
699 let expected_live_mode_bytes_received =
700 header_1.payload_size + header_1.to_bytes().len() as u64;
701 let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
702 let expected_live_mode_bytes_sent =
703 header_2.payload_size + header_2.to_bytes().len() as u64;
704
705 let (protocol, mut events_rx, mut live_mode_tx) =
706 peer_a.topic_sync_protocol(topic.clone(), true);
707
708 live_mode_tx
709 .send(ToSync::Payload(Operation {
710 hash: header_2.hash(),
711 header: header_2.clone(),
712 body: Some(body.clone()),
713 }))
714 .await
715 .unwrap();
716 live_mode_tx.send(ToSync::Close).await.unwrap();
717
718 let total_bytes = header_bytes_0.len() + body.to_bytes().len();
719 let remote_rx = run_protocol_uni(
720 protocol,
721 &[
722 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
723 TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
724 total_operations: 1,
725 total_bytes: total_bytes as u64,
726 }),
727 TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
728 header_bytes_0,
729 Some(body.to_bytes()),
730 )),
731 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
732 TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
733 TestTopicSyncMessage::Close,
734 ],
735 )
736 .await
737 .unwrap();
738
739 for index in 0..=8 {
740 let event = events_rx.recv().await.unwrap();
741 match index {
742 0 => {
743 assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
744 }
745 1 => {
746 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
747 }
748 2 => {
749 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
750 }
751 3 => {
752 assert_matches!(event, TestTopicSyncEvent::Operation(_));
753 }
754 4 => {
755 assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
756 }
757 5 => {
758 assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
759 }
760 6 => {
761 assert_matches!(event, TestTopicSyncEvent::Operation(_));
762 }
763 7 => {
764 let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
765 let TopicLogSyncLiveMetrics {
766 operations_received,
767 operations_sent,
768 bytes_received,
769 bytes_sent,
770 } = metrics;
771 assert_eq!(operations_received, 1);
772 assert_eq!(operations_sent, 1);
773 assert_eq!(bytes_received, expected_live_mode_bytes_received);
774 assert_eq!(bytes_sent, expected_live_mode_bytes_sent);
775 }
776 8 => {
777 assert_matches!(event, TestTopicSyncEvent::Success)
778 }
779 _ => panic!(),
780 };
781 }
782
783 let messages = remote_rx.collect::<Vec<_>>().await;
784 assert_eq!(messages.len(), 4);
785 for (index, message) in messages.into_iter().enumerate() {
786 match index {
787 0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
788 1 => {
789 assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
790 }
791 2 => {
792 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(
793 header,
794 Some(body)
795 ) => (header, body));
796 assert_eq!(header, header_2);
797 assert_eq!(body_inner, body);
798 }
799 3 => {
800 assert_matches!(message, TestTopicSyncMessage::Close)
801 }
802 _ => panic!(),
803 };
804 }
805 }
806
807 #[tokio::test]
808 async fn dedup_live_mode_messages() {
809 let log_id = 0;
810 let topic = TestTopic::new("messages");
811 let mut peer_a = Peer::new(0);
812 let mut peer_b = Peer::new(1);
813
814 let body = Body::new("Hello, Sloth!".as_bytes());
815 let (header_0, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
816
817 let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
818 peer_a.insert_topic(&topic, &logs);
819
820 let logs = HashMap::default();
821 peer_a.insert_topic(&topic, &logs);
822
823 let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
824 let expected_live_mode_bytes_received =
825 header_1.payload_size + header_1.to_bytes().len() as u64;
826 let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
827 let expected_live_mode_bytes_sent =
828 header_2.payload_size + header_2.to_bytes().len() as u64;
829
830 let (protocol, mut events_rx, mut live_mode_tx) =
831 peer_a.topic_sync_protocol(topic.clone(), true);
832
833 live_mode_tx
834 .send(ToSync::Payload(Operation {
835 hash: header_2.hash(),
836 header: header_2.clone(),
837 body: Some(body.clone()),
838 }))
839 .await
840 .unwrap();
841
842 live_mode_tx
844 .send(ToSync::Payload(Operation {
845 hash: header_2.hash(),
846 header: header_2.clone(),
847 body: Some(body.clone()),
848 }))
849 .await
850 .unwrap();
851
852 live_mode_tx.send(ToSync::Close).await.unwrap();
853
854 let total_bytes = header_bytes_0.len() + body.to_bytes().len();
855 let remote_rx = run_protocol_uni(
856 protocol,
857 &[
858 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
859 TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
860 total_operations: 1,
861 total_bytes: total_bytes as u64,
862 }),
863 TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
864 header_bytes_0,
865 Some(body.to_bytes()),
866 )),
867 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
868 TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
869 TestTopicSyncMessage::Live(header_0.clone(), Some(body.clone())),
871 TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
873 TestTopicSyncMessage::Close,
874 ],
875 )
876 .await
877 .unwrap();
878
879 for index in 0..=8 {
880 let event = events_rx.recv().await.unwrap();
881 match index {
882 0 => {
883 assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
884 }
885 1 => {
886 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
887 }
888 2 => {
889 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
890 }
891 3 => {
892 assert_matches!(event, TestTopicSyncEvent::Operation(_));
893 }
894 4 => {
895 assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
896 }
897 5 => {
898 assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
899 }
900 6 => {
901 assert_matches!(event, TestTopicSyncEvent::Operation(_));
902 }
903 7 => {
904 let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
905 let TopicLogSyncLiveMetrics {
906 operations_received,
907 operations_sent,
908 bytes_received,
909 bytes_sent,
910 } = metrics;
911 assert_eq!(operations_received, 1);
912 assert_eq!(operations_sent, 1);
913 assert_eq!(bytes_received, expected_live_mode_bytes_received);
914 assert_eq!(bytes_sent, expected_live_mode_bytes_sent);
915 }
916 8 => {
917 assert_matches!(event, TestTopicSyncEvent::Success)
918 }
919 _ => panic!(),
920 };
921 }
922
923 let messages = remote_rx.collect::<Vec<_>>().await;
924 assert_eq!(messages.len(), 4);
925 for (index, message) in messages.into_iter().enumerate() {
926 match index {
927 0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
928 1 => {
929 assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
930 }
931 2 => {
932 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(header, Some(body)) => (header, body));
933 assert_eq!(header, header_2);
934 assert_eq!(body_inner, body);
935 }
936 3 => {
937 assert_matches!(message, TestTopicSyncMessage::Close)
938 }
939 _ => panic!(),
940 };
941 }
942 }
943
944 #[tokio::test]
945 async fn unexpected_stream_closure_sync() {
946 let topic = TestTopic::new("messages");
947 let mut peer = Peer::new(0);
948 peer.insert_topic(&topic, &Default::default());
949
950 let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
951
952 let messages = [TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))];
953
954 let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
955 let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
956 let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
957
958 for message in messages {
959 remote_message_tx.send(message.to_owned()).await.unwrap();
960 }
961
962 let handle = tokio::spawn(async move {
963 session
964 .run(&mut local_message_tx, &mut local_message_rx)
965 .await
966 .expect_err("expected unexpected stream closure")
967 });
968
969 drop(remote_message_tx);
970
971 let err = handle.await.unwrap();
972 assert_matches!(
973 err,
974 TopicLogSyncError::Sync(LogSyncError::UnexpectedStreamClosure)
975 );
976
977 while let Ok(event) = events_rx.recv().await {
978 if let TopicLogSyncEvent::Failed { error } = event {
979 assert_eq!(
980 error,
981 "remote unexpectedly closed stream during initial sync".to_string()
982 );
983 break;
984 }
985 }
986 }
987
988 #[tokio::test]
989 async fn unexpected_stream_closure_live_mode() {
990 let topic = TestTopic::new("messages");
991 let mut peer = Peer::new(0);
992 peer.insert_topic(&topic, &Default::default());
993
994 let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
995
996 let messages = [
997 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
998 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
999 ];
1000
1001 let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
1002 let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
1003 let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
1004
1005 for message in messages {
1006 remote_message_tx.send(message.to_owned()).await.unwrap();
1007 }
1008
1009 let handle = tokio::spawn(async move {
1010 session
1011 .run(&mut local_message_tx, &mut local_message_rx)
1012 .await
1013 .expect_err("expected unexpected stream closure")
1014 });
1015
1016 drop(remote_message_tx);
1017
1018 let err = handle.await.unwrap();
1019 assert_matches!(err, TopicLogSyncError::UnexpectedStreamClosure);
1020
1021 while let Ok(event) = events_rx.recv().await {
1022 if let TopicLogSyncEvent::Failed { error } = event {
1023 assert_eq!(
1024 error,
1025 "remote unexpectedly closed stream in live-mode".to_string()
1026 );
1027 break;
1028 }
1029 }
1030 }
1031}