p2panda_sync/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
// SPDX-License-Identifier: MIT OR Apache-2.0

//! Data- and transport-agnostic interface to implement custom sync protocols, compatible with
//! `p2panda-net` or other peer-to-peer networking solutions.
//!
//! Sync or "synchronisation" protocols (also known as "replication protocols") are used to
//! efficiently exchange data between peers.
//!
//! Unlike gossip protocols, sync protocols are better solutions to "catch up on past state". Peers
//! can negotiate scope and access in a sync protocol for any type of data the remote peer
//! currently knows about.
//!
//! In addition to the generic definition of the `SyncProtocol` trait, `p2panda-sync` includes
//! optional implementations for efficient sync of append-only log-based data types. These optional
//! implementations may be activated via feature flags. Finally, `p2panda-sync` provides helpers to
//! encode wire messages in CBOR.
#[cfg(feature = "cbor")]
pub mod cbor;
#[cfg(feature = "log-sync")]
pub mod log_sync;

use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;

use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite, Sink};
use serde::{Deserialize, Serialize};
use thiserror::Error;

/// Traits to implement a custom sync protocol.
///
/// Implementing a `SyncProtocol` trait needs extra care and is only required when designing custom
/// low-level peer-to-peer protocols and data types. p2panda already comes with solutions which can
/// be used "out of the box", providing implementations for most applications and usecases.
///
/// ## Design
///
/// Sync sessions take place when two peers connect to each other and follow the sync protocol.
/// They are designed as a two-party protocol featuring an "initiator" and an "acceptor" role.
///
/// Each protocol usually follows two phases: 1) The "Handshake" phase, during which the
/// "initiator" sends the "topic query" and any access control data to the "acceptor", and 2) The
/// "Sync" phase, where the requested application data is finally exchanged and validated.
///
/// ## Privacy and Security
///
/// The `SyncProtocol` trait has been designed to allow privacy-respecting implementations where
/// application data (via access control) and the topic query itself (for example via Diffie
/// Hellmann) is securely exchanged without revealing any information to unknown peers
/// unnecessarily. This usually takes place during the "Handshake" phase of the protocol.
///
/// The underlying transport layer should provide automatic authentication of the remote peer, a
/// reliable connection and transport encryption. `p2panda-net`, for example, uses self-certified
/// TLS 1.3 over QUIC.
///
/// ## Streams
///
/// Three distinct data channels are provided by the underlying transport layer to each
/// `SyncProtocol` implementation: `tx` for sending data to the remote peer, `rx` to receive data
/// from the remote peer and `app_tx` to send received data to the higher-level application-,
/// validation- and persistance-layers.
///
/// ## Topic queries
///
/// Topics queries are generic data types which can be used to subjectively express interest in a
/// particular subset of the data we want to sync over, like chat group identifiers or very
/// specific "search queries", for example "give me all documents containing the word 'billy'."
///
/// With the help of the `TopicMap` trait we can keep sync implementations agnostic to specific
/// topic query implementations. The sync protocol only needs to feed the "topic query" into the
/// "map" which will answer with the actual to-be-synced data entities (for example coming from a
/// store). This allows application developers to re-use your `SyncProtocol` implementation for
/// their custom `TopicQuery` requirements.
///
/// ## Validation
///
/// Basic data-format and -encoding validation usually takes place during the "Sync" phase of the
/// protocol.
///
/// Further validation which might require more knowledge of the application state or can only be
/// applied after decrypting the payload should be handled _outside_ the sync protocol, by sending
/// it upstream to higher application layers.
///
/// ## Errors
///
/// Protocol implementations operate on multiple layers at the same time, expressed in distinct
/// error categories:
///
/// 1. Unexpected behaviour of the remote peer not following the implemented protocol
/// 2. Handling (rare) critical system failures
#[async_trait]
pub trait SyncProtocol<T, 'a>
where
    Self: Send + Sync + Debug,
    T: TopicQuery,
{
    /// Custom identifier for this sync protocol implementation.
    ///
    /// This is currently only used for debugging or logging purposes.
    fn name(&self) -> &'static str;

    /// Initiate a sync protocol session over the provided bi-directional stream for the given
    /// topic query.
    ///
    /// During the "Handshake" phase the "initiator" usually requests access and informs the remote
    /// peer about the "topic query" they are interested in. Implementations for `p2panda-net`
    /// are required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
    /// `app_tx`) during this phase to inform the backend that we've successfully requested access,
    /// exchanged the topic query with the remote peer and are about to begin sync.
    ///
    /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
    /// the actual application data is exchanged with the remote peer. It's left up to each
    /// protocol implementation to decide whether data is exchanged in one or both directions.
    /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
    /// (via `app_tx`).
    ///
    /// In case of a detected failure (either through a critical error on our end or an unexpected
    /// behaviour from the remote peer) a `SyncError` is returned.
    async fn initiate(
        self: Arc<Self>,
        topic_query: T,
        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
        app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
    ) -> Result<(), SyncError>;

    /// Accept a sync protocol session over the provided bi-directional stream.
    ///
    /// During the "Handshake" phase the "acceptor" usually responds to the access request and
    /// learns about the "topic query" from the remote peer. Implementations for `p2panda-net` are
    /// required to send a `SyncFrom::HandshakeSuccess` message to the application layer (via
    /// `app_tx`) during this phase to inform the backend that the topic query has been
    /// successfully received from the remote peer and that data exchange is about to begin.
    ///
    /// After the "Handshake" is complete the protocol enters the "Sync" phase, during which
    /// the actual application data is exchanged with the remote peer. It's left up to each
    /// protocol implementation to decide whether data is exchanged in one or both directions.
    /// Synced data is forwarded to the application layers via the `SyncFrom::Data` message
    /// (via `app_tx`).
    ///
    /// In case of a detected failure (either through a critical error on our end or an unexpected
    /// behaviour from the remote peer) a `SyncError` is returned.
    async fn accept(
        self: Arc<Self>,
        tx: Box<&'a mut (dyn AsyncWrite + Send + Unpin)>,
        rx: Box<&'a mut (dyn AsyncRead + Send + Unpin)>,
        app_tx: Box<&'a mut (dyn Sink<FromSync<T>, Error = SyncError> + Send + Unpin)>,
    ) -> Result<(), SyncError>;
}

/// Messages which can be sent to the higher application layers (for further validation or
/// persistance) and the underlying transport layer (for managing the sync session).
#[derive(Debug, PartialEq)]
pub enum FromSync<T>
where
    T: TopicQuery,
{
    /// During the "Handshake" phase both peers usually manage access control and negotiate the
    /// "topic query" they want to exchange over. This message indicates that this phase has ended.
    ///
    /// Implementations for `p2panda-net` are required to send this message to the underlying
    /// transport layer to inform the "backend" that we've successfully requested access, exchanged
    /// the topic query with the remote peer and are about to begin sync.
    ///
    /// With this information backends can optionally apply optimisations, which might for example
    /// be required to keep application messages in-order (as there might exist other channels the
    /// backend exchanges similar data over at the same time).
    HandshakeSuccess(T),

    /// Application data we've received during the sync session from the remote peer and want to
    /// forward to higher application layers.
    ///
    /// These "frontends" might further process, decrypt payloads, sort messages or apply more
    /// validation before they get finally persisted or rendered to the user. At this point the
    /// sync protocol is merely "forwarding" it without any knowledge of how the data is used.
    Data {
        /// Exchanged data from sync session.
        ///
        /// Some data-types might be designed with "off-chain" use in mind, where a "header" is
        /// crucial for integrity and authenticity but the actual "payload" is optional or
        /// requested lazily in a later process.
        header: Vec<u8>,

        /// Optional "body" which can represent "off-chain" application data.
        ///
        /// This is useful for realising "off-chain" compatible data types. Implementations without
        /// this distinction will always leave this field as `None` and only encode their data
        /// types in the `header` field.
        payload: Option<Vec<u8>>,
    },
}

/// Errors which can occur during sync sessions.
///
/// 1. Critical system failures (ie. bug in p2panda code or sync implementation, sync
///    implementation did not follow "2. Phase Flow" requirements, lack of system resources, etc.)
/// 2. Unexpected Behaviour (ie. remote peer abruptly disconnected, error which got correctly
///    caught in sync implementation, etc.)
#[derive(Debug, PartialEq, Error)]
pub enum SyncError {
    /// Error due to unexpected (buggy or malicious) behaviour of the remote peer.
    ///
    /// Indicates that the sync protocol was not correctly followed, for example due to unexpected
    /// or missing messages, etc.
    ///
    /// Can be used by a backend to re-attempt syncing with this peer or down-grading it in
    /// priority, potentially deny-listing if communication failed too often.
    #[error("sync session failed due to unexpected protocol behaviour of remote peer: {0}")]
    UnexpectedBehaviour(String),

    /// Error due to invalid encoding of a message sent by remote peer.
    ///
    /// Note that this error is intended for receiving messages from _remote_ peers which we can't
    /// decode properly. If we fail with encoding our _own_ messages we should rather consider this
    /// an `Critical` error type, as it likely means that there's a buggy implementation.
    #[error("sync session failed due to invalid encoding of message sent by remote peer: {0}")]
    InvalidEncoding(String),

    /// Critical error due to system failure on our end.
    ///
    /// This indicates that our system is running out of resources (storage layer failure etc.) or
    /// we have a buggy implementation.
    #[error("sync session failed due critical system error: {0}")]
    Critical(String),
}

/// Converts critical I/O error (which occurs during codec stream handling) into [`SyncError`].
///
/// This is usually a critical system failure indicating an implementation bug or lacking resources
/// on the user's machine.
///
/// See `Encoder` or `Decoder` `Error` trait type in tokio's codec for more information:
/// <https://docs.rs/tokio-util/latest/tokio_util/codec/trait.Decoder.html#associatedtype.Error>
impl From<std::io::Error> for SyncError {
    fn from(err: std::io::Error) -> Self {
        match err.kind() {
            // Broken pipes usually indicate that the remote peer closed the connection
            // unexpectedly, this is why we're not treating it as a critical error but as
            // "unexpected behaviour" instead.
            std::io::ErrorKind::BrokenPipe => Self::UnexpectedBehaviour("broken pipe".into()),
            _ => Self::Critical(format!("internal i/o stream error {err}")),
        }
    }
}

/// Identify the particular dataset a peer is interested in syncing.
///
/// Exactly how this is expressed is left up to the user to decide. During sync the "initiator"
/// sends their topic query to a remote peer where it is be mapped to their local dataset.
/// Additional access-control checks can be performed. Once this "handshake" is complete both
/// peers will proceed with the designated sync protocol.
///
/// ## `TopicId` vs `TopicQuery`
///
/// While `TopicId` is merely a 32-byte identifier which can't hold much information other than
/// being a distinct identifier of a single data item or collection of them, we can use `TopicQuery` to
/// implement custom data types representing "queries" for very specific data items. Peers can for
/// example announce that they'd like "all events from the 27th of September 23 until today" with
/// `TopicQuery`.
///
/// Consult the `TopicId` documentation in `p2panda-net` for more information.
pub trait TopicQuery:
    // Data types implementing `TopicQuery` also need to implement `Eq` and `Hash` in order to allow 
    // backends to organise sync sessions per topic query and peer, along with `Serialize` and 
    // `Deserialize` to allow sending topics over the wire.
    Clone + Debug + Eq + Hash + Send + Sync + Serialize + for<'a> Deserialize<'a>
{
}

/// Maps a `TopicQuery` to the related data being sent over the wire during sync.
///
/// Each `SyncProtocol` implementation defines the type of data it is expecting to sync and how
/// the scope for a particular session should be identified. Sync protocol users can provide an
/// implementation of `TopicMap` so that scope `S` for data can be retrieved for a specific topic
/// query `T` when a peer initiates or accepts a sync session.
///
/// Since `TopicMap` is generic we can use the same mapping across different sync implementations
/// for the same data type when necessary.
///
/// ## Designing `TopicMap` for applications
///
/// Considering an example chat application which is based on append-only log data types, we
/// probably want to organise messages from an author for a certain chat group into one log each.
/// Like this, a chat group can be expressed as a collection of one to potentially many logs (one
/// per member of the group):
///
/// ```text
/// All authors: A, B and C
/// All chat groups: 1 and 2
///
/// "Chat group 1 with members A and B"
/// - Log A1
/// - Log B1
///
/// "Chat group 2 with members A, B and C"
/// - Log A2
/// - Log B2
/// - Log C2
/// ```
///
/// If we implement `TopicQuery` to express that we're interested in syncing over a specific chat
/// group, for example "Chat Group 2" we would implement `TopicMap` to give us all append-only
/// logs of all members inside this group, that is the entries inside logs `A2`, `B2` and `C2`.
#[async_trait]
pub trait TopicMap<T, S>: Debug + Send + Sync
where
    T: TopicQuery,
{
    async fn get(&self, topic: &T) -> Option<S>;
}