1use std::collections::HashMap;
5use std::fmt::{Debug, Display};
6use std::marker::PhantomData;
7
8use futures::{Sink, SinkExt, Stream, StreamExt, stream};
9use p2panda_core::cbor::{DecodeError, decode_cbor};
10use p2panda_core::{Body, Extensions, Hash, Header, Operation, PublicKey};
11use p2panda_store::{LogId, LogStore, OperationStore};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::select;
15use tokio::sync::broadcast;
16use tracing::{debug, trace, warn};
17
18use crate::dedup::{DEFAULT_BUFFER_CAPACITY, Dedup};
19use crate::traits::Protocol;
20
21pub type Logs<L> = HashMap<PublicKey, Vec<L>>;
23
24#[derive(Default)]
26enum State<L> {
27 #[default]
29 Start,
30
31 SendHave { metrics: LogSyncMetrics },
33
34 ReceiveHave { metrics: LogSyncMetrics },
36
37 SendPreSyncOrDone {
39 operations: Vec<(PublicKey, L, u64, Hash)>,
40 metrics: LogSyncMetrics,
41 },
42
43 ReceivePreSyncOrDone {
45 operations: Vec<(PublicKey, L, u64, Hash)>,
46 metrics: LogSyncMetrics,
47 },
48
49 Sync {
52 operations: Vec<(PublicKey, L, u64, Hash)>,
53 metrics: LogSyncMetrics,
54 },
55
56 End { metrics: LogSyncMetrics },
58}
59
60pub struct LogSync<L, E, S, Evt> {
62 state: State<L>,
63 logs: Logs<L>,
64 store: S,
65 event_tx: broadcast::Sender<Evt>,
66 buffer_capacity: usize,
67 _marker: PhantomData<E>,
68}
69
70impl<L, E, S, Evt> LogSync<L, E, S, Evt> {
71 pub fn new(store: S, logs: Logs<L>, event_tx: broadcast::Sender<Evt>) -> Self {
72 Self::new_with_capacity(store, logs, event_tx, DEFAULT_BUFFER_CAPACITY)
73 }
74
75 pub fn new_with_capacity(
76 store: S,
77 logs: Logs<L>,
78 event_tx: broadcast::Sender<Evt>,
79 buffer_capacity: usize,
80 ) -> Self {
81 Self {
82 state: Default::default(),
83 store,
84 event_tx,
85 logs,
86 buffer_capacity,
87 _marker: PhantomData,
88 }
89 }
90}
91
92impl<L, E, S, Evt> Protocol for LogSync<L, E, S, Evt>
93where
94 L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
95 E: Extensions + Send + 'static,
96 S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
97 Evt: Debug + From<LogSyncEvent<E>> + Send + 'static,
98{
99 type Error = LogSyncError;
100 type Output = (Dedup<Hash>, LogSyncMetrics);
101 type Message = LogSyncMessage<L>;
102
103 async fn run(
104 mut self,
105 sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
106 stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
107 ) -> Result<Self::Output, Self::Error> {
108 let mut sync_done_received = false;
109 let mut sync_done_sent = false;
110 let mut dedup = Dedup::new(self.buffer_capacity);
111
112 let metrics = loop {
113 match self.state {
114 State::Start => {
115 let metrics = LogSyncMetrics::default();
116 self.event_tx
117 .send(
118 LogSyncEvent::Status(LogSyncStatus::Started {
119 metrics: metrics.clone(),
120 })
121 .into(),
122 )
123 .map_err(|_| LogSyncError::BroadcastSend)?;
124 self.state = State::SendHave { metrics };
125 }
126 State::SendHave { metrics } => {
127 let local_log_heights = local_log_heights(&self.store, &self.logs).await?;
128 sink.send(LogSyncMessage::<L>::Have(local_log_heights.clone()))
129 .await
130 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
131 self.state = State::ReceiveHave { metrics };
132 }
133 State::ReceiveHave { mut metrics } => {
134 let Some(message) = stream.next().await else {
135 return Err(LogSyncError::UnexpectedStreamClosure);
136 };
137 let message =
138 message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
139 let LogSyncMessage::Have(remote_log_heights) = message else {
140 return Err(LogSyncError::UnexpectedMessage(message.to_string()));
141 };
142
143 let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
144 remote_log_heights.clone().into_iter().collect();
145
146 let (operations, total_size) = operations_needed_by_remote(
150 &self.store,
151 &self.logs,
152 remote_log_heights_map,
153 )
154 .await?;
155
156 metrics.total_operations_local = Some(operations.len() as u64);
157 metrics.total_bytes_local = Some(total_size);
158
159 self.state = State::SendPreSyncOrDone {
160 operations,
161 metrics,
162 };
163 }
164 State::SendPreSyncOrDone {
165 operations,
166 metrics,
167 } => {
168 let total_operations = metrics.total_operations_local.unwrap();
169 let total_bytes = metrics.total_bytes_local.unwrap();
170
171 if total_operations > 0 {
172 sink.send(LogSyncMessage::PreSync {
173 total_bytes,
174 total_operations,
175 })
176 .await
177 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
178 } else {
179 sink.send(LogSyncMessage::Done)
180 .await
181 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
182 }
183
184 self.event_tx
185 .send(
186 LogSyncEvent::Status(LogSyncStatus::Progress {
187 metrics: metrics.clone(),
188 })
189 .into(),
190 )
191 .map_err(|_| LogSyncError::BroadcastSend)?;
192
193 self.state = State::ReceivePreSyncOrDone {
194 operations,
195 metrics,
196 };
197 }
198 State::ReceivePreSyncOrDone {
199 operations,
200 mut metrics,
201 } => {
202 let Some(message) = stream.next().await else {
203 return Err(LogSyncError::UnexpectedStreamClosure);
204 };
205 let message =
206 message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
207
208 metrics.total_bytes_remote = Some(0);
209 metrics.total_operations_remote = Some(0);
210
211 match message {
212 LogSyncMessage::PreSync {
213 total_operations,
214 total_bytes,
215 } => {
216 metrics.total_bytes_remote = Some(total_bytes);
217 metrics.total_operations_remote = Some(total_operations);
218 }
219 LogSyncMessage::Done => sync_done_received = true,
220 message => {
221 return Err(LogSyncError::UnexpectedMessage(message.to_string()));
222 }
223 }
224
225 debug!(
226 local_ops = metrics.total_operations_local.unwrap_or_default(),
227 remote_ops = metrics.total_operations_remote.unwrap_or_default(),
228 local_bytes = metrics.total_bytes_local.unwrap_or_default(),
229 remote_bytes = metrics.total_bytes_remote.unwrap_or_default(),
230 "sync metrics received",
231 );
232
233 self.event_tx
234 .send(
235 LogSyncEvent::Status(LogSyncStatus::Progress {
236 metrics: metrics.clone(),
237 })
238 .into(),
239 )
240 .map_err(|_| LogSyncError::BroadcastSend)?;
241
242 self.state = State::Sync {
243 operations,
244 metrics,
245 };
246 }
247 State::Sync {
248 operations,
249 mut metrics,
250 } => {
251 let mut operation_stream = stream::iter(operations);
252 let mut sent_operations = 0;
253 let total_operations = metrics
254 .total_operations_local
255 .expect("total operations set");
256
257 loop {
261 select! {
262 message = stream.next(), if !sync_done_received => {
263 let Some(message) = message else {
264 break;
265 };
266 let message =
267 message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
268 match message {
269 LogSyncMessage::Operation(header, body) => {
270 metrics.total_bytes_received += {
271 header.len()
272 + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
273 } as u64;
274 metrics.total_operations_received += 1;
275
276 let header: Header<E> = decode_cbor(&header[..])?;
280 let body = body.map(|ref bytes| Body::new(bytes));
281
282 dedup.insert(header.hash());
287
288 trace!(
289 phase = "sync",
290 id = ?header.hash().fmt_short(),
291 received_ops = metrics.total_operations_received,
292 received_bytes = metrics.total_bytes_received,
293 "received operation"
294 );
295
296 self.event_tx
298 .send(
299 LogSyncEvent::Data(Box::new(Operation {
300 hash: header.hash(),
301 header,
302 body,
303 }))
304 .into(),
305 )
306 .map_err(|_| LogSyncError::BroadcastSend)?;
307 }
308 LogSyncMessage::Done => {
309 sync_done_received = true;
310 }
311 message => {
312 return Err(LogSyncError::UnexpectedMessage(message.to_string()));
313 }
314 }
315 },
316 Some((public_key, log_id, seq_num, hash)) = operation_stream.next() => {
317 dedup.insert(hash);
319
320 let result = self
322 .store
323 .get_raw_operation(hash)
324 .await
325 .map_err(|err| LogSyncError::OperationStore(format!("{err}")))?;
326
327 #[cfg(test)]
332 tokio::task::yield_now().await;
333
334 if let Some((header, body)) = result {
335 metrics.total_bytes_sent += {
336 header.len() + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
337 } as u64;
338 metrics.total_operations_sent += 1;
339
340 trace!(
341 phase = "sync",
342 public_key = %public_key.fmt_short(),
343 log_id = ?log_id,
344 seq_num,
345 id = %hash.fmt_short(),
346 sent_ops = metrics.total_operations_sent,
347 sent_bytes = metrics.total_bytes_sent,
348 "send operation",
349 );
350
351 sink.send(LogSyncMessage::Operation(header, body))
352 .await
353 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
354 } else {
355 warn!(
356 phase = "sync",
357 log_id = ?log_id,
358 seq_num,
359 id = %hash.fmt_short(),
360 "expected operation missing from store",
361 );
362 };
363
364 sent_operations += 1;
365
366 if sent_operations >= total_operations {
367 sink.send(LogSyncMessage::Done)
368 .await
369 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
370 sync_done_sent = true;
371 }
372 },
373 else => {
374 break;
378 }
379 }
380 if sync_done_received && sync_done_sent {
381 break;
382 }
383 }
384 self.state = State::End { metrics };
385 }
386 State::End { metrics } => {
387 self.event_tx
388 .send(
389 LogSyncEvent::Status(LogSyncStatus::Completed {
390 metrics: metrics.clone(),
391 })
392 .into(),
393 )
394 .map_err(|_| LogSyncError::BroadcastSend)?;
395 break metrics;
396 }
397 }
398 };
399
400 Ok((dedup, metrics))
401 }
402}
403
404async fn local_log_heights<L, E, S>(
406 store: &S,
407 logs: &Logs<L>,
408) -> Result<Vec<(PublicKey, Vec<(L, u64)>)>, LogSyncError>
409where
410 L: LogId,
411 S: LogStore<L, E> + OperationStore<L, E>,
412{
413 let mut local_log_heights = Vec::new();
414 for (public_key, log_ids) in logs {
415 let mut log_heights = Vec::new();
416 for log_id in log_ids {
417 let latest = store
418 .latest_operation(public_key, log_id)
419 .await
420 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
421
422 if let Some((header, _)) = latest {
423 log_heights.push((log_id.clone(), header.seq_num));
424 };
425 }
426 local_log_heights.push((*public_key, log_heights));
427 }
428
429 Ok(local_log_heights)
430}
431
432async fn operations_needed_by_remote<L, E, S>(
435 store: &S,
436 local_logs: &Logs<L>,
437 remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>>,
438) -> Result<(Vec<(PublicKey, L, u64, Hash)>, u64), LogSyncError>
439where
440 L: LogId,
441 E: Extensions,
442 S: LogStore<L, E> + OperationStore<L, E>,
443{
444 let mut remote_needs = Vec::new();
449 let mut total_size = 0;
450
451 for (public_key, log_ids) in local_logs {
452 for log_id in log_ids {
453 let latest_operation = store
455 .latest_operation(public_key, log_id)
456 .await
457 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
458
459 let log_height = match latest_operation {
460 Some((header, _)) => header.seq_num,
461 None => continue,
464 };
465
466 let remote_needs_from = match remote_log_heights_map.get(public_key) {
468 Some(log_heights) => {
469 match log_heights.iter().find(|(id, _)| *id == *log_id) {
470 Some((_, log_height)) => log_height + 1,
473 None => 0,
475 }
476 }
477 None => 0,
479 };
480
481 if remote_needs_from <= log_height {
482 let log = store
483 .get_log_hashes(public_key, log_id, Some(remote_needs_from))
484 .await
485 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
486
487 if let Some(log) = log {
488 let log: Vec<_> = log
493 .into_iter()
494 .map(|(seq_num, hash)| (*public_key, log_id.clone(), seq_num, hash))
495 .collect();
496 remote_needs.extend(log);
497 }
498
499 let size = store
500 .get_log_size(public_key, log_id, Some(remote_needs_from))
501 .await
502 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
503
504 if let Some(size) = size {
505 total_size += size;
506 }
507 };
508 }
509 }
510
511 Ok((remote_needs, total_size))
512}
513
514#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
516#[serde(tag = "type", content = "value")]
517pub enum LogSyncMessage<L> {
518 Have(Vec<(PublicKey, Vec<(L, u64)>)>),
519 PreSync {
520 total_operations: u64,
521 total_bytes: u64,
522 },
523 Operation(Vec<u8>, Option<Vec<u8>>),
525 Done,
526}
527
528impl<L> Display for LogSyncMessage<L>
529where
530 L: Debug,
531{
532 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
533 let value = match self {
534 LogSyncMessage::Have(_) => "have",
535 LogSyncMessage::PreSync { .. } => "pre_sync",
536 LogSyncMessage::Operation(_, _) => "operation",
537 LogSyncMessage::Done => "done",
538 };
539
540 write!(f, "{value}")
541 }
542}
543
544#[derive(Clone, Debug, PartialEq)]
546pub enum LogSyncEvent<E> {
547 Status(LogSyncStatus),
548 Data(Box<Operation<E>>),
549}
550
551#[derive(Clone, Debug, PartialEq, Default)]
553pub struct LogSyncMetrics {
554 pub total_operations_local: Option<u64>,
555 pub total_operations_remote: Option<u64>,
556 pub total_operations_received: u64,
557 pub total_operations_sent: u64,
558 pub total_bytes_local: Option<u64>,
559 pub total_bytes_remote: Option<u64>,
560 pub total_bytes_received: u64,
561 pub total_bytes_sent: u64,
562}
563
564#[derive(Clone, Debug, PartialEq)]
566pub enum LogSyncStatus {
567 Started { metrics: LogSyncMetrics },
568 Progress { metrics: LogSyncMetrics },
569 Completed { metrics: LogSyncMetrics },
570}
571
572#[derive(Debug, Error)]
574pub enum LogSyncError {
575 #[error(transparent)]
576 Decode(#[from] DecodeError),
577
578 #[error("log store error: {0}")]
579 LogStore(String),
580
581 #[error("operation store error: {0}")]
582 OperationStore(String),
583
584 #[error("no active receivers when broadcasting")]
585 BroadcastSend,
586
587 #[error("log sync error sending on message sink: {0}")]
588 MessageSink(String),
589
590 #[error("log sync error receiving from message stream: {0}")]
591 MessageStream(String),
592
593 #[error("remote unexpectedly closed stream during initial sync")]
594 UnexpectedStreamClosure,
595
596 #[error("log sync received unexpected protocol message: {0}")]
597 UnexpectedMessage(String),
598}
599
600pub trait ShortFormat {
603 fn fmt_short(&self) -> String;
604}
605
606impl ShortFormat for PublicKey {
607 fn fmt_short(&self) -> String {
608 self.to_hex()[0..10].to_string()
609 }
610}
611
612impl ShortFormat for Hash {
613 fn fmt_short(&self) -> String {
614 self.to_hex()[0..5].to_string()
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use std::time::Duration;
621
622 use assert_matches::assert_matches;
623 use futures::StreamExt;
624 use p2panda_core::Body;
625 use p2panda_store::OperationStore;
626
627 use crate::protocols::log_sync::{
628 LogSyncError, LogSyncEvent, LogSyncMetrics, LogSyncStatus, Logs, Operation,
629 };
630 use crate::test_utils::{
631 Peer, TestLogSyncMessage, run_protocol, run_protocol_uni, setup_logging,
632 };
633
634 #[tokio::test]
635 async fn log_sync_no_operations() {
636 let mut peer: Peer = Peer::new(0);
637
638 let (session, mut event_rx) = peer.log_sync_protocol(&Logs::default());
639 let remote_message_rx = run_protocol_uni(
640 session,
641 &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
642 )
643 .await
644 .unwrap();
645
646 for index in 0..=3 {
647 let event = event_rx.recv().await.unwrap();
648 match index {
649 0 => {
650 let (total_operations, total_bytes) = assert_matches!(
651 event,
652 LogSyncEvent::Status(LogSyncStatus::Started { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
653 => (total_operations_remote, total_bytes_remote)
654 );
655 assert_eq!(total_operations, None);
656 assert_eq!(total_bytes, None);
657 }
658 1 => {
659 let (total_operations, total_bytes) = assert_matches!(
660 event,
661 LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. } })
662 => (total_operations_local, total_bytes_local)
663 );
664 assert_eq!(total_operations, Some(0));
665 assert_eq!(total_bytes, Some(0));
666 }
667 2 => {
668 let (total_operations, total_bytes) = assert_matches!(
669 event,
670 LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
671 => (total_operations_remote, total_bytes_remote)
672 );
673 assert_eq!(total_operations, Some(0));
674 assert_eq!(total_bytes, Some(0));
675 }
676 3 => {
677 let (total_operations, total_bytes) = assert_matches!(
678 event,
679 LogSyncEvent::Status(LogSyncStatus::Completed { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
680 => (total_operations_remote, total_bytes_remote)
681 );
682 assert_eq!(total_operations, Some(0));
683 assert_eq!(total_bytes, Some(0));
684 }
685 _ => panic!(),
686 };
687 }
688
689 let messages = remote_message_rx.collect::<Vec<_>>().await;
690 assert_eq!(messages.len(), 2);
691 for (index, message) in messages.into_iter().enumerate() {
692 match index {
693 0 => assert_eq!(message, TestLogSyncMessage::Have(vec![])),
694 1 => {
695 assert_eq!(message, TestLogSyncMessage::Done);
696 break;
697 }
698 _ => panic!(),
699 };
700 }
701 }
702
703 #[tokio::test]
704 async fn log_sync_some_operations() {
705 let mut peer = Peer::new(0);
706 let log_id = 0;
707
708 let body = Body::new("Hello, Sloth!".as_bytes());
709 let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
710 let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
711 let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
712
713 let mut logs = Logs::default();
714 logs.insert(peer.id(), vec![log_id]);
715
716 let (session, mut event_rx) = peer.log_sync_protocol(&logs);
717 let remote_message_rx = run_protocol_uni(
718 session,
719 &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
720 )
721 .await
722 .unwrap();
723
724 let expected_bytes = header_0.payload_size
725 + header_bytes_0.len() as u64
726 + header_1.payload_size
727 + header_bytes_1.len() as u64
728 + header_2.payload_size
729 + header_bytes_2.len() as u64;
730
731 for index in 0..=3 {
732 let event = event_rx.recv().await.unwrap();
733 match index {
734 0 => {
735 assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. }));
736 }
737 1 => {
738 let (total_operations, total_bytes) = assert_matches!(
739 event,
740 LogSyncEvent::Status(LogSyncStatus::Progress {
741 metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. }
742 }) => (total_operations_local, total_bytes_local)
743 );
744 assert_eq!(total_operations, Some(3));
745
746 assert_eq!(total_bytes, Some(expected_bytes));
747 }
748 2 => {
749 let (total_operations, total_bytes) = assert_matches!(
750 event,
751 LogSyncEvent::Status(LogSyncStatus::Progress {
752 metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
753 }) => (total_operations_remote, total_bytes_remote)
754 );
755 assert_eq!(total_operations, Some(0));
756 assert_eq!(total_bytes, Some(0));
757 }
758 3 => {
759 let (total_operations, total_bytes) = assert_matches!(
760 event,
761 LogSyncEvent::Status(LogSyncStatus::Completed {
762 metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
763 }) => (total_operations_remote, total_bytes_remote)
764 );
765 assert_eq!(total_operations, Some(0));
766 assert_eq!(total_bytes, Some(0));
767 }
768 _ => panic!(),
769 };
770 }
771
772 let messages = remote_message_rx.collect::<Vec<_>>().await;
773 assert_eq!(messages.len(), 6);
774 for (index, message) in messages.into_iter().enumerate() {
775 match index {
776 0 => assert_eq!(
777 message,
778 TestLogSyncMessage::Have(vec![(peer.id(), vec![(0, 2)])])
779 ),
780 1 => assert_eq!(
781 message,
782 TestLogSyncMessage::PreSync {
783 total_operations: 3,
784 total_bytes: expected_bytes
785 }
786 ),
787 2 => {
788 let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
789 header,
790 Some(body),
791 ) => (header, body));
792 assert_eq!(header, header_bytes_0);
793 assert_eq!(Body::new(&body_inner), body)
794 }
795 3 => {
796 let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
797 header,
798 Some(body),
799 ) => (header, body));
800 assert_eq!(header, header_bytes_1);
801 assert_eq!(Body::new(&body_inner), body)
802 }
803 4 => {
804 let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
805 header,
806 Some(body),
807 ) => (header, body));
808 assert_eq!(header, header_bytes_2);
809 assert_eq!(Body::new(&body_inner), body)
810 }
811 5 => {
812 assert_eq!(message, TestLogSyncMessage::Done);
813 }
814 _ => panic!(),
815 };
816 }
817 }
818
819 #[tokio::test]
820 async fn log_sync_bidirectional_exchange() {
821 const LOG_ID: u64 = 0;
822
823 let mut peer_a = Peer::new(0);
824 let mut peer_b = Peer::new(1);
825
826 let body_a = Body::new("From Alice".as_bytes());
827 let body_b = Body::new("From Bob".as_bytes());
828
829 let (header_a0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
830 let (header_a1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
831
832 let (header_b0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
833 let (header_b1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
834
835 let mut logs = Logs::default();
836 logs.insert(peer_a.id(), vec![LOG_ID]);
837 logs.insert(peer_b.id(), vec![LOG_ID]);
838
839 let (a_session, mut peer_a_event_rx) = peer_a.log_sync_protocol(&logs);
840 let (b_session, mut peer_b_event_rx) = peer_b.log_sync_protocol(&logs);
841
842 run_protocol(a_session, b_session).await.unwrap();
843
844 for index in 0..=5 {
845 let event = peer_a_event_rx.recv().await.unwrap();
846 match index {
847 0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
848 1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
849 2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
850 3 => {
851 let (header, body_inner) = assert_matches!(
852 event,
853 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
854 );
855 assert_eq!(header, header_b0);
856 assert_eq!(body_inner.unwrap(), body_b);
857 }
858 4 => {
859 let (header, body_inner) = assert_matches!(
860 event,
861 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
862 );
863 assert_eq!(header, header_b1);
864 assert_eq!(body_inner.unwrap(), body_b);
865 }
866 5 => {
867 assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { .. }));
868 break;
869 }
870 _ => panic!(),
871 }
872 }
873
874 for index in 0..=5 {
875 let event = peer_b_event_rx.recv().await.unwrap();
876 match index {
877 0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
878 1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
879 2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
880 3 => {
881 let (header, body_inner) = assert_matches!(
882 event,
883 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
884 );
885 assert_eq!(header, header_a0);
886 assert_eq!(body_inner.unwrap(), body_a);
887 }
888 4 => {
889 let (header, body_inner) = assert_matches!(
890 event,
891 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
892 );
893 assert_eq!(header, header_a1);
894 assert_eq!(body_inner.unwrap(), body_a);
895 }
896 5 => {
897 let metrics = assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { metrics }) => metrics);
898 let LogSyncMetrics {
899 total_operations_local,
900 total_operations_remote,
901 total_operations_received,
902 total_operations_sent,
903 total_bytes_local,
904 total_bytes_remote,
905 total_bytes_received,
906 total_bytes_sent,
907 } = metrics;
908
909 assert_eq!(total_operations_remote.unwrap(), total_operations_received);
910 assert_eq!(total_bytes_remote.unwrap(), total_bytes_received);
911 assert_eq!(total_operations_local.unwrap(), total_operations_sent);
912 assert_eq!(total_bytes_local.unwrap(), total_bytes_sent);
913 }
914 _ => panic!(),
915 }
916 }
917 }
918
919 #[tokio::test]
920 async fn log_sync_unexpected_operation_before_presend() {
921 let mut peer = Peer::new(0);
922 const LOG_ID: u64 = 1;
923
924 let body = Body::new(b"unexpected op before presend");
925 let (_, header_bytes) = peer.create_operation(&body, LOG_ID).await;
926
927 let mut logs = Logs::default();
928 logs.insert(peer.id(), vec![LOG_ID]);
929
930 let (session, _event_rx) = peer.log_sync_protocol(&logs);
931
932 let messages = vec![
934 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
935 TestLogSyncMessage::Operation(header_bytes.clone(), Some(body.to_bytes())),
936 TestLogSyncMessage::PreSync {
937 total_operations: 1,
938 total_bytes: 100,
939 },
940 TestLogSyncMessage::Done,
941 ];
942
943 let result = run_protocol_uni(session, &messages).await;
944 assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
945 }
946
947 #[tokio::test]
948 async fn log_sync_unexpected_presend_twice() {
949 let mut peer = Peer::new(0);
950 const LOG_ID: u64 = 1;
951
952 let body = Body::new(b"two presends");
953 peer.create_operation(&body, LOG_ID).await;
954
955 let mut logs = Logs::default();
956 logs.insert(peer.id(), vec![LOG_ID]);
957 let (session, _event_rx) = peer.log_sync_protocol(&logs);
958
959 let messages = vec![
960 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
961 TestLogSyncMessage::PreSync {
962 total_operations: 1,
963 total_bytes: 32,
964 },
965 TestLogSyncMessage::PreSync {
966 total_operations: 1,
967 total_bytes: 32,
968 },
969 TestLogSyncMessage::Done,
970 ];
971
972 let result = run_protocol_uni(session, &messages).await;
973 assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
974 }
975
976 #[tokio::test]
977 async fn log_sync_unexpected_done_before_anything() {
978 let mut peer = Peer::new(0);
979 let logs = Logs::default();
980
981 let (session, _event_rx) = peer.log_sync_protocol(&logs);
982
983 let messages = vec![TestLogSyncMessage::Done];
984 let result = run_protocol_uni(session, &messages).await;
985
986 assert!(
987 matches!(result, Err(LogSyncError::UnexpectedMessage(_))),
988 "{:?}",
989 result
990 );
991 }
992
993 #[tokio::test]
994 async fn log_sync_unexpected_have_after_presend() {
995 let mut peer = Peer::new(0);
996 const LOG_ID: u64 = 1;
997 let body = Body::new(b"bad have order");
998 peer.create_operation(&body, LOG_ID).await;
999
1000 let mut logs = Logs::default();
1001 logs.insert(peer.id(), vec![LOG_ID]);
1002 let (session, _event_rx) = peer.log_sync_protocol(&logs);
1003
1004 let messages = vec![
1005 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
1006 TestLogSyncMessage::PreSync {
1007 total_operations: 1,
1008 total_bytes: 64,
1009 },
1010 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 99)])]), TestLogSyncMessage::Done,
1012 ];
1013
1014 let result = run_protocol_uni(session, &messages).await;
1015 assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
1016 }
1017
1018 #[tokio::test]
1019 async fn log_sync_with_concurrently_pruned_log() {
1020 setup_logging();
1021
1022 let mut peer_a = Peer::new(0);
1023 let mut peer_b = Peer::new(1);
1024
1025 let body = Body::new(&[0; 10000]);
1026
1027 for _ in 0..30 {
1029 let _ = peer_a.create_operation(&body, 0).await;
1030 }
1031 for _ in 0..30 {
1032 let _ = peer_a.create_operation(&body, 1).await;
1033 }
1034 let mut to_be_pruned_log = vec![];
1035 for _ in 0..30 {
1036 let (header, _) = peer_a.create_operation(&body, 2).await;
1037 to_be_pruned_log.push(header.hash());
1038 }
1039
1040 let mut logs = Logs::default();
1041 logs.insert(peer_a.id(), vec![0, 1, 2]);
1042
1043 let (a_session, _peer_b_event_rx) = peer_a.log_sync_protocol(&logs);
1044 let (b_session, mut peer_b_event_rx) = peer_b.log_sync_protocol(&logs);
1045
1046 let _peer_b_event_tx_clone = b_session.event_tx.clone();
1047
1048 tokio::spawn(async move {
1050 run_protocol(a_session, b_session).await.unwrap();
1051 });
1052
1053 tokio::time::sleep(Duration::from_micros(1)).await;
1055 peer_a
1056 .store
1057 .delete_operation(to_be_pruned_log[0])
1058 .await
1059 .unwrap();
1060
1061 loop {
1062 let event = peer_b_event_rx.recv().await.unwrap();
1063 if let LogSyncEvent::Status(LogSyncStatus::Completed { metrics }) = event {
1064 let LogSyncMetrics {
1065 total_operations_remote,
1066 total_operations_received,
1067 ..
1068 } = metrics;
1069
1070 assert_eq!(total_operations_remote.unwrap(), 90);
1073
1074 assert_eq!(total_operations_received, 89);
1077 break;
1078 }
1079 }
1080 }
1081}