p2panda_sync/manager/
mod.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Manager for initiating and orchestrating topic log sync sessions.
4//!
5//! Concurrently running sessions perform message forwarding with de-duplication. Events from all
6//! running sync sessions can be consumed via a single manager event stream.
7mod event_stream;
8mod session_map;
9
10pub use event_stream::ManagerEventStream;
11pub use session_map::SessionTopicMap;
12
13use std::collections::HashMap;
14use std::fmt::Debug;
15use std::hash::Hash as StdHash;
16use std::marker::PhantomData;
17use std::pin::Pin;
18
19use futures::channel::mpsc;
20use futures::future::ready;
21use futures::stream::SelectAll;
22use futures::{Sink, SinkExt, Stream, StreamExt};
23use p2panda_core::{Extensions, Operation, PublicKey};
24use p2panda_store::{LogId, LogStore, OperationStore};
25use serde::{Deserialize, Serialize};
26use thiserror::Error;
27use tokio::sync::broadcast;
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
30use tracing::debug;
31
32use crate::manager::event_stream::{ManagerEventStreamState, StreamDebug};
33use crate::protocols::{Logs, TopicLogSync, TopicLogSyncError, TopicLogSyncEvent};
34use crate::traits::{Manager, TopicMap};
35use crate::{FromSync, SessionConfig, ToSync};
36
37static CHANNEL_BUFFER: usize = 1028;
38
39pub type ToTopicSync<E> = ToSync<Operation<E>>;
40
41/// Create and manage topic log sync sessions.
42///
43/// Sync sessions are created via the manager, which instantiates them with access to the shared
44/// topic map and operation store as well as channels for receiving sync events and for sending
45/// newly arriving operations in live mode.
46///
47/// A handle can be acquired to a sync session via the session_handle method for sending any live
48/// mode operations to a specific session. It's expected that users map sessions (by their id) to
49/// any topic subscriptions in order to understand the correct mappings.  
50#[allow(clippy::type_complexity)]
51#[derive(Debug)]
52pub struct TopicSyncManager<T, S, M, L, E>
53where
54    T: Clone,
55    E: Extensions,
56{
57    topic_map: M,
58    store: S,
59    session_topic_map: SessionTopicMap<T, mpsc::Sender<ToTopicSync<E>>>,
60    from_session_tx: HashMap<(u64, PublicKey), broadcast::Sender<TopicLogSyncEvent<E>>>,
61    from_session_rx: HashMap<(u64, PublicKey), broadcast::Receiver<TopicLogSyncEvent<E>>>,
62    manager_tx: Vec<mpsc::Sender<SessionStream<T, E>>>,
63    _phantom: PhantomData<L>,
64}
65
66#[derive(Debug)]
67pub(crate) struct SessionStream<T, E>
68where
69    T: Clone,
70    E: Clone,
71{
72    pub session_id: u64,
73    pub topic: T,
74    pub remote: PublicKey,
75    pub event_rx: broadcast::Receiver<TopicLogSyncEvent<E>>,
76    pub live_tx: mpsc::Sender<ToTopicSync<E>>,
77}
78
79impl<T, S, M, L, E> TopicSyncManager<T, S, M, L, E>
80where
81    T: Clone,
82    E: Extensions,
83{
84    pub fn new(topic_map: M, store: S) -> Self {
85        Self {
86            topic_map,
87            store,
88            manager_tx: Default::default(),
89            session_topic_map: Default::default(),
90            from_session_tx: Default::default(),
91            from_session_rx: Default::default(),
92            _phantom: PhantomData,
93        }
94    }
95}
96
97impl<T, S, M, L, E> Manager<T> for TopicSyncManager<T, S, M, L, E>
98where
99    T: Clone + Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
100    S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
101    M: TopicMap<T, Logs<L>> + Send + 'static,
102    L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
103    E: Extensions + Send + 'static,
104{
105    type Protocol = TopicLogSync<T, S, M, L, E>;
106    type Args = TopicSyncManagerArgs<S, M>;
107    type Event = TopicLogSyncEvent<E>;
108    type Message = Operation<E>;
109    type Error = TopicSyncManagerError;
110
111    /// Instantiate a manager from arguments.
112    fn from_args(config: Self::Args) -> Self {
113        Self::new(config.topic_map, config.store)
114    }
115
116    /// Instantiate a new sync session.
117    async fn session(&mut self, session_id: u64, config: &SessionConfig<T>) -> Self::Protocol {
118        let (live_tx, live_rx) = mpsc::channel(CHANNEL_BUFFER);
119        let (event_tx, event_rx) = broadcast::channel::<Self::Event>(CHANNEL_BUFFER);
120
121        self.from_session_tx
122            .insert((session_id, config.remote), event_tx.clone());
123
124        self.from_session_rx
125            .insert((session_id, config.remote), event_rx);
126
127        self.session_topic_map
128            .insert_with_topic(session_id, config.topic.clone(), live_tx.clone());
129
130        for manager_tx in self.manager_tx.iter_mut() {
131            if manager_tx
132                .send(SessionStream {
133                    session_id,
134                    topic: config.topic.clone(),
135                    remote: config.remote,
136                    event_rx: event_tx.subscribe(),
137                    live_tx: live_tx.clone(),
138                })
139                .await
140                .is_err()
141            {
142                debug!("manager handle dropped");
143            };
144        }
145
146        let live_rx = if config.live_mode {
147            Some(live_rx)
148        } else {
149            None
150        };
151
152        TopicLogSync::new(
153            config.topic.clone(),
154            self.store.clone(),
155            self.topic_map.clone(),
156            live_rx,
157            event_tx,
158        )
159    }
160
161    /// Retrieve a handle to a running sync session.
162    async fn session_handle(
163        &self,
164        session_id: u64,
165    ) -> Option<Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>> {
166        let map_fn = |to_sync: ToSync<Self::Message>| {
167            ready({
168                match to_sync {
169                    ToSync::Payload(operation) => Ok::<_, Self::Error>(ToSync::Payload(operation)),
170                    ToSync::Close => Ok::<_, Self::Error>(ToSync::Close),
171                }
172            })
173        };
174
175        self.session_topic_map.sender(session_id).map(|tx| {
176            Box::pin(tx.clone().with(map_fn))
177                as Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>
178        })
179    }
180
181    /// Subscribe to the event stream for all running sync sessions.
182    fn subscribe(&mut self) -> impl Stream<Item = FromSync<Self::Event>> + Send + Unpin + 'static {
183        let (manager_tx, manager_rx) = mpsc::channel(CHANNEL_BUFFER);
184        self.manager_tx.push(manager_tx);
185
186        let mut session_rx_set = SelectAll::new();
187        for ((id, remote), tx) in self.from_session_tx.iter() {
188            let session_id = *id;
189            let remote = *remote;
190            let stream = BroadcastStream::new(tx.subscribe());
191
192            #[allow(clippy::type_complexity)]
193            let stream: Pin<
194                Box<dyn StreamDebug<Option<FromSync<TopicLogSyncEvent<E>>>>>,
195            > = Box::pin(stream.map(Box::new(
196                move |event: Result<TopicLogSyncEvent<E>, BroadcastStreamRecvError>| {
197                    event.ok().map(|event| FromSync {
198                        session_id,
199                        remote,
200                        event,
201                    })
202                },
203            )));
204            session_rx_set.push(stream);
205        }
206
207        let state = ManagerEventStreamState {
208            manager_rx,
209            session_rx_set,
210            session_topic_map: self.session_topic_map.clone(),
211        };
212
213        let stream = ManagerEventStream {
214            state: Some(state),
215            pending: Default::default(),
216        };
217
218        Box::pin(stream)
219    }
220}
221
222/// Configuration object for `TopicSyncManager`.
223#[derive(Clone, Debug)]
224pub struct TopicSyncManagerArgs<S, M> {
225    pub store: S,
226    pub topic_map: M,
227}
228
229/// Error types which can be returned from `TopicSyncManager`.
230#[derive(Debug, Error)]
231pub enum TopicSyncManagerError {
232    #[error(transparent)]
233    TopicLogSync(#[from] TopicLogSyncError),
234
235    #[error("received operation before topic agreed")]
236    OperationBeforeTopic,
237
238    #[error(transparent)]
239    Send(#[from] mpsc::SendError),
240}
241
242#[cfg(test)]
243mod tests {
244    use std::collections::HashMap;
245    use std::time::Duration;
246
247    use assert_matches::assert_matches;
248    use futures::{SinkExt, StreamExt};
249    use p2panda_core::{Body, Operation};
250
251    use crate::manager::{TopicSyncManager, TopicSyncManagerArgs};
252    use crate::protocols::TopicLogSyncEvent;
253    use crate::test_utils::{
254        Peer, TestMemoryStore, TestTopic, TestTopicMap, TestTopicSyncEvent, TestTopicSyncManager,
255        drain_stream, run_protocol, setup_logging,
256    };
257    use crate::traits::Manager;
258    use crate::{FromSync, SessionConfig, ToSync};
259
260    #[test]
261    fn from_args() {
262        let store = TestMemoryStore::new();
263        let topic_map = TestTopicMap::new();
264        let config = TopicSyncManagerArgs { store, topic_map };
265        let _: TestTopicSyncManager = Manager::from_args(config);
266    }
267
268    #[tokio::test]
269    async fn manager_e2e() {
270        setup_logging();
271
272        const TOPIC_NAME: &str = "messages";
273        const LOG_ID: u64 = 0;
274        const SESSION_ID: u64 = 0;
275
276        let topic = TestTopic::new(TOPIC_NAME);
277
278        // Setup Peer A
279        let mut peer_a = Peer::new(0);
280        let body = Body::new("Hello from Peer A".as_bytes());
281        let _ = peer_a.create_operation(&body, LOG_ID).await;
282        let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
283        peer_a.insert_topic(&topic, &logs);
284        let mut peer_a_manager =
285            TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
286
287        // Setup Peer B
288        let mut peer_b = Peer::new(1);
289        let body = Body::new("Hello from Peer B".as_bytes());
290        let _ = peer_b.create_operation(&body, LOG_ID).await;
291        let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
292        peer_b.insert_topic(&topic, &logs);
293        let mut peer_b_manager =
294            TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
295
296        // Instantiate sync session for Peer A.
297        let config = SessionConfig {
298            topic,
299            remote: peer_b.id(),
300            live_mode: true,
301        };
302
303        // Subscribe to both managers.
304        let mut event_stream_a = peer_a_manager.subscribe();
305        let mut event_stream_b = peer_b_manager.subscribe();
306
307        // Instantiate sync session for Peer A.
308        let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
309
310        // Instantiate sync session for Peer B.
311        let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
312
313        // Get a handle to Peer A sync session.
314        let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
315
316        // Create and send a new live-mode message.
317        let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
318        peer_a_handle
319            .send(ToSync::Payload(Operation {
320                hash: header_1.hash(),
321                header: header_1.clone(),
322                body: Some(body.clone()),
323            }))
324            .await
325            .unwrap();
326        peer_a_handle.send(ToSync::Close).await.unwrap();
327
328        // Actually run the protocol.
329        run_protocol(peer_a_session, peer_b_session).await.unwrap();
330
331        // Assert Peer A's events.
332        for index in 0..=7 {
333            let event = event_stream_a.next().await.unwrap();
334            assert_eq!(event.session_id(), 0);
335            match index {
336                0 => assert_matches!(
337                    event,
338                    FromSync {
339                        event: TopicLogSyncEvent::SyncStarted(_),
340                        ..
341                    }
342                ),
343                1 | 2 => assert_matches!(
344                    event,
345                    FromSync {
346                        event: TopicLogSyncEvent::SyncStatus(_),
347                        ..
348                    }
349                ),
350                3 => assert_matches!(
351                    event,
352                    FromSync {
353                        event: TopicLogSyncEvent::Operation(_),
354                        ..
355                    }
356                ),
357                4 => assert_matches!(
358                    event,
359                    FromSync {
360                        event: TopicLogSyncEvent::SyncFinished(_),
361                        ..
362                    }
363                ),
364                5 => assert_matches!(
365                    event,
366                    FromSync {
367                        event: TopicLogSyncEvent::LiveModeStarted,
368                        ..
369                    }
370                ),
371                6 => assert_matches!(
372                    event,
373                    FromSync {
374                        event: TopicLogSyncEvent::LiveModeFinished(_),
375                        ..
376                    }
377                ),
378                7 => assert_matches!(
379                    event,
380                    FromSync {
381                        event: TopicLogSyncEvent::Success,
382                        ..
383                    }
384                ),
385                _ => panic!(),
386            }
387        }
388
389        // Assert Peer B's events.
390        for index in 0..=8 {
391            let event = event_stream_b.next().await.unwrap();
392            match index {
393                0 => assert_matches!(
394                    event,
395                    FromSync {
396                        session_id: 0,
397                        event: TopicLogSyncEvent::SyncStarted(_),
398                        ..
399                    }
400                ),
401                1 | 2 => assert_matches!(
402                    event,
403                    FromSync {
404                        session_id: 0,
405                        event: TopicLogSyncEvent::SyncStatus(_),
406                        ..
407                    }
408                ),
409                3 => assert_matches!(
410                    event,
411                    FromSync {
412                        session_id: 0,
413                        event: TopicLogSyncEvent::Operation(_),
414                        ..
415                    }
416                ),
417                4 => assert_matches!(
418                    event,
419                    FromSync {
420                        session_id: 0,
421                        event: TopicLogSyncEvent::SyncFinished(_),
422                        ..
423                    }
424                ),
425                5 => assert_matches!(
426                    event,
427                    FromSync {
428                        event: TopicLogSyncEvent::LiveModeStarted,
429                        ..
430                    }
431                ),
432                6 => assert_matches!(
433                    event,
434                    FromSync {
435                        session_id: 0,
436                        event: TopicLogSyncEvent::Operation(_),
437                        ..
438                    }
439                ),
440                7 => assert_matches!(
441                    event,
442                    FromSync {
443                        event: TopicLogSyncEvent::LiveModeFinished(_),
444                        ..
445                    }
446                ),
447                8 => assert_matches!(
448                    event,
449                    FromSync {
450                        event: TopicLogSyncEvent::Success,
451                        ..
452                    }
453                ),
454                _ => panic!(),
455            }
456        }
457    }
458
459    #[tokio::test]
460    async fn live_mode_three_peer_forwarding() {
461        use std::collections::HashMap;
462
463        const TOPIC_NAME: &str = "chat";
464        const LOG_ID: u64 = 0;
465        const SESSION_AB: u64 = 0;
466        const SESSION_AC: u64 = 1;
467        const SESSION_BA: u64 = 2;
468        const SESSION_CA: u64 = 3;
469
470        // Shared topic
471        let topic = TestTopic::new(TOPIC_NAME);
472
473        // Peer A
474        let mut peer_a = Peer::new(0);
475        let body_a = Body::new("Hello from A".as_bytes());
476        let (peer_a_header_0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
477        let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
478        peer_a.insert_topic(&topic, &logs);
479        let mut manager_a = TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
480
481        // Peer B
482        let mut peer_b = Peer::new(1);
483        let body_b = Body::new("Hello from B".as_bytes());
484        let (peer_b_header_0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
485        let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
486        peer_b.insert_topic(&topic, &logs);
487        let mut manager_b = TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
488
489        // Peer C
490        let mut peer_c = Peer::new(2);
491        let body_c = Body::new("Hello from C".as_bytes());
492        let (peer_c_header_0, _) = peer_c.create_operation(&body_c, LOG_ID).await;
493        let logs = HashMap::from([(peer_c.id(), vec![LOG_ID])]);
494        peer_c.insert_topic(&topic, &logs);
495        let mut manager_c = TopicSyncManager::new(peer_c.topic_map.clone(), peer_c.store.clone());
496
497        // Session A -> B (A initiates)
498        let mut config = SessionConfig {
499            topic: topic.clone(),
500            remote: peer_b.id(),
501            live_mode: true,
502        };
503        let session_ab = manager_a.session(SESSION_AB, &config).await;
504        config.remote = peer_a.id();
505        let session_b = manager_b.session(SESSION_BA, &config).await;
506
507        // Session A -> C (A initiates)
508        let mut config = SessionConfig {
509            topic: topic.clone(),
510            remote: peer_c.id(),
511            live_mode: true,
512        };
513        let session_ac = manager_a.session(SESSION_AC, &config).await;
514        config.remote = peer_a.id();
515        let session_c = manager_c.session(SESSION_CA, &config).await;
516
517        let mut event_stream_a = manager_a.subscribe();
518        let mut event_stream_b = manager_b.subscribe();
519        let mut event_stream_c = manager_c.subscribe();
520
521        // Run both protocols concurrently
522        tokio::spawn(async move {
523            run_protocol(session_ab, session_b).await.unwrap();
524        });
525        tokio::spawn(async move {
526            run_protocol(session_ac, session_c).await.unwrap();
527        });
528
529        // Send live-mode messages from all peers
530        let mut handle_ab = manager_a.session_handle(SESSION_AB).await.unwrap();
531        let mut handle_ac = manager_a.session_handle(SESSION_AC).await.unwrap();
532        let mut handle_ba = manager_b.session_handle(SESSION_BA).await.unwrap();
533        let mut handle_ca = manager_c.session_handle(SESSION_CA).await.unwrap();
534
535        let body_a = Body::new("Hello again from A".as_bytes());
536        let body_b = Body::new("Hello again from B".as_bytes());
537        let body_c = Body::new("Hello again from C".as_bytes());
538        let (peer_a_header_1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
539        let (peer_b_header_1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
540        let (peer_c_header_1, _) = peer_c.create_operation(&body_c, LOG_ID).await;
541
542        let operation_a = Operation {
543            hash: peer_a_header_1.hash(),
544            header: peer_a_header_1.clone(),
545            body: Some(body_a.clone()),
546        };
547        let operation_b = Operation {
548            hash: peer_b_header_1.hash(),
549            header: peer_b_header_1.clone(),
550            body: Some(body_b.clone()),
551        };
552        let operation_c = Operation {
553            hash: peer_c_header_1.hash(),
554            header: peer_c_header_1.clone(),
555            body: Some(body_c.clone()),
556        };
557
558        handle_ab
559            .send(ToSync::Payload(operation_a.clone()))
560            .await
561            .unwrap();
562        handle_ac.send(ToSync::Payload(operation_a)).await.unwrap();
563        handle_ba.send(ToSync::Payload(operation_b)).await.unwrap();
564        handle_ca.send(ToSync::Payload(operation_c)).await.unwrap();
565
566        // Collect all operations each peer receives on the event stream.
567        let mut operations_a = vec![];
568        let mut operations_b = vec![];
569        let mut operations_c = vec![];
570        let _ = tokio::time::timeout(Duration::from_millis(500), async {
571            loop {
572                tokio::select! {
573                    Some(event) = event_stream_a.next() => {
574                        if let TestTopicSyncEvent::Operation(operation) = event.event() {
575                            operations_a.push(*operation.clone());
576                        }
577                    }
578                    Some(event) = event_stream_b.next() => {
579                        if let TestTopicSyncEvent::Operation(operation) = event.event() {
580                            operations_b.push(*operation.clone());
581                        }
582                    }
583                    Some(event) = event_stream_c.next() => {
584                        if let TestTopicSyncEvent::Operation(operation) = event.event() {
585                            operations_c.push(*operation.clone());
586                        }
587                    }
588                    else => tokio::time::sleep(Duration::from_millis(5)).await
589                }
590            }
591        })
592        .await;
593
594        // All peers received 4 messages, B & C received each other messages via A, and nobody
595        // received their own messages.
596        assert_eq!(operations_a.len(), 4);
597        assert_eq!(operations_b.len(), 4);
598        assert_eq!(operations_c.len(), 4);
599        assert!(
600            operations_a
601                .iter()
602                .find(|operation| operation.header == peer_a_header_0
603                    || operation.header == peer_a_header_1)
604                .is_none()
605        );
606        assert!(
607            operations_b
608                .iter()
609                .find(|operation| operation.header == peer_b_header_0
610                    || operation.header == peer_b_header_1)
611                .is_none()
612        );
613        assert!(
614            operations_c
615                .iter()
616                .find(|operation| operation.header == peer_c_header_0
617                    || operation.header == peer_c_header_1)
618                .is_none()
619        );
620    }
621
622    #[tokio::test]
623    async fn non_blocking_manager_stream() {
624        const TOPIC_NAME: &str = "messages";
625        const LOG_ID: u64 = 0;
626        const SESSION_ID: u64 = 0;
627
628        let topic = TestTopic::new(TOPIC_NAME);
629
630        // Setup Peer A
631        let mut peer_a = Peer::new(0);
632        let body = Body::new("Hello from Peer A".as_bytes());
633        let _ = peer_a.create_operation(&body, LOG_ID).await;
634        let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
635        peer_a.insert_topic(&topic, &logs);
636        let mut peer_a_manager =
637            TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
638
639        // Spawn a task polling peer a's manager stream.
640        let mut peer_a_stream = peer_a_manager.subscribe();
641        tokio::task::spawn(async move {
642            loop {
643                peer_a_stream.next().await;
644            }
645        });
646
647        // Setup Peer B
648        let mut peer_b = Peer::new(1);
649        let body = Body::new("Hello from Peer B".as_bytes());
650        let _ = peer_b.create_operation(&body, LOG_ID).await;
651        let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
652        peer_b.insert_topic(&topic, &logs);
653        let mut peer_b_manager =
654            TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
655
656        // Instantiate sync session for Peer A.
657        let config = SessionConfig {
658            topic,
659            remote: peer_b.id(),
660            live_mode: true,
661        };
662
663        let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
664
665        // Instantiate sync session for Peer B.
666        let event_stream = peer_b_manager.subscribe();
667        let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
668
669        // Get a handle to Peer A sync session.
670        let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
671
672        // Create and send a new live-mode message.
673        let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
674        peer_a_handle
675            .send(ToSync::Payload(Operation {
676                hash: header_1.hash(),
677                header: header_1.clone(),
678                body: Some(body.clone()),
679            }))
680            .await
681            .unwrap();
682        peer_a_handle.send(ToSync::Close).await.unwrap();
683
684        // Actually run the protocol.
685        run_protocol(peer_a_session, peer_b_session).await.unwrap();
686
687        // Assert Peer B's events.
688        let events = drain_stream(event_stream).await;
689        assert_eq!(events.len(), 9);
690        for (index, event) in events.into_iter().enumerate() {
691            match index {
692                0 => assert_matches!(
693                    event,
694                    FromSync {
695                        session_id: 0,
696                        event: TopicLogSyncEvent::SyncStarted(_),
697                        ..
698                    }
699                ),
700                1 | 2 => assert_matches!(
701                    event,
702                    FromSync {
703                        session_id: 0,
704                        event: TopicLogSyncEvent::SyncStatus(_),
705                        ..
706                    }
707                ),
708                3 => assert_matches!(
709                    event,
710                    FromSync {
711                        session_id: 0,
712                        event: TopicLogSyncEvent::Operation(_),
713                        ..
714                    }
715                ),
716                4 => assert_matches!(
717                    event,
718                    FromSync {
719                        session_id: 0,
720                        event: TopicLogSyncEvent::SyncFinished(_),
721                        ..
722                    }
723                ),
724                5 => assert_matches!(
725                    event,
726                    FromSync {
727                        event: TopicLogSyncEvent::LiveModeStarted,
728                        ..
729                    }
730                ),
731                6 => assert_matches!(
732                    event,
733                    FromSync {
734                        session_id: 0,
735                        event: TopicLogSyncEvent::Operation(_),
736                        ..
737                    }
738                ),
739                7 => assert_matches!(
740                    event,
741                    FromSync {
742                        event: TopicLogSyncEvent::LiveModeFinished(_),
743                        ..
744                    }
745                ),
746                8 => assert_matches!(
747                    event,
748                    FromSync {
749                        event: TopicLogSyncEvent::Success,
750                        ..
751                    }
752                ),
753                _ => panic!(),
754            }
755        }
756    }
757}