p2panda_net/sync/log_sync/
builder.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::marker::PhantomData;
4
5use p2panda_core::Extensions;
6use p2panda_store::{LogId, LogStore, OperationStore};
7use p2panda_sync::manager::{TopicSyncManager, TopicSyncManagerArgs};
8use p2panda_sync::protocols::Logs;
9use p2panda_sync::traits::TopicMap;
10use ractor::thread_local::{ThreadLocalActor, ThreadLocalActorSpawner};
11use serde::{Deserialize, Serialize};
12
13use crate::TopicId;
14use crate::gossip::Gossip;
15use crate::iroh_endpoint::Endpoint;
16use crate::sync::actors::SyncManager;
17use crate::sync::log_sync::{LOG_SYNC_PROTOCOL_ID, LogSync, LogSyncError};
18
19pub struct Builder<S, L, E, TM>
20where
21    S: OperationStore<L, E> + LogStore<L, E> + Send + 'static,
22    L: LogId + Serialize + for<'de> Deserialize<'de> + Send + 'static,
23    E: Extensions + Send + 'static,
24    TM: TopicMap<TopicId, Logs<L>> + Send + 'static,
25{
26    store: S,
27    topic_map: TM,
28    endpoint: Endpoint,
29    gossip: Gossip,
30    _marker: PhantomData<(L, E)>,
31}
32
33impl<S, L, E, TM> Builder<S, L, E, TM>
34where
35    S: OperationStore<L, E> + LogStore<L, E> + Send + 'static,
36    L: LogId + Serialize + for<'de> Deserialize<'de> + Send + 'static,
37    E: Extensions + Send + 'static,
38    TM: TopicMap<TopicId, Logs<L>> + Send + 'static,
39{
40    pub fn new(store: S, topic_map: TM, endpoint: Endpoint, gossip: Gossip) -> Self {
41        Self {
42            store,
43            topic_map,
44            endpoint,
45            gossip,
46            _marker: PhantomData,
47        }
48    }
49
50    pub async fn spawn(self) -> Result<LogSync<S, L, E, TM>, LogSyncError<E>> {
51        let (actor_ref, _) = {
52            let thread_pool = ThreadLocalActorSpawner::new();
53
54            let config = TopicSyncManagerArgs {
55                store: self.store,
56                topic_map: self.topic_map,
57            };
58
59            let args = (
60                LOG_SYNC_PROTOCOL_ID.to_vec(),
61                config,
62                self.endpoint,
63                self.gossip,
64            );
65
66            SyncManager::<TopicSyncManager<TopicId, S, TM, L, E>>::spawn(None, args, thread_pool)
67                .await?
68        };
69
70        Ok(LogSync::new(actor_ref))
71    }
72}