1use std::collections::HashMap;
5use std::fmt::Debug;
6use std::hash::Hash as StdHash;
7use std::marker::PhantomData;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use futures::channel::mpsc;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use p2panda_core::{Body, Extensions, Header, Operation};
14use p2panda_store::{LogId, LogStore, OperationStore};
15use pin_project_lite::pin_project;
16use serde::{Deserialize, Serialize};
17use thiserror::Error;
18use tokio::sync::broadcast;
19use tracing::{Level, debug, enabled, trace, warn};
20
21use crate::ToSync;
22use crate::dedup::DEFAULT_BUFFER_CAPACITY;
23use crate::protocols::log_sync::{
24 LogSync, LogSyncError, LogSyncEvent, LogSyncMessage, LogSyncMetrics, LogSyncStatus,
25};
26use crate::protocols::{Logs, ShortFormat};
27use crate::traits::{Protocol, TopicMap};
28
29#[derive(Debug)]
43pub struct TopicLogSync<T, S, M, L, E> {
44 pub store: S,
45 pub topic_map: M,
46 pub topic: T,
47 pub event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
48 pub live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
49 pub buffer_capacity: usize,
50 pub _phantom: PhantomData<L>,
51}
52
53impl<T, S, M, L, E> TopicLogSync<T, S, M, L, E>
54where
55 T: Eq + StdHash + Serialize + for<'a> Deserialize<'a>,
56 S: LogStore<L, E> + OperationStore<L, E>,
57 M: TopicMap<T, Logs<L>>,
58 L: LogId + for<'de> Deserialize<'de> + Serialize,
59 E: Extensions,
60{
61 pub fn new(
64 topic: T,
65 store: S,
66 topic_map: M,
67 live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
68 event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
69 ) -> Self {
70 Self::new_with_capacity(
71 topic,
72 store,
73 topic_map,
74 live_mode_rx,
75 event_tx,
76 DEFAULT_BUFFER_CAPACITY,
77 )
78 }
79
80 pub fn new_with_capacity(
82 topic: T,
83 store: S,
84 topic_map: M,
85 live_mode_rx: Option<mpsc::Receiver<ToSync<Operation<E>>>>,
86 event_tx: broadcast::Sender<TopicLogSyncEvent<E>>,
87 buffer_capacity: usize,
88 ) -> Self {
89 Self {
90 topic,
91 topic_map,
92 store,
93 event_tx,
94 live_mode_rx,
95 buffer_capacity,
96 _phantom: PhantomData,
97 }
98 }
99}
100
101impl<T, S, M, L, E> Protocol for TopicLogSync<T, S, M, L, E>
102where
103 T: Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
104 S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
105 M: TopicMap<T, Logs<L>> + Send + 'static,
106 L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
107 E: Extensions + Send + 'static,
108{
109 type Error = TopicLogSyncError;
110 type Message = TopicLogSyncMessage<L, E>;
111 type Output = ();
112
113 async fn run(
114 self,
115 mut sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
116 mut stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
117 ) -> Result<Self::Output, Self::Error> {
118 debug!(
121 live_mode = self.live_mode_rx.is_some(),
122 "start sync session"
123 );
124
125 let logs = self
127 .topic_map
128 .get(&self.topic)
129 .await
130 .map_err(|err| TopicLogSyncError::TopicMap(err.to_string()))?;
131
132 if enabled!(Level::DEBUG) {
133 let display_logs: HashMap<String, usize> =
134 logs.iter().map(|(k, v)| (k.fmt_short(), v.len())).collect();
135 debug!(logs = ?display_logs, "local topic logs retrieved");
136 }
137
138 if enabled!(Level::TRACE) {
139 let trace_logs: Vec<(String, &Vec<L>)> =
140 logs.iter().map(|(k, v)| (k.fmt_short(), v)).collect();
141 trace!(logs = ?trace_logs, "local topic logs retrieved");
142 }
143
144 let (mut dedup, sync_metrics) = {
146 let (mut log_sync_sink, mut log_sync_stream) = sync_channels(&mut sink, &mut stream);
147 let protocol = LogSync::new_with_capacity(
148 self.store.clone(),
149 logs,
150 self.event_tx.clone(),
151 self.buffer_capacity,
152 );
153 let result = protocol.run(&mut log_sync_sink, &mut log_sync_stream).await;
154
155 match result {
158 Ok(output) => output,
159 Err(err) => {
160 self.event_tx
161 .send(TopicLogSyncEvent::Failed {
162 error: err.to_string(),
163 })
164 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
165
166 log_sync_sink
167 .close()
168 .await
169 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
170
171 return Err(err.into());
172 }
173 }
174 };
175
176 let mut metrics: TopicLogSyncLiveMetrics = sync_metrics.into();
177 let result = match self.live_mode_rx {
178 None => Ok(()),
179 Some(mut live_mode_rx) => {
180 let mut close_sent = false;
187 self.event_tx
188 .send(TopicLogSyncEvent::LiveModeStarted)
189 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
190
191 let result = loop {
192 tokio::select! {
193 biased;
194 Some(message) = live_mode_rx.next() => {
195 match message {
196 ToSync::Payload(operation) => {
197 if !dedup.insert(operation.hash) {
198 trace!(id = ?operation.hash.fmt_short(), "ignore duplicate operation sent on live-mode channel");
199 continue;
200 }
201
202 metrics.bytes_sent +=
203 operation.header.to_bytes().len() as u64 + operation.header.payload_size;
204 metrics.operations_sent += 1;
205
206 trace!(
207 phase = "live",
208 id = ?operation.hash.fmt_short(),
209 sent_ops = ?metrics.operations_sent,
210 sent_bytes = ?metrics.bytes_sent,
211 "sent operation"
212 );
213
214 let result = sink
215 .send(TopicLogSyncMessage::Live(
216 operation.header,
217 operation.body,
218 ))
219 .await
220 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
221
222 if result.is_err() {
223 break result;
224 };
225 }
226 ToSync::Close => {
227 debug!("closing sync session");
231
232 let result = sink
233 .send(TopicLogSyncMessage::Close)
234 .await
235 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")).into());
236 if result.is_err() {
237 break result;
238 };
239 close_sent = true;
240 }
241 };
242 }
243 message = stream.next() => {
244 let Some(message) = message else {
245 if close_sent {
246 break Ok(());
247 }
248 break Err(TopicLogSyncError::UnexpectedStreamClosure);
249 };
250
251 match message {
252 Ok(message) => {
253 if let TopicLogSyncMessage::Close = message {
254 debug!("received close message from remote");
257 break Ok(());
258 };
259
260 let TopicLogSyncMessage::Live(header, body) = message else {
261 break Err(TopicLogSyncError::UnexpectedProtocolMessage(
262 message.to_string(),
263 ));
264 };
265
266 if !dedup.insert(header.hash()) {
272 trace!(phase = "live", operation_id = ?header.hash().fmt_short(), "ignore duplicate operation sent from remote");
273 continue;
274 }
275
276 metrics.bytes_received += header.to_bytes().len() as u64 + header.payload_size;
277 metrics.operations_received += 1;
278
279 trace!(
280 phase = "live",
281 operation_id = ?header.hash().fmt_short(),
282 received_ops = %metrics.operations_received,
283 received_bytes = %metrics.bytes_received,
284 "received operation"
285 );
286
287 self.event_tx
288 .send(TopicLogSyncEvent::Operation(Box::new(Operation {
289 hash: header.hash(),
290 header,
291 body,
292 })))
293 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
294 }
295 Err(err) => {
296 if close_sent {
297 break Ok(());
298 }
299 break Err(TopicLogSyncError::DecodeMessage(format!("{err:?}")));
300 }
301 }
302 }
303 }
304 };
305
306 self.event_tx
307 .send(TopicLogSyncEvent::LiveModeFinished(metrics.clone()))
308 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
309
310 result
311 }
312 };
313
314 sink.close()
315 .await
316 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))?;
317
318 let final_event = match result.as_ref() {
319 Ok(_) => {
320 debug!(
321 sent_ops = ?metrics.operations_sent,
322 sent_bytes = ?metrics.bytes_sent,
323 received_ops = %metrics.operations_received,
324 received_bytes = %metrics.bytes_received,
325 "sync session closed"
326 );
327 TopicLogSyncEvent::Success
328 }
329 Err(err) => {
330 warn!(error = ?err, "sync session closed with error");
331 TopicLogSyncEvent::Failed {
332 error: err.to_string(),
333 }
334 }
335 };
336
337 self.event_tx
338 .send(final_event)
339 .map_err(|_| TopicLogSyncChannelError::EventSend)?;
340
341 result
342 }
343}
344
345#[allow(clippy::complexity)]
347fn sync_channels<'a, L, E>(
348 sink: &mut (impl Sink<TopicLogSyncMessage<L, E>, Error = impl Debug> + Unpin),
349 stream: &mut (impl Stream<Item = Result<TopicLogSyncMessage<L, E>, impl Debug>> + Unpin),
350) -> (
351 impl Sink<LogSyncMessage<L>, Error = TopicLogSyncChannelError> + Unpin,
352 impl Stream<Item = Result<LogSyncMessage<L>, TopicLogSyncChannelError>> + Unpin,
353) {
354 let log_sync_sink = LogSyncSink::new(sink);
355
356 let log_sync_stream = stream.by_ref().map(|message| match message {
357 Ok(TopicLogSyncMessage::Sync(message)) => Ok(message),
358 Ok(TopicLogSyncMessage::Live { .. }) | Ok(TopicLogSyncMessage::Close) => Err(
359 TopicLogSyncChannelError::MessageStream("non-protocol message received".to_string()),
360 ),
361 Err(err) => Err(TopicLogSyncChannelError::MessageStream(format!("{err:?}"))),
362 });
363
364 (log_sync_sink, log_sync_stream)
365}
366
367#[derive(Debug, Error)]
369pub enum TopicLogSyncChannelError {
370 #[error("error sending on message sink: {0}")]
371 MessageSink(String),
372
373 #[error("error receiving from message stream: {0}")]
374 MessageStream(String),
375
376 #[error("no active receivers for broadcast")]
377 EventSend,
378}
379
380#[derive(Debug, Error)]
382pub enum TopicLogSyncError {
383 #[error(transparent)]
384 Sync(#[from] LogSyncError),
385
386 #[error("topic map error: {0}")]
387 TopicMap(String),
388
389 #[error("unexpected protocol message: {0}")]
390 UnexpectedProtocolMessage(String),
391
392 #[error(transparent)]
393 Channel(#[from] TopicLogSyncChannelError),
394
395 #[error("remote unexpectedly closed stream in live-mode")]
396 UnexpectedStreamClosure,
397
398 #[error("{0}")]
399 DecodeMessage(String),
400}
401
402#[derive(Clone, Debug, PartialEq, Default)]
404pub struct TopicLogSyncLiveMetrics {
405 pub operations_received: u64,
406 pub operations_sent: u64,
407 pub bytes_received: u64,
408 pub bytes_sent: u64,
409}
410
411impl From<LogSyncMetrics> for TopicLogSyncLiveMetrics {
412 fn from(value: LogSyncMetrics) -> TopicLogSyncLiveMetrics {
413 TopicLogSyncLiveMetrics {
414 operations_received: value.total_operations_received,
415 operations_sent: value.total_operations_sent,
416 bytes_received: value.total_bytes_received,
417 bytes_sent: value.total_bytes_sent,
418 }
419 }
420}
421
422#[derive(Debug, Clone, PartialEq)]
424pub enum TopicLogSyncEvent<E> {
425 SyncStarted(LogSyncMetrics),
426 SyncStatus(LogSyncMetrics),
427 SyncFinished(LogSyncMetrics),
428 LiveModeStarted,
429 LiveModeFinished(TopicLogSyncLiveMetrics),
430 Operation(Box<Operation<E>>),
431 Success,
432 Failed { error: String },
433}
434
435impl<E> From<LogSyncEvent<E>> for TopicLogSyncEvent<E> {
436 fn from(event: LogSyncEvent<E>) -> Self {
437 match event {
438 LogSyncEvent::Status(status_event) => match status_event {
439 LogSyncStatus::Started { metrics } => TopicLogSyncEvent::SyncStarted(metrics),
440 LogSyncStatus::Progress { metrics } => TopicLogSyncEvent::SyncStatus(metrics),
441 LogSyncStatus::Completed { metrics } => TopicLogSyncEvent::SyncFinished(metrics),
442 },
443 LogSyncEvent::Data(operation) => TopicLogSyncEvent::Operation(operation),
444 }
445 }
446}
447
448#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
450#[serde(tag = "type", content = "value")]
451#[allow(clippy::large_enum_variant)]
452pub enum TopicLogSyncMessage<L, E> {
453 Sync(LogSyncMessage<L>),
454 Live(Header<E>, Option<Body>),
455 Close,
456}
457
458impl<L, E> std::fmt::Display for TopicLogSyncMessage<L, E> {
459 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
460 let value = match self {
461 TopicLogSyncMessage::Sync(_) => "sync",
462 TopicLogSyncMessage::Live(_, _) => "live",
463 TopicLogSyncMessage::Close => "close",
464 };
465 write!(f, "{value}")
466 }
467}
468
469pin_project! {
470 pub struct LogSyncSink<S, L, E> {
472 #[pin]
473 inner: S,
474 _phantom: std::marker::PhantomData<(L, E)>,
475 }
476}
477
478impl<S, L, E> LogSyncSink<S, L, E> {
479 pub fn new(inner: S) -> Self {
480 Self {
481 inner,
482 _phantom: std::marker::PhantomData,
483 }
484 }
485}
486
487impl<S, L, E> Sink<LogSyncMessage<L>> for LogSyncSink<S, L, E>
488where
489 S: Sink<TopicLogSyncMessage<L, E>>,
490 S::Error: Debug,
491{
492 type Error = TopicLogSyncChannelError;
493
494 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
495 let this = self.project();
496 this.inner
497 .poll_ready(cx)
498 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
499 }
500
501 fn start_send(self: Pin<&mut Self>, item: LogSyncMessage<L>) -> Result<(), Self::Error> {
502 let this = self.project();
503 let msg = TopicLogSyncMessage::Sync(item);
504 this.inner
505 .start_send(msg)
506 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
507 }
508
509 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
510 let this = self.project();
511 this.inner
512 .poll_flush(cx)
513 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
514 }
515
516 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
517 let this = self.project();
518 this.inner
519 .poll_close(cx)
520 .map_err(|err| TopicLogSyncChannelError::MessageSink(format!("{err:?}")))
521 }
522}
523
524#[cfg(test)]
525pub mod tests {
526 use std::collections::HashMap;
527
528 use assert_matches::assert_matches;
529 use futures::channel::mpsc;
530 use futures::{SinkExt, StreamExt};
531 use p2panda_core::{Body, Operation};
532
533 use crate::ToSync;
534 use crate::protocols::{LogSyncError, LogSyncMessage};
535 use crate::test_utils::{
536 Peer, TestTopic, TestTopicSyncEvent, TestTopicSyncMessage, run_protocol, run_protocol_uni,
537 setup_logging,
538 };
539 use crate::traits::Protocol;
540
541 use super::{TopicLogSyncError, TopicLogSyncEvent, TopicLogSyncLiveMetrics};
542
543 #[tokio::test]
544 async fn sync_session_no_operations() {
545 let topic = TestTopic::new("messages");
546 let mut peer = Peer::new(0);
547 peer.insert_topic(&topic, &HashMap::default());
548
549 let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
550
551 let remote_rx = run_protocol_uni(
552 session,
553 &[
554 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
555 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
556 ],
557 )
558 .await
559 .unwrap();
560
561 assert_matches!(
562 events_rx.recv().await.unwrap(),
563 TestTopicSyncEvent::SyncStarted(_)
564 );
565 assert_matches!(
566 events_rx.recv().await.unwrap(),
567 TestTopicSyncEvent::SyncStatus(_)
568 );
569 assert_matches!(
570 events_rx.recv().await.unwrap(),
571 TestTopicSyncEvent::SyncStatus(_)
572 );
573 assert_matches!(
574 events_rx.recv().await.unwrap(),
575 TestTopicSyncEvent::SyncFinished(_)
576 );
577 assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
578
579 let messages = remote_rx.collect::<Vec<_>>().await;
580 assert_eq!(messages.len(), 2);
581 for (index, message) in messages.into_iter().enumerate() {
582 match index {
583 0 => assert_eq!(
584 message,
585 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))
586 ),
587 1 => {
588 assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
589 break;
590 }
591 _ => panic!(),
592 };
593 }
594 }
595
596 #[tokio::test]
597 async fn sync_operations_accept() {
598 let log_id = 0;
599 let topic = TestTopic::new("messages");
600 let mut peer = Peer::new(0);
601
602 let body = Body::new("Hello, Sloth!".as_bytes());
603 let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
604 let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
605 let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
606
607 let logs = HashMap::from([(peer.id(), vec![log_id])]);
608 peer.insert_topic(&topic, &logs);
609
610 let (session, mut events_rx, _) = peer.topic_sync_protocol(topic.clone(), false);
611
612 let remote_rx = run_protocol_uni(
613 session,
614 &[
615 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
616 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
617 ],
618 )
619 .await
620 .unwrap();
621
622 assert_matches!(
623 events_rx.recv().await.unwrap(),
624 TestTopicSyncEvent::SyncStarted(_)
625 );
626 assert_matches!(
627 events_rx.recv().await.unwrap(),
628 TestTopicSyncEvent::SyncStatus(_)
629 );
630 assert_matches!(
631 events_rx.recv().await.unwrap(),
632 TestTopicSyncEvent::SyncStatus(_)
633 );
634 assert_matches!(
635 events_rx.recv().await.unwrap(),
636 TestTopicSyncEvent::SyncFinished(_)
637 );
638 assert_matches!(events_rx.recv().await.unwrap(), TestTopicSyncEvent::Success);
639
640 let messages = remote_rx.collect::<Vec<_>>().await;
641 assert_eq!(messages.len(), 6);
642 for (index, message) in messages.into_iter().enumerate() {
643 match index {
644 0 => assert_eq!(
645 message,
646 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![(
647 peer.id(),
648 vec![(0, 2)]
649 )]))
650 ),
651 1 => {
652 let expected_bytes = header_0.payload_size
653 + header_bytes_0.len() as u64
654 + header_1.payload_size
655 + header_bytes_1.len() as u64
656 + header_2.payload_size
657 + header_bytes_2.len() as u64;
658
659 assert_eq!(
660 message,
661 TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
662 total_operations: 3,
663 total_bytes: expected_bytes
664 })
665 )
666 }
667 2 => {
668 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
669 header,
670 Some(body),
671 )) => (header, body));
672 assert_eq!(header, header_bytes_0);
673 assert_eq!(Body::new(&body_inner), body)
674 }
675 3 => {
676 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
677 header,
678 Some(body),
679 )) => (header, body));
680 assert_eq!(header, header_bytes_1);
681 assert_eq!(Body::new(&body_inner), body)
682 }
683 4 => {
684 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
685 header,
686 Some(body),
687 )) => (header, body));
688 assert_eq!(header, header_bytes_2);
689 assert_eq!(Body::new(&body_inner), body)
690 }
691 5 => {
692 assert_eq!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done));
693 break;
694 }
695 _ => panic!(),
696 };
697 }
698 }
699
700 #[tokio::test]
701 async fn topic_log_sync_full_duplex() {
702 setup_logging();
703 let topic = TestTopic::new("messages");
704 let log_id = 0;
705
706 let mut peer_a = Peer::new(0);
707 let mut peer_b = Peer::new(1);
708
709 let body = Body::new("Hello, Sloth!".as_bytes());
710 let (header_0, _) = peer_a.create_operation(&body, 0).await;
711 let (header_1, _) = peer_a.create_operation(&body, 0).await;
712 let (header_2, _) = peer_a.create_operation(&body, 0).await;
713
714 let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
715 peer_a.insert_topic(&topic, &logs);
716
717 let (peer_a_session, mut peer_a_events_rx, _) =
718 peer_a.topic_sync_protocol(topic.clone(), false);
719
720 let (peer_b_session, mut peer_b_events_rx, _) =
721 peer_b.topic_sync_protocol(topic.clone(), false);
722
723 run_protocol(peer_a_session, peer_b_session).await.unwrap();
724
725 assert_matches!(
727 peer_a_events_rx.recv().await.unwrap(),
728 TestTopicSyncEvent::SyncStarted(_)
729 );
730 assert_matches!(
731 peer_a_events_rx.recv().await.unwrap(),
732 TestTopicSyncEvent::SyncStatus(_)
733 );
734 assert_matches!(
735 peer_a_events_rx.recv().await.unwrap(),
736 TestTopicSyncEvent::SyncStatus(_)
737 );
738 assert_matches!(
739 peer_a_events_rx.recv().await.unwrap(),
740 TestTopicSyncEvent::SyncFinished(_)
741 );
742 assert_matches!(
743 peer_a_events_rx.recv().await.unwrap(),
744 TestTopicSyncEvent::Success
745 );
746
747 assert_matches!(
749 peer_b_events_rx.recv().await.unwrap(),
750 TestTopicSyncEvent::SyncStarted(_)
751 );
752 assert_matches!(
753 peer_b_events_rx.recv().await.unwrap(),
754 TestTopicSyncEvent::SyncStatus(_)
755 );
756 assert_matches!(
757 peer_b_events_rx.recv().await.unwrap(),
758 TestTopicSyncEvent::SyncStatus(_)
759 );
760 let (header, body_inner) = assert_matches!(
761 peer_b_events_rx.recv().await.unwrap(),
762 TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
763 );
764 assert_eq!(header, header_0);
765 assert_eq!(body_inner.unwrap(), body);
766 let (header, body_inner) = assert_matches!(
767 peer_b_events_rx.recv().await.unwrap(),
768 TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
769 );
770 assert_eq!(header, header_1);
771 assert_eq!(body_inner.unwrap(), body);
772 let (header, body_inner) = assert_matches!(
773 peer_b_events_rx.recv().await.unwrap(),
774 TestTopicSyncEvent::Operation (operation) => {let Operation {header, body, ..} = *operation; (header, body)}
775 );
776 assert_eq!(header, header_2);
777 assert_eq!(body_inner.unwrap(), body);
778 assert_matches!(
779 peer_b_events_rx.recv().await.unwrap(),
780 TestTopicSyncEvent::SyncFinished(_)
781 );
782 assert_matches!(
783 peer_b_events_rx.recv().await.unwrap(),
784 TestTopicSyncEvent::Success
785 );
786 }
787
788 #[tokio::test]
789 async fn live_mode() {
790 let log_id = 0;
791 let topic = TestTopic::new("messages");
792 let mut peer_a = Peer::new(0);
793 let mut peer_b = Peer::new(1);
794
795 let body = Body::new("Hello, Sloth!".as_bytes());
796 let (header_0, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
797
798 let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
799 peer_a.insert_topic(&topic, &logs);
800
801 let logs = HashMap::default();
802 peer_a.insert_topic(&topic, &logs);
803
804 let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
805 let expected_bytes_received = header_0.payload_size
806 + header_0.to_bytes().len() as u64
807 + header_1.payload_size
808 + header_1.to_bytes().len() as u64;
809 let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
810 let expected_bytes_sent = header_2.payload_size + header_2.to_bytes().len() as u64;
811
812 let (protocol, mut events_rx, mut live_mode_tx) =
813 peer_a.topic_sync_protocol(topic.clone(), true);
814
815 live_mode_tx
816 .send(ToSync::Payload(Operation {
817 hash: header_2.hash(),
818 header: header_2.clone(),
819 body: Some(body.clone()),
820 }))
821 .await
822 .unwrap();
823 live_mode_tx.send(ToSync::Close).await.unwrap();
824
825 let total_bytes = header_bytes_0.len() + body.to_bytes().len();
826 let remote_rx = run_protocol_uni(
827 protocol,
828 &[
829 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
830 TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
831 total_operations: 1,
832 total_bytes: total_bytes as u64,
833 }),
834 TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
835 header_bytes_0,
836 Some(body.to_bytes()),
837 )),
838 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
839 TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
840 TestTopicSyncMessage::Close,
841 ],
842 )
843 .await
844 .unwrap();
845
846 for index in 0..=8 {
847 let event = events_rx.recv().await.unwrap();
848 match index {
849 0 => {
850 assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
851 }
852 1 => {
853 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
854 }
855 2 => {
856 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
857 }
858 3 => {
859 assert_matches!(event, TestTopicSyncEvent::Operation(_));
860 }
861 4 => {
862 assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
863 }
864 5 => {
865 assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
866 }
867 6 => {
868 assert_matches!(event, TestTopicSyncEvent::Operation(_));
869 }
870 7 => {
871 let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
872 let TopicLogSyncLiveMetrics {
873 operations_received,
874 operations_sent,
875 bytes_received,
876 bytes_sent,
877 } = metrics;
878 assert_eq!(operations_received, 2);
879 assert_eq!(operations_sent, 1);
880 assert_eq!(bytes_received, expected_bytes_received);
881 assert_eq!(bytes_sent, expected_bytes_sent);
882 }
883 8 => {
884 assert_matches!(event, TestTopicSyncEvent::Success)
885 }
886 _ => panic!(),
887 };
888 }
889
890 let messages = remote_rx.collect::<Vec<_>>().await;
891 assert_eq!(messages.len(), 4);
892 for (index, message) in messages.into_iter().enumerate() {
893 match index {
894 0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
895 1 => {
896 assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
897 }
898 2 => {
899 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(
900 header,
901 Some(body)
902 ) => (header, body));
903 assert_eq!(header, header_2);
904 assert_eq!(body_inner, body);
905 }
906 3 => {
907 assert_matches!(message, TestTopicSyncMessage::Close)
908 }
909 _ => panic!(),
910 };
911 }
912 }
913
914 #[tokio::test]
915 async fn dedup_live_mode_messages() {
916 let log_id = 0;
917 let topic = TestTopic::new("messages");
918 let mut peer_a = Peer::new(0);
919 let mut peer_b = Peer::new(1);
920
921 let body = Body::new("Hello, Sloth!".as_bytes());
922 let (header_0, header_bytes_0) = peer_b.create_operation(&body, log_id).await;
923
924 let logs = HashMap::from([(peer_a.id(), vec![log_id])]);
925 peer_a.insert_topic(&topic, &logs);
926
927 let logs = HashMap::default();
928 peer_a.insert_topic(&topic, &logs);
929
930 let (header_1, _) = peer_b.create_operation_no_insert(&body, log_id).await;
931 let expected_bytes_received = header_0.payload_size
932 + header_0.to_bytes().len() as u64
933 + header_1.payload_size
934 + header_1.to_bytes().len() as u64;
935 let (header_2, _) = peer_a.create_operation_no_insert(&body, log_id).await;
936 let expected_bytes_sent = header_2.payload_size + header_2.to_bytes().len() as u64;
937
938 let (protocol, mut events_rx, mut live_mode_tx) =
939 peer_a.topic_sync_protocol(topic.clone(), true);
940
941 live_mode_tx
942 .send(ToSync::Payload(Operation {
943 hash: header_2.hash(),
944 header: header_2.clone(),
945 body: Some(body.clone()),
946 }))
947 .await
948 .unwrap();
949
950 live_mode_tx
952 .send(ToSync::Payload(Operation {
953 hash: header_2.hash(),
954 header: header_2.clone(),
955 body: Some(body.clone()),
956 }))
957 .await
958 .unwrap();
959
960 live_mode_tx.send(ToSync::Close).await.unwrap();
961
962 let total_bytes = header_bytes_0.len() + body.to_bytes().len();
963 let remote_rx = run_protocol_uni(
964 protocol,
965 &[
966 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
967 TestTopicSyncMessage::Sync(LogSyncMessage::PreSync {
968 total_operations: 1,
969 total_bytes: total_bytes as u64,
970 }),
971 TestTopicSyncMessage::Sync(LogSyncMessage::Operation(
972 header_bytes_0,
973 Some(body.to_bytes()),
974 )),
975 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
976 TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
977 TestTopicSyncMessage::Live(header_0.clone(), Some(body.clone())),
979 TestTopicSyncMessage::Live(header_1.clone(), Some(body.clone())),
981 TestTopicSyncMessage::Close,
982 ],
983 )
984 .await
985 .unwrap();
986
987 for index in 0..=8 {
988 let event = events_rx.recv().await.unwrap();
989 match index {
990 0 => {
991 assert_matches!(event, TestTopicSyncEvent::SyncStarted(_));
992 }
993 1 => {
994 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
995 }
996 2 => {
997 assert_matches!(event, TestTopicSyncEvent::SyncStatus(_));
998 }
999 3 => {
1000 assert_matches!(event, TestTopicSyncEvent::Operation(_));
1001 }
1002 4 => {
1003 assert_matches!(event, TestTopicSyncEvent::SyncFinished(_));
1004 }
1005 5 => {
1006 assert_matches!(event, TestTopicSyncEvent::LiveModeStarted);
1007 }
1008 6 => {
1009 assert_matches!(event, TestTopicSyncEvent::Operation(_));
1010 }
1011 7 => {
1012 let metrics = assert_matches!(event, TestTopicSyncEvent::LiveModeFinished(metrics) => metrics);
1013 let TopicLogSyncLiveMetrics {
1014 operations_received,
1015 operations_sent,
1016 bytes_received,
1017 bytes_sent,
1018 } = metrics;
1019 assert_eq!(operations_received, 2);
1020 assert_eq!(operations_sent, 1);
1021 assert_eq!(bytes_received, expected_bytes_received);
1022 assert_eq!(bytes_sent, expected_bytes_sent);
1023 }
1024 8 => {
1025 assert_matches!(event, TestTopicSyncEvent::Success)
1026 }
1027 _ => panic!(),
1028 };
1029 }
1030
1031 let messages = remote_rx.collect::<Vec<_>>().await;
1032 assert_eq!(messages.len(), 4);
1033 for (index, message) in messages.into_iter().enumerate() {
1034 match index {
1035 0 => assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Have(_))),
1036 1 => {
1037 assert_matches!(message, TestTopicSyncMessage::Sync(LogSyncMessage::Done))
1038 }
1039 2 => {
1040 let (header, body_inner) = assert_matches!(message, TestTopicSyncMessage::Live(header, Some(body)) => (header, body));
1041 assert_eq!(header, header_2);
1042 assert_eq!(body_inner, body);
1043 }
1044 3 => {
1045 assert_matches!(message, TestTopicSyncMessage::Close)
1046 }
1047 _ => panic!(),
1048 };
1049 }
1050 }
1051
1052 #[tokio::test]
1053 async fn unexpected_stream_closure_sync() {
1054 let topic = TestTopic::new("messages");
1055 let mut peer = Peer::new(0);
1056 peer.insert_topic(&topic, &Default::default());
1057
1058 let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
1059
1060 let messages = [TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![]))];
1061
1062 let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
1063 let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
1064 let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
1065
1066 for message in messages {
1067 remote_message_tx.send(message.to_owned()).await.unwrap();
1068 }
1069
1070 let handle = tokio::spawn(async move {
1071 session
1072 .run(&mut local_message_tx, &mut local_message_rx)
1073 .await
1074 .expect_err("expected unexpected stream closure")
1075 });
1076
1077 drop(remote_message_tx);
1078
1079 let err = handle.await.unwrap();
1080 assert_matches!(
1081 err,
1082 TopicLogSyncError::Sync(LogSyncError::UnexpectedStreamClosure)
1083 );
1084
1085 while let Ok(event) = events_rx.recv().await {
1086 if let TopicLogSyncEvent::Failed { error } = event {
1087 assert_eq!(
1088 error,
1089 "remote unexpectedly closed stream during initial sync".to_string()
1090 );
1091 break;
1092 }
1093 }
1094 }
1095
1096 #[tokio::test]
1097 async fn unexpected_stream_closure_live_mode() {
1098 let topic = TestTopic::new("messages");
1099 let mut peer = Peer::new(0);
1100 peer.insert_topic(&topic, &Default::default());
1101
1102 let (session, mut events_rx, _live_tx) = peer.topic_sync_protocol(topic.clone(), true);
1103
1104 let messages = [
1105 TestTopicSyncMessage::Sync(LogSyncMessage::Have(vec![])),
1106 TestTopicSyncMessage::Sync(LogSyncMessage::Done),
1107 ];
1108
1109 let (mut local_message_tx, _remote_message_rx) = mpsc::channel(128);
1110 let (mut remote_message_tx, local_message_rx) = mpsc::channel(128);
1111 let mut local_message_rx = local_message_rx.map(|message| Ok::<_, ()>(message));
1112
1113 for message in messages {
1114 remote_message_tx.send(message.to_owned()).await.unwrap();
1115 }
1116
1117 let handle = tokio::spawn(async move {
1118 session
1119 .run(&mut local_message_tx, &mut local_message_rx)
1120 .await
1121 .expect_err("expected unexpected stream closure")
1122 });
1123
1124 drop(remote_message_tx);
1125
1126 let err = handle.await.unwrap();
1127 assert_matches!(err, TopicLogSyncError::UnexpectedStreamClosure);
1128
1129 while let Ok(event) = events_rx.recv().await {
1130 if let TopicLogSyncEvent::Failed { error } = event {
1131 assert_eq!(
1132 error,
1133 "remote unexpectedly closed stream in live-mode".to_string()
1134 );
1135 break;
1136 }
1137 }
1138 }
1139}