Skip to main content

p2panda_sync/manager/
mod.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Manager for initiating and orchestrating topic log sync sessions.
4//!
5//! Concurrently running sessions perform message forwarding with de-duplication. Events from all
6//! running sync sessions can be consumed via a single manager event stream.
7mod event_stream;
8mod session_map;
9
10pub use event_stream::ManagerEventStream;
11pub use session_map::SessionTopicMap;
12
13use std::collections::HashMap;
14use std::fmt::Debug;
15use std::hash::Hash as StdHash;
16use std::marker::PhantomData;
17use std::pin::Pin;
18
19use futures::channel::mpsc;
20use futures::future::ready;
21use futures::stream::SelectAll;
22use futures::{Sink, SinkExt, Stream, StreamExt};
23use p2panda_core::{Extensions, Operation, PublicKey};
24use p2panda_store::{LogId, LogStore, OperationStore};
25use serde::{Deserialize, Serialize};
26use thiserror::Error;
27use tokio::sync::broadcast;
28use tokio_stream::wrappers::BroadcastStream;
29use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
30use tracing::debug;
31
32use crate::manager::event_stream::{ManagerEventStreamState, StreamDebug};
33use crate::protocols::{Logs, TopicLogSync, TopicLogSyncError, TopicLogSyncEvent};
34use crate::traits::{Manager, TopicMap};
35use crate::{FromSync, SessionConfig, ToSync};
36
37static CHANNEL_BUFFER: usize = 1028;
38
39pub type ToTopicSync<E> = ToSync<Operation<E>>;
40
41/// Create and manage topic log sync sessions.
42///
43/// Sync sessions are created via the manager, which instantiates them with access to the shared
44/// topic map and operation store as well as channels for receiving sync events and for sending
45/// newly arriving operations in live mode.
46///
47/// A handle can be acquired to a sync session via the session_handle method for sending any live
48/// mode operations to a specific session. It's expected that users map sessions (by their id) to
49/// any topic subscriptions in order to understand the correct mappings.  
50#[allow(clippy::type_complexity)]
51#[derive(Debug)]
52pub struct TopicSyncManager<T, S, M, L, E>
53where
54    T: Clone,
55    E: Extensions,
56{
57    topic_map: M,
58    store: S,
59    session_topic_map: SessionTopicMap<T, mpsc::Sender<ToTopicSync<E>>>,
60    from_session_tx: HashMap<(u64, PublicKey), broadcast::Sender<TopicLogSyncEvent<E>>>,
61    from_session_rx: HashMap<(u64, PublicKey), broadcast::Receiver<TopicLogSyncEvent<E>>>,
62    manager_tx: Vec<mpsc::Sender<SessionStream<T, E>>>,
63    _phantom: PhantomData<L>,
64}
65
66#[derive(Debug)]
67pub(crate) struct SessionStream<T, E>
68where
69    T: Clone,
70    E: Clone,
71{
72    pub session_id: u64,
73    pub topic: T,
74    pub remote: PublicKey,
75    pub event_rx: broadcast::Receiver<TopicLogSyncEvent<E>>,
76    pub live_tx: mpsc::Sender<ToTopicSync<E>>,
77}
78
79impl<T, S, M, L, E> TopicSyncManager<T, S, M, L, E>
80where
81    T: Clone,
82    E: Extensions,
83{
84    pub fn new(topic_map: M, store: S) -> Self {
85        Self {
86            topic_map,
87            store,
88            manager_tx: Default::default(),
89            session_topic_map: Default::default(),
90            from_session_tx: Default::default(),
91            from_session_rx: Default::default(),
92            _phantom: PhantomData,
93        }
94    }
95}
96
97impl<T, S, M, L, E> Manager<T> for TopicSyncManager<T, S, M, L, E>
98where
99    T: Clone + Debug + Eq + StdHash + Serialize + for<'a> Deserialize<'a> + Send + 'static,
100    S: LogStore<L, E> + OperationStore<L, E> + Send + 'static,
101    M: TopicMap<T, Logs<L>> + Send + 'static,
102    L: LogId + for<'de> Deserialize<'de> + Serialize + Send + 'static,
103    E: Extensions + Send + 'static,
104{
105    type Protocol = TopicLogSync<T, S, M, L, E>;
106    type Args = TopicSyncManagerArgs<S, M>;
107    type Event = TopicLogSyncEvent<E>;
108    type Message = Operation<E>;
109    type Error = TopicSyncManagerError;
110
111    /// Instantiate a manager from arguments.
112    fn from_args(config: Self::Args) -> Self {
113        Self::new(config.topic_map, config.store)
114    }
115
116    /// Instantiate a new sync session.
117    async fn session(&mut self, session_id: u64, config: &SessionConfig<T>) -> Self::Protocol {
118        let (live_tx, live_rx) = mpsc::channel(CHANNEL_BUFFER);
119        let (event_tx, event_rx) = broadcast::channel::<Self::Event>(CHANNEL_BUFFER);
120
121        self.from_session_tx
122            .insert((session_id, config.remote), event_tx.clone());
123
124        self.from_session_rx
125            .insert((session_id, config.remote), event_rx);
126
127        self.session_topic_map
128            .insert_with_topic(session_id, config.topic.clone(), live_tx.clone());
129
130        for manager_tx in self.manager_tx.iter_mut() {
131            if let Err(err) = manager_tx
132                .send(SessionStream {
133                    session_id,
134                    topic: config.topic.clone(),
135                    remote: config.remote,
136                    event_rx: event_tx.subscribe(),
137                    live_tx: live_tx.clone(),
138                })
139                .await
140            {
141                debug!("manager handle dropped: {err:?}");
142            };
143        }
144
145        let live_rx = if config.live_mode {
146            Some(live_rx)
147        } else {
148            None
149        };
150
151        TopicLogSync::new(
152            config.topic.clone(),
153            self.store.clone(),
154            self.topic_map.clone(),
155            live_rx,
156            event_tx,
157        )
158    }
159
160    /// Retrieve a handle to a running sync session.
161    async fn session_handle(
162        &self,
163        session_id: u64,
164    ) -> Option<Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>> {
165        let map_fn = |to_sync: ToSync<Self::Message>| {
166            ready({
167                match to_sync {
168                    ToSync::Payload(operation) => Ok::<_, Self::Error>(ToSync::Payload(operation)),
169                    ToSync::Close => Ok::<_, Self::Error>(ToSync::Close),
170                }
171            })
172        };
173
174        self.session_topic_map.sender(session_id).map(|tx| {
175            Box::pin(tx.clone().with(map_fn))
176                as Pin<Box<dyn Sink<ToTopicSync<E>, Error = Self::Error>>>
177        })
178    }
179
180    /// Subscribe to the event stream for all running sync sessions.
181    fn subscribe(&mut self) -> impl Stream<Item = FromSync<Self::Event>> + Send + Unpin + 'static {
182        let (manager_tx, manager_rx) = mpsc::channel(CHANNEL_BUFFER);
183        self.manager_tx.push(manager_tx);
184
185        let mut session_rx_set = SelectAll::new();
186        for ((id, remote), tx) in self.from_session_tx.iter() {
187            let session_id = *id;
188            let remote = *remote;
189            let stream = BroadcastStream::new(tx.subscribe());
190
191            #[allow(clippy::type_complexity)]
192            let stream: Pin<
193                Box<dyn StreamDebug<Option<FromSync<TopicLogSyncEvent<E>>>>>,
194            > = Box::pin(stream.map(Box::new(
195                move |event: Result<TopicLogSyncEvent<E>, BroadcastStreamRecvError>| {
196                    event.ok().map(|event| FromSync {
197                        session_id,
198                        remote,
199                        event,
200                    })
201                },
202            )));
203            session_rx_set.push(stream);
204        }
205
206        let state = ManagerEventStreamState {
207            manager_rx,
208            session_rx_set,
209            session_topic_map: self.session_topic_map.clone(),
210        };
211
212        let stream = ManagerEventStream {
213            state: Some(state),
214            pending: Default::default(),
215        };
216
217        Box::pin(stream)
218    }
219}
220
221/// Configuration object for `TopicSyncManager`.
222#[derive(Clone, Debug)]
223pub struct TopicSyncManagerArgs<S, M> {
224    pub store: S,
225    pub topic_map: M,
226}
227
228/// Error types which can be returned from `TopicSyncManager`.
229#[derive(Debug, Error)]
230pub enum TopicSyncManagerError {
231    #[error(transparent)]
232    TopicLogSync(#[from] TopicLogSyncError),
233
234    #[error("received operation before topic agreed")]
235    OperationBeforeTopic,
236
237    #[error(transparent)]
238    Send(#[from] mpsc::SendError),
239}
240
241#[cfg(test)]
242mod tests {
243    use std::collections::HashMap;
244    use std::time::Duration;
245
246    use assert_matches::assert_matches;
247    use futures::{SinkExt, StreamExt};
248    use p2panda_core::{Body, Operation};
249
250    use crate::manager::{TopicSyncManager, TopicSyncManagerArgs};
251    use crate::protocols::TopicLogSyncEvent;
252    use crate::test_utils::{
253        Peer, TestMemoryStore, TestTopic, TestTopicMap, TestTopicSyncEvent, TestTopicSyncManager,
254        drain_stream, run_protocol, setup_logging,
255    };
256    use crate::traits::Manager;
257    use crate::{FromSync, SessionConfig, ToSync};
258
259    #[test]
260    fn from_args() {
261        let store = TestMemoryStore::new();
262        let topic_map = TestTopicMap::new();
263        let config = TopicSyncManagerArgs { store, topic_map };
264        let _: TestTopicSyncManager = Manager::from_args(config);
265    }
266
267    #[tokio::test]
268    async fn manager_e2e() {
269        setup_logging();
270
271        const TOPIC_NAME: &str = "messages";
272        const LOG_ID: u64 = 0;
273        const SESSION_ID: u64 = 0;
274
275        let topic = TestTopic::new(TOPIC_NAME);
276
277        // Setup Peer A
278        let mut peer_a = Peer::new(0);
279        let body = Body::new("Hello from Peer A".as_bytes());
280        let _ = peer_a.create_operation(&body, LOG_ID).await;
281        let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
282        peer_a.insert_topic(&topic, &logs);
283        let mut peer_a_manager =
284            TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
285
286        // Setup Peer B
287        let mut peer_b = Peer::new(1);
288        let body = Body::new("Hello from Peer B".as_bytes());
289        let _ = peer_b.create_operation(&body, LOG_ID).await;
290        let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
291        peer_b.insert_topic(&topic, &logs);
292        let mut peer_b_manager =
293            TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
294
295        // Instantiate sync session for Peer A.
296        let config = SessionConfig {
297            topic,
298            remote: peer_b.id(),
299            live_mode: true,
300        };
301
302        // Subscribe to both managers.
303        let mut event_stream_a = peer_a_manager.subscribe();
304        let mut event_stream_b = peer_b_manager.subscribe();
305
306        // Instantiate sync session for Peer A.
307        let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
308
309        // Instantiate sync session for Peer B.
310        let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
311
312        // Get a handle to Peer A sync session.
313        let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
314
315        // Create and send a new live-mode message.
316        let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
317        peer_a_handle
318            .send(ToSync::Payload(Operation {
319                hash: header_1.hash(),
320                header: header_1.clone(),
321                body: Some(body.clone()),
322            }))
323            .await
324            .unwrap();
325        peer_a_handle.send(ToSync::Close).await.unwrap();
326
327        // Actually run the protocol.
328        run_protocol(peer_a_session, peer_b_session).await.unwrap();
329
330        // Assert Peer A's events.
331        for index in 0..=7 {
332            let event = event_stream_a.next().await.unwrap();
333            assert_eq!(event.session_id(), 0);
334            match index {
335                0 => assert_matches!(
336                    event,
337                    FromSync {
338                        event: TopicLogSyncEvent::SyncStarted(_),
339                        ..
340                    }
341                ),
342                1 | 2 => assert_matches!(
343                    event,
344                    FromSync {
345                        event: TopicLogSyncEvent::SyncStatus(_),
346                        ..
347                    }
348                ),
349                3 => assert_matches!(
350                    event,
351                    FromSync {
352                        event: TopicLogSyncEvent::Operation(_),
353                        ..
354                    }
355                ),
356                4 => assert_matches!(
357                    event,
358                    FromSync {
359                        event: TopicLogSyncEvent::SyncFinished(_),
360                        ..
361                    }
362                ),
363                5 => assert_matches!(
364                    event,
365                    FromSync {
366                        event: TopicLogSyncEvent::LiveModeStarted,
367                        ..
368                    }
369                ),
370                6 => assert_matches!(
371                    event,
372                    FromSync {
373                        event: TopicLogSyncEvent::LiveModeFinished(_),
374                        ..
375                    }
376                ),
377                7 => assert_matches!(
378                    event,
379                    FromSync {
380                        event: TopicLogSyncEvent::Success,
381                        ..
382                    }
383                ),
384                _ => panic!(),
385            }
386        }
387
388        // Assert Peer B's events.
389        for index in 0..=8 {
390            let event = event_stream_b.next().await.unwrap();
391            match index {
392                0 => assert_matches!(
393                    event,
394                    FromSync {
395                        session_id: 0,
396                        event: TopicLogSyncEvent::SyncStarted(_),
397                        ..
398                    }
399                ),
400                1 | 2 => assert_matches!(
401                    event,
402                    FromSync {
403                        session_id: 0,
404                        event: TopicLogSyncEvent::SyncStatus(_),
405                        ..
406                    }
407                ),
408                3 => assert_matches!(
409                    event,
410                    FromSync {
411                        session_id: 0,
412                        event: TopicLogSyncEvent::Operation(_),
413                        ..
414                    }
415                ),
416                4 => assert_matches!(
417                    event,
418                    FromSync {
419                        session_id: 0,
420                        event: TopicLogSyncEvent::SyncFinished(_),
421                        ..
422                    }
423                ),
424                5 => assert_matches!(
425                    event,
426                    FromSync {
427                        event: TopicLogSyncEvent::LiveModeStarted,
428                        ..
429                    }
430                ),
431                6 => assert_matches!(
432                    event,
433                    FromSync {
434                        session_id: 0,
435                        event: TopicLogSyncEvent::Operation(_),
436                        ..
437                    }
438                ),
439                7 => assert_matches!(
440                    event,
441                    FromSync {
442                        event: TopicLogSyncEvent::LiveModeFinished(_),
443                        ..
444                    }
445                ),
446                8 => assert_matches!(
447                    event,
448                    FromSync {
449                        event: TopicLogSyncEvent::Success,
450                        ..
451                    }
452                ),
453                _ => panic!(),
454            }
455        }
456    }
457
458    #[tokio::test]
459    async fn live_mode_three_peer_forwarding() {
460        setup_logging();
461
462        const TOPIC_NAME: &str = "chat";
463        const LOG_ID: u64 = 0;
464        const SESSION_AB: u64 = 0;
465        const SESSION_AC: u64 = 1;
466        const SESSION_BA: u64 = 2;
467        const SESSION_CA: u64 = 3;
468
469        // Shared topic
470        let topic = TestTopic::new(TOPIC_NAME);
471
472        // Peer A
473        let mut peer_a = Peer::new(0);
474        let body_a = Body::new("Hello from A".as_bytes());
475        let (peer_a_header_0, _) = peer_a.create_operation(&body_a, LOG_ID).await;
476        let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
477        peer_a.insert_topic(&topic, &logs);
478        let mut manager_a = TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
479
480        // Peer B
481        let mut peer_b = Peer::new(1);
482        let body_b = Body::new("Hello from B".as_bytes());
483        let (peer_b_header_0, _) = peer_b.create_operation(&body_b, LOG_ID).await;
484        let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
485        peer_b.insert_topic(&topic, &logs);
486        let mut manager_b = TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
487
488        // Peer C
489        let mut peer_c = Peer::new(2);
490        let body_c = Body::new("Hello from C".as_bytes());
491        let (peer_c_header_0, _) = peer_c.create_operation(&body_c, LOG_ID).await;
492        let logs = HashMap::from([(peer_c.id(), vec![LOG_ID])]);
493        peer_c.insert_topic(&topic, &logs);
494        let mut manager_c = TopicSyncManager::new(peer_c.topic_map.clone(), peer_c.store.clone());
495
496        // Session A -> B (A initiates)
497        let mut config = SessionConfig {
498            topic: topic.clone(),
499            remote: peer_b.id(),
500            live_mode: true,
501        };
502        let session_ab = manager_a.session(SESSION_AB, &config).await;
503        config.remote = peer_a.id();
504        let session_b = manager_b.session(SESSION_BA, &config).await;
505
506        // Session A -> C (A initiates)
507        let mut config = SessionConfig {
508            topic: topic.clone(),
509            remote: peer_c.id(),
510            live_mode: true,
511        };
512        let session_ac = manager_a.session(SESSION_AC, &config).await;
513        config.remote = peer_a.id();
514        let session_c = manager_c.session(SESSION_CA, &config).await;
515
516        let mut event_stream_a = manager_a.subscribe();
517        let mut event_stream_b = manager_b.subscribe();
518        let mut event_stream_c = manager_c.subscribe();
519
520        // Run both protocols concurrently
521        tokio::spawn(async move {
522            run_protocol(session_ab, session_b).await.unwrap();
523        });
524        tokio::spawn(async move {
525            run_protocol(session_ac, session_c).await.unwrap();
526        });
527
528        // Send live-mode messages from all peers
529        let mut handle_ab = manager_a.session_handle(SESSION_AB).await.unwrap();
530        let mut handle_ac = manager_a.session_handle(SESSION_AC).await.unwrap();
531        let mut handle_ba = manager_b.session_handle(SESSION_BA).await.unwrap();
532        let mut handle_ca = manager_c.session_handle(SESSION_CA).await.unwrap();
533
534        let body_a = Body::new("Hello again from A".as_bytes());
535        let body_b = Body::new("Hello again from B".as_bytes());
536        let body_c = Body::new("Hello again from C".as_bytes());
537        let (peer_a_header_1, _) = peer_a.create_operation(&body_a, LOG_ID).await;
538        let (peer_b_header_1, _) = peer_b.create_operation(&body_b, LOG_ID).await;
539        let (peer_c_header_1, _) = peer_c.create_operation(&body_c, LOG_ID).await;
540
541        let operation_a = Operation {
542            hash: peer_a_header_1.hash(),
543            header: peer_a_header_1.clone(),
544            body: Some(body_a.clone()),
545        };
546        let operation_b = Operation {
547            hash: peer_b_header_1.hash(),
548            header: peer_b_header_1.clone(),
549            body: Some(body_b.clone()),
550        };
551        let operation_c = Operation {
552            hash: peer_c_header_1.hash(),
553            header: peer_c_header_1.clone(),
554            body: Some(body_c.clone()),
555        };
556
557        handle_ab
558            .send(ToSync::Payload(operation_a.clone()))
559            .await
560            .unwrap();
561        handle_ac.send(ToSync::Payload(operation_a)).await.unwrap();
562        handle_ba.send(ToSync::Payload(operation_b)).await.unwrap();
563        handle_ca.send(ToSync::Payload(operation_c)).await.unwrap();
564
565        // Collect all operations each peer receives on the event stream.
566        let mut operations_a = vec![];
567        let mut operations_b = vec![];
568        let mut operations_c = vec![];
569        let _ = tokio::time::timeout(Duration::from_millis(500), async {
570            loop {
571                tokio::select! {
572                    Some(event) = event_stream_a.next() => {
573                        if let TestTopicSyncEvent::Operation(operation) = event.event() {
574                            operations_a.push(*operation.clone());
575                        }
576                    }
577                    Some(event) = event_stream_b.next() => {
578                        if let TestTopicSyncEvent::Operation(operation) = event.event() {
579                            operations_b.push(*operation.clone());
580                        }
581                    }
582                    Some(event) = event_stream_c.next() => {
583                        if let TestTopicSyncEvent::Operation(operation) = event.event() {
584                            operations_c.push(*operation.clone());
585                        }
586                    }
587                    else => tokio::time::sleep(Duration::from_millis(5)).await
588                }
589            }
590        })
591        .await;
592
593        // All peers received 4 messages, B & C received each other messages via A, and nobody
594        // received their own messages.
595        assert_eq!(operations_a.len(), 4);
596        assert_eq!(operations_b.len(), 4);
597        assert_eq!(operations_c.len(), 4);
598        assert!(
599            operations_a
600                .iter()
601                .find(|operation| operation.header == peer_a_header_0
602                    || operation.header == peer_a_header_1)
603                .is_none()
604        );
605        assert!(
606            operations_b
607                .iter()
608                .find(|operation| operation.header == peer_b_header_0
609                    || operation.header == peer_b_header_1)
610                .is_none()
611        );
612        assert!(
613            operations_c
614                .iter()
615                .find(|operation| operation.header == peer_c_header_0
616                    || operation.header == peer_c_header_1)
617                .is_none()
618        );
619    }
620
621    #[tokio::test]
622    async fn non_blocking_manager_stream() {
623        const TOPIC_NAME: &str = "messages";
624        const LOG_ID: u64 = 0;
625        const SESSION_ID: u64 = 0;
626
627        let topic = TestTopic::new(TOPIC_NAME);
628
629        // Setup Peer A
630        let mut peer_a = Peer::new(0);
631        let body = Body::new("Hello from Peer A".as_bytes());
632        let _ = peer_a.create_operation(&body, LOG_ID).await;
633        let logs = HashMap::from([(peer_a.id(), vec![LOG_ID])]);
634        peer_a.insert_topic(&topic, &logs);
635        let mut peer_a_manager =
636            TopicSyncManager::new(peer_a.topic_map.clone(), peer_a.store.clone());
637
638        // Spawn a task polling peer a's manager stream.
639        let mut peer_a_stream = peer_a_manager.subscribe();
640        tokio::task::spawn(async move {
641            loop {
642                peer_a_stream.next().await;
643            }
644        });
645
646        // Setup Peer B
647        let mut peer_b = Peer::new(1);
648        let body = Body::new("Hello from Peer B".as_bytes());
649        let _ = peer_b.create_operation(&body, LOG_ID).await;
650        let logs = HashMap::from([(peer_b.id(), vec![LOG_ID])]);
651        peer_b.insert_topic(&topic, &logs);
652        let mut peer_b_manager =
653            TopicSyncManager::new(peer_b.topic_map.clone(), peer_b.store.clone());
654
655        // Instantiate sync session for Peer A.
656        let config = SessionConfig {
657            topic,
658            remote: peer_b.id(),
659            live_mode: true,
660        };
661
662        let peer_a_session = peer_a_manager.session(SESSION_ID, &config).await;
663
664        // Instantiate sync session for Peer B.
665        let event_stream = peer_b_manager.subscribe();
666        let peer_b_session = peer_b_manager.session(SESSION_ID, &config).await;
667
668        // Get a handle to Peer A sync session.
669        let mut peer_a_handle = peer_a_manager.session_handle(SESSION_ID).await.unwrap();
670
671        // Create and send a new live-mode message.
672        let (header_1, _) = peer_a.create_operation_no_insert(&body, LOG_ID).await;
673        peer_a_handle
674            .send(ToSync::Payload(Operation {
675                hash: header_1.hash(),
676                header: header_1.clone(),
677                body: Some(body.clone()),
678            }))
679            .await
680            .unwrap();
681        peer_a_handle.send(ToSync::Close).await.unwrap();
682
683        // Actually run the protocol.
684        run_protocol(peer_a_session, peer_b_session).await.unwrap();
685
686        // Assert Peer B's events.
687        let events = drain_stream(event_stream).await;
688        assert_eq!(events.len(), 9);
689        for (index, event) in events.into_iter().enumerate() {
690            match index {
691                0 => assert_matches!(
692                    event,
693                    FromSync {
694                        session_id: 0,
695                        event: TopicLogSyncEvent::SyncStarted(_),
696                        ..
697                    }
698                ),
699                1 | 2 => assert_matches!(
700                    event,
701                    FromSync {
702                        session_id: 0,
703                        event: TopicLogSyncEvent::SyncStatus(_),
704                        ..
705                    }
706                ),
707                3 => assert_matches!(
708                    event,
709                    FromSync {
710                        session_id: 0,
711                        event: TopicLogSyncEvent::Operation(_),
712                        ..
713                    }
714                ),
715                4 => assert_matches!(
716                    event,
717                    FromSync {
718                        session_id: 0,
719                        event: TopicLogSyncEvent::SyncFinished(_),
720                        ..
721                    }
722                ),
723                5 => assert_matches!(
724                    event,
725                    FromSync {
726                        event: TopicLogSyncEvent::LiveModeStarted,
727                        ..
728                    }
729                ),
730                6 => assert_matches!(
731                    event,
732                    FromSync {
733                        session_id: 0,
734                        event: TopicLogSyncEvent::Operation(_),
735                        ..
736                    }
737                ),
738                7 => assert_matches!(
739                    event,
740                    FromSync {
741                        event: TopicLogSyncEvent::LiveModeFinished(_),
742                        ..
743                    }
744                ),
745                8 => assert_matches!(
746                    event,
747                    FromSync {
748                        event: TopicLogSyncEvent::Success,
749                        ..
750                    }
751                ),
752                _ => panic!(),
753            }
754        }
755    }
756}