p2panda_net/sync/log_sync/
api.rs1use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use p2panda_core::{Extensions, Operation};
8use p2panda_store::{LogId, LogStore, OperationStore};
9use p2panda_sync::protocols::{Logs, TopicLogSyncEvent};
10use p2panda_sync::traits::TopicMap;
11use ractor::{ActorRef, call};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::sync::RwLock;
15
16use crate::TopicId;
17use crate::gossip::Gossip;
18use crate::iroh_endpoint::Endpoint;
19use crate::sync::actors::ToSyncManager;
20use crate::sync::handle::SyncHandle;
21use crate::sync::log_sync::Builder;
22
23#[derive(Clone)]
43pub struct LogSync<S, L, E, TM>
44where
45 S: OperationStore<L, E> + LogStore<L, E> + Send + 'static,
46 L: LogId + Serialize + for<'de> Deserialize<'de> + Send + 'static,
47 E: Extensions + Send + 'static,
48 TM: TopicMap<TopicId, Logs<L>> + Send + 'static,
49{
50 inner: Arc<RwLock<Inner<E>>>,
51 _phantom: PhantomData<(S, L, TM)>,
52}
53
54struct Inner<E>
55where
56 E: Extensions + Send + 'static,
57{
58 #[allow(clippy::type_complexity)]
59 actor_ref: ActorRef<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>,
60}
61
62impl<S, L, E, TM> LogSync<S, L, E, TM>
63where
64 S: OperationStore<L, E> + LogStore<L, E> + Send + 'static,
65 L: LogId + Serialize + for<'de> Deserialize<'de> + Send + 'static,
66 E: Extensions + Send + 'static,
67 TM: TopicMap<TopicId, Logs<L>> + Send + 'static,
68{
69 #[allow(clippy::type_complexity)]
70 pub(crate) fn new(
71 actor_ref: ActorRef<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>,
72 ) -> Self {
73 Self {
74 inner: Arc::new(RwLock::new(Inner { actor_ref })),
75 _phantom: PhantomData,
76 }
77 }
78
79 pub fn builder(
80 store: S,
81 topic_map: TM,
82 endpoint: Endpoint,
83 gossip: Gossip,
84 ) -> Builder<S, L, E, TM> {
85 Builder::<S, L, E, TM>::new(store, topic_map, endpoint, gossip)
86 }
87
88 pub async fn stream(
90 &self,
91 topic: TopicId,
92 live_mode: bool,
93 ) -> Result<SyncHandle<Operation<E>, TopicLogSyncEvent<E>>, LogSyncError<E>> {
94 let inner = self.inner.read().await;
95 let sync_manager_ref =
96 call!(inner.actor_ref, ToSyncManager::Create, topic, live_mode).map_err(Box::new)?;
97
98 Ok(SyncHandle::new(
99 topic,
100 inner.actor_ref.clone(),
101 sync_manager_ref,
102 ))
103 }
104}
105
106impl<E> Drop for Inner<E>
107where
108 E: Extensions + Send + 'static,
109{
110 fn drop(&mut self) {
111 self.actor_ref.stop(None);
112 }
113}
114
115#[derive(Debug, Error)]
116pub enum LogSyncError<E> {
117 #[error(transparent)]
119 ActorSpawn(#[from] ractor::SpawnErr),
120
121 #[error(transparent)]
123 ActorRpc(#[from] Box<ractor::RactorErr<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>>),
124}