1use std::collections::HashMap;
21use std::fmt::Debug;
22use std::marker::PhantomData;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use futures::{AsyncRead, AsyncWrite, Sink, SinkExt, StreamExt, stream};
27use p2panda_core::{Extensions, PublicKey};
28use p2panda_store::{LogId, LogStore};
29use serde::{Deserialize, Serialize};
30
31use crate::cbor::{into_cbor_sink, into_cbor_stream};
32use crate::{FromSync, SyncError, SyncProtocol, TopicQuery};
33
34type SeqNum = u64;
35
36type LogHeights<T> = Vec<(T, SeqNum)>;
37
38type Logs<T> = HashMap<PublicKey, Vec<T>>;
39
40#[async_trait]
75pub trait TopicLogMap<T, L>: Debug + Send + Sync
76where
77 T: TopicQuery,
78{
79 async fn get(&self, topic: &T) -> Option<Logs<L>>;
80}
81
82#[allow(clippy::large_enum_variant)]
84#[derive(Debug, Clone, Deserialize, Serialize)]
85#[serde(tag = "type", content = "value")]
86enum Message<T, L = String> {
87 Have(T, Vec<(PublicKey, LogHeights<L>)>),
88 Data(Vec<u8>, Option<Vec<u8>>),
89 Done,
90}
91
92#[derive(Clone, Debug)]
94pub struct LogSyncProtocol<TM, L, E, S: LogStore<L, E>> {
95 topic_map: TM,
96 store: S,
97 _marker: PhantomData<(L, E)>,
98}
99
100impl<TM, L, E, S> LogSyncProtocol<TM, L, E, S>
101where
102 S: LogStore<L, E>,
103{
104 pub fn new(topic_map: TM, store: S) -> Self {
107 Self {
108 topic_map,
109 store,
110 _marker: PhantomData {},
111 }
112 }
113}
114
115#[async_trait]
129impl<'a, T, TM, L, E, S> SyncProtocol<T, 'a> for LogSyncProtocol<TM, L, E, S>
130where
131 T: TopicQuery,
132 TM: TopicLogMap<T, L>,
133 L: LogId + Send + Sync + for<'de> Deserialize<'de> + Serialize + 'a,
134 E: Extensions + Send + Sync + 'a,
135 S: Debug + Sync + LogStore<L, E>,
136{
137 fn name(&self) -> &'static str {
138 "p2panda-log-sync-v1"
139 }
140
141 async fn initiate(
142 self: Arc<Self>,
143 topic_query: T,
144 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
145 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
146 mut app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
147 ) -> Result<(), SyncError> {
148 let mut sync_done_received = false;
149 let mut sync_done_sent = false;
150
151 let mut sink = into_cbor_sink(tx);
152 let mut stream = into_cbor_stream(rx);
153
154 let local_log_heights =
156 local_log_heights(&self.store, &self.topic_map, &topic_query).await?;
157
158 sink.send(Message::<T, L>::Have(
160 topic_query.clone(),
161 local_log_heights.clone(),
162 ))
163 .await?;
164
165 app_tx
167 .send(FromSync::HandshakeSuccess(topic_query.clone()))
168 .await?;
169
170 while let Some(result) = stream.next().await {
172 let message: Message<T, L> = result?;
173
174 match message {
175 Message::Data(header, payload) => {
176 app_tx.send(FromSync::Data { header, payload }).await?;
178 }
179 Message::Done => {
180 sync_done_received = true;
181 }
182 Message::Have(remote_topic_query, remote_log_heights) => {
183 if !sync_done_received {
184 return Err(SyncError::UnexpectedBehaviour(
185 "unexpected \"have\" message received".to_string(),
186 ));
187 }
188
189 if remote_topic_query != topic_query {
191 return Err(SyncError::UnexpectedBehaviour(format!(
192 "incompatible topic query {topic_query:?} requested from remote peer"
193 )));
194 }
195
196 let Some(logs) = self.topic_map.get(&topic_query).await else {
198 return Err(SyncError::UnexpectedBehaviour(format!(
199 "unsupported topic query {topic_query:?} requested from remote peer"
200 )));
201 };
202
203 let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
204 remote_log_heights.clone().into_iter().collect();
205
206 let messages: Vec<Message<T, L>> =
208 messages_needed_by_remote(&self.store, &logs, remote_log_heights_map)
209 .await?;
210 sink.send_all(&mut stream::iter(messages.into_iter().map(Ok)))
211 .await?;
212
213 sink.send(Message::Done).await?;
215 sync_done_sent = true;
216 }
217 };
218
219 if sync_done_received && sync_done_sent {
220 break;
221 }
222 }
223
224 sink.flush().await?;
226 app_tx.flush().await?;
227
228 Ok(())
229 }
230
231 async fn accept(
232 self: Arc<Self>,
233 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
234 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
235 mut app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
236 ) -> Result<(), SyncError> {
237 let mut sync_done_sent = false;
238 let mut sync_done_received = false;
239
240 let mut sink = into_cbor_sink(tx);
241 let mut stream = into_cbor_stream(rx);
242
243 while let Some(result) = stream.next().await {
244 let message: Message<T, L> = result?;
245 match message {
246 Message::Have(topic_query, remote_log_heights) => {
247 app_tx
250 .send(FromSync::HandshakeSuccess(topic_query.clone()))
251 .await?;
252
253 let Some(logs) = self.topic_map.get(&topic_query).await else {
255 return Err(SyncError::UnexpectedBehaviour(format!(
256 "unsupported topic query {topic_query:?} requested from remote peer"
257 )));
258 };
259
260 let remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>> =
261 remote_log_heights.clone().into_iter().collect();
262
263 let messages: Vec<Message<T, L>> =
265 messages_needed_by_remote(&self.store, &logs, remote_log_heights_map)
266 .await?;
267 sink.send_all(&mut stream::iter(messages.into_iter().map(Ok)))
268 .await?;
269
270 sink.send(Message::Done).await?;
272 sync_done_sent = true;
273
274 let local_log_heights =
276 local_log_heights(&self.store, &self.topic_map, &topic_query).await?;
277
278 sink.send(Message::<T, L>::Have(
280 topic_query.clone(),
281 local_log_heights.clone(),
282 ))
283 .await?;
284 }
285 Message::Data(header, payload) => {
286 app_tx.send(FromSync::Data { header, payload }).await?;
288 }
289 Message::Done => {
290 sync_done_received = true;
291 }
292 };
293
294 if sync_done_received && sync_done_sent {
295 break;
296 }
297 }
298
299 sink.flush().await?;
301 app_tx.flush().await?;
302
303 Ok(())
304 }
305}
306
307async fn local_log_heights<T, L, E>(
310 store: &impl LogStore<L, E>,
311 topic_map: &impl TopicLogMap<T, L>,
312 topic_query: &T,
313) -> Result<Vec<(PublicKey, Vec<(L, u64)>)>, SyncError>
314where
315 T: TopicQuery,
316 L: LogId,
317{
318 let Some(logs) = topic_map.get(topic_query).await else {
320 return Err(SyncError::Critical(format!(
321 "unknown {topic_query:?} topic query"
322 )));
323 };
324
325 let mut local_log_heights = Vec::new();
327 for (public_key, log_ids) in logs {
328 let mut log_heights = Vec::new();
329 for log_id in log_ids {
330 let latest = store
331 .latest_operation(&public_key, &log_id)
332 .await
333 .map_err(|err| {
334 SyncError::Critical(format!("can't retrieve log heights from store, {err}"))
335 })?;
336
337 if let Some((header, _)) = latest {
338 log_heights.push((log_id.clone(), header.seq_num));
339 };
340 }
341 local_log_heights.push((public_key, log_heights));
342 }
343
344 Ok(local_log_heights)
345}
346
347async fn remote_needs<T, L, E>(
350 store: &impl LogStore<L, E>,
351 log_id: &L,
352 public_key: &PublicKey,
353 from: SeqNum,
354) -> Result<Vec<Message<T, L>>, SyncError>
355where
356 E: Extensions + Send + Sync,
357{
358 let log = store
359 .get_raw_log(public_key, log_id, Some(from))
360 .await
361 .map_err(|err| SyncError::Critical(format!("could not retrieve log from store, {err}")))?;
362
363 let messages = log
364 .unwrap_or_default()
365 .into_iter()
366 .map(|(header, payload)| Message::Data(header, payload))
367 .collect();
368
369 Ok(messages)
370}
371
372async fn messages_needed_by_remote<T, L, E>(
375 store: &impl LogStore<L, E>,
376 logs: &Logs<L>,
377 remote_log_heights_map: HashMap<PublicKey, Vec<(L, u64)>>,
378) -> Result<Vec<Message<T, L>>, SyncError>
379where
380 L: LogId,
381 E: Extensions + Send + Sync,
382{
383 let mut messages_for_remote = Vec::new();
388
389 for (public_key, log_ids) in logs {
390 for log_id in log_ids {
391 let latest_operation =
393 store
394 .latest_operation(public_key, log_id)
395 .await
396 .map_err(|err| {
397 SyncError::Critical(format!("can't retreive log heights from store, {err}"))
398 })?;
399
400 let log_height = match latest_operation {
401 Some((header, _)) => header.seq_num,
402 None => continue,
405 };
406
407 let remote_needs_from = match remote_log_heights_map.get(public_key) {
409 Some(log_heights) => {
410 match log_heights.iter().find(|(id, _)| *id == *log_id) {
411 Some((_, log_height)) => log_height + 1,
414 None => 0,
416 }
417 }
418 None => 0,
420 };
421
422 if remote_needs_from <= log_height {
423 let messages: Vec<Message<T, L>> =
424 remote_needs(store, log_id, public_key, remote_needs_from).await?;
425 for message in messages {
426 messages_for_remote.push(message);
427 }
428 };
429 }
430 }
431
432 Ok(messages_for_remote)
433}
434
435#[cfg(test)]
436mod tests {
437 use std::collections::HashMap;
438 use std::sync::Arc;
439
440 use async_trait::async_trait;
441 use futures::SinkExt;
442 use p2panda_core::{Body, Hash, Header, PrivateKey};
443 use p2panda_store::{MemoryStore, OperationStore};
444 use serde::{Deserialize, Serialize};
445 use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, ReadHalf};
446 use tokio::sync::mpsc;
447 use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
448 use tokio_util::sync::PollSender;
449
450 use crate::{FromSync, SyncError, SyncProtocol, TopicQuery};
451
452 use super::{LogSyncProtocol, Logs, Message, TopicLogMap};
453
454 impl<T, L> Message<T, L>
455 where
456 T: Serialize,
457 L: Serialize,
458 {
459 pub fn to_bytes(&self) -> Vec<u8> {
460 p2panda_core::cbor::encode_cbor(&self).expect("type can be serialized")
461 }
462 }
463
464 fn create_operation(
465 private_key: &PrivateKey,
466 body: &Body,
467 seq_num: u64,
468 timestamp: u64,
469 backlink: Option<Hash>,
470 ) -> (Hash, Header, Vec<u8>) {
471 let mut header = Header {
472 version: 1,
473 public_key: private_key.public_key(),
474 signature: None,
475 payload_size: body.size(),
476 payload_hash: Some(body.hash()),
477 timestamp,
478 seq_num,
479 backlink,
480 previous: vec![],
481 extensions: None,
482 };
483 header.sign(private_key);
484 let header_bytes = header.to_bytes();
485 (header.hash(), header, header_bytes)
486 }
487
488 #[derive(Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)]
489 pub struct LogHeightTopic(String, [u8; 32]);
490
491 impl LogHeightTopic {
492 pub fn new(name: &str) -> Self {
493 Self(name.to_owned(), [0; 32])
494 }
495 }
496
497 impl TopicQuery for LogHeightTopic {}
498
499 #[derive(Clone, Debug)]
500 struct LogHeightTopicMap<T>(HashMap<T, Logs<u64>>);
501
502 impl<T> LogHeightTopicMap<T>
503 where
504 T: TopicQuery,
505 {
506 pub fn new() -> Self {
507 LogHeightTopicMap(HashMap::new())
508 }
509
510 fn insert(&mut self, topic_query: &T, logs: Logs<u64>) -> Option<Logs<u64>> {
511 self.0.insert(topic_query.clone(), logs)
512 }
513 }
514
515 #[async_trait]
516 impl<T> TopicLogMap<T, u64> for LogHeightTopicMap<T>
517 where
518 T: TopicQuery,
519 {
520 async fn get(&self, topic_query: &T) -> Option<Logs<u64>> {
521 self.0.get(topic_query).cloned()
522 }
523 }
524
525 async fn assert_message_bytes(
526 mut rx: ReadHalf<DuplexStream>,
527 messages: Vec<Message<LogHeightTopic, u8>>,
528 ) {
529 let mut buf = Vec::new();
530 rx.read_to_end(&mut buf).await.unwrap();
531 assert_eq!(
532 buf,
533 messages.iter().fold(Vec::new(), |mut acc, message| {
534 acc.extend(message.to_bytes());
535 acc
536 })
537 );
538 }
539
540 fn to_bytes(messages: Vec<Message<LogHeightTopic>>) -> Vec<u8> {
541 messages.iter().fold(Vec::new(), |mut acc, message| {
542 acc.extend(message.to_bytes());
543 acc
544 })
545 }
546
547 #[tokio::test]
548 async fn sync_no_operations_accept() {
549 let topic_query = LogHeightTopic::new("messages");
550 let logs = HashMap::new();
551 let store = MemoryStore::<u64>::new();
552
553 let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
555 let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
556 let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
557
558 let (app_tx, mut app_rx) = mpsc::channel(128);
560
561 let message_bytes = to_bytes(vec![
563 Message::Have(topic_query.clone(), vec![]),
564 Message::Done,
565 ]);
566 peer_b_write.write_all(&message_bytes[..]).await.unwrap();
567
568 let mut topic_map = LogHeightTopicMap::new();
570 topic_map.insert(&topic_query, logs);
571 let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
572 let mut sink =
573 PollSender::new(app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
574 protocol
575 .accept(
576 Box::new(&mut peer_a_write.compat_write()),
577 Box::new(&mut peer_a_read.compat()),
578 Box::new(&mut sink),
579 )
580 .await
581 .unwrap();
582
583 assert_message_bytes(
585 peer_b_read,
586 vec![Message::Done, Message::Have(topic_query.clone(), vec![])],
587 )
588 .await;
589
590 let mut messages = Vec::new();
592 app_rx.recv_many(&mut messages, 10).await;
593 assert_eq!(messages, vec![FromSync::HandshakeSuccess(topic_query)])
594 }
595
596 #[tokio::test]
597 async fn sync_no_operations_initiate() {
598 let topic_query = LogHeightTopic::new("messages");
599 let logs = HashMap::new();
600 let store = MemoryStore::<u64>::new();
601
602 let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
604 let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
605 let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
606
607 let (app_tx, mut app_rx) = mpsc::channel(128);
609
610 let messages = [
612 Message::Done,
613 Message::Have::<LogHeightTopic>(topic_query.clone(), vec![]),
614 ];
615 let message_bytes = messages.iter().fold(Vec::new(), |mut acc, message| {
616 acc.extend(message.to_bytes());
617 acc
618 });
619 peer_b_write.write_all(&message_bytes[..]).await.unwrap();
620
621 let mut topic_map = LogHeightTopicMap::new();
623 topic_map.insert(&topic_query, logs);
624 let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
625 let mut sink =
626 PollSender::new(app_tx).sink_map_err(|err| crate::SyncError::Critical(err.to_string()));
627 protocol
628 .initiate(
629 topic_query.clone(),
630 Box::new(&mut peer_a_write.compat_write()),
631 Box::new(&mut peer_a_read.compat()),
632 Box::new(&mut sink),
633 )
634 .await
635 .unwrap();
636
637 assert_message_bytes(
639 peer_b_read,
640 vec![Message::Have(topic_query.clone(), vec![]), Message::Done],
641 )
642 .await;
643
644 let mut messages = Vec::new();
646 app_rx.recv_many(&mut messages, 10).await;
647 assert_eq!(messages, vec![FromSync::HandshakeSuccess(topic_query)])
648 }
649
650 #[tokio::test]
651 async fn sync_operations_accept() {
652 let private_key = PrivateKey::new();
653 let log_id = 0;
654 let topic_query = LogHeightTopic::new("messages");
655 let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
656
657 let mut store = MemoryStore::<u64>::new();
658
659 let body = Body::new("Hello, Sloth!".as_bytes());
660 let (hash_0, header_0, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
661 let (hash_1, header_1, header_bytes_1) =
662 create_operation(&private_key, &body, 1, 100, Some(hash_0));
663 let (hash_2, header_2, header_bytes_2) =
664 create_operation(&private_key, &body, 2, 200, Some(hash_1));
665
666 store
667 .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
668 .await
669 .unwrap();
670 store
671 .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
672 .await
673 .unwrap();
674 store
675 .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
676 .await
677 .unwrap();
678
679 let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
681 let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
682 let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
683
684 let (app_tx, mut app_rx) = mpsc::channel(128);
686
687 let messages = [
689 Message::Have::<LogHeightTopic>(topic_query.clone(), vec![]),
690 Message::Done,
691 ];
692 let message_bytes = messages.iter().fold(Vec::new(), |mut acc, message| {
693 acc.extend(message.to_bytes());
694 acc
695 });
696 peer_b_write.write_all(&message_bytes[..]).await.unwrap();
697
698 let mut topic_map = LogHeightTopicMap::new();
700 topic_map.insert(&topic_query, logs);
701 let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
702 let mut sink =
703 PollSender::new(app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
704 protocol
705 .accept(
706 Box::new(&mut peer_a_write.compat_write()),
707 Box::new(&mut peer_a_read.compat()),
708 Box::new(&mut sink),
709 )
710 .await
711 .unwrap();
712
713 let messages = vec![
715 Message::Data(header_bytes_0, Some(body.to_bytes())),
716 Message::Data(header_bytes_1, Some(body.to_bytes())),
717 Message::Data(header_bytes_2, Some(body.to_bytes())),
718 Message::Done,
719 Message::Have(
720 topic_query.clone(),
721 vec![(private_key.public_key(), vec![(0, 2)])],
722 ),
723 ];
724 assert_message_bytes(peer_b_read, messages).await;
725
726 let mut messages = Vec::new();
728 app_rx.recv_many(&mut messages, 10).await;
729 assert_eq!(messages, [FromSync::HandshakeSuccess(topic_query)])
730 }
731
732 #[tokio::test]
733 async fn sync_operations_initiate() {
734 let private_key = PrivateKey::new();
735 let log_id = 0;
736 let topic_query = LogHeightTopic::new("messages");
737 let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
738
739 let store = MemoryStore::<u64>::new();
740
741 let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
743 let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
744 let (peer_b_read, mut peer_b_write) = tokio::io::split(peer_b);
745
746 let (app_tx, mut app_rx) = mpsc::channel(128);
748
749 let body = Body::new("Hello, Sloth!".as_bytes());
751
752 let (hash_0, _, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
753 let (hash_1, _, header_bytes_1) =
754 create_operation(&private_key, &body, 1, 100, Some(hash_0));
755 let (_, _, header_bytes_2) = create_operation(&private_key, &body, 2, 200, Some(hash_1));
756
757 let messages = vec![
759 Message::Data(header_bytes_0.clone(), Some(body.to_bytes())),
760 Message::Data(header_bytes_1.clone(), Some(body.to_bytes())),
761 Message::Data(header_bytes_2.clone(), Some(body.to_bytes())),
762 Message::Done,
763 Message::Have::<LogHeightTopic>(topic_query.clone(), vec![]),
764 ];
765 let message_bytes = messages.iter().fold(Vec::new(), |mut acc, message| {
766 acc.extend(message.to_bytes());
767 acc
768 });
769 peer_b_write.write_all(&message_bytes[..]).await.unwrap();
770
771 let mut topic_map = LogHeightTopicMap::new();
773 topic_map.insert(&topic_query, logs);
774 let protocol = Arc::new(LogSyncProtocol::new(topic_map, store));
775 let mut sink =
776 PollSender::new(app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
777 protocol
778 .initiate(
779 topic_query.clone(),
780 Box::new(&mut peer_a_write.compat_write()),
781 Box::new(&mut peer_a_read.compat()),
782 Box::new(&mut sink),
783 )
784 .await
785 .unwrap();
786
787 assert_message_bytes(
789 peer_b_read,
790 vec![
791 Message::Have(
792 topic_query.clone(),
793 vec![(private_key.public_key(), vec![])],
794 ),
795 Message::Done,
796 ],
797 )
798 .await;
799
800 let mut messages = Vec::new();
802 app_rx.recv_many(&mut messages, 10).await;
803 assert_eq!(
804 messages,
805 [
806 FromSync::HandshakeSuccess(topic_query),
807 FromSync::Data {
808 header: header_bytes_0,
809 payload: Some(body.to_bytes())
810 },
811 FromSync::Data {
812 header: header_bytes_1,
813 payload: Some(body.to_bytes())
814 },
815 FromSync::Data {
816 header: header_bytes_2,
817 payload: Some(body.to_bytes())
818 },
819 ]
820 );
821 }
822
823 #[tokio::test]
824 async fn e2e_sync_where_one_peer_has_data() {
825 let private_key = PrivateKey::new();
826 let log_id = 0;
827 let topic_query = LogHeightTopic::new("messages");
828 let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
829
830 let store_1 = MemoryStore::default();
832
833 let mut topic_map = LogHeightTopicMap::new();
835 topic_map.insert(&topic_query, logs);
836 let peer_a_protocol = Arc::new(LogSyncProtocol::new(topic_map.clone(), store_1));
837
838 let mut store_2 = MemoryStore::default();
840 let body = Body::new("Hello, Sloth!".as_bytes());
841
842 let (hash_0, header_0, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
843 let (hash_1, header_1, header_bytes_1) =
844 create_operation(&private_key, &body, 1, 100, Some(hash_0));
845 let (hash_2, header_2, header_bytes_2) =
846 create_operation(&private_key, &body, 2, 200, Some(hash_1));
847
848 store_2
849 .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
850 .await
851 .unwrap();
852 store_2
853 .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
854 .await
855 .unwrap();
856 store_2
857 .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
858 .await
859 .unwrap();
860
861 let peer_b_protocol = Arc::new(LogSyncProtocol::new(topic_map, store_2));
863
864 let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
866 let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
867 let (peer_b_read, peer_b_write) = tokio::io::split(peer_b);
868
869 let peer_a_protocol_clone = peer_a_protocol.clone();
871 let (peer_a_app_tx, mut peer_a_app_rx) = mpsc::channel(128);
872 let mut sink =
873 PollSender::new(peer_a_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
874 let topic_clone = topic_query.clone();
875 let handle_1 = tokio::spawn(async move {
876 peer_a_protocol_clone
877 .initiate(
878 topic_clone,
879 Box::new(&mut peer_a_write.compat_write()),
880 Box::new(&mut peer_a_read.compat()),
881 Box::new(&mut sink),
882 )
883 .await
884 .unwrap();
885 });
886
887 let peer_b_protocol_clone = peer_b_protocol.clone();
889 let (peer_b_app_tx, mut peer_b_app_rx) = mpsc::channel(128);
890 let mut sink =
891 PollSender::new(peer_b_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
892 let handle_2 = tokio::spawn(async move {
893 peer_b_protocol_clone
894 .accept(
895 Box::new(&mut peer_b_write.compat_write()),
896 Box::new(&mut peer_b_read.compat()),
897 Box::new(&mut sink),
898 )
899 .await
900 .unwrap();
901 });
902
903 let (_, _) = tokio::join!(handle_1, handle_2);
905
906 let peer_a_expected_messages = vec![
907 FromSync::HandshakeSuccess(topic_query.clone()),
908 FromSync::Data {
909 header: header_bytes_0,
910 payload: Some(body.to_bytes()),
911 },
912 FromSync::Data {
913 header: header_bytes_1,
914 payload: Some(body.to_bytes()),
915 },
916 FromSync::Data {
917 header: header_bytes_2,
918 payload: Some(body.to_bytes()),
919 },
920 ];
921
922 let mut peer_a_messages = Vec::new();
923 peer_a_app_rx.recv_many(&mut peer_a_messages, 10).await;
924 assert_eq!(peer_a_messages, peer_a_expected_messages);
925
926 let peer_b_expected_messages = vec![FromSync::HandshakeSuccess(topic_query.clone())];
927 let mut peer_b_messages = Vec::new();
928 peer_b_app_rx.recv_many(&mut peer_b_messages, 10).await;
929 assert_eq!(peer_b_messages, peer_b_expected_messages);
930 }
931
932 #[tokio::test]
933 async fn e2e_partial_sync() {
934 let private_key = PrivateKey::new();
935 let log_id = 0;
936 let topic_query = LogHeightTopic::new("messages");
937 let logs = HashMap::from([(private_key.public_key(), vec![log_id])]);
938
939 let body = Body::new("Hello, Sloth!".as_bytes());
940
941 let (hash_0, header_0, header_bytes_0) = create_operation(&private_key, &body, 0, 0, None);
942 let (hash_1, header_1, header_bytes_1) =
943 create_operation(&private_key, &body, 1, 100, Some(hash_0));
944 let (hash_2, header_2, header_bytes_2) =
945 create_operation(&private_key, &body, 2, 200, Some(hash_1));
946
947 let mut store_1 = MemoryStore::default();
948 store_1
949 .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
950 .await
951 .unwrap();
952
953 let mut topic_map = LogHeightTopicMap::new();
955 topic_map.insert(&topic_query, logs);
956 let peer_a_protocol = Arc::new(LogSyncProtocol::new(topic_map.clone(), store_1));
957
958 let mut store_2 = MemoryStore::default();
960
961 store_2
963 .insert_operation(hash_0, &header_0, Some(&body), &header_bytes_0, &log_id)
964 .await
965 .unwrap();
966 store_2
967 .insert_operation(hash_1, &header_1, Some(&body), &header_bytes_1, &log_id)
968 .await
969 .unwrap();
970 store_2
971 .insert_operation(hash_2, &header_2, Some(&body), &header_bytes_2, &log_id)
972 .await
973 .unwrap();
974
975 let peer_b_protocol = Arc::new(LogSyncProtocol::new(topic_map, store_2));
977
978 let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
980 let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
981 let (peer_b_read, peer_b_write) = tokio::io::split(peer_b);
982
983 let peer_a_protocol_clone = peer_a_protocol.clone();
985 let (peer_a_app_tx, mut peer_a_app_rx) = mpsc::channel(128);
986 let mut sink =
987 PollSender::new(peer_a_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
988 let topic_clone = topic_query.clone();
989 let handle_1 = tokio::spawn(async move {
990 peer_a_protocol_clone
991 .initiate(
992 topic_clone,
993 Box::new(&mut peer_a_write.compat_write()),
994 Box::new(&mut peer_a_read.compat()),
995 Box::new(&mut sink),
996 )
997 .await
998 .unwrap();
999 });
1000
1001 let peer_b_protocol_clone = peer_b_protocol.clone();
1003 let (peer_b_app_tx, mut peer_b_app_rx) = mpsc::channel(128);
1004 let mut sink =
1005 PollSender::new(peer_b_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
1006 let handle_2 = tokio::spawn(async move {
1007 peer_b_protocol_clone
1008 .accept(
1009 Box::new(&mut peer_b_write.compat_write()),
1010 Box::new(&mut peer_b_read.compat()),
1011 Box::new(&mut sink),
1012 )
1013 .await
1014 .unwrap();
1015 });
1016
1017 let (_, _) = tokio::join!(handle_1, handle_2);
1019
1020 let peer_a_expected_messages = vec![
1021 FromSync::HandshakeSuccess(topic_query.clone()),
1022 FromSync::Data {
1023 header: header_bytes_1,
1024 payload: Some(body.to_bytes()),
1025 },
1026 FromSync::Data {
1027 header: header_bytes_2,
1028 payload: Some(body.to_bytes()),
1029 },
1030 ];
1031
1032 let mut peer_a_messages = Vec::new();
1033 peer_a_app_rx.recv_many(&mut peer_a_messages, 10).await;
1034 assert_eq!(peer_a_messages, peer_a_expected_messages);
1035
1036 let peer_b_expected_messages = vec![FromSync::HandshakeSuccess(topic_query.clone())];
1037 let mut peer_b_messages = Vec::new();
1038 peer_b_app_rx.recv_many(&mut peer_b_messages, 10).await;
1039 assert_eq!(peer_b_messages, peer_b_expected_messages);
1040 }
1041
1042 #[tokio::test]
1043 async fn e2e_sync_two_logs() {
1044 let private_key = PrivateKey::new();
1051 let log_id_1 = 0;
1052 let log_id_2 = 1;
1053
1054 let body_1 = Body::new("Hello, Sloth!".as_bytes());
1055 let body_2 = Body::new("Hello, Panda!".as_bytes());
1056
1057 let (hash_0, header_0, header_bytes_1_0) =
1059 create_operation(&private_key, &body_1, 0, 0, None);
1060 let (hash_1, header_1, header_bytes_1_1) =
1061 create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
1062 let (hash_2, header_2, header_bytes_1_2) =
1063 create_operation(&private_key, &body_1, 2, 200, Some(hash_1));
1064
1065 let mut store_1 = MemoryStore::default();
1067 store_1
1068 .insert_operation(
1069 hash_0,
1070 &header_0,
1071 Some(&body_1),
1072 &header_bytes_1_0,
1073 &log_id_1,
1074 )
1075 .await
1076 .unwrap();
1077 store_1
1078 .insert_operation(
1079 hash_1,
1080 &header_1,
1081 Some(&body_1),
1082 &header_bytes_1_1,
1083 &log_id_1,
1084 )
1085 .await
1086 .unwrap();
1087 store_1
1088 .insert_operation(
1089 hash_2,
1090 &header_2,
1091 Some(&body_1),
1092 &header_bytes_1_2,
1093 &log_id_1,
1094 )
1095 .await
1096 .unwrap();
1097
1098 let (hash_0, header_0, header_bytes_2_0) =
1100 create_operation(&private_key, &body_2, 0, 300, None);
1101 let (hash_1, header_1, header_bytes_2_1) =
1102 create_operation(&private_key, &body_2, 1, 400, Some(hash_0));
1103 let (hash_2, header_2, header_bytes_2_2) =
1104 create_operation(&private_key, &body_2, 2, 500, Some(hash_1));
1105
1106 let mut store_2 = MemoryStore::default();
1108 store_2
1109 .insert_operation(
1110 hash_0,
1111 &header_0,
1112 Some(&body_2),
1113 &header_bytes_2_0,
1114 &log_id_2,
1115 )
1116 .await
1117 .unwrap();
1118 store_2
1119 .insert_operation(
1120 hash_1,
1121 &header_1,
1122 Some(&body_2),
1123 &header_bytes_2_1,
1124 &log_id_2,
1125 )
1126 .await
1127 .unwrap();
1128 store_2
1129 .insert_operation(
1130 hash_2,
1131 &header_2,
1132 Some(&body_2),
1133 &header_bytes_2_2,
1134 &log_id_2,
1135 )
1136 .await
1137 .unwrap();
1138
1139 let topic_query = LogHeightTopic::new("messages");
1141 let logs = HashMap::from([(private_key.public_key(), vec![log_id_1, log_id_2])]);
1142 let mut topic_map = LogHeightTopicMap::new();
1143 topic_map.insert(&topic_query, logs);
1144
1145 let peer_a_protocol = Arc::new(LogSyncProtocol::new(topic_map.clone(), store_1.clone()));
1147 let peer_b_protocol = Arc::new(LogSyncProtocol::new(topic_map, store_2.clone()));
1148
1149 let (peer_a, peer_b) = tokio::io::duplex(64 * 1024);
1151 let (peer_a_read, peer_a_write) = tokio::io::split(peer_a);
1152 let (peer_b_read, peer_b_write) = tokio::io::split(peer_b);
1153
1154 let peer_a_protocol_clone = peer_a_protocol.clone();
1156 let (peer_a_app_tx, mut peer_a_app_rx) = mpsc::channel(128);
1157 let mut sink =
1158 PollSender::new(peer_a_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
1159 let topic_clone = topic_query.clone();
1160 let handle_1 = tokio::spawn(async move {
1161 peer_a_protocol_clone
1162 .initiate(
1163 topic_clone,
1164 Box::new(&mut peer_a_write.compat_write()),
1165 Box::new(&mut peer_a_read.compat()),
1166 Box::new(&mut sink),
1167 )
1168 .await
1169 .unwrap();
1170 });
1171
1172 let peer_b_protocol_clone = peer_b_protocol.clone();
1174 let (peer_b_app_tx, mut peer_b_app_rx) = mpsc::channel(128);
1175 let mut sink =
1176 PollSender::new(peer_b_app_tx).sink_map_err(|err| SyncError::Critical(err.to_string()));
1177 let handle_2 = tokio::spawn(async move {
1178 peer_b_protocol_clone
1179 .accept(
1180 Box::new(&mut peer_b_write.compat_write()),
1181 Box::new(&mut peer_b_read.compat()),
1182 Box::new(&mut sink),
1183 )
1184 .await
1185 .unwrap();
1186 });
1187
1188 let (_, _) = tokio::join!(handle_1, handle_2);
1190
1191 let peer_b_expected_messages = vec![
1193 FromSync::HandshakeSuccess(topic_query.clone()),
1194 FromSync::Data {
1195 header: header_bytes_1_0,
1196 payload: Some(body_1.to_bytes()),
1197 },
1198 FromSync::Data {
1199 header: header_bytes_1_1,
1200 payload: Some(body_1.to_bytes()),
1201 },
1202 FromSync::Data {
1203 header: header_bytes_1_2,
1204 payload: Some(body_1.to_bytes()),
1205 },
1206 ];
1207
1208 let mut peer_b_messages = Vec::new();
1209 peer_b_app_rx.recv_many(&mut peer_b_messages, 10).await;
1210 assert_eq!(peer_b_messages, peer_b_expected_messages);
1211
1212 let peer_a_expected_messages = vec![
1214 FromSync::HandshakeSuccess(topic_query.clone()),
1215 FromSync::Data {
1216 header: header_bytes_2_0,
1217 payload: Some(body_2.to_bytes()),
1218 },
1219 FromSync::Data {
1220 header: header_bytes_2_1,
1221 payload: Some(body_2.to_bytes()),
1222 },
1223 FromSync::Data {
1224 header: header_bytes_2_2,
1225 payload: Some(body_2.to_bytes()),
1226 },
1227 ];
1228
1229 let mut peer_a_messages = Vec::new();
1230 peer_a_app_rx.recv_many(&mut peer_a_messages, 10).await;
1231 assert_eq!(peer_a_messages, peer_a_expected_messages);
1232 }
1233}