p2panda_sync/lib.rs
1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Data- and transport-agnostic interface to implement custom sync protocols, compatible with
4//! `p2panda-net` or other peer-to-peer networking solutions.
5//!
6//! Sync or "synchronisation" protocols (also known as "replication protocols") are used to
7//! efficiently exchange data between peers.
8//!
9//! Unlike gossip protocols, sync protocols are better solutions to "catch up on past state". Peers
10//! can negotiate scope and access in a sync protocol for any type of data the remote peer
11//! currently knows about.
12//!
13//! In addition to the generic definition of the `SyncProtocol` trait, `p2panda-sync` includes
14//! optional implementations for efficient sync of append-only log-based data types. These optional
15//! implementations may be activated via feature flags. Finally, `p2panda-sync` provides helpers to
16//! encode wire messages in CBOR.
17#[cfg(feature = "cbor")]
18pub mod cbor;
19#[cfg(feature = "log-sync")]
20pub mod log_sync;
21
22use std::fmt::Debug;
23use std::hash::Hash;
24use std::sync::Arc;
25
26use async_trait::async_trait;
27use futures::{AsyncRead, AsyncWrite, Sink};
28use serde::{Deserialize, Serialize};
29use thiserror::Error;
30
31/// Traits to implement a custom sync protocol.
32///
33/// Implementing a `SyncProtocol` trait needs extra care and is only required when designing custom
34/// low-level peer-to-peer protocols and data types. p2panda already comes with solutions which can
35/// be used "out of the box", providing implementations for most applications and usecases.
36///
37/// ## Design
38///
39/// Sync sessions take place when two peers connect to each other and follow the sync protocol.
40/// They are designed as a two-party protocol featuring an "initiator" and an "acceptor" role.
41///
42/// Each protocol usually follows two phases: 1) The "Handshake" phase, during which the
43/// "initiator" sends the "topic query" and any access control data to the "acceptor", and 2) The
44/// "Sync" phase, where the requested application data is finally exchanged and validated.
45///
46/// ## Privacy and Security
47///
48/// The `SyncProtocol` trait has been designed to allow privacy-respecting implementations where
49/// application data (via access control) and the topic query itself (for example via Diffie
50/// Hellmann) is securely exchanged without revealing any information to unknown peers
51/// unnecessarily. This usually takes place during the "Handshake" phase of the protocol.
52///
53/// The underlying transport layer should provide automatic authentication of the remote peer, a
54/// reliable connection and transport encryption. `p2panda-net`, for example, uses self-certified
55/// TLS 1.3 over QUIC.
56///
57/// ## Streams
58///
59/// Three distinct data channels are provided by the underlying transport layer to each
60/// `SyncProtocol` implementation: `tx` for sending data to the remote peer, `rx` to receive data
61/// from the remote peer and `app_tx` to send received data to the higher-level application-,
62/// validation- and persistance-layers.
63///
64/// ## Topic queries
65///
66/// Topics queries are generic data types which can be used to subjectively express interest in a
67/// particular subset of the data we want to sync over, like chat group identifiers or very
68/// specific "search queries", for example "give me all documents containing the word 'billy'."
69///
70/// With the help of the `TopicMap` trait we can keep sync implementations agnostic to specific
71/// topic query implementations. The sync protocol only needs to feed the "topic query" into the
72/// "map" which will answer with the actual to-be-synced data entities (for example coming from a
73/// store). This allows application developers to re-use your `SyncProtocol` implementation for
74/// their custom `TopicQuery` requirements.
75///
76/// ## Validation
77///
78/// Basic data-format and -encoding validation usually takes place during the "Sync" phase of the
79/// protocol.
80///
81/// Further validation which might require more knowledge of the application state or can only be
82/// applied after decrypting the payload should be handled _outside_ the sync protocol, by sending
83/// it upstream to higher application layers.
84///
85/// ## Errors
86///
87/// Protocol implementations operate on multiple layers at the same time, expressed in distinct
88/// error categories:
89///
90/// 1. Unexpected behaviour of the remote peer not following the implemented protocol
91/// 2. Handling (rare) critical system failures
92#[async_trait]
93pub trait SyncProtocol<T, 'a>
94where
95 Self: Send + Sync + Debug,
96 T: TopicQuery,
97{
98 /// Custom identifier for this sync protocol implementation.
99 ///
100 /// This is currently only used for debugging or logging purposes.
101 fn name(&self) -> &'static str;
102
103 /// Initiate a sync protocol session over the provided bi-directional stream for the given
104 /// topic query.
105 ///
106 /// During the "Handshake" phase the "initiator" usually requests access and informs the remote
107 /// peer about the "topic query" they are interested in. Implementations for `p2panda-net`
108 /// are required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
109 /// `app_tx`) during this phase to inform the backend that we've successfully requested access,
110 /// exchanged the topic query with the remote peer and are about to begin sync.
111 ///
112 /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
113 /// the actual application data is exchanged with the remote peer. It's left up to each
114 /// protocol implementation to decide whether data is exchanged in one or both directions.
115 /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
116 /// (via `app_tx`).
117 ///
118 /// In case of a detected failure (either through a critical error on our end or an unexpected
119 /// behaviour from the remote peer) a `SyncError` is returned.
120 async fn initiate(
121 self: Arc<Self>,
122 topic_query: T,
123 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
124 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
125 app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
126 ) -> Result<(), SyncError>;
127
128 /// Accept a sync protocol session over the provided bi-directional stream.
129 ///
130 /// During the "Handshake" phase the "acceptor" usually responds to the access request and
131 /// learns about the "topic query" from the remote peer. Implementations for `p2panda-net` are
132 /// required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
133 /// `app_tx`) during this phase to inform the backend that the topic query has been
134 /// successfully received from the remote peer and that data exchange is about to begin.
135 ///
136 /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
137 /// the actual application data is exchanged with the remote peer. It's left up to each
138 /// protocol implementation to decide whether data is exchanged in one or both directions.
139 /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
140 /// (via `app_tx`).
141 ///
142 /// In case of a detected failure (either through a critical error on our end or an unexpected
143 /// behaviour from the remote peer) a `SyncError` is returned.
144 async fn accept(
145 self: Arc<Self>,
146 tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
147 rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
148 app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
149 ) -> Result<(), SyncError>;
150}
151
152/// Messages which can be sent to the higher application layers (for further validation or
153/// persistance) and the underlying transport layer (for managing the sync session).
154#[derive(Debug, PartialEq)]
155pub enum FromSync<T>
156where
157 T: TopicQuery,
158{
159 /// During the "Handshake" phase both peers usually manage access control and negotiate the
160 /// "topic query" they want to exchange over. This message indicates that this phase has ended.
161 ///
162 /// Implementations for `p2panda-net` are required to send this message to the underlying
163 /// transport layer to inform the "backend" that we've successfully requested access, exchanged
164 /// the topic query with the remote peer and are about to begin sync.
165 ///
166 /// With this information backends can optionally apply optimisations, which might for example
167 /// be required to keep application messages in-order (as there might exist other channels the
168 /// backend exchanges similar data over at the same time).
169 HandshakeSuccess(T),
170
171 /// Application data we've received during the sync session from the remote peer and want to
172 /// forward to higher application layers.
173 ///
174 /// These "frontends" might further process, decrypt payloads, sort messages or apply more
175 /// validation before they get finally persisted or rendered to the user. At this point the
176 /// sync protocol is merely "forwarding" it without any knowledge of how the data is used.
177 Data {
178 /// Exchanged data from sync session.
179 ///
180 /// Some data-types might be designed with "off-chain" use in mind, where a "header" is
181 /// crucial for integrity and authenticity but the actual "payload" is optional or
182 /// requested lazily in a later process.
183 header: Vec<u8>,
184
185 /// Optional "body" which can represent "off-chain" application data.
186 ///
187 /// This is useful for realising "off-chain" compatible data types. Implementations without
188 /// this distinction will always leave this field as `None` and only encode their data
189 /// types in the `header` field.
190 payload: Option<Vec<u8>>,
191 },
192}
193
194/// Errors which can occur during sync sessions.
195///
196/// 1. Critical system failures (ie. bug in p2panda code or sync implementation, sync
197/// implementation did not follow "2. Phase Flow" requirements, lack of system resources, etc.)
198/// 2. Unexpected Behaviour (ie. remote peer abruptly disconnected, error which got correctly
199/// caught in sync implementation, etc.)
200#[derive(Debug, PartialEq, Error)]
201pub enum SyncError {
202 /// Error due to unexpected (buggy or malicious) behaviour of the remote peer.
203 ///
204 /// Indicates that the sync protocol was not correctly followed, for example due to unexpected
205 /// or missing messages, etc.
206 ///
207 /// Can be used by a backend to re-attempt syncing with this peer or down-grading it in
208 /// priority, potentially deny-listing if communication failed too often.
209 #[error("sync session failed due to unexpected protocol behaviour of remote peer: {0}")]
210 UnexpectedBehaviour(String),
211
212 /// Error due to invalid encoding of a message sent by remote peer.
213 ///
214 /// Note that this error is intended for receiving messages from _remote_ peers which we can't
215 /// decode properly. If we fail with encoding our _own_ messages we should rather consider this
216 /// an `Critical` error type, as it likely means that there's a buggy implementation.
217 #[error("sync session failed due to invalid encoding of message sent by remote peer: {0}")]
218 InvalidEncoding(String),
219
220 /// Critical error due to system failure on our end.
221 ///
222 /// This indicates that our system is running out of resources (storage layer failure etc.) or
223 /// we have a buggy implementation.
224 #[error("sync session failed due critical system error: {0}")]
225 Critical(String),
226}
227
228/// Converts critical I/O error (which occurs during codec stream handling) into [`SyncError`].
229///
230/// This is usually a critical system failure indicating an implementation bug or lacking resources
231/// on the user's machine.
232///
233/// See `Encoder` or `Decoder` `Error` trait type in tokio's codec for more information:
234/// <https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Decoder.html#associatedtype.Error>
235impl From<std::io::Error> for SyncError {
236 fn from(err: std::io::Error) -> Self {
237 match err.kind() {
238 // Broken pipes usually indicate that the remote peer closed the connection
239 // unexpectedly, this is why we're not treating it as a critical error but as
240 // "unexpected behaviour" instead.
241 std::io::ErrorKind::BrokenPipe => Self::UnexpectedBehaviour("broken pipe".into()),
242 _ => Self::Critical(format!("internal i/o stream error {err}")),
243 }
244 }
245}
246
247/// Identify the particular dataset a peer is interested in syncing.
248///
249/// Exactly how this is expressed is left up to the user to decide. During sync the "initiator"
250/// sends their topic query to a remote peer where it is be mapped to their local dataset.
251/// Additional access-control checks can be performed. Once this "handshake" is complete both
252/// peers will proceed with the designated sync protocol.
253///
254/// ## `TopicId` vs `TopicQuery`
255///
256/// While `TopicId` is merely a 32-byte identifier which can't hold much information other than
257/// being a distinct identifier of a single data item or collection of them, we can use `TopicQuery` to
258/// implement custom data types representing "queries" for very specific data items. Peers can for
259/// example announce that they'd like "all events from the 27th of September 23 until today" with
260/// `TopicQuery`.
261///
262/// Consult the `TopicId` documentation in `p2panda-net` for more information.
263pub trait TopicQuery:
264 // Data types implementing `TopicQuery` also need to implement `Eq` and `Hash` in order to allow
265 // backends to organise sync sessions per topic query and peer, along with `Serialize` and
266 // `Deserialize` to allow sending topics over the wire.
267 Clone + Debug + Eq + Hash + Send + Sync + Serialize + for<'a> Deserialize<'a>
268{
269}