p2panda_sync/lib.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Data- and transport-agnostic interface to implement custom sync protocols, compatible with
4//! `p2panda-net` or other peer-to-peer networking solutions.
5//!
6//! Sync or "synchronisation" protocols (also known as "replication protocols") are used to
7//! efficiently exchange data between peers.
8//!
9//! Unlike gossip protocols, sync protocols are better solutions to "catch up on past state". Peers
10//! can negotiate scope and access in a sync protocol for any type of data the remote peer
11//! currently knows about.
12//!
13//! In addition to the generic definition of the `SyncProtocol` trait, `p2panda-sync` includes
14//! optional implementations for efficient sync of append-only log-based data types. These optional
15//! implementations may be activated via feature flags. Finally, `p2panda-sync` provides helpers to
16//! encode wire messages in CBOR.
17#[cfg(feature = "cbor")]
18pub mod cbor;
19#[cfg(feature = "log-sync")]
20pub mod log_sync;
21
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::Arc;
25
26use async_trait::async_trait;
27use futures::{AsyncRead, AsyncWrite, Sink};
28use serde::{Deserialize, Serialize};
29use thiserror::Error;
30
31/// Traits to implement a custom sync protocol.
32///
33/// Implementing a `SyncProtocol` trait needs extra care and is only required when designing custom
34/// low-level peer-to-peer protocols and data types. p2panda already comes with solutions which can
35/// be used "out of the box", providing implementations for most applications and usecases.
36///
37/// ## Design
38///
39/// Sync sessions take place when two peers connect to each other and follow the sync protocol.
40/// They are designed as a two-party protocol featuring an "initiator" and an "acceptor" role.
41///
42/// Each protocol usually follows two phases: 1) The "Handshake" phase, during which the
43/// "initiator" sends the "topic query" and any access control data to the "acceptor", and 2) The
44/// "Sync" phase, where the requested application data is finally exchanged and validated.
45///
46/// ## Privacy and Security
47///
48/// The `SyncProtocol` trait has been designed to allow privacy-respecting implementations where
49/// application data (via access control) and the topic query itself (for example via Diffie
50/// Hellmann) is securely exchanged without revealing any information to unknown peers
51/// unnecessarily. This usually takes place during the "Handshake" phase of the protocol.
52///
53/// The underlying transport layer should provide automatic authentication of the remote peer, a
54/// reliable connection and transport encryption. `p2panda-net`, for example, uses self-certified
55/// TLS 1.3 over QUIC.
56///
57/// ## Streams
58///
59/// Three distinct data channels are provided by the underlying transport layer to each
60/// `SyncProtocol` implementation: `tx` for sending data to the remote peer, `rx` to receive data
61/// from the remote peer and `app_tx` to send received data to the higher-level application-,
62/// validation- and persistance-layers.
63///
64/// ## Topic queries
65///
66/// Topics queries are generic data types which can be used to subjectively express interest in a
67/// particular subset of the data we want to sync over, like chat group identifiers or very
68/// specific "search queries", for example "give me all documents containing the word 'billy'."
69///
70/// With the help of the `TopicMap` trait we can keep sync implementations agnostic to specific
71/// topic query implementations. The sync protocol only needs to feed the "topic query" into the
72/// "map" which will answer with the actual to-be-synced data entities (for example coming from a
73/// store). This allows application developers to re-use your `SyncProtocol` implementation for
74/// their custom `TopicQuery` requirements.
75///
76/// ## Validation
77///
78/// Basic data-format and -encoding validation usually takes place during the "Sync" phase of the
79/// protocol.
80///
81/// Further validation which might require more knowledge of the application state or can only be
82/// applied after decrypting the payload should be handled _outside_ the sync protocol, by sending
83/// it upstream to higher application layers.
84///
85/// ## Errors
86///
87/// Protocol implementations operate on multiple layers at the same time, expressed in distinct
88/// error categories:
89///
90/// 1. Unexpected behaviour of the remote peer not following the implemented protocol
91/// 2. Handling (rare) critical system failures
92#[async_trait]
93pub trait SyncProtocol<T, 'a>
94where
95    Self: Send + Sync + Debug,
96    T: TopicQuery,
97{
98    /// Custom identifier for this sync protocol implementation.
99    ///
100    /// This is currently only used for debugging or logging purposes.
101    fn name(&self) -> &'static str;
102
103    /// Initiate a sync protocol session over the provided bi-directional stream for the given
104    /// topic query.
105    ///
106    /// During the "Handshake" phase the "initiator" usually requests access and informs the remote
107    /// peer about the "topic query" they are interested in. Implementations for `p2panda-net`
108    /// are required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
109    /// `app_tx`) during this phase to inform the backend that we've successfully requested access,
110    /// exchanged the topic query with the remote peer and are about to begin sync.
111    ///
112    /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
113    /// the actual application data is exchanged with the remote peer. It's left up to each
114    /// protocol implementation to decide whether data is exchanged in one or both directions.
115    /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
116    /// (via `app_tx`).
117    ///
118    /// In case of a detected failure (either through a critical error on our end or an unexpected
119    /// behaviour from the remote peer) a `SyncError` is returned.
120    async fn initiate(
121        self: Arc<Self>,
122        topic_query: T,
123        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
124        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
125        app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
126    ) -> Result<(), SyncError>;
127
128    /// Accept a sync protocol session over the provided bi-directional stream.
129    ///
130    /// During the "Handshake" phase the "acceptor" usually responds to the access request and
131    /// learns about the "topic query" from the remote peer. Implementations for `p2panda-net` are
132    /// required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
133    /// `app_tx`) during this phase to inform the backend that the topic query has been
134    /// successfully received from the remote peer and that data exchange is about to begin.
135    ///
136    /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
137    /// the actual application data is exchanged with the remote peer. It's left up to each
138    /// protocol implementation to decide whether data is exchanged in one or both directions.
139    /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
140    /// (via `app_tx`).
141    ///
142    /// In case of a detected failure (either through a critical error on our end or an unexpected
143    /// behaviour from the remote peer) a `SyncError` is returned.
144    async fn accept(
145        self: Arc<Self>,
146        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
147        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
148        app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
149    ) -> Result<(), SyncError>;
150}
151
152/// Messages which can be sent to the higher application layers (for further validation or
153/// persistance) and the underlying transport layer (for managing the sync session).
154#[derive(Debug, PartialEq)]
155pub enum FromSync<T>
156where
157    T: TopicQuery,
158{
159    /// During the "Handshake" phase both peers usually manage access control and negotiate the
160    /// "topic query" they want to exchange over. This message indicates that this phase has ended.
161    ///
162    /// Implementations for `p2panda-net` are required to send this message to the underlying
163    /// transport layer to inform the "backend" that we've successfully requested access, exchanged
164    /// the topic query with the remote peer and are about to begin sync.
165    ///
166    /// With this information backends can optionally apply optimisations, which might for example
167    /// be required to keep application messages in-order (as there might exist other channels the
168    /// backend exchanges similar data over at the same time).
169    HandshakeSuccess(T),
170
171    /// Application data we've received during the sync session from the remote peer and want to
172    /// forward to higher application layers.
173    ///
174    /// These "frontends" might further process, decrypt payloads, sort messages or apply more
175    /// validation before they get finally persisted or rendered to the user. At this point the
176    /// sync protocol is merely "forwarding" it without any knowledge of how the data is used.
177    Data {
178        /// Exchanged data from sync session.
179        ///
180        /// Some data-types might be designed with "off-chain" use in mind, where a "header" is
181        /// crucial for integrity and authenticity but the actual "payload" is optional or
182        /// requested lazily in a later process.
183        header: Vec<u8>,
184
185        /// Optional "body" which can represent "off-chain" application data.
186        ///
187        /// This is useful for realising "off-chain" compatible data types. Implementations without
188        /// this distinction will always leave this field as `None` and only encode their data
189        /// types in the `header` field.
190        payload: Option<Vec<u8>>,
191    },
192}
193
194/// Errors which can occur during sync sessions.
195///
196/// 1. Critical system failures (ie. bug in p2panda code or sync implementation, sync
197///    implementation did not follow "2. Phase Flow" requirements, lack of system resources, etc.)
198/// 2. Unexpected Behaviour (ie. remote peer abruptly disconnected, error which got correctly
199///    caught in sync implementation, etc.)
200#[derive(Debug, PartialEq, Error)]
201pub enum SyncError {
202    /// Error due to unexpected (buggy or malicious) behaviour of the remote peer.
203    ///
204    /// Indicates that the sync protocol was not correctly followed, for example due to unexpected
205    /// or missing messages, etc.
206    ///
207    /// Can be used by a backend to re-attempt syncing with this peer or down-grading it in
208    /// priority, potentially deny-listing if communication failed too often.
209    #[error("sync session failed due to unexpected protocol behaviour of remote peer: {0}")]
210    UnexpectedBehaviour(String),
211
212    /// Error due to invalid encoding of a message sent by remote peer.
213    ///
214    /// Note that this error is intended for receiving messages from _remote_ peers which we can't
215    /// decode properly. If we fail with encoding our _own_ messages we should rather consider this
216    /// an `Critical` error type, as it likely means that there's a buggy implementation.
217    #[error("sync session failed due to invalid encoding of message sent by remote peer: {0}")]
218    InvalidEncoding(String),
219
220    /// Critical error due to system failure on our end.
221    ///
222    /// This indicates that our system is running out of resources (storage layer failure etc.) or
223    /// we have a buggy implementation.
224    #[error("sync session failed due critical system error: {0}")]
225    Critical(String),
226}
227
228/// Converts critical I/O error (which occurs during codec stream handling) into [`SyncError`].
229///
230/// This is usually a critical system failure indicating an implementation bug or lacking resources
231/// on the user's machine.
232///
233/// See `Encoder` or `Decoder` `Error` trait type in tokio's codec for more information:
234/// <https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Decoder.html#associatedtype.Error>
235impl From<std::io::Error> for SyncError {
236    fn from(err: std::io::Error) -> Self {
237        match err.kind() {
238            // Broken pipes usually indicate that the remote peer closed the connection
239            // unexpectedly, this is why we're not treating it as a critical error but as
240            // "unexpected behaviour" instead.
241            std::io::ErrorKind::BrokenPipe => Self::UnexpectedBehaviour("broken pipe".into()),
242            _ => Self::Critical(format!("internal i/o stream error {err}")),
243        }
244    }
245}
246
247/// Identify the particular dataset a peer is interested in syncing.
248///
249/// Exactly how this is expressed is left up to the user to decide. During sync the "initiator"
250/// sends their topic query to a remote peer where it is be mapped to their local dataset.
251/// Additional access-control checks can be performed. Once this "handshake" is complete both
252/// peers will proceed with the designated sync protocol.
253///
254/// ## `TopicId` vs `TopicQuery`
255///
256/// While `TopicId` is merely a 32-byte identifier which can't hold much information other than
257/// being a distinct identifier of a single data item or collection of them, we can use `TopicQuery` to
258/// implement custom data types representing "queries" for very specific data items. Peers can for
259/// example announce that they'd like "all events from the 27th of September 23 until today" with
260/// `TopicQuery`.
261///
262/// Consult the `TopicId` documentation in `p2panda-net` for more information.
263pub trait TopicQuery:
264    // Data types implementing `TopicQuery` also need to implement `Eq` and `Hash` in order to allow 
265    // backends to organise sync sessions per topic query and peer, along with `Serialize` and 
266    // `Deserialize` to allow sending topics over the wire.
267    Clone + Debug + Eq + Hash + Send + Sync + Serialize + for<'a> Deserialize<'a>
268{
269}