use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures_util::{Stream, StreamExt, ready};
use p2panda_core::cbor::{DecodeError, EncodeError, decode_cbor, encode_cbor};
use p2panda_core::timestamp::{HybridTimestamp, LamportTimestamp, Timestamp};
use p2panda_core::{Signature, SigningKey, Topic, VerifyingKey};
use p2panda_net::gossip::{GossipHandle, GossipSubscription};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::warn;
use crate::forge::{Forge, OperationForge};
const MESSAGE_VERSION: u64 = 1;
#[derive(Clone, Debug, PartialEq, Eq)]
struct WrappedMessage<M> {
version: u64,
verifying_key: VerifyingKey,
signature: Signature,
timestamp: HybridTimestamp,
body: M,
}
impl<M> WrappedMessage<M>
where
M: Serialize + for<'a> Deserialize<'a>,
{
pub fn new(
body: M,
timestamp: HybridTimestamp,
signing_key: &SigningKey,
) -> Result<Self, EncodeError> {
let verifying_key = signing_key.verifying_key();
let signature = Self::sign(signing_key, verifying_key, timestamp, &body)?;
Ok(Self {
version: MESSAGE_VERSION,
verifying_key,
signature,
timestamp,
body,
})
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, WrappedMessageError> {
let (version, verifying_key, signature, timestamp, logical, body): (
u64,
VerifyingKey,
Signature,
Timestamp,
LamportTimestamp,
M,
) = decode_cbor(bytes)?;
let timestamp = HybridTimestamp::from_parts(timestamp, logical);
if version != MESSAGE_VERSION {
return Err(WrappedMessageError::UnsupportedVersion(version));
}
let message = Self {
version,
verifying_key,
signature,
timestamp,
body,
};
message.verify()?;
Ok(message)
}
pub fn to_bytes(&self) -> Result<Vec<u8>, EncodeError> {
let (timestamp, logical) = self.timestamp.to_parts();
let message = (
self.version,
self.verifying_key,
self.signature,
timestamp,
logical,
&self.body,
);
let bytes = encode_cbor(&message)?;
Ok(bytes)
}
pub fn verify(&self) -> Result<(), WrappedMessageError> {
let (timestamp, logical) = self.timestamp.to_parts();
let message = (
self.version,
self.verifying_key,
timestamp,
logical,
&self.body,
);
let bytes = encode_cbor(&message).map_err(|_| WrappedMessageError::InvalidSignature)?;
if !self.verifying_key.verify(&bytes, &self.signature) {
return Err(WrappedMessageError::InvalidSignature);
}
Ok(())
}
fn sign(
signing_key: &SigningKey,
verifying_key: VerifyingKey,
timestamp: HybridTimestamp,
body: &M,
) -> Result<Signature, EncodeError> {
let (timestamp, logical) = timestamp.to_parts();
let message = (MESSAGE_VERSION, verifying_key, timestamp, logical, body);
let bytes = encode_cbor(&message)?;
Ok(signing_key.sign(&bytes))
}
}
#[derive(Debug, Error)]
enum WrappedMessageError {
#[error("unsupported message version {0}")]
UnsupportedVersion(u64),
#[error("invalid message encoding: {0}")]
InvalidEncoding(#[from] DecodeError),
#[error("invalid message signature")]
InvalidSignature,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EphemeralMessage<M> {
topic: Topic,
inner: WrappedMessage<M>,
}
impl<M> EphemeralMessage<M>
where
M: Serialize + for<'a> Deserialize<'a>,
{
pub fn topic(&self) -> Topic {
self.topic
}
pub fn author(&self) -> VerifyingKey {
self.inner.verifying_key
}
pub fn timestamp(&self) -> u64 {
let (timestamp, _logical) = self.inner.timestamp.to_parts();
timestamp.into()
}
pub fn body(&self) -> &M {
&self.inner.body
}
}
pub(crate) fn ephemeral_stream<M>(
topic: Topic,
forge: OperationForge,
handle: GossipHandle,
) -> (EphemeralStreamPublisher<M>, EphemeralStreamSubscription<M>) {
let subscription = handle.subscribe();
let tx = EphemeralStreamPublisher {
topic,
forge,
inner: handle,
timestamp: Arc::new(Mutex::new(HybridTimestamp::now())),
_marker: PhantomData,
};
let rx = EphemeralStreamSubscription {
topic,
inner: subscription,
_marker: PhantomData,
};
(tx, rx)
}
#[derive(Clone, Debug)]
pub struct EphemeralStreamPublisher<M> {
topic: Topic,
forge: OperationForge,
inner: GossipHandle,
timestamp: Arc<Mutex<HybridTimestamp>>,
_marker: PhantomData<M>,
}
impl<M> EphemeralStreamPublisher<M>
where
M: Serialize + for<'a> Deserialize<'a>,
{
pub fn topic(&self) -> Topic {
self.topic
}
pub async fn publish(&self, message: M) -> Result<(), EphemeralPublishError> {
let timestamp = {
let mut timestamp = self
.timestamp
.lock()
.expect("lock poisoned by another thread");
*timestamp = timestamp.increment();
*timestamp
};
let bytes = {
let wrapped = WrappedMessage::new(message, timestamp, self.forge.signing_key())?;
wrapped.to_bytes()?
};
self.inner
.publish(bytes)
.await
.map_err(|_err| EphemeralPublishError::BrokenChannel)?;
Ok(())
}
}
#[derive(Debug, Error)]
pub enum EphemeralPublishError {
#[error("critical encoding error: {0}")]
Encode(#[from] EncodeError),
#[error("error in internal gossip actor occurred")]
BrokenChannel,
}
#[pin_project]
pub struct EphemeralStreamSubscription<M> {
topic: Topic,
#[pin]
inner: GossipSubscription,
_marker: PhantomData<M>,
}
impl<M> EphemeralStreamSubscription<M>
where
M: Serialize + for<'a> Deserialize<'a>,
{
pub fn topic(&self) -> Topic {
self.topic
}
}
impl<M> Stream for EphemeralStreamSubscription<M>
where
M: Serialize + for<'a> Deserialize<'a>,
{
type Item = EphemeralMessage<M>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.inner.poll_next_unpin(cx)) {
Some(Ok(bytes)) => match WrappedMessage::from_bytes(&bytes) {
Ok(wrapped) => Poll::Ready(Some(EphemeralMessage {
topic: self.topic,
inner: wrapped,
})),
Err(err) => {
warn!("invalid ephemeral message received: {err}");
Poll::Pending
}
},
Some(Err(_)) => Poll::Pending,
None => Poll::Ready(None),
}
}
}
#[cfg(test)]
mod tests {
use p2panda_core::SigningKey;
use p2panda_core::timestamp::HybridTimestamp;
use super::WrappedMessage;
#[test]
fn encoding() {
let signing_key = SigningKey::generate();
let timestamp = HybridTimestamp::now();
let message_1 = WrappedMessage::new(
"This message is signed!".to_string(),
timestamp,
&signing_key,
)
.unwrap();
let bytes = message_1.to_bytes().unwrap();
let message_2 = WrappedMessage::from_bytes(&bytes).unwrap();
assert_eq!(message_1, message_2);
}
#[test]
fn signatures() {
let signing_key = SigningKey::generate();
let timestamp = HybridTimestamp::now();
let message = WrappedMessage::new(
"This message is signed!".to_string(),
timestamp,
&signing_key,
)
.unwrap();
assert!(message.verify().is_ok());
}
}