p2panda 0.6.1

Out-of-the-box p2panda Node API for application developers
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
// SPDX-License-Identifier: MIT OR Apache-2.0

use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use futures_util::{Stream, StreamExt, ready};
use p2panda_core::cbor::{DecodeError, EncodeError, decode_cbor, encode_cbor};
use p2panda_core::timestamp::{HybridTimestamp, LamportTimestamp, Timestamp};
use p2panda_core::{Signature, SigningKey, Topic, VerifyingKey};
use p2panda_net::gossip::{GossipHandle, GossipSubscription};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::warn;

use crate::forge::{Forge, OperationForge};

/// Message specification version to create and encode messages for ephemeral streams.
const MESSAGE_VERSION: u64 = 1;

/// Message being disseminated to other nodes via gossip.
///
/// They can be seen as a wrapper around the application's message payloads, providing integrity
/// and provenance guarantees, plus making sure each message is unique with the help of a
/// timestamp.
///
/// Messages are represented as a tuple and are CBOR-encoded as follows:
///
/// ```plain
/// (
///    version[u64],
///    verifying_key[32 bytes],
///    signature[64 bytes],
///    timestamp[u64],
///    lamport_timestamp[u64],
///    body[bytes],
/// )
/// ```
#[derive(Clone, Debug, PartialEq, Eq)]
struct WrappedMessage<M> {
    version: u64,
    verifying_key: VerifyingKey,
    signature: Signature,
    timestamp: HybridTimestamp,
    body: M,
}

impl<M> WrappedMessage<M>
where
    M: Serialize + for<'a> Deserialize<'a>,
{
    pub fn new(
        body: M,
        timestamp: HybridTimestamp,
        signing_key: &SigningKey,
    ) -> Result<Self, EncodeError> {
        let verifying_key = signing_key.verifying_key();
        let signature = Self::sign(signing_key, verifying_key, timestamp, &body)?;

        Ok(Self {
            version: MESSAGE_VERSION,
            verifying_key,
            signature,
            timestamp,
            body,
        })
    }

    pub fn from_bytes(bytes: &[u8]) -> Result<Self, WrappedMessageError> {
        // Attempt deserializing message tuple. This fails if the encoding is invalid.
        let (version, verifying_key, signature, timestamp, logical, body): (
            u64,
            VerifyingKey,
            Signature,
            Timestamp,
            LamportTimestamp,
            M,
        ) = decode_cbor(bytes)?;

        let timestamp = HybridTimestamp::from_parts(timestamp, logical);

        // Check supported message version.
        if version != MESSAGE_VERSION {
            return Err(WrappedMessageError::UnsupportedVersion(version));
        }

        let message = Self {
            version,
            verifying_key,
            signature,
            timestamp,
            body,
        };

        // Check message integrity and provenance.
        message.verify()?;

        Ok(message)
    }

    pub fn to_bytes(&self) -> Result<Vec<u8>, EncodeError> {
        let (timestamp, logical) = self.timestamp.to_parts();
        let message = (
            self.version,
            self.verifying_key,
            self.signature,
            timestamp,
            logical,
            &self.body,
        );
        let bytes = encode_cbor(&message)?;
        Ok(bytes)
    }

    pub fn verify(&self) -> Result<(), WrappedMessageError> {
        let (timestamp, logical) = self.timestamp.to_parts();
        let message = (
            self.version,
            self.verifying_key,
            timestamp,
            logical,
            &self.body,
        );

        // Treat encoding error for verifying the signature as an "invalid signature" since
        // the data came from a remote (potentially malicious) node.
        let bytes = encode_cbor(&message).map_err(|_| WrappedMessageError::InvalidSignature)?;

        if !self.verifying_key.verify(&bytes, &self.signature) {
            return Err(WrappedMessageError::InvalidSignature);
        }

        Ok(())
    }

    fn sign(
        signing_key: &SigningKey,
        verifying_key: VerifyingKey,
        timestamp: HybridTimestamp,
        body: &M,
    ) -> Result<Signature, EncodeError> {
        let (timestamp, logical) = timestamp.to_parts();
        let message = (MESSAGE_VERSION, verifying_key, timestamp, logical, body);
        let bytes = encode_cbor(&message)?;
        Ok(signing_key.sign(&bytes))
    }
}

#[derive(Debug, Error)]
enum WrappedMessageError {
    #[error("unsupported message version {0}")]
    UnsupportedVersion(u64),

    #[error("invalid message encoding: {0}")]
    InvalidEncoding(#[from] DecodeError),

    #[error("invalid message signature")]
    InvalidSignature,
}

/// Message coming from an ephemeral stream subscription.
///
/// Ephemeral messages are verified (for integrity and provenance) but not persisted on a system
/// layer. They contain the application's message payloads as well as public key of the author and
/// timestamp at which they were sent.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EphemeralMessage<M> {
    topic: Topic,
    inner: WrappedMessage<M>,
}

impl<M> EphemeralMessage<M>
where
    M: Serialize + for<'a> Deserialize<'a>,
{
    /// Associated topic.
    pub fn topic(&self) -> Topic {
        self.topic
    }

    /// Verified author.
    pub fn author(&self) -> VerifyingKey {
        self.inner.verifying_key
    }

    /// Timestamp when this operation was created.
    ///
    /// Microseconds since the UNIX epoch based on system time.
    pub fn timestamp(&self) -> u64 {
        // Only return the wall-clock time to the user as this is the interesting bit, the logical
        // lamport timestamp helps internally with keeping messages unique.
        let (timestamp, _logical) = self.inner.timestamp.to_parts();
        timestamp.into()
    }

    /// Application message payload.
    pub fn body(&self) -> &M {
        &self.inner.body
    }
}

/// Returns publish and subscribe halfs of an ephemeral messaging stream for a given topic.
pub(crate) fn ephemeral_stream<M>(
    topic: Topic,
    forge: OperationForge,
    handle: GossipHandle,
) -> (EphemeralStreamPublisher<M>, EphemeralStreamSubscription<M>) {
    let subscription = handle.subscribe();

    let tx = EphemeralStreamPublisher {
        topic,
        forge,
        inner: handle,
        timestamp: Arc::new(Mutex::new(HybridTimestamp::now())),
        _marker: PhantomData,
    };

    let rx = EphemeralStreamSubscription {
        topic,
        inner: subscription,
        _marker: PhantomData,
    };

    (tx, rx)
}

/// Publish messages into an ephemeral topic stream.
///
/// Any message type `M` can be published as long as it can be encoded into bytes by implementing
/// serde's [`Serialize`] and [`Deserialize`] traits.
///
/// Only currently reachable and subscribed peers will receive published messages.
///
/// ## Example
///
/// ```rust
/// # use p2panda_core::Topic;
/// # use serde::{Serialize, Deserialize};
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let node = p2panda::builder().spawn().await?;
/// #
/// let now_playing = Topic::random();
///
/// #[derive(Clone, Debug, Serialize, Deserialize)]
/// struct PlaylistItem {
///     artist: String,
///     song_name: String,
/// }
///
/// let (tx, _rx) = node.ephemeral_stream::<PlaylistItem>(now_playing).await?;
///
/// tx.publish(PlaylistItem {
///     artist: "Richard Cheese".into(),
///     song_name: "Panda".into(),
/// }).await?;
/// #
/// # Ok(())
/// # }
/// ```
#[derive(Clone, Debug)]
pub struct EphemeralStreamPublisher<M> {
    topic: Topic,
    forge: OperationForge,
    inner: GossipHandle,
    timestamp: Arc<Mutex<HybridTimestamp>>,
    _marker: PhantomData<M>,
}

impl<M> EphemeralStreamPublisher<M>
where
    M: Serialize + for<'a> Deserialize<'a>,
{
    /// Associated topic.
    pub fn topic(&self) -> Topic {
        self.topic
    }

    /// Publish a message into an ephemeral topic stream.
    ///
    /// Only currently reachable and subscribed peers will receive published messages.
    pub async fn publish(&self, message: M) -> Result<(), EphemeralPublishError> {
        // The PlumTree implementation for the gossip overlay used by p2panda-net ignores duplicate
        // messages to avoid flooding the network. This can lead to surprises by the users as they
        // expect messages to still arrive, not noticing it's because of a duplicate payload.
        //
        // To help with this we're using a microsecond-precision timestamp + lamport logical clock
        // serving somewhat as a nonce, making sure that every message is guaranteed to be unique.
        let timestamp = {
            let mut timestamp = self
                .timestamp
                .lock()
                .expect("lock poisoned by another thread");
            *timestamp = timestamp.increment();
            *timestamp
        };

        let bytes = {
            let wrapped = WrappedMessage::new(message, timestamp, self.forge.signing_key())?;
            wrapped.to_bytes()?
        };

        self.inner
            .publish(bytes)
            .await
            .map_err(|_err| EphemeralPublishError::BrokenChannel)?;

        Ok(())
    }
}

/// Error occurred when publishing a message to ephemeral topic stream.
#[derive(Debug, Error)]
pub enum EphemeralPublishError {
    /// If this error occurs probably something is wrong with the system.
    #[error("critical encoding error: {0}")]
    Encode(#[from] EncodeError),

    /// Broken / closed communication channel with the internal gossip actor in `p2panda-net`. This
    /// can be due to the actor crashing.
    ///
    /// Users may re-attempt sending the message in case the actor restarted later.
    #[error("error in internal gossip actor occurred")]
    BrokenChannel,
}

/// Subscription to messages arriving from an ephemeral topic stream.
///
/// ## Example
///
/// ```no_run
/// use futures_util::StreamExt;
/// use p2panda_core::Topic;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let node = p2panda::spawn().await?;
/// let topic = Topic::random();
///
/// let (_tx, mut rx) = node.ephemeral_stream::<String>(topic).await?;
///
/// while let Some(stream_event) = rx.next().await {
///     // .. react to received messages
/// }
/// #
/// # Ok(())
/// # }
/// ```
#[pin_project]
pub struct EphemeralStreamSubscription<M> {
    topic: Topic,
    #[pin]
    inner: GossipSubscription,
    _marker: PhantomData<M>,
}

impl<M> EphemeralStreamSubscription<M>
where
    M: Serialize + for<'a> Deserialize<'a>,
{
    /// Associated topic.
    pub fn topic(&self) -> Topic {
        self.topic
    }
}

impl<M> Stream for EphemeralStreamSubscription<M>
where
    M: Serialize + for<'a> Deserialize<'a>,
{
    type Item = EphemeralMessage<M>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match ready!(self.inner.poll_next_unpin(cx)) {
            // Check encoding & supported version and signature during deserialisation.
            Some(Ok(bytes)) => match WrappedMessage::from_bytes(&bytes) {
                Ok(wrapped) => Poll::Ready(Some(EphemeralMessage {
                    topic: self.topic,
                    inner: wrapped,
                })),
                Err(err) => {
                    // Don't bother users with invalid wrapped messages as this type is not public.
                    // Instead we log a warning, in case this reveals a buggy implementation, etc.
                    warn!("invalid ephemeral message received: {err}");
                    Poll::Pending
                }
            },
            // Ignore internal broadcast channel error, this only indicates that the channel
            // dropped a message which we can't do much about on this layer anymore. In the future
            // we want to remove this error type altogether.
            //
            // Related issue: https://github.com/p2panda/p2panda/issues/959
            Some(Err(_)) => Poll::Pending,
            // Internal stream seized.
            None => Poll::Ready(None),
        }
    }
}

#[cfg(test)]
mod tests {
    use p2panda_core::SigningKey;
    use p2panda_core::timestamp::HybridTimestamp;

    use super::WrappedMessage;

    #[test]
    fn encoding() {
        let signing_key = SigningKey::generate();
        let timestamp = HybridTimestamp::now();

        let message_1 = WrappedMessage::new(
            "This message is signed!".to_string(),
            timestamp,
            &signing_key,
        )
        .unwrap();

        let bytes = message_1.to_bytes().unwrap();
        let message_2 = WrappedMessage::from_bytes(&bytes).unwrap();

        assert_eq!(message_1, message_2);
    }

    #[test]
    fn signatures() {
        let signing_key = SigningKey::generate();
        let timestamp = HybridTimestamp::now();

        let message = WrappedMessage::new(
            "This message is signed!".to_string(),
            timestamp,
            &signing_key,
        )
        .unwrap();

        assert!(message.verify().is_ok());
    }
}