1mod event_stream;
8mod session_map;
9
10pub use event_stream::ManagerEventStream;
11pub use session_map::SessionTopicMap;
12
13use std::collections::HashMap;
14use std::fmt::Debug;
15use std::hash::Hash as StdHash;
16use std::marker::PhantomData;
17use std::pin::Pin;
18
19use futures::channel::mpsc;
20use futures::future::ready;
21use futures::stream::SelectAll;
22use futures::{Sink, SinkExt, Stream, StreamExt};
23use p2panda_core::{Extensions, Operation, PublicKey};
24use p2panda_store::{LogId, LogStore, OperationStore};
25use serde::{Deserialize, Serialize};
26use thiserror::Error;
27use tokio::sync::broadcast;
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
30use tracing::debug;
31
32use crate::manager::event_stream::{ManagerEventStreamState, StreamDebug};
33use crate::protocols::{Logs, TopicLogSync, TopicLogSyncError, TopicLogSyncEvent};
34use crate::traits::{Manager, TopicMap};
35use crate::{FromSync, SessionConfig, ToSync};
36
37static CHANNEL_BUFFER: usize = 1028;
38
39pub type ToTopicSync<E> = ToSync<Operation<E>>;
40
41#[allow(clippy::type_complexity)]
51#[derive(Debug)]
52pub struct TopicSyncManager<T, S, M, L, E>
53where
54 T: Clone,
55 E: Extensions,
56{
57 topic_map: M,
58 store: S,
59 session_topic_map: SessionTopicMap<T, mpsc::Sender<ToTopicSync<E>>>,
60 from_session_tx: HashMap<(u64, PublicKey), broadcast::Sender<TopicLogSyncEvent<E>>>,
61 from_session_rx: HashMap<(u64, PublicKey), broadcast::Receiver<TopicLogSyncEvent<E>>>,
62 manager_tx: Vec<mpsc::Sender<SessionStream<T, E>>>,
63 _phantom: PhantomData<L>,
64}
65
66#[derive(Debug)]
67pub(crate) struct SessionStream<T, E>
68where
69 T: Clone,
70 E: Clone,
71{
72 pub session_id: u64,
73 pub topic: T,
74 pub remote: PublicKey,
75 pub event_rx: broadcast::Receiver<TopicLogSyncEvent<E>>,
76 pub live_tx: mpsc::Sender<ToTopicSync<E>>,
77}
78
79impl<T, S, M, L, E> TopicSyncManager<T, S, M, L, E>
80where
81 T: Clone,
82 E: Extensions,
83{
84 pub fn new(topic_map: M, store: S) -> Self {
85 Self {
86 topic_map,
87 store,
88 manager_tx: Default::default(),
89 session_topic_map: Default::default(),
90 from_session_tx: Default::default(),
91 from_session_rx: Default::default(),
92 _phantom: PhantomData,
93 }
94 }
95}
96
97impl<T, S, M, L, E> Manager<T> for TopicSyncManager<T, S, M, L, E>
98where
99 T: Clone + Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
100 S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
101 M: TopicMap<T, Logs<L>> + Send + 'static,
102 L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
103 E: Extensions + Send + 'static,
104{
105 type Protocol = TopicLogSync<T, S, M, L, E>;
106 type Args = TopicSyncManagerArgs<S, M>;
107 type Event = TopicLogSyncEvent<E>;
108 type Message = Operation<E>;
109 type Error = TopicSyncManagerError;
110
111 fn from_args(config: Self::Args) -> Self {
113 Self::new(config.topic_map, config.store)
114 }
115
116 async fn session(&mut self, session_id: u64, config: &SessionConfig<T>) -> Self::Protocol {
118 let (live_tx, live_rx) = mpsc::channel(CHANNEL_BUFFER);
119 let (event_tx, event_rx) = broadcast::channel::<Self::Event>(CHANNEL_BUFFER);
120
121 self.from_session_tx
122 .insert((session_id, config.remote), event_tx.clone());
123
124 self.from_session_rx
125 .insert((session_id, config.remote), event_rx);
126
127 self.session_topic_map
128 .insert_with_topic(session_id, config.topic.clone(), live_tx.clone());
129
130 for manager_tx in self.manager_tx.iter_mut() {
131 if let Err(err) = manager_tx
132 .send(SessionStream {
133 session_id,
134 topic: config.topic.clone(),
135 remote: config.remote,
136 event_rx: event_tx.subscribe(),
137 live_tx: live_tx.clone(),
138 })
139 .await
140 {
141 debug!("manager handle dropped: {err:?}");
142 };
143 }
144
145 let live_rx = if config.live_mode {
146 Some(live_rx)
147 } else {
148 None
149 };
150
151 TopicLogSync::new(
152 config.topic.clone(),
153 self.store.clone(),
154 self.topic_map.clone(),
155 live_rx,
156 event_tx,
157 )
158 }
159
160 async fn session_handle(
162 &self,
163 session_id: u64,
164 ) -> Option<Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>> {
165 let map_fn = |to_sync: ToSync<Self::Message>| {
166 ready({
167 match to_sync {
168 ToSync::Payload(operation) => Ok::<_, Self::Error>(ToSync::Payload(operation)),
169 ToSync::Close => Ok::<_, Self::Error>(ToSync::Close),
170 }
171 })
172 };
173
174 self.session_topic_map.sender(session_id).map(|tx| {
175 Box::pin(tx.clone().with(map_fn))
176 as Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>
177 })
178 }
179
180 fn subscribe(&mut self) -> impl Stream<Item = FromSync<Self::Event>> + Send + Unpin + 'static {
182 let (manager_tx, manager_rx) = mpsc::channel(CHANNEL_BUFFER);
183 self.manager_tx.push(manager_tx);
184
185 let mut session_rx_set = SelectAll::new();
186 for ((id, remote), tx) in self.from_session_tx.iter() {
187 let session_id = *id;
188 let remote = *remote;
189 let stream = BroadcastStream::new(tx.subscribe());
190
191 #[allow(clippy::type_complexity)]
192 let stream: Pin<
193 Box<dyn StreamDebug<Option<FromSync<TopicLogSyncEvent<E>>>>>,
194 > = Box::pin(stream.map(Box::new(
195 move |event: Result<TopicLogSyncEvent<E>, BroadcastStreamRecvError>| {
196 event.ok().map(|event| FromSync {
197 session_id,
198 remote,
199 event,
200 })
201 },
202 )));
203 session_rx_set.push(stream);
204 }
205
206 let state = ManagerEventStreamState {
207 manager_rx,
208 session_rx_set,
209 session_topic_map: self.session_topic_map.clone(),
210 };
211
212 let stream = ManagerEventStream {
213 state: Some(state),
214 pending: Default::default(),
215 };
216
217 Box::pin(stream)
218 }
219}
220
221#[derive(Clone, Debug)]
223pub struct TopicSyncManagerArgs<S, M> {
224 pub store: S,
225 pub topic_map: M,
226}
227
228#[derive(Debug, Error)]
230pub enum TopicSyncManagerError {
231 #[error(transparent)]
232 TopicLogSync(#[from] TopicLogSyncError),
233
234 #[error("received operation before topic agreed")]
235 OperationBeforeTopic,
236
237 #[error(transparent)]
238 Send(#[from] mpsc::SendError),
239}
240
241#[cfg(test)]
242mod tests {
243 use std::collections::HashMap;
244 use std::time::Duration;
245
246 use assert_matches::assert_matches;
247 use futures::{SinkExt, StreamExt};
248 use p2panda_core::{Body, Operation};
249
250 use crate::manager::{TopicSyncManager, TopicSyncManagerArgs};
251 use crate::protocols::TopicLogSyncEvent;
252 use crate::test_utils::{
253 Peer, TestMemoryStore, TestTopic, TestTopicMap, TestTopicSyncEvent, TestTopicSyncManager,
254 drain_stream, run_protocol, setup_logging,
255 };
256 use crate::traits::Manager;
257 use crate::{FromSync, SessionConfig, ToSync};
258
259 #[test]
260 fn from_args() {
261 let store = TestMemoryStore::new();
262 let topic_map = TestTopicMap::new();
263 let config = TopicSyncManagerArgs { store, topic_map };
264 let _: TestTopicSyncManager = Manager::from_args(config);
265 }
266
267 #[tokio::test]
268 async fn manager_e2e() {
269 setup_logging();
270
271 const TOPIC_NAME: &str = "messages";
272 const LOG_ID: u64 = 0;
273 const SESSION_ID: u64 = 0;
274
275 let topic = TestTopic::new(TOPIC_NAME);
276
277 let mut peer_a = Peer::new(0);
279 let body = Body::new("Hello from Peer A".as_bytes());
280 let _ = peer_a.create_operation(&body, LOG_ID).await;
281 let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
282 peer_a.insert_topic(&topic, &logs);
283 let mut peer_a_manager =
284 TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
285
286 let mut peer_b = Peer::new(1);
288 let body = Body::new("Hello from Peer B".as_bytes());
289 let _ = peer_b.create_operation(&body, LOG_ID).await;
290 let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
291 peer_b.insert_topic(&topic, &logs);
292 let mut peer_b_manager =
293 TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
294
295 let config = SessionConfig {
297 topic,
298 remote: peer_b.id(),
299 live_mode: true,
300 };
301
302 let mut event_stream_a = peer_a_manager.subscribe();
304 let mut event_stream_b = peer_b_manager.subscribe();
305
306 let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
308
309 let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
311
312 let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
314
315 let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
317 peer_a_handle
318 .send(ToSync::Payload(Operation {
319 hash: header_1.hash(),
320 header: header_1.clone(),
321 body: Some(body.clone()),
322 }))
323 .await
324 .unwrap();
325 peer_a_handle.send(ToSync::Close).await.unwrap();
326
327 run_protocol(peer_a_session, peer_b_session).await.unwrap();
329
330 for index in 0..=7 {
332 let event = event_stream_a.next().await.unwrap();
333 assert_eq!(event.session_id(), 0);
334 match index {
335 0 => assert_matches!(
336 event,
337 FromSync {
338 event: TopicLogSyncEvent::SyncStarted(_),
339 ..
340 }
341 ),
342 1 | 2 => assert_matches!(
343 event,
344 FromSync {
345 event: TopicLogSyncEvent::SyncStatus(_),
346 ..
347 }
348 ),
349 3 => assert_matches!(
350 event,
351 FromSync {
352 event: TopicLogSyncEvent::Operation(_),
353 ..
354 }
355 ),
356 4 => assert_matches!(
357 event,
358 FromSync {
359 event: TopicLogSyncEvent::SyncFinished(_),
360 ..
361 }
362 ),
363 5 => assert_matches!(
364 event,
365 FromSync {
366 event: TopicLogSyncEvent::LiveModeStarted,
367 ..
368 }
369 ),
370 6 => assert_matches!(
371 event,
372 FromSync {
373 event: TopicLogSyncEvent::LiveModeFinished(_),
374 ..
375 }
376 ),
377 7 => assert_matches!(
378 event,
379 FromSync {
380 event: TopicLogSyncEvent::Success,
381 ..
382 }
383 ),
384 _ => panic!(),
385 }
386 }
387
388 for index in 0..=8 {
390 let event = event_stream_b.next().await.unwrap();
391 match index {
392 0 => assert_matches!(
393 event,
394 FromSync {
395 session_id: 0,
396 event: TopicLogSyncEvent::SyncStarted(_),
397 ..
398 }
399 ),
400 1 | 2 => assert_matches!(
401 event,
402 FromSync {
403 session_id: 0,
404 event: TopicLogSyncEvent::SyncStatus(_),
405 ..
406 }
407 ),
408 3 => assert_matches!(
409 event,
410 FromSync {
411 session_id: 0,
412 event: TopicLogSyncEvent::Operation(_),
413 ..
414 }
415 ),
416 4 => assert_matches!(
417 event,
418 FromSync {
419 session_id: 0,
420 event: TopicLogSyncEvent::SyncFinished(_),
421 ..
422 }
423 ),
424 5 => assert_matches!(
425 event,
426 FromSync {
427 event: TopicLogSyncEvent::LiveModeStarted,
428 ..
429 }
430 ),
431 6 => assert_matches!(
432 event,
433 FromSync {
434 session_id: 0,
435 event: TopicLogSyncEvent::Operation(_),
436 ..
437 }
438 ),
439 7 => assert_matches!(
440 event,
441 FromSync {
442 event: TopicLogSyncEvent::LiveModeFinished(_),
443 ..
444 }
445 ),
446 8 => assert_matches!(
447 event,
448 FromSync {
449 event: TopicLogSyncEvent::Success,
450 ..
451 }
452 ),
453 _ => panic!(),
454 }
455 }
456 }
457
458 #[tokio::test]
459 async fn live_mode_three_peer_forwarding() {
460 setup_logging();
461
462 const TOPIC_NAME: &str = "chat";
463 const LOG_ID: u64 = 0;
464 const SESSION_AB: u64 = 0;
465 const SESSION_AC: u64 = 1;
466 const SESSION_BA: u64 = 2;
467 const SESSION_CA: u64 = 3;
468
469 let topic = TestTopic::new(TOPIC_NAME);
471
472 let mut peer_a = Peer::new(0);
474 let body_a = Body::new("Hello from A".as_bytes());
475 let (peer_a_header_0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
476 let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
477 peer_a.insert_topic(&topic, &logs);
478 let mut manager_a = TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
479
480 let mut peer_b = Peer::new(1);
482 let body_b = Body::new("Hello from B".as_bytes());
483 let (peer_b_header_0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
484 let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
485 peer_b.insert_topic(&topic, &logs);
486 let mut manager_b = TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
487
488 let mut peer_c = Peer::new(2);
490 let body_c = Body::new("Hello from C".as_bytes());
491 let (peer_c_header_0, _) = peer_c.create_operation(&body_c, LOG_ID).await;
492 let logs = HashMap::from([(peer_c.id(), vec![LOG_ID])]);
493 peer_c.insert_topic(&topic, &logs);
494 let mut manager_c = TopicSyncManager::new(peer_c.topic_map.clone(), peer_c.store.clone());
495
496 let mut config = SessionConfig {
498 topic: topic.clone(),
499 remote: peer_b.id(),
500 live_mode: true,
501 };
502 let session_ab = manager_a.session(SESSION_AB, &config).await;
503 config.remote = peer_a.id();
504 let session_b = manager_b.session(SESSION_BA, &config).await;
505
506 let mut config = SessionConfig {
508 topic: topic.clone(),
509 remote: peer_c.id(),
510 live_mode: true,
511 };
512 let session_ac = manager_a.session(SESSION_AC, &config).await;
513 config.remote = peer_a.id();
514 let session_c = manager_c.session(SESSION_CA, &config).await;
515
516 let mut event_stream_a = manager_a.subscribe();
517 let mut event_stream_b = manager_b.subscribe();
518 let mut event_stream_c = manager_c.subscribe();
519
520 tokio::spawn(async move {
522 run_protocol(session_ab, session_b).await.unwrap();
523 });
524 tokio::spawn(async move {
525 run_protocol(session_ac, session_c).await.unwrap();
526 });
527
528 let mut handle_ab = manager_a.session_handle(SESSION_AB).await.unwrap();
530 let mut handle_ac = manager_a.session_handle(SESSION_AC).await.unwrap();
531 let mut handle_ba = manager_b.session_handle(SESSION_BA).await.unwrap();
532 let mut handle_ca = manager_c.session_handle(SESSION_CA).await.unwrap();
533
534 let body_a = Body::new("Hello again from A".as_bytes());
535 let body_b = Body::new("Hello again from B".as_bytes());
536 let body_c = Body::new("Hello again from C".as_bytes());
537 let (peer_a_header_1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
538 let (peer_b_header_1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
539 let (peer_c_header_1, _) = peer_c.create_operation(&body_c, LOG_ID).await;
540
541 let operation_a = Operation {
542 hash: peer_a_header_1.hash(),
543 header: peer_a_header_1.clone(),
544 body: Some(body_a.clone()),
545 };
546 let operation_b = Operation {
547 hash: peer_b_header_1.hash(),
548 header: peer_b_header_1.clone(),
549 body: Some(body_b.clone()),
550 };
551 let operation_c = Operation {
552 hash: peer_c_header_1.hash(),
553 header: peer_c_header_1.clone(),
554 body: Some(body_c.clone()),
555 };
556
557 handle_ab
558 .send(ToSync::Payload(operation_a.clone()))
559 .await
560 .unwrap();
561 handle_ac.send(ToSync::Payload(operation_a)).await.unwrap();
562 handle_ba.send(ToSync::Payload(operation_b)).await.unwrap();
563 handle_ca.send(ToSync::Payload(operation_c)).await.unwrap();
564
565 let mut operations_a = vec![];
567 let mut operations_b = vec![];
568 let mut operations_c = vec![];
569 let _ = tokio::time::timeout(Duration::from_millis(500), async {
570 loop {
571 tokio::select! {
572 Some(event) = event_stream_a.next() => {
573 if let TestTopicSyncEvent::Operation(operation) = event.event() {
574 operations_a.push(*operation.clone());
575 }
576 }
577 Some(event) = event_stream_b.next() => {
578 if let TestTopicSyncEvent::Operation(operation) = event.event() {
579 operations_b.push(*operation.clone());
580 }
581 }
582 Some(event) = event_stream_c.next() => {
583 if let TestTopicSyncEvent::Operation(operation) = event.event() {
584 operations_c.push(*operation.clone());
585 }
586 }
587 else => tokio::time::sleep(Duration::from_millis(5)).await
588 }
589 }
590 })
591 .await;
592
593 assert_eq!(operations_a.len(), 4);
596 assert_eq!(operations_b.len(), 4);
597 assert_eq!(operations_c.len(), 4);
598 assert!(
599 operations_a
600 .iter()
601 .find(|operation| operation.header == peer_a_header_0
602 || operation.header == peer_a_header_1)
603 .is_none()
604 );
605 assert!(
606 operations_b
607 .iter()
608 .find(|operation| operation.header == peer_b_header_0
609 || operation.header == peer_b_header_1)
610 .is_none()
611 );
612 assert!(
613 operations_c
614 .iter()
615 .find(|operation| operation.header == peer_c_header_0
616 || operation.header == peer_c_header_1)
617 .is_none()
618 );
619 }
620
621 #[tokio::test]
622 async fn non_blocking_manager_stream() {
623 const TOPIC_NAME: &str = "messages";
624 const LOG_ID: u64 = 0;
625 const SESSION_ID: u64 = 0;
626
627 let topic = TestTopic::new(TOPIC_NAME);
628
629 let mut peer_a = Peer::new(0);
631 let body = Body::new("Hello from Peer A".as_bytes());
632 let _ = peer_a.create_operation(&body, LOG_ID).await;
633 let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
634 peer_a.insert_topic(&topic, &logs);
635 let mut peer_a_manager =
636 TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
637
638 let mut peer_a_stream = peer_a_manager.subscribe();
640 tokio::task::spawn(async move {
641 loop {
642 peer_a_stream.next().await;
643 }
644 });
645
646 let mut peer_b = Peer::new(1);
648 let body = Body::new("Hello from Peer B".as_bytes());
649 let _ = peer_b.create_operation(&body, LOG_ID).await;
650 let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
651 peer_b.insert_topic(&topic, &logs);
652 let mut peer_b_manager =
653 TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
654
655 let config = SessionConfig {
657 topic,
658 remote: peer_b.id(),
659 live_mode: true,
660 };
661
662 let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
663
664 let event_stream = peer_b_manager.subscribe();
666 let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
667
668 let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
670
671 let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
673 peer_a_handle
674 .send(ToSync::Payload(Operation {
675 hash: header_1.hash(),
676 header: header_1.clone(),
677 body: Some(body.clone()),
678 }))
679 .await
680 .unwrap();
681 peer_a_handle.send(ToSync::Close).await.unwrap();
682
683 run_protocol(peer_a_session, peer_b_session).await.unwrap();
685
686 let events = drain_stream(event_stream).await;
688 assert_eq!(events.len(), 9);
689 for (index, event) in events.into_iter().enumerate() {
690 match index {
691 0 => assert_matches!(
692 event,
693 FromSync {
694 session_id: 0,
695 event: TopicLogSyncEvent::SyncStarted(_),
696 ..
697 }
698 ),
699 1 | 2 => assert_matches!(
700 event,
701 FromSync {
702 session_id: 0,
703 event: TopicLogSyncEvent::SyncStatus(_),
704 ..
705 }
706 ),
707 3 => assert_matches!(
708 event,
709 FromSync {
710 session_id: 0,
711 event: TopicLogSyncEvent::Operation(_),
712 ..
713 }
714 ),
715 4 => assert_matches!(
716 event,
717 FromSync {
718 session_id: 0,
719 event: TopicLogSyncEvent::SyncFinished(_),
720 ..
721 }
722 ),
723 5 => assert_matches!(
724 event,
725 FromSync {
726 event: TopicLogSyncEvent::LiveModeStarted,
727 ..
728 }
729 ),
730 6 => assert_matches!(
731 event,
732 FromSync {
733 session_id: 0,
734 event: TopicLogSyncEvent::Operation(_),
735 ..
736 }
737 ),
738 7 => assert_matches!(
739 event,
740 FromSync {
741 event: TopicLogSyncEvent::LiveModeFinished(_),
742 ..
743 }
744 ),
745 8 => assert_matches!(
746 event,
747 FromSync {
748 event: TopicLogSyncEvent::Success,
749 ..
750 }
751 ),
752 _ => panic!(),
753 }
754 }
755 }
756}