commonware_p2p/utils/
codec.rs

1//! Codec wrapper for [Sender] and [Receiver].
2
3use crate::{Receiver, Recipients, Sender};
4use bytes::Bytes;
5use commonware_codec::{Codec, Error};
6
7/// Wrap a [Sender] and [Receiver] with some [Codec].
8pub fn wrap<S: Sender, R: Receiver, V: Codec>(
9    config: V::Cfg,
10    sender: S,
11    receiver: R,
12) -> (WrappedSender<S, V>, WrappedReceiver<R, V>) {
13    (
14        WrappedSender::new(sender),
15        WrappedReceiver::new(config, receiver),
16    )
17}
18
19/// Tuple representing a message received from a given public key.
20pub type WrappedMessage<P, V> = (P, Result<V, Error>);
21
22/// Wrapper around a [Sender] that encodes messages using a [Codec].
23#[derive(Clone)]
24pub struct WrappedSender<S: Sender, V: Codec> {
25    sender: S,
26
27    _phantom_v: std::marker::PhantomData<V>,
28}
29
30impl<S: Sender, V: Codec> WrappedSender<S, V> {
31    /// Create a new [WrappedSender] with the given [Sender].
32    pub fn new(sender: S) -> Self {
33        Self {
34            sender,
35            _phantom_v: std::marker::PhantomData,
36        }
37    }
38
39    /// Send a message to a set of recipients.
40    pub async fn send(
41        &mut self,
42        recipients: Recipients<S::PublicKey>,
43        message: V,
44        priority: bool,
45    ) -> Result<Vec<S::PublicKey>, S::Error> {
46        let encoded = message.encode();
47        self.sender
48            .send(recipients, Bytes::from(encoded), priority)
49            .await
50    }
51}
52
53/// Wrapper around a [Receiver] that decodes messages using a [Codec].
54pub struct WrappedReceiver<R: Receiver, V: Codec> {
55    config: V::Cfg,
56    receiver: R,
57
58    _phantom_v: std::marker::PhantomData<V>,
59}
60
61impl<R: Receiver, V: Codec> WrappedReceiver<R, V> {
62    /// Create a new [WrappedReceiver] with the given [Receiver].
63    pub fn new(config: V::Cfg, receiver: R) -> Self {
64        Self {
65            config,
66            receiver,
67            _phantom_v: std::marker::PhantomData,
68        }
69    }
70
71    /// Receive a message from an arbitrary recipient.
72    pub async fn recv(&mut self) -> Result<WrappedMessage<R::PublicKey, V>, R::Error> {
73        let (pk, bytes) = self.receiver.recv().await?;
74        let decoded = match V::decode_cfg(bytes.as_ref(), &self.config) {
75            Ok(decoded) => decoded,
76            Err(e) => {
77                return Ok((pk, Err(e)));
78            }
79        };
80        Ok((pk, Ok(decoded)))
81    }
82}