p2panda_sync/lib.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3#![cfg_attr(doctest, doc=include_str!("../README.md"))]
4
5//! Data- and transport-agnostic interface to implement custom sync protocols, compatible with
6//! `p2panda-net` or other peer-to-peer networking solutions.
7//!
8//! Sync or "synchronisation" protocols (also known as "replication protocols") are used to
9//! efficiently exchange data between peers.
10//!
11//! Unlike gossip protocols, sync protocols are better solutions to "catch up on past state". Peers
12//! can negotiate scope and access in a sync protocol for any type of data the remote peer
13//! currently knows about.
14//!
15//! In addition to the generic definition of the `SyncProtocol` trait, `p2panda-sync` includes
16//! optional implementations for efficient sync of append-only log-based data types. These optional
17//! implementations may be activated via feature flags. Finally, `p2panda-sync` provides helpers to
18//! encode wire messages in CBOR.
19#[cfg(feature = "cbor")]
20pub mod cbor;
21#[cfg(feature = "log-sync")]
22pub mod log_sync;
23#[cfg(feature = "test-protocols")]
24pub mod test_protocols;
25
26use std::fmt::Debug;
27use std::hash::Hash;
28use std::sync::Arc;
29
30use async_trait::async_trait;
31use futures::{AsyncRead, AsyncWrite, Sink};
32use serde::{Deserialize, Serialize};
33use thiserror::Error;
34
35/// Traits to implement a custom sync protocol.
36///
37/// Implementing a `SyncProtocol` trait needs extra care and is only required when designing custom
38/// low-level peer-to-peer protocols and data types. p2panda already comes with solutions which can
39/// be used "out of the box", providing implementations for most applications and usecases.
40///
41/// ## Design
42///
43/// Sync sessions take place when two peers connect to each other and follow the sync protocol.
44/// They are designed as a two-party protocol featuring an "initiator" and an "acceptor" role.
45///
46/// Each protocol usually follows two phases: 1) The "Handshake" phase, during which the
47/// "initiator" sends the "topic query" and any access control data to the "acceptor", and 2) The
48/// "Sync" phase, where the requested application data is finally exchanged and validated.
49///
50/// ## Privacy and Security
51///
52/// The `SyncProtocol` trait has been designed to allow privacy-respecting implementations where
53/// application data (via access control) and the topic query itself (for example via Diffie
54/// Hellmann) is securely exchanged without revealing any information to unknown peers
55/// unnecessarily. This usually takes place during the "Handshake" phase of the protocol.
56///
57/// The underlying transport layer should provide automatic authentication of the remote peer, a
58/// reliable connection and transport encryption. `p2panda-net`, for example, uses self-certified
59/// TLS 1.3 over QUIC.
60///
61/// ## Streams
62///
63/// Three distinct data channels are provided by the underlying transport layer to each
64/// `SyncProtocol` implementation: `tx` for sending data to the remote peer, `rx` to receive data
65/// from the remote peer and `app_tx` to send received data to the higher-level application-,
66/// validation- and persistance-layers.
67///
68/// ## Topic queries
69///
70/// Topics queries are generic data types which can be used to subjectively express interest in a
71/// particular subset of the data we want to sync over, like chat group identifiers or very
72/// specific "search queries", for example "give me all documents containing the word 'billy'."
73///
74/// With the help of the `TopicMap` trait we can keep sync implementations agnostic to specific
75/// topic query implementations. The sync protocol only needs to feed the "topic query" into the
76/// "map" which will answer with the actual to-be-synced data entities (for example coming from a
77/// store). This allows application developers to re-use your `SyncProtocol` implementation for
78/// their custom `TopicQuery` requirements.
79///
80/// ## Validation
81///
82/// Basic data-format and -encoding validation usually takes place during the "Sync" phase of the
83/// protocol.
84///
85/// Further validation which might require more knowledge of the application state or can only be
86/// applied after decrypting the payload should be handled _outside_ the sync protocol, by sending
87/// it upstream to higher application layers.
88///
89/// ## Errors
90///
91/// Protocol implementations operate on multiple layers at the same time, expressed in distinct
92/// error categories:
93///
94/// 1. Unexpected behaviour of the remote peer not following the implemented protocol
95/// 2. Handling (rare) critical system failures
96#[async_trait]
97pub trait SyncProtocol<T, 'a>
98where
99 Self: Send + Sync + Debug,
100 T: TopicQuery,
101{
102 /// Custom identifier for this sync protocol implementation.
103 ///
104 /// This is currently only used for debugging or logging purposes.
105 fn name(&self) -> &'static str;
106
107 /// Initiate a sync protocol session over the provided bi-directional stream for the given
108 /// topic query.
109 ///
110 /// During the "Handshake" phase the "initiator" usually requests access and informs the remote
111 /// peer about the "topic query" they are interested in. Implementations for `p2panda-net`
112 /// are required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
113 /// `app_tx`) during this phase to inform the backend that we've successfully requested access,
114 /// exchanged the topic query with the remote peer and are about to begin sync.
115 ///
116 /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
117 /// the actual application data is exchanged with the remote peer. It's left up to each
118 /// protocol implementation to decide whether data is exchanged in one or both directions.
119 /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
120 /// (via `app_tx`).
121 ///
122 /// In case of a detected failure (either through a critical error on our end or an unexpected
123 /// behaviour from the remote peer) a `SyncError` is returned.
124 async fn initiate(
125 self: Arc<Self>,
126 topic_query: T,
127 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
128 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
129 app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
130 ) -> Result<(), SyncError>;
131
132 /// Accept a sync protocol session over the provided bi-directional stream.
133 ///
134 /// During the "Handshake" phase the "acceptor" usually responds to the access request and
135 /// learns about the "topic query" from the remote peer. Implementations for `p2panda-net` are
136 /// required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
137 /// `app_tx`) during this phase to inform the backend that the topic query has been
138 /// successfully received from the remote peer and that data exchange is about to begin.
139 ///
140 /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
141 /// the actual application data is exchanged with the remote peer. It's left up to each
142 /// protocol implementation to decide whether data is exchanged in one or both directions.
143 /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
144 /// (via `app_tx`).
145 ///
146 /// In case of a detected failure (either through a critical error on our end or an unexpected
147 /// behaviour from the remote peer) a `SyncError` is returned.
148 async fn accept(
149 self: Arc<Self>,
150 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
151 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
152 app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
153 ) -> Result<(), SyncError>;
154}
155
156/// Messages which can be sent to the higher application layers (for further validation or
157/// persistance) and the underlying transport layer (for managing the sync session).
158#[derive(Debug, PartialEq)]
159pub enum FromSync<T>
160where
161 T: TopicQuery,
162{
163 /// During the "Handshake" phase both peers usually manage access control and negotiate the
164 /// "topic query" they want to exchange over. This message indicates that this phase has ended.
165 ///
166 /// Implementations for `p2panda-net` are required to send this message to the underlying
167 /// transport layer to inform the "backend" that we've successfully requested access, exchanged
168 /// the topic query with the remote peer and are about to begin sync.
169 ///
170 /// With this information backends can optionally apply optimisations, which might for example
171 /// be required to keep application messages in-order (as there might exist other channels the
172 /// backend exchanges similar data over at the same time).
173 HandshakeSuccess(T),
174
175 /// Application data we've received during the sync session from the remote peer and want to
176 /// forward to higher application layers.
177 ///
178 /// These "frontends" might further process, decrypt payloads, sort messages or apply more
179 /// validation before they get finally persisted or rendered to the user. At this point the
180 /// sync protocol is merely "forwarding" it without any knowledge of how the data is used.
181 Data {
182 /// Exchanged data from sync session.
183 ///
184 /// Some data-types might be designed with "off-chain" use in mind, where a "header" is
185 /// crucial for integrity and authenticity but the actual "payload" is optional or
186 /// requested lazily in a later process.
187 header: Vec<u8>,
188
189 /// Optional "body" which can represent "off-chain" application data.
190 ///
191 /// This is useful for realising "off-chain" compatible data types. Implementations without
192 /// this distinction will always leave this field as `None` and only encode their data
193 /// types in the `header` field.
194 payload: Option<Vec<u8>>,
195 },
196}
197
198/// Errors which can occur during sync sessions.
199///
200/// 1. Critical system failures (ie. bug in p2panda code or sync implementation, sync
201/// implementation did not follow "2. Phase Flow" requirements, lack of system resources, etc.)
202/// 2. Unexpected Behaviour (ie. remote peer abruptly disconnected, error which got correctly
203/// caught in sync implementation, etc.)
204#[derive(Debug, PartialEq, Error)]
205pub enum SyncError {
206 /// Error due to unexpected (buggy or malicious) behaviour of the remote peer.
207 ///
208 /// Indicates that the sync protocol was not correctly followed, for example due to unexpected
209 /// or missing messages, etc.
210 ///
211 /// Can be used by a backend to re-attempt syncing with this peer or down-grading it in
212 /// priority, potentially deny-listing if communication failed too often.
213 #[error("sync session failed due to unexpected protocol behaviour of remote peer: {0}")]
214 UnexpectedBehaviour(String),
215
216 /// Error due to invalid encoding of a message sent by remote peer.
217 ///
218 /// Note that this error is intended for receiving messages from _remote_ peers which we can't
219 /// decode properly. If we fail with encoding our _own_ messages we should rather consider this
220 /// an `Critical` error type, as it likely means that there's a buggy implementation.
221 #[error("sync session failed due to invalid encoding of message sent by remote peer: {0}")]
222 InvalidEncoding(String),
223
224 /// Critical error due to system failure on our end.
225 ///
226 /// This indicates that our system is running out of resources (storage layer failure etc.) or
227 /// we have a buggy implementation.
228 #[error("sync session failed due critical system error: {0}")]
229 Critical(String),
230}
231
232/// Converts critical I/O error (which occurs during codec stream handling) into [`SyncError`].
233///
234/// This is usually a critical system failure indicating an implementation bug or lacking resources
235/// on the user's machine.
236///
237/// See `Encoder` or `Decoder` `Error` trait type in tokio's codec for more information:
238/// <https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Decoder.html#associatedtype.Error>
239impl From<std::io::Error> for SyncError {
240 fn from(err: std::io::Error) -> Self {
241 match err.kind() {
242 // Broken pipes usually indicate that the remote peer closed the connection
243 // unexpectedly, this is why we're not treating it as a critical error but as
244 // "unexpected behaviour" instead.
245 std::io::ErrorKind::BrokenPipe => Self::UnexpectedBehaviour("broken pipe".into()),
246 _ => Self::Critical(format!("internal i/o stream error {err}")),
247 }
248 }
249}
250
251/// Identify the particular dataset a peer is interested in syncing.
252///
253/// Exactly how this is expressed is left up to the user to decide. During sync the "initiator"
254/// sends their topic query to a remote peer where it is be mapped to their local dataset.
255/// Additional access-control checks can be performed. Once this "handshake" is complete both
256/// peers will proceed with the designated sync protocol.
257///
258/// ## `TopicId` vs `TopicQuery`
259///
260/// While `TopicId` is merely a 32-byte identifier which can't hold much information other than
261/// being a distinct identifier of a single data item or collection of them, we can use `TopicQuery` to
262/// implement custom data types representing "queries" for very specific data items. Peers can for
263/// example announce that they'd like "all events from the 27th of September 23 until today" with
264/// `TopicQuery`.
265///
266/// Consult the `TopicId` documentation in `p2panda-net` for more information.
267pub trait TopicQuery:
268 // Data types implementing `TopicQuery` also need to implement `Eq` and `Hash` in order to allow
269 // backends to organise sync sessions per topic query and peer, along with `Serialize` and
270 // `Deserialize` to allow sending topics over the wire.
271 Clone + Debug + Eq + Hash + Send + Sync + Serialize + for<'a> Deserialize<'a>
272{
273}