p2panda_sync/
lib.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3#![cfg_attr(doctest, doc=include_str!("../README.md"))]
4
5//! Data- and transport-agnostic interface to implement custom sync protocols, compatible with
6//! `p2panda-net` or other peer-to-peer networking solutions.
7//!
8//! Sync or "synchronisation" protocols (also known as "replication protocols") are used to
9//! efficiently exchange data between peers.
10//!
11//! Unlike gossip protocols, sync protocols are better solutions to "catch up on past state". Peers
12//! can negotiate scope and access in a sync protocol for any type of data the remote peer
13//! currently knows about.
14//!
15//! In addition to the generic definition of the `SyncProtocol` trait, `p2panda-sync` includes
16//! optional implementations for efficient sync of append-only log-based data types. These optional
17//! implementations may be activated via feature flags. Finally, `p2panda-sync` provides helpers to
18//! encode wire messages in CBOR.
19#[cfg(feature = "cbor")]
20pub mod cbor;
21#[cfg(feature = "log-sync")]
22pub mod log_sync;
23#[cfg(feature = "test-protocols")]
24pub mod test_protocols;
25
26use std::fmt::Debug;
27use std::hash::Hash;
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use futures::{AsyncRead, AsyncWrite, Sink};
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35/// Traits to implement a custom sync protocol.
36///
37/// Implementing a `SyncProtocol` trait needs extra care and is only required when designing custom
38/// low-level peer-to-peer protocols and data types. p2panda already comes with solutions which can
39/// be used "out of the box", providing implementations for most applications and usecases.
40///
41/// ## Design
42///
43/// Sync sessions take place when two peers connect to each other and follow the sync protocol.
44/// They are designed as a two-party protocol featuring an "initiator" and an "acceptor" role.
45///
46/// Each protocol usually follows two phases: 1) The "Handshake" phase, during which the
47/// "initiator" sends the "topic query" and any access control data to the "acceptor", and 2) The
48/// "Sync" phase, where the requested application data is finally exchanged and validated.
49///
50/// ## Privacy and Security
51///
52/// The `SyncProtocol` trait has been designed to allow privacy-respecting implementations where
53/// application data (via access control) and the topic query itself (for example via Diffie
54/// Hellmann) is securely exchanged without revealing any information to unknown peers
55/// unnecessarily. This usually takes place during the "Handshake" phase of the protocol.
56///
57/// The underlying transport layer should provide automatic authentication of the remote peer, a
58/// reliable connection and transport encryption. `p2panda-net`, for example, uses self-certified
59/// TLS 1.3 over QUIC.
60///
61/// ## Streams
62///
63/// Three distinct data channels are provided by the underlying transport layer to each
64/// `SyncProtocol` implementation: `tx` for sending data to the remote peer, `rx` to receive data
65/// from the remote peer and `app_tx` to send received data to the higher-level application-,
66/// validation- and persistance-layers.
67///
68/// ## Topic queries
69///
70/// Topics queries are generic data types which can be used to subjectively express interest in a
71/// particular subset of the data we want to sync over, like chat group identifiers or very
72/// specific "search queries", for example "give me all documents containing the word 'billy'."
73///
74/// With the help of the `TopicMap` trait we can keep sync implementations agnostic to specific
75/// topic query implementations. The sync protocol only needs to feed the "topic query" into the
76/// "map" which will answer with the actual to-be-synced data entities (for example coming from a
77/// store). This allows application developers to re-use your `SyncProtocol` implementation for
78/// their custom `TopicQuery` requirements.
79///
80/// ## Validation
81///
82/// Basic data-format and -encoding validation usually takes place during the "Sync" phase of the
83/// protocol.
84///
85/// Further validation which might require more knowledge of the application state or can only be
86/// applied after decrypting the payload should be handled _outside_ the sync protocol, by sending
87/// it upstream to higher application layers.
88///
89/// ## Errors
90///
91/// Protocol implementations operate on multiple layers at the same time, expressed in distinct
92/// error categories:
93///
94/// 1. Unexpected behaviour of the remote peer not following the implemented protocol
95/// 2. Handling (rare) critical system failures
96#[async_trait]
97pub trait SyncProtocol<T, 'a>
98where
99    Self: Send + Sync + Debug,
100    T: TopicQuery,
101{
102    /// Custom identifier for this sync protocol implementation.
103    ///
104    /// This is currently only used for debugging or logging purposes.
105    fn name(&self) -> &'static str;
106
107    /// Initiate a sync protocol session over the provided bi-directional stream for the given
108    /// topic query.
109    ///
110    /// During the "Handshake" phase the "initiator" usually requests access and informs the remote
111    /// peer about the "topic query" they are interested in. Implementations for `p2panda-net`
112    /// are required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
113    /// `app_tx`) during this phase to inform the backend that we've successfully requested access,
114    /// exchanged the topic query with the remote peer and are about to begin sync.
115    ///
116    /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
117    /// the actual application data is exchanged with the remote peer. It's left up to each
118    /// protocol implementation to decide whether data is exchanged in one or both directions.
119    /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
120    /// (via `app_tx`).
121    ///
122    /// In case of a detected failure (either through a critical error on our end or an unexpected
123    /// behaviour from the remote peer) a `SyncError` is returned.
124    async fn initiate(
125        self: Arc<Self>,
126        topic_query: T,
127        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
128        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
129        app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
130    ) -> Result<(), SyncError>;
131
132    /// Accept a sync protocol session over the provided bi-directional stream.
133    ///
134    /// During the "Handshake" phase the "acceptor" usually responds to the access request and
135    /// learns about the "topic query" from the remote peer. Implementations for `p2panda-net` are
136    /// required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
137    /// `app_tx`) during this phase to inform the backend that the topic query has been
138    /// successfully received from the remote peer and that data exchange is about to begin.
139    ///
140    /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
141    /// the actual application data is exchanged with the remote peer. It's left up to each
142    /// protocol implementation to decide whether data is exchanged in one or both directions.
143    /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
144    /// (via `app_tx`).
145    ///
146    /// In case of a detected failure (either through a critical error on our end or an unexpected
147    /// behaviour from the remote peer) a `SyncError` is returned.
148    async fn accept(
149        self: Arc<Self>,
150        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
151        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
152        app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
153    ) -> Result<(), SyncError>;
154}
155
156/// Messages which can be sent to the higher application layers (for further validation or
157/// persistance) and the underlying transport layer (for managing the sync session).
158#[derive(Debug, PartialEq)]
159pub enum FromSync<T>
160where
161    T: TopicQuery,
162{
163    /// During the "Handshake" phase both peers usually manage access control and negotiate the
164    /// "topic query" they want to exchange over. This message indicates that this phase has ended.
165    ///
166    /// Implementations for `p2panda-net` are required to send this message to the underlying
167    /// transport layer to inform the "backend" that we've successfully requested access, exchanged
168    /// the topic query with the remote peer and are about to begin sync.
169    ///
170    /// With this information backends can optionally apply optimisations, which might for example
171    /// be required to keep application messages in-order (as there might exist other channels the
172    /// backend exchanges similar data over at the same time).
173    HandshakeSuccess(T),
174
175    /// Application data we've received during the sync session from the remote peer and want to
176    /// forward to higher application layers.
177    ///
178    /// These "frontends" might further process, decrypt payloads, sort messages or apply more
179    /// validation before they get finally persisted or rendered to the user. At this point the
180    /// sync protocol is merely "forwarding" it without any knowledge of how the data is used.
181    Data {
182        /// Exchanged data from sync session.
183        ///
184        /// Some data-types might be designed with "off-chain" use in mind, where a "header" is
185        /// crucial for integrity and authenticity but the actual "payload" is optional or
186        /// requested lazily in a later process.
187        header: Vec<u8>,
188
189        /// Optional "body" which can represent "off-chain" application data.
190        ///
191        /// This is useful for realising "off-chain" compatible data types. Implementations without
192        /// this distinction will always leave this field as `None` and only encode their data
193        /// types in the `header` field.
194        payload: Option<Vec<u8>>,
195    },
196}
197
198/// Errors which can occur during sync sessions.
199///
200/// 1. Critical system failures (ie. bug in p2panda code or sync implementation, sync
201///    implementation did not follow "2. Phase Flow" requirements, lack of system resources, etc.)
202/// 2. Unexpected Behaviour (ie. remote peer abruptly disconnected, error which got correctly
203///    caught in sync implementation, etc.)
204#[derive(Debug, PartialEq, Error)]
205pub enum SyncError {
206    /// Error due to unexpected (buggy or malicious) behaviour of the remote peer.
207    ///
208    /// Indicates that the sync protocol was not correctly followed, for example due to unexpected
209    /// or missing messages, etc.
210    ///
211    /// Can be used by a backend to re-attempt syncing with this peer or down-grading it in
212    /// priority, potentially deny-listing if communication failed too often.
213    #[error("sync session failed due to unexpected protocol behaviour of remote peer: {0}")]
214    UnexpectedBehaviour(String),
215
216    /// Error due to invalid encoding of a message sent by remote peer.
217    ///
218    /// Note that this error is intended for receiving messages from _remote_ peers which we can't
219    /// decode properly. If we fail with encoding our _own_ messages we should rather consider this
220    /// an `Critical` error type, as it likely means that there's a buggy implementation.
221    #[error("sync session failed due to invalid encoding of message sent by remote peer: {0}")]
222    InvalidEncoding(String),
223
224    /// Critical error due to system failure on our end.
225    ///
226    /// This indicates that our system is running out of resources (storage layer failure etc.) or
227    /// we have a buggy implementation.
228    #[error("sync session failed due critical system error: {0}")]
229    Critical(String),
230}
231
232/// Converts critical I/O error (which occurs during codec stream handling) into [`SyncError`].
233///
234/// This is usually a critical system failure indicating an implementation bug or lacking resources
235/// on the user's machine.
236///
237/// See `Encoder` or `Decoder` `Error` trait type in tokio's codec for more information:
238/// <https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Decoder.html#associatedtype.Error>
239impl From<std::io::Error> for SyncError {
240    fn from(err: std::io::Error) -> Self {
241        match err.kind() {
242            // Broken pipes usually indicate that the remote peer closed the connection
243            // unexpectedly, this is why we're not treating it as a critical error but as
244            // "unexpected behaviour" instead.
245            std::io::ErrorKind::BrokenPipe => Self::UnexpectedBehaviour("broken pipe".into()),
246            _ => Self::Critical(format!("internal i/o stream error {err}")),
247        }
248    }
249}
250
251/// Identify the particular dataset a peer is interested in syncing.
252///
253/// Exactly how this is expressed is left up to the user to decide. During sync the "initiator"
254/// sends their topic query to a remote peer where it is be mapped to their local dataset.
255/// Additional access-control checks can be performed. Once this "handshake" is complete both
256/// peers will proceed with the designated sync protocol.
257///
258/// ## `TopicId` vs `TopicQuery`
259///
260/// While `TopicId` is merely a 32-byte identifier which can't hold much information other than
261/// being a distinct identifier of a single data item or collection of them, we can use `TopicQuery` to
262/// implement custom data types representing "queries" for very specific data items. Peers can for
263/// example announce that they'd like "all events from the 27th of September 23 until today" with
264/// `TopicQuery`.
265///
266/// Consult the `TopicId` documentation in `p2panda-net` for more information.
267pub trait TopicQuery:
268    // Data types implementing `TopicQuery` also need to implement `Eq` and `Hash` in order to allow 
269    // backends to organise sync sessions per topic query and peer, along with `Serialize` and 
270    // `Deserialize` to allow sending topics over the wire.
271    Clone + Debug + Eq + Hash + Send + Sync + Serialize + for<'a> Deserialize<'a>
272{
273}