1mod event_stream;
8mod session_map;
9
10pub use event_stream::ManagerEventStream;
11pub use session_map::SessionTopicMap;
12
13use std::collections::HashMap;
14use std::fmt::Debug;
15use std::hash::Hash as StdHash;
16use std::marker::PhantomData;
17use std::pin::Pin;
18
19use futures::channel::mpsc;
20use futures::future::ready;
21use futures::stream::SelectAll;
22use futures::{Sink, SinkExt, Stream, StreamExt};
23use p2panda_core::{Extensions, Operation, PublicKey};
24use p2panda_store::{LogId, LogStore, OperationStore};
25use serde::{Deserialize, Serialize};
26use thiserror::Error;
27use tokio::sync::broadcast;
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
30use tracing::debug;
31
32use crate::manager::event_stream::{ManagerEventStreamState, StreamDebug};
33use crate::protocols::{Logs, TopicLogSync, TopicLogSyncError, TopicLogSyncEvent};
34use crate::traits::{Manager, TopicMap};
35use crate::{FromSync, SessionConfig, ToSync};
36
37static CHANNEL_BUFFER: usize = 1028;
38
39pub type ToTopicSync<E> = ToSync<Operation<E>>;
40
41#[allow(clippy::type_complexity)]
51#[derive(Debug)]
52pub struct TopicSyncManager<T, S, M, L, E>
53where
54 T: Clone,
55 E: Extensions,
56{
57 topic_map: M,
58 store: S,
59 session_topic_map: SessionTopicMap<T, mpsc::Sender<ToTopicSync<E>>>,
60 from_session_tx: HashMap<(u64, PublicKey), broadcast::Sender<TopicLogSyncEvent<E>>>,
61 from_session_rx: HashMap<(u64, PublicKey), broadcast::Receiver<TopicLogSyncEvent<E>>>,
62 manager_tx: Vec<mpsc::Sender<SessionStream<T, E>>>,
63 _phantom: PhantomData<L>,
64}
65
66#[derive(Debug)]
67pub(crate) struct SessionStream<T, E>
68where
69 T: Clone,
70 E: Clone,
71{
72 pub session_id: u64,
73 pub topic: T,
74 pub remote: PublicKey,
75 pub event_rx: broadcast::Receiver<TopicLogSyncEvent<E>>,
76 pub live_tx: mpsc::Sender<ToTopicSync<E>>,
77}
78
79impl<T, S, M, L, E> TopicSyncManager<T, S, M, L, E>
80where
81 T: Clone,
82 E: Extensions,
83{
84 pub fn new(topic_map: M, store: S) -> Self {
85 Self {
86 topic_map,
87 store,
88 manager_tx: Default::default(),
89 session_topic_map: Default::default(),
90 from_session_tx: Default::default(),
91 from_session_rx: Default::default(),
92 _phantom: PhantomData,
93 }
94 }
95}
96
97impl<T, S, M, L, E> Manager<T> for TopicSyncManager<T, S, M, L, E>
98where
99 T: Clone + Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
100 S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
101 M: TopicMap<T, Logs<L>> + Send + 'static,
102 L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
103 E: Extensions + Send + 'static,
104{
105 type Protocol = TopicLogSync<T, S, M, L, E>;
106 type Args = TopicSyncManagerArgs<S, M>;
107 type Event = TopicLogSyncEvent<E>;
108 type Message = Operation<E>;
109 type Error = TopicSyncManagerError;
110
111 fn from_args(config: Self::Args) -> Self {
113 Self::new(config.topic_map, config.store)
114 }
115
116 async fn session(&mut self, session_id: u64, config: &SessionConfig<T>) -> Self::Protocol {
118 let (live_tx, live_rx) = mpsc::channel(CHANNEL_BUFFER);
119 let (event_tx, event_rx) = broadcast::channel::<Self::Event>(CHANNEL_BUFFER);
120
121 self.from_session_tx
122 .insert((session_id, config.remote), event_tx.clone());
123
124 self.from_session_rx
125 .insert((session_id, config.remote), event_rx);
126
127 self.session_topic_map
128 .insert_with_topic(session_id, config.topic.clone(), live_tx.clone());
129
130 for manager_tx in self.manager_tx.iter_mut() {
131 if manager_tx
132 .send(SessionStream {
133 session_id,
134 topic: config.topic.clone(),
135 remote: config.remote,
136 event_rx: event_tx.subscribe(),
137 live_tx: live_tx.clone(),
138 })
139 .await
140 .is_err()
141 {
142 debug!("manager handle dropped");
143 };
144 }
145
146 let live_rx = if config.live_mode {
147 Some(live_rx)
148 } else {
149 None
150 };
151
152 TopicLogSync::new(
153 config.topic.clone(),
154 self.store.clone(),
155 self.topic_map.clone(),
156 live_rx,
157 event_tx,
158 )
159 }
160
161 async fn session_handle(
163 &self,
164 session_id: u64,
165 ) -> Option<Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>> {
166 let map_fn = |to_sync: ToSync<Self::Message>| {
167 ready({
168 match to_sync {
169 ToSync::Payload(operation) => Ok::<_, Self::Error>(ToSync::Payload(operation)),
170 ToSync::Close => Ok::<_, Self::Error>(ToSync::Close),
171 }
172 })
173 };
174
175 self.session_topic_map.sender(session_id).map(|tx| {
176 Box::pin(tx.clone().with(map_fn))
177 as Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>
178 })
179 }
180
181 fn subscribe(&mut self) -> impl Stream<Item = FromSync<Self::Event>> + Send + Unpin + 'static {
183 let (manager_tx, manager_rx) = mpsc::channel(CHANNEL_BUFFER);
184 self.manager_tx.push(manager_tx);
185
186 let mut session_rx_set = SelectAll::new();
187 for ((id, remote), tx) in self.from_session_tx.iter() {
188 let session_id = *id;
189 let remote = *remote;
190 let stream = BroadcastStream::new(tx.subscribe());
191
192 #[allow(clippy::type_complexity)]
193 let stream: Pin<
194 Box<dyn StreamDebug<Option<FromSync<TopicLogSyncEvent<E>>>>>,
195 > = Box::pin(stream.map(Box::new(
196 move |event: Result<TopicLogSyncEvent<E>, BroadcastStreamRecvError>| {
197 event.ok().map(|event| FromSync {
198 session_id,
199 remote,
200 event,
201 })
202 },
203 )));
204 session_rx_set.push(stream);
205 }
206
207 let state = ManagerEventStreamState {
208 manager_rx,
209 session_rx_set,
210 session_topic_map: self.session_topic_map.clone(),
211 };
212
213 let stream = ManagerEventStream {
214 state: Some(state),
215 pending: Default::default(),
216 };
217
218 Box::pin(stream)
219 }
220}
221
222#[derive(Clone, Debug)]
224pub struct TopicSyncManagerArgs<S, M> {
225 pub store: S,
226 pub topic_map: M,
227}
228
229#[derive(Debug, Error)]
231pub enum TopicSyncManagerError {
232 #[error(transparent)]
233 TopicLogSync(#[from] TopicLogSyncError),
234
235 #[error("received operation before topic agreed")]
236 OperationBeforeTopic,
237
238 #[error(transparent)]
239 Send(#[from] mpsc::SendError),
240}
241
242#[cfg(test)]
243mod tests {
244 use std::collections::HashMap;
245 use std::time::Duration;
246
247 use assert_matches::assert_matches;
248 use futures::{SinkExt, StreamExt};
249 use p2panda_core::{Body, Operation};
250
251 use crate::manager::{TopicSyncManager, TopicSyncManagerArgs};
252 use crate::protocols::TopicLogSyncEvent;
253 use crate::test_utils::{
254 Peer, TestMemoryStore, TestTopic, TestTopicMap, TestTopicSyncEvent, TestTopicSyncManager,
255 drain_stream, run_protocol, setup_logging,
256 };
257 use crate::traits::Manager;
258 use crate::{FromSync, SessionConfig, ToSync};
259
260 #[test]
261 fn from_args() {
262 let store = TestMemoryStore::new();
263 let topic_map = TestTopicMap::new();
264 let config = TopicSyncManagerArgs { store, topic_map };
265 let _: TestTopicSyncManager = Manager::from_args(config);
266 }
267
268 #[tokio::test]
269 async fn manager_e2e() {
270 setup_logging();
271
272 const TOPIC_NAME: &str = "messages";
273 const LOG_ID: u64 = 0;
274 const SESSION_ID: u64 = 0;
275
276 let topic = TestTopic::new(TOPIC_NAME);
277
278 let mut peer_a = Peer::new(0);
280 let body = Body::new("Hello from Peer A".as_bytes());
281 let _ = peer_a.create_operation(&body, LOG_ID).await;
282 let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
283 peer_a.insert_topic(&topic, &logs);
284 let mut peer_a_manager =
285 TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
286
287 let mut peer_b = Peer::new(1);
289 let body = Body::new("Hello from Peer B".as_bytes());
290 let _ = peer_b.create_operation(&body, LOG_ID).await;
291 let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
292 peer_b.insert_topic(&topic, &logs);
293 let mut peer_b_manager =
294 TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
295
296 let config = SessionConfig {
298 topic,
299 remote: peer_b.id(),
300 live_mode: true,
301 };
302
303 let mut event_stream_a = peer_a_manager.subscribe();
305 let mut event_stream_b = peer_b_manager.subscribe();
306
307 let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
309
310 let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
312
313 let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
315
316 let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
318 peer_a_handle
319 .send(ToSync::Payload(Operation {
320 hash: header_1.hash(),
321 header: header_1.clone(),
322 body: Some(body.clone()),
323 }))
324 .await
325 .unwrap();
326 peer_a_handle.send(ToSync::Close).await.unwrap();
327
328 run_protocol(peer_a_session, peer_b_session).await.unwrap();
330
331 for index in 0..=7 {
333 let event = event_stream_a.next().await.unwrap();
334 assert_eq!(event.session_id(), 0);
335 match index {
336 0 => assert_matches!(
337 event,
338 FromSync {
339 event: TopicLogSyncEvent::SyncStarted(_),
340 ..
341 }
342 ),
343 1 | 2 => assert_matches!(
344 event,
345 FromSync {
346 event: TopicLogSyncEvent::SyncStatus(_),
347 ..
348 }
349 ),
350 3 => assert_matches!(
351 event,
352 FromSync {
353 event: TopicLogSyncEvent::Operation(_),
354 ..
355 }
356 ),
357 4 => assert_matches!(
358 event,
359 FromSync {
360 event: TopicLogSyncEvent::SyncFinished(_),
361 ..
362 }
363 ),
364 5 => assert_matches!(
365 event,
366 FromSync {
367 event: TopicLogSyncEvent::LiveModeStarted,
368 ..
369 }
370 ),
371 6 => assert_matches!(
372 event,
373 FromSync {
374 event: TopicLogSyncEvent::LiveModeFinished(_),
375 ..
376 }
377 ),
378 7 => assert_matches!(
379 event,
380 FromSync {
381 event: TopicLogSyncEvent::Success,
382 ..
383 }
384 ),
385 _ => panic!(),
386 }
387 }
388
389 for index in 0..=8 {
391 let event = event_stream_b.next().await.unwrap();
392 match index {
393 0 => assert_matches!(
394 event,
395 FromSync {
396 session_id: 0,
397 event: TopicLogSyncEvent::SyncStarted(_),
398 ..
399 }
400 ),
401 1 | 2 => assert_matches!(
402 event,
403 FromSync {
404 session_id: 0,
405 event: TopicLogSyncEvent::SyncStatus(_),
406 ..
407 }
408 ),
409 3 => assert_matches!(
410 event,
411 FromSync {
412 session_id: 0,
413 event: TopicLogSyncEvent::Operation(_),
414 ..
415 }
416 ),
417 4 => assert_matches!(
418 event,
419 FromSync {
420 session_id: 0,
421 event: TopicLogSyncEvent::SyncFinished(_),
422 ..
423 }
424 ),
425 5 => assert_matches!(
426 event,
427 FromSync {
428 event: TopicLogSyncEvent::LiveModeStarted,
429 ..
430 }
431 ),
432 6 => assert_matches!(
433 event,
434 FromSync {
435 session_id: 0,
436 event: TopicLogSyncEvent::Operation(_),
437 ..
438 }
439 ),
440 7 => assert_matches!(
441 event,
442 FromSync {
443 event: TopicLogSyncEvent::LiveModeFinished(_),
444 ..
445 }
446 ),
447 8 => assert_matches!(
448 event,
449 FromSync {
450 event: TopicLogSyncEvent::Success,
451 ..
452 }
453 ),
454 _ => panic!(),
455 }
456 }
457 }
458
459 #[tokio::test]
460 async fn live_mode_three_peer_forwarding() {
461 use std::collections::HashMap;
462
463 const TOPIC_NAME: &str = "chat";
464 const LOG_ID: u64 = 0;
465 const SESSION_AB: u64 = 0;
466 const SESSION_AC: u64 = 1;
467 const SESSION_BA: u64 = 2;
468 const SESSION_CA: u64 = 3;
469
470 let topic = TestTopic::new(TOPIC_NAME);
472
473 let mut peer_a = Peer::new(0);
475 let body_a = Body::new("Hello from A".as_bytes());
476 let (peer_a_header_0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
477 let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
478 peer_a.insert_topic(&topic, &logs);
479 let mut manager_a = TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
480
481 let mut peer_b = Peer::new(1);
483 let body_b = Body::new("Hello from B".as_bytes());
484 let (peer_b_header_0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
485 let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
486 peer_b.insert_topic(&topic, &logs);
487 let mut manager_b = TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
488
489 let mut peer_c = Peer::new(2);
491 let body_c = Body::new("Hello from C".as_bytes());
492 let (peer_c_header_0, _) = peer_c.create_operation(&body_c, LOG_ID).await;
493 let logs = HashMap::from([(peer_c.id(), vec![LOG_ID])]);
494 peer_c.insert_topic(&topic, &logs);
495 let mut manager_c = TopicSyncManager::new(peer_c.topic_map.clone(), peer_c.store.clone());
496
497 let mut config = SessionConfig {
499 topic: topic.clone(),
500 remote: peer_b.id(),
501 live_mode: true,
502 };
503 let session_ab = manager_a.session(SESSION_AB, &config).await;
504 config.remote = peer_a.id();
505 let session_b = manager_b.session(SESSION_BA, &config).await;
506
507 let mut config = SessionConfig {
509 topic: topic.clone(),
510 remote: peer_c.id(),
511 live_mode: true,
512 };
513 let session_ac = manager_a.session(SESSION_AC, &config).await;
514 config.remote = peer_a.id();
515 let session_c = manager_c.session(SESSION_CA, &config).await;
516
517 let mut event_stream_a = manager_a.subscribe();
518 let mut event_stream_b = manager_b.subscribe();
519 let mut event_stream_c = manager_c.subscribe();
520
521 tokio::spawn(async move {
523 run_protocol(session_ab, session_b).await.unwrap();
524 });
525 tokio::spawn(async move {
526 run_protocol(session_ac, session_c).await.unwrap();
527 });
528
529 let mut handle_ab = manager_a.session_handle(SESSION_AB).await.unwrap();
531 let mut handle_ac = manager_a.session_handle(SESSION_AC).await.unwrap();
532 let mut handle_ba = manager_b.session_handle(SESSION_BA).await.unwrap();
533 let mut handle_ca = manager_c.session_handle(SESSION_CA).await.unwrap();
534
535 let body_a = Body::new("Hello again from A".as_bytes());
536 let body_b = Body::new("Hello again from B".as_bytes());
537 let body_c = Body::new("Hello again from C".as_bytes());
538 let (peer_a_header_1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
539 let (peer_b_header_1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
540 let (peer_c_header_1, _) = peer_c.create_operation(&body_c, LOG_ID).await;
541
542 let operation_a = Operation {
543 hash: peer_a_header_1.hash(),
544 header: peer_a_header_1.clone(),
545 body: Some(body_a.clone()),
546 };
547 let operation_b = Operation {
548 hash: peer_b_header_1.hash(),
549 header: peer_b_header_1.clone(),
550 body: Some(body_b.clone()),
551 };
552 let operation_c = Operation {
553 hash: peer_c_header_1.hash(),
554 header: peer_c_header_1.clone(),
555 body: Some(body_c.clone()),
556 };
557
558 handle_ab
559 .send(ToSync::Payload(operation_a.clone()))
560 .await
561 .unwrap();
562 handle_ac.send(ToSync::Payload(operation_a)).await.unwrap();
563 handle_ba.send(ToSync::Payload(operation_b)).await.unwrap();
564 handle_ca.send(ToSync::Payload(operation_c)).await.unwrap();
565
566 let mut operations_a = vec![];
568 let mut operations_b = vec![];
569 let mut operations_c = vec![];
570 let _ = tokio::time::timeout(Duration::from_millis(500), async {
571 loop {
572 tokio::select! {
573 Some(event) = event_stream_a.next() => {
574 if let TestTopicSyncEvent::Operation(operation) = event.event() {
575 operations_a.push(*operation.clone());
576 }
577 }
578 Some(event) = event_stream_b.next() => {
579 if let TestTopicSyncEvent::Operation(operation) = event.event() {
580 operations_b.push(*operation.clone());
581 }
582 }
583 Some(event) = event_stream_c.next() => {
584 if let TestTopicSyncEvent::Operation(operation) = event.event() {
585 operations_c.push(*operation.clone());
586 }
587 }
588 else => tokio::time::sleep(Duration::from_millis(5)).await
589 }
590 }
591 })
592 .await;
593
594 assert_eq!(operations_a.len(), 4);
597 assert_eq!(operations_b.len(), 4);
598 assert_eq!(operations_c.len(), 4);
599 assert!(
600 operations_a
601 .iter()
602 .find(|operation| operation.header == peer_a_header_0
603 || operation.header == peer_a_header_1)
604 .is_none()
605 );
606 assert!(
607 operations_b
608 .iter()
609 .find(|operation| operation.header == peer_b_header_0
610 || operation.header == peer_b_header_1)
611 .is_none()
612 );
613 assert!(
614 operations_c
615 .iter()
616 .find(|operation| operation.header == peer_c_header_0
617 || operation.header == peer_c_header_1)
618 .is_none()
619 );
620 }
621
622 #[tokio::test]
623 async fn non_blocking_manager_stream() {
624 const TOPIC_NAME: &str = "messages";
625 const LOG_ID: u64 = 0;
626 const SESSION_ID: u64 = 0;
627
628 let topic = TestTopic::new(TOPIC_NAME);
629
630 let mut peer_a = Peer::new(0);
632 let body = Body::new("Hello from Peer A".as_bytes());
633 let _ = peer_a.create_operation(&body, LOG_ID).await;
634 let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
635 peer_a.insert_topic(&topic, &logs);
636 let mut peer_a_manager =
637 TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
638
639 let mut peer_a_stream = peer_a_manager.subscribe();
641 tokio::task::spawn(async move {
642 loop {
643 peer_a_stream.next().await;
644 }
645 });
646
647 let mut peer_b = Peer::new(1);
649 let body = Body::new("Hello from Peer B".as_bytes());
650 let _ = peer_b.create_operation(&body, LOG_ID).await;
651 let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
652 peer_b.insert_topic(&topic, &logs);
653 let mut peer_b_manager =
654 TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
655
656 let config = SessionConfig {
658 topic,
659 remote: peer_b.id(),
660 live_mode: true,
661 };
662
663 let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
664
665 let event_stream = peer_b_manager.subscribe();
667 let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
668
669 let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
671
672 let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
674 peer_a_handle
675 .send(ToSync::Payload(Operation {
676 hash: header_1.hash(),
677 header: header_1.clone(),
678 body: Some(body.clone()),
679 }))
680 .await
681 .unwrap();
682 peer_a_handle.send(ToSync::Close).await.unwrap();
683
684 run_protocol(peer_a_session, peer_b_session).await.unwrap();
686
687 let events = drain_stream(event_stream).await;
689 assert_eq!(events.len(), 9);
690 for (index, event) in events.into_iter().enumerate() {
691 match index {
692 0 => assert_matches!(
693 event,
694 FromSync {
695 session_id: 0,
696 event: TopicLogSyncEvent::SyncStarted(_),
697 ..
698 }
699 ),
700 1 | 2 => assert_matches!(
701 event,
702 FromSync {
703 session_id: 0,
704 event: TopicLogSyncEvent::SyncStatus(_),
705 ..
706 }
707 ),
708 3 => assert_matches!(
709 event,
710 FromSync {
711 session_id: 0,
712 event: TopicLogSyncEvent::Operation(_),
713 ..
714 }
715 ),
716 4 => assert_matches!(
717 event,
718 FromSync {
719 session_id: 0,
720 event: TopicLogSyncEvent::SyncFinished(_),
721 ..
722 }
723 ),
724 5 => assert_matches!(
725 event,
726 FromSync {
727 event: TopicLogSyncEvent::LiveModeStarted,
728 ..
729 }
730 ),
731 6 => assert_matches!(
732 event,
733 FromSync {
734 session_id: 0,
735 event: TopicLogSyncEvent::Operation(_),
736 ..
737 }
738 ),
739 7 => assert_matches!(
740 event,
741 FromSync {
742 event: TopicLogSyncEvent::LiveModeFinished(_),
743 ..
744 }
745 ),
746 8 => assert_matches!(
747 event,
748 FromSync {
749 event: TopicLogSyncEvent::Success,
750 ..
751 }
752 ),
753 _ => panic!(),
754 }
755 }
756 }
757}