1use std::collections::HashMap;
5use std::fmt::{Debug, Display};
6use std::marker::PhantomData;
7
8use futures::{Sink, SinkExt, Stream, StreamExt, stream};
9use p2panda_core::cbor::{DecodeError, decode_cbor};
10use p2panda_core::{Body, Extensions, Hash, Header, Operation, PublicKey};
11use p2panda_store::{LogId, LogStore, OperationStore};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::select;
15use tokio::sync::broadcast;
16
17use crate::dedup::{DEFAULT_BUFFER_CAPACITY, Dedup};
18use crate::traits::Protocol;
19
20pub type Logs<L> = HashMap<PublicKey, Vec<L>>;
22
23#[derive(Default)]
25enum State {
26 #[default]
28 Start,
29
30 SendHave { metrics: LogSyncMetrics },
32
33 ReceiveHave { metrics: LogSyncMetrics },
35
36 SendPreSyncOrDone {
38 operations: Vec<Hash>,
39 metrics: LogSyncMetrics,
40 },
41
42 ReceivePreSyncOrDone {
44 operations: Vec<Hash>,
45 metrics: LogSyncMetrics,
46 },
47
48 Sync {
51 operations: Vec<Hash>,
52 metrics: LogSyncMetrics,
53 },
54
55 End { metrics: LogSyncMetrics },
57}
58
59pub struct LogSync<L, E, S, Evt> {
61 state: State,
62 logs: Logs<L>,
63 store: S,
64 event_tx: broadcast::Sender<Evt>,
65 buffer_capacity: usize,
66 _marker: PhantomData<E>,
67}
68
69impl<L, E, S, Evt> LogSync<L, E, S, Evt> {
70 pub fn new(store: S, logs: Logs<L>, event_tx: broadcast::Sender<Evt>) -> Self {
71 Self::new_with_capacity(store, logs, event_tx, DEFAULT_BUFFER_CAPACITY)
72 }
73
74 pub fn new_with_capacity(
75 store: S,
76 logs: Logs<L>,
77 event_tx: broadcast::Sender<Evt>,
78 buffer_capacity: usize,
79 ) -> Self {
80 Self {
81 state: Default::default(),
82 store,
83 event_tx,
84 logs,
85 buffer_capacity,
86 _marker: PhantomData,
87 }
88 }
89}
90
91impl<L, E, S, Evt> Protocol for LogSync<L, E, S, Evt>
92where
93 L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
94 E: Extensions + Send + 'static,
95 S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
96 Evt: Debug + From<LogSyncEvent<E>> + Send + 'static,
97{
98 type Error = LogSyncError;
99 type Output = Dedup<Hash>;
100 type Message = LogSyncMessage<L>;
101
102 async fn run(
103 mut self,
104 sink: &mut (impl Sink<Self::Message, Error = impl Debug> + Unpin),
105 stream: &mut (impl Stream<Item = Result<Self::Message, impl Debug>> + Unpin),
106 ) -> Result<Self::Output, Self::Error> {
107 let mut sync_done_received = false;
108 let mut sync_done_sent = false;
109 let mut dedup = Dedup::new(self.buffer_capacity);
110
111 loop {
112 match self.state {
113 State::Start => {
114 let metrics = LogSyncMetrics::default();
115 self.event_tx
116 .send(
117 LogSyncEvent::Status(LogSyncStatus::Started {
118 metrics: metrics.clone(),
119 })
120 .into(),
121 )
122 .map_err(|_| LogSyncError::BroadcastSend)?;
123 self.state = State::SendHave { metrics };
124 }
125 State::SendHave { metrics } => {
126 let local_log_heights = local_log_heights(&self.store, &self.logs).await?;
127 sink.send(LogSyncMessage::<L>::Have(local_log_heights.clone()))
128 .await
129 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
130 self.state = State::ReceiveHave { metrics };
131 }
132 State::ReceiveHave { mut metrics } => {
133 let Some(message) = stream.next().await else {
134 return Err(LogSyncError::UnexpectedStreamClosure);
135 };
136 let message =
137 message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
138 let LogSyncMessage::Have(remote_log_heights) = message else {
139 return Err(LogSyncError::UnexpectedMessage(message.to_string()));
140 };
141
142 let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
143 remote_log_heights.clone().into_iter().collect();
144
145 let (operations, total_size) = operations_needed_by_remote(
149 &self.store,
150 &self.logs,
151 remote_log_heights_map,
152 )
153 .await?;
154
155 metrics.total_operations_local = Some(operations.len() as u64);
156 metrics.total_bytes_local = Some(total_size);
157
158 self.state = State::SendPreSyncOrDone {
159 operations,
160 metrics,
161 };
162 }
163 State::SendPreSyncOrDone {
164 operations,
165 metrics,
166 } => {
167 let total_operations = metrics.total_operations_local.unwrap();
168 let total_bytes = metrics.total_bytes_local.unwrap();
169
170 if total_operations > 0 {
171 sink.send(LogSyncMessage::PreSync {
172 total_bytes,
173 total_operations,
174 })
175 .await
176 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
177 } else {
178 sink.send(LogSyncMessage::Done)
179 .await
180 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
181 }
182
183 self.event_tx
184 .send(
185 LogSyncEvent::Status(LogSyncStatus::Progress {
186 metrics: metrics.clone(),
187 })
188 .into(),
189 )
190 .map_err(|_| LogSyncError::BroadcastSend)?;
191
192 self.state = State::ReceivePreSyncOrDone {
193 operations,
194 metrics,
195 };
196 }
197 State::ReceivePreSyncOrDone {
198 operations,
199 mut metrics,
200 } => {
201 let Some(message) = stream.next().await else {
202 return Err(LogSyncError::UnexpectedStreamClosure);
203 };
204 let message =
205 message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
206
207 metrics.total_bytes_remote = Some(0);
208 metrics.total_operations_remote = Some(0);
209
210 match message {
211 LogSyncMessage::PreSync {
212 total_operations,
213 total_bytes,
214 } => {
215 metrics.total_bytes_remote = Some(total_bytes);
216 metrics.total_operations_remote = Some(total_operations);
217 }
218 LogSyncMessage::Done => sync_done_received = true,
219 message => {
220 return Err(LogSyncError::UnexpectedMessage(message.to_string()));
221 }
222 }
223
224 self.event_tx
225 .send(
226 LogSyncEvent::Status(LogSyncStatus::Progress {
227 metrics: metrics.clone(),
228 })
229 .into(),
230 )
231 .map_err(|_| LogSyncError::BroadcastSend)?;
232
233 self.state = State::Sync {
234 operations,
235 metrics,
236 };
237 }
238 State::Sync {
239 operations,
240 mut metrics,
241 } => {
242 let mut operation_stream = stream::iter(operations);
243 let mut sent_operations = 0;
244 let total_operations = metrics
245 .total_operations_local
246 .expect("total operations set");
247
248 loop {
252 select! {
253 message = stream.next(), if !sync_done_received => {
254 let Some(message) = message else {
255 break;
256 };
257 let message =
258 message.map_err(|err| LogSyncError::MessageStream(format!("{err:?}")))?;
259 match message {
260 LogSyncMessage::Operation(header, body) => {
261 metrics.total_bytes_received += {
262 header.len()
263 + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
264 } as u64;
265 metrics.total_operations_received += 1;
266
267 let header: Header<E> = decode_cbor(&header[..])?;
271 let body = body.map(|ref bytes| Body::new(bytes));
272
273 dedup.insert(header.hash());
278
279 self.event_tx
281 .send(
282 LogSyncEvent::Data(Box::new(Operation {
283 hash: header.hash(),
284 header,
285 body,
286 }))
287 .into(),
288 )
289 .map_err(|_| LogSyncError::BroadcastSend)?;
290 }
291 LogSyncMessage::Done => {
292 sync_done_received = true;
293 }
294 message => {
295 return Err(LogSyncError::UnexpectedMessage(message.to_string()));
296 }
297 }
298 },
299 Some(hash) = operation_stream.next() => {
300 dedup.insert(hash);
302
303 let (header, body) = self
305 .store
306 .get_raw_operation(hash)
307 .await
308 .map_err(|err| LogSyncError::OperationStore(format!("{err}")))?
309 .expect("operation to be in store");
310
311 metrics.total_bytes_sent += {
312 header.len() + body.as_ref().map(|bytes| bytes.len()).unwrap_or_default()
313 } as u64;
314 metrics.total_operations_sent += 1;
315
316 sink.send(LogSyncMessage::Operation(header, body))
317 .await
318 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
319 sent_operations += 1;
320
321 if sent_operations >= total_operations {
322 sink.send(LogSyncMessage::Done)
323 .await
324 .map_err(|err| LogSyncError::MessageSink(format!("{err:?}")))?;
325 sync_done_sent = true;
326 }
327 },
328 else => {
329 break;
333 }
334 }
335 if sync_done_received && sync_done_sent {
336 break;
337 }
338 }
339 self.state = State::End { metrics };
340 }
341 State::End { metrics } => {
342 self.event_tx
343 .send(
344 LogSyncEvent::Status(LogSyncStatus::Completed {
345 metrics: metrics.clone(),
346 })
347 .into(),
348 )
349 .map_err(|_| LogSyncError::BroadcastSend)?;
350 break;
351 }
352 }
353 }
354
355 Ok(dedup)
356 }
357}
358
359async fn local_log_heights<L, E, S>(
361 store: &S,
362 logs: &Logs<L>,
363) -> Result<Vec<(PublicKey, Vec<(L, u64)>)>, LogSyncError>
364where
365 L: LogId,
366 S: LogStore<L, E> + OperationStore<L, E>,
367{
368 let mut local_log_heights = Vec::new();
369 for (public_key, log_ids) in logs {
370 let mut log_heights = Vec::new();
371 for log_id in log_ids {
372 let latest = store
373 .latest_operation(public_key, log_id)
374 .await
375 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
376
377 if let Some((header, _)) = latest {
378 log_heights.push((log_id.clone(), header.seq_num));
379 };
380 }
381 local_log_heights.push((*public_key, log_heights));
382 }
383
384 Ok(local_log_heights)
385}
386
387async fn operations_needed_by_remote<L, E, S>(
390 store: &S,
391 logs: &Logs<L>,
392 remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>>,
393) -> Result<(Vec<Hash>, u64), LogSyncError>
394where
395 L: LogId,
396 E: Extensions,
397 S: LogStore<L, E> + OperationStore<L, E>,
398{
399 let mut operations = Vec::new();
404 let mut total_size = 0;
405
406 for (public_key, log_ids) in logs {
407 for log_id in log_ids {
408 let latest_operation = store
410 .latest_operation(public_key, log_id)
411 .await
412 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
413
414 let log_height = match latest_operation {
415 Some((header, _)) => header.seq_num,
416 None => continue,
419 };
420
421 let remote_needs_from = match remote_log_heights_map.get(public_key) {
423 Some(log_heights) => {
424 match log_heights.iter().find(|(id, _)| *id == *log_id) {
425 Some((_, log_height)) => log_height + 1,
428 None => 0,
430 }
431 }
432 None => 0,
434 };
435
436 if remote_needs_from <= log_height {
437 let log = store
438 .get_log_hashes(public_key, log_id, Some(remote_needs_from))
439 .await
440 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
441
442 if let Some(log) = log {
443 operations.extend(log);
444 }
445
446 let size = store
447 .get_log_size(public_key, log_id, Some(remote_needs_from))
448 .await
449 .map_err(|err| LogSyncError::LogStore(format!("{err}")))?;
450
451 if let Some(size) = size {
452 total_size += size;
453 }
454 };
455 }
456 }
457
458 Ok((operations, total_size))
459}
460
461#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
463#[serde(tag = "type", content = "value")]
464pub enum LogSyncMessage<L> {
465 Have(Vec<(PublicKey, Vec<(L, u64)>)>),
466 PreSync {
467 total_operations: u64,
468 total_bytes: u64,
469 },
470 Operation(Vec<u8>, Option<Vec<u8>>),
472 Done,
473}
474
475impl<L> Display for LogSyncMessage<L>
476where
477 L: Debug,
478{
479 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
480 let value = match self {
481 LogSyncMessage::Have(_) => "have",
482 LogSyncMessage::PreSync { .. } => "pre_sync",
483 LogSyncMessage::Operation(_, _) => "operation",
484 LogSyncMessage::Done => "done",
485 };
486
487 write!(f, "{value}")
488 }
489}
490
491#[derive(Clone, Debug, PartialEq)]
493pub enum LogSyncEvent<E> {
494 Status(LogSyncStatus),
495 Data(Box<Operation<E>>),
496}
497
498#[derive(Clone, Debug, PartialEq, Default)]
500pub struct LogSyncMetrics {
501 pub total_operations_local: Option<u64>,
502 pub total_operations_remote: Option<u64>,
503 pub total_operations_received: u64,
504 pub total_operations_sent: u64,
505 pub total_bytes_local: Option<u64>,
506 pub total_bytes_remote: Option<u64>,
507 pub total_bytes_received: u64,
508 pub total_bytes_sent: u64,
509}
510
511#[derive(Clone, Debug, PartialEq)]
513pub enum LogSyncStatus {
514 Started { metrics: LogSyncMetrics },
515 Progress { metrics: LogSyncMetrics },
516 Completed { metrics: LogSyncMetrics },
517}
518
519#[derive(Debug, Error)]
521pub enum LogSyncError {
522 #[error(transparent)]
523 Decode(#[from] DecodeError),
524
525 #[error("log store error: {0}")]
526 LogStore(String),
527
528 #[error("operation store error: {0}")]
529 OperationStore(String),
530
531 #[error("no active receivers when broadcasting")]
532 BroadcastSend,
533
534 #[error("log sync error sending on message sink: {0}")]
535 MessageSink(String),
536
537 #[error("log sync error receiving from message stream: {0}")]
538 MessageStream(String),
539
540 #[error("remote unexpectedly closed stream during initial sync")]
541 UnexpectedStreamClosure,
542
543 #[error("log sync received unexpected protocol message: {0}")]
544 UnexpectedMessage(String),
545}
546
547#[cfg(test)]
548mod tests {
549 use assert_matches::assert_matches;
550 use futures::StreamExt;
551 use p2panda_core::Body;
552
553 use crate::protocols::log_sync::{
554 LogSyncError, LogSyncEvent, LogSyncMetrics, LogSyncStatus, Logs, Operation,
555 };
556 use crate::test_utils::{Peer, TestLogSyncMessage, run_protocol, run_protocol_uni};
557
558 #[tokio::test]
559 async fn log_sync_no_operations() {
560 let mut peer: Peer = Peer::new(0);
561
562 let (session, mut event_rx) = peer.log_sync_protocol(&Logs::default());
563 let remote_message_rx = run_protocol_uni(
564 session,
565 &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
566 )
567 .await
568 .unwrap();
569
570 for index in 0..=3 {
571 let event = event_rx.recv().await.unwrap();
572 match index {
573 0 => {
574 let (total_operations, total_bytes) = assert_matches!(
575 event,
576 LogSyncEvent::Status(LogSyncStatus::Started { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
577 => (total_operations_remote, total_bytes_remote)
578 );
579 assert_eq!(total_operations, None);
580 assert_eq!(total_bytes, None);
581 }
582 1 => {
583 let (total_operations, total_bytes) = assert_matches!(
584 event,
585 LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. } })
586 => (total_operations_local, total_bytes_local)
587 );
588 assert_eq!(total_operations, Some(0));
589 assert_eq!(total_bytes, Some(0));
590 }
591 2 => {
592 let (total_operations, total_bytes) = assert_matches!(
593 event,
594 LogSyncEvent::Status(LogSyncStatus::Progress { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
595 => (total_operations_remote, total_bytes_remote)
596 );
597 assert_eq!(total_operations, Some(0));
598 assert_eq!(total_bytes, Some(0));
599 }
600 3 => {
601 let (total_operations, total_bytes) = assert_matches!(
602 event,
603 LogSyncEvent::Status(LogSyncStatus::Completed { metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. } })
604 => (total_operations_remote, total_bytes_remote)
605 );
606 assert_eq!(total_operations, Some(0));
607 assert_eq!(total_bytes, Some(0));
608 }
609 _ => panic!(),
610 };
611 }
612
613 let messages = remote_message_rx.collect::<Vec<_>>().await;
614 assert_eq!(messages.len(), 2);
615 for (index, message) in messages.into_iter().enumerate() {
616 match index {
617 0 => assert_eq!(message, TestLogSyncMessage::Have(vec![])),
618 1 => {
619 assert_eq!(message, TestLogSyncMessage::Done);
620 break;
621 }
622 _ => panic!(),
623 };
624 }
625 }
626
627 #[tokio::test]
628 async fn log_sync_some_operations() {
629 let mut peer = Peer::new(0);
630 let log_id = 0;
631
632 let body = Body::new("Hello, Sloth!".as_bytes());
633 let (header_0, header_bytes_0) = peer.create_operation(&body, log_id).await;
634 let (header_1, header_bytes_1) = peer.create_operation(&body, log_id).await;
635 let (header_2, header_bytes_2) = peer.create_operation(&body, log_id).await;
636
637 let mut logs = Logs::default();
638 logs.insert(peer.id(), vec![log_id]);
639
640 let (session, mut event_rx) = peer.log_sync_protocol(&logs);
641 let remote_message_rx = run_protocol_uni(
642 session,
643 &[TestLogSyncMessage::Have(vec![]), TestLogSyncMessage::Done],
644 )
645 .await
646 .unwrap();
647
648 let expected_bytes = header_0.payload_size
649 + header_bytes_0.len() as u64
650 + header_1.payload_size
651 + header_bytes_1.len() as u64
652 + header_2.payload_size
653 + header_bytes_2.len() as u64;
654
655 for index in 0..=3 {
656 let event = event_rx.recv().await.unwrap();
657 match index {
658 0 => {
659 assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. }));
660 }
661 1 => {
662 let (total_operations, total_bytes) = assert_matches!(
663 event,
664 LogSyncEvent::Status(LogSyncStatus::Progress {
665 metrics: LogSyncMetrics { total_operations_local, total_bytes_local, .. }
666 }) => (total_operations_local, total_bytes_local)
667 );
668 assert_eq!(total_operations, Some(3));
669
670 assert_eq!(total_bytes, Some(expected_bytes));
671 }
672 2 => {
673 let (total_operations, total_bytes) = assert_matches!(
674 event,
675 LogSyncEvent::Status(LogSyncStatus::Progress {
676 metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
677 }) => (total_operations_remote, total_bytes_remote)
678 );
679 assert_eq!(total_operations, Some(0));
680 assert_eq!(total_bytes, Some(0));
681 }
682 3 => {
683 let (total_operations, total_bytes) = assert_matches!(
684 event,
685 LogSyncEvent::Status(LogSyncStatus::Completed {
686 metrics: LogSyncMetrics { total_operations_remote, total_bytes_remote, .. }
687 }) => (total_operations_remote, total_bytes_remote)
688 );
689 assert_eq!(total_operations, Some(0));
690 assert_eq!(total_bytes, Some(0));
691 }
692 _ => panic!(),
693 };
694 }
695
696 let messages = remote_message_rx.collect::<Vec<_>>().await;
697 assert_eq!(messages.len(), 6);
698 for (index, message) in messages.into_iter().enumerate() {
699 match index {
700 0 => assert_eq!(
701 message,
702 TestLogSyncMessage::Have(vec![(peer.id(), vec![(0, 2)])])
703 ),
704 1 => assert_eq!(
705 message,
706 TestLogSyncMessage::PreSync {
707 total_operations: 3,
708 total_bytes: expected_bytes
709 }
710 ),
711 2 => {
712 let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
713 header,
714 Some(body),
715 ) => (header, body));
716 assert_eq!(header, header_bytes_0);
717 assert_eq!(Body::new(&body_inner), body)
718 }
719 3 => {
720 let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
721 header,
722 Some(body),
723 ) => (header, body));
724 assert_eq!(header, header_bytes_1);
725 assert_eq!(Body::new(&body_inner), body)
726 }
727 4 => {
728 let (header, body_inner) = assert_matches!(message, TestLogSyncMessage::Operation(
729 header,
730 Some(body),
731 ) => (header, body));
732 assert_eq!(header, header_bytes_2);
733 assert_eq!(Body::new(&body_inner), body)
734 }
735 5 => {
736 assert_eq!(message, TestLogSyncMessage::Done);
737 }
738 _ => panic!(),
739 };
740 }
741 }
742
743 #[tokio::test]
744 async fn log_sync_bidirectional_exchange() {
745 const LOG_ID: u64 = 0;
746
747 let mut peer_a = Peer::new(0);
748 let mut peer_b = Peer::new(1);
749
750 let body_a = Body::new("From Alice".as_bytes());
751 let body_b = Body::new("From Bob".as_bytes());
752
753 let (header_a0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
754 let (header_a1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
755
756 let (header_b0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
757 let (header_b1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
758
759 let mut logs = Logs::default();
760 logs.insert(peer_a.id(), vec![LOG_ID]);
761 logs.insert(peer_b.id(), vec![LOG_ID]);
762
763 let (a_session, mut peer_a_event_rx) = peer_a.log_sync_protocol(&logs);
764 let (b_session, mut peer_b_event_rx) = peer_b.log_sync_protocol(&logs);
765
766 run_protocol(a_session, b_session).await.unwrap();
767
768 for index in 0..=5 {
769 let event = peer_a_event_rx.recv().await.unwrap();
770 match index {
771 0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
772 1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
773 2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
774 3 => {
775 let (header, body_inner) = assert_matches!(
776 event,
777 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
778 );
779 assert_eq!(header, header_b0);
780 assert_eq!(body_inner.unwrap(), body_b);
781 }
782 4 => {
783 let (header, body_inner) = assert_matches!(
784 event,
785 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
786 );
787 assert_eq!(header, header_b1);
788 assert_eq!(body_inner.unwrap(), body_b);
789 }
790 5 => {
791 assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { .. }));
792 break;
793 }
794 _ => panic!(),
795 }
796 }
797
798 for index in 0..=5 {
799 let event = peer_b_event_rx.recv().await.unwrap();
800 match index {
801 0 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Started { .. })),
802 1 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
803 2 => assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Progress { .. })),
804 3 => {
805 let (header, body_inner) = assert_matches!(
806 event,
807 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
808 );
809 assert_eq!(header, header_a0);
810 assert_eq!(body_inner.unwrap(), body_a);
811 }
812 4 => {
813 let (header, body_inner) = assert_matches!(
814 event,
815 LogSyncEvent::Data(operation) => {let Operation {header, body, ..} = *operation; (header, body)}
816 );
817 assert_eq!(header, header_a1);
818 assert_eq!(body_inner.unwrap(), body_a);
819 }
820 5 => {
821 let metrics = assert_matches!(event, LogSyncEvent::Status(LogSyncStatus::Completed { metrics }) => metrics);
822 let LogSyncMetrics {
823 total_operations_local,
824 total_operations_remote,
825 total_operations_received,
826 total_operations_sent,
827 total_bytes_local,
828 total_bytes_remote,
829 total_bytes_received,
830 total_bytes_sent,
831 } = metrics;
832
833 assert_eq!(total_operations_remote.unwrap(), total_operations_received);
834 assert_eq!(total_bytes_remote.unwrap(), total_bytes_received);
835 assert_eq!(total_operations_local.unwrap(), total_operations_sent);
836 assert_eq!(total_bytes_local.unwrap(), total_bytes_sent);
837 }
838 _ => panic!(),
839 }
840 }
841 }
842
843 #[tokio::test]
844 async fn log_sync_unexpected_operation_before_presend() {
845 let mut peer = Peer::new(0);
846 const LOG_ID: u64 = 1;
847
848 let body = Body::new(b"unexpected op before presend");
849 let (_, header_bytes) = peer.create_operation(&body, LOG_ID).await;
850
851 let mut logs = Logs::default();
852 logs.insert(peer.id(), vec![LOG_ID]);
853
854 let (session, _event_rx) = peer.log_sync_protocol(&logs);
855
856 let messages = vec![
858 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
859 TestLogSyncMessage::Operation(header_bytes.clone(), Some(body.to_bytes())),
860 TestLogSyncMessage::PreSync {
861 total_operations: 1,
862 total_bytes: 100,
863 },
864 TestLogSyncMessage::Done,
865 ];
866
867 let result = run_protocol_uni(session, &messages).await;
868 assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
869 }
870
871 #[tokio::test]
872 async fn log_sync_unexpected_presend_twice() {
873 let mut peer = Peer::new(0);
874 const LOG_ID: u64 = 1;
875
876 let body = Body::new(b"two presends");
877 peer.create_operation(&body, LOG_ID).await;
878
879 let mut logs = Logs::default();
880 logs.insert(peer.id(), vec![LOG_ID]);
881 let (session, _event_rx) = peer.log_sync_protocol(&logs);
882
883 let messages = vec![
884 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
885 TestLogSyncMessage::PreSync {
886 total_operations: 1,
887 total_bytes: 32,
888 },
889 TestLogSyncMessage::PreSync {
890 total_operations: 1,
891 total_bytes: 32,
892 },
893 TestLogSyncMessage::Done,
894 ];
895
896 let result = run_protocol_uni(session, &messages).await;
897 assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
898 }
899
900 #[tokio::test]
901 async fn log_sync_unexpected_done_before_anything() {
902 let mut peer = Peer::new(0);
903 let logs = Logs::default();
904
905 let (session, _event_rx) = peer.log_sync_protocol(&logs);
906
907 let messages = vec![TestLogSyncMessage::Done];
908 let result = run_protocol_uni(session, &messages).await;
909
910 assert!(
911 matches!(result, Err(LogSyncError::UnexpectedMessage(_))),
912 "{:?}",
913 result
914 );
915 }
916
917 #[tokio::test]
918 async fn log_sync_unexpected_have_after_presend() {
919 let mut peer = Peer::new(0);
920 const LOG_ID: u64 = 1;
921 let body = Body::new(b"bad have order");
922 peer.create_operation(&body, LOG_ID).await;
923
924 let mut logs = Logs::default();
925 logs.insert(peer.id(), vec![LOG_ID]);
926 let (session, _event_rx) = peer.log_sync_protocol(&logs);
927
928 let messages = vec![
929 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 0)])]),
930 TestLogSyncMessage::PreSync {
931 total_operations: 1,
932 total_bytes: 64,
933 },
934 TestLogSyncMessage::Have(vec![(peer.id(), vec![(LOG_ID, 99)])]), TestLogSyncMessage::Done,
936 ];
937
938 let result = run_protocol_uni(session, &messages).await;
939 assert!(matches!(result, Err(LogSyncError::UnexpectedMessage(_))));
940 }
941}