p2panda_net/sync/log_sync/
api.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::fmt::Debug;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use p2panda_core::{Extensions, Operation};
8use p2panda_store::{LogId, LogStore, OperationStore};
9use p2panda_sync::protocols::{Logs, TopicLogSyncEvent};
10use p2panda_sync::traits::TopicMap;
11use ractor::{ActorRef, call};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::sync::RwLock;
15
16use crate::TopicId;
17use crate::gossip::Gossip;
18use crate::iroh_endpoint::Endpoint;
19use crate::sync::actors::ToSyncManager;
20use crate::sync::handle::SyncHandle;
21use crate::sync::log_sync::Builder;
22
23/// Eventually consistent, local-first sync protocol based on append-only logs.
24///
25/// ## Example
26///
27/// See [`chat.rs`] for a full example using the sync protocol.
28///
29/// ## Local-first
30///
31/// In local-first applications we want to converge towards the same state eventually, which
32/// requires nodes to catch up on missed messages - independent of if they've been offline or
33/// not.
34///
35/// `p2panda-net` comes with a default `LogSync` protocol implementation which uses p2panda's
36/// **append-only log** Base Convergent Data Type (CDT).
37///
38/// After initial sync has finished, nodes switch to **live-mode** to directly push new messages to the
39/// network using a gossip protocol.
40///
41/// [`chat.rs`]: https://github.com/p2panda/p2panda/blob/main/p2panda-net/examples/chat.rs
42#[derive(Clone)]
43pub struct LogSync<S, L, E, TM>
44where
45    S: OperationStore<L, E> + LogStore<L, E> + Send + 'static,
46    L: LogId + Serialize + for<'de> Deserialize<'de> + Send + 'static,
47    E: Extensions + Send + 'static,
48    TM: TopicMap<TopicId, Logs<L>> + Send + 'static,
49{
50    inner: Arc<RwLock<Inner<E>>>,
51    _phantom: PhantomData<(S, L, TM)>,
52}
53
54struct Inner<E>
55where
56    E: Extensions + Send + 'static,
57{
58    #[allow(clippy::type_complexity)]
59    actor_ref: ActorRef<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>,
60}
61
62impl<S, L, E, TM> LogSync<S, L, E, TM>
63where
64    S: OperationStore<L, E> + LogStore<L, E> + Send + 'static,
65    L: LogId + Serialize + for<'de> Deserialize<'de> + Send + 'static,
66    E: Extensions + Send + 'static,
67    TM: TopicMap<TopicId, Logs<L>> + Send + 'static,
68{
69    #[allow(clippy::type_complexity)]
70    pub(crate) fn new(
71        actor_ref: ActorRef<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>,
72    ) -> Self {
73        Self {
74            inner: Arc::new(RwLock::new(Inner { actor_ref })),
75            _phantom: PhantomData,
76        }
77    }
78
79    pub fn builder(
80        store: S,
81        topic_map: TM,
82        endpoint: Endpoint,
83        gossip: Gossip,
84    ) -> Builder<S, L, E, TM> {
85        Builder::<S, L, E, TM>::new(store, topic_map, endpoint, gossip)
86    }
87
88    // TODO: Extensions should be generic over a stream handle, not over this struct.
89    pub async fn stream(
90        &self,
91        topic: TopicId,
92        live_mode: bool,
93    ) -> Result<SyncHandle<Operation<E>, TopicLogSyncEvent<E>>, LogSyncError<E>> {
94        let inner = self.inner.read().await;
95        let sync_manager_ref =
96            call!(inner.actor_ref, ToSyncManager::Create, topic, live_mode).map_err(Box::new)?;
97
98        Ok(SyncHandle::new(
99            topic,
100            inner.actor_ref.clone(),
101            sync_manager_ref,
102        ))
103    }
104}
105
106impl<E> Drop for Inner<E>
107where
108    E: Extensions + Send + 'static,
109{
110    fn drop(&mut self) {
111        self.actor_ref.stop(None);
112    }
113}
114
115#[derive(Debug, Error)]
116pub enum LogSyncError<E> {
117    /// Spawning the internal actor failed.
118    #[error(transparent)]
119    ActorSpawn(#[from] ractor::SpawnErr),
120
121    /// Messaging with internal actor via RPC failed.
122    #[error(transparent)]
123    ActorRpc(#[from] Box<ractor::RactorErr<ToSyncManager<Operation<E>, TopicLogSyncEvent<E>>>>),
124}