Skip to main content

commonware_p2p/utils/
codec.rs

1//! Codec wrapper for [Sender] and [Receiver].
2
3use crate::{CheckedSender, Receiver, Recipients, Sender};
4use commonware_codec::{Codec, Error};
5use std::time::SystemTime;
6
7/// Wrap a [Sender] and [Receiver] with some [Codec].
8pub const fn wrap<S: Sender, R: Receiver, V: Codec>(
9    config: V::Cfg,
10    sender: S,
11    receiver: R,
12) -> (WrappedSender<S, V>, WrappedReceiver<R, V>) {
13    (
14        WrappedSender::new(sender),
15        WrappedReceiver::new(config, receiver),
16    )
17}
18
19/// Tuple representing a message received from a given public key.
20pub type WrappedMessage<P, V> = (P, Result<V, Error>);
21
22/// Wrapper around a [Sender] that encodes messages using a [Codec].
23#[derive(Clone)]
24pub struct WrappedSender<S: Sender, V: Codec> {
25    sender: S,
26    _phantom_v: std::marker::PhantomData<V>,
27}
28
29impl<S: Sender, V: Codec> WrappedSender<S, V> {
30    /// Create a new [WrappedSender] with the given [Sender].
31    pub const fn new(sender: S) -> Self {
32        Self {
33            sender,
34            _phantom_v: std::marker::PhantomData,
35        }
36    }
37
38    /// Send a message to a set of recipients.
39    pub async fn send(
40        &mut self,
41        recipients: Recipients<S::PublicKey>,
42        message: V,
43        priority: bool,
44    ) -> Result<Vec<S::PublicKey>, <S::Checked<'_> as CheckedSender>::Error> {
45        let encoded = message.encode();
46        self.sender.send(recipients, encoded, priority).await
47    }
48
49    /// Check if a message can be sent to a set of recipients, returning a [CheckedWrappedSender]
50    /// or the time at which the send can be retried.
51    pub async fn check(
52        &mut self,
53        recipients: Recipients<S::PublicKey>,
54    ) -> Result<CheckedWrappedSender<'_, S, V>, SystemTime> {
55        self.sender
56            .check(recipients)
57            .await
58            .map(|checked| CheckedWrappedSender {
59                sender: checked,
60                _phantom_v: std::marker::PhantomData,
61            })
62    }
63}
64
65/// Checked sender that wraps a [`crate::LimitedSender::Checked`] and encodes messages using a [Codec].
66#[derive(Debug)]
67pub struct CheckedWrappedSender<'a, S: Sender, V: Codec> {
68    sender: S::Checked<'a>,
69    _phantom_v: std::marker::PhantomData<V>,
70}
71
72impl<'a, S: Sender, V: Codec> CheckedWrappedSender<'a, S, V> {
73    pub async fn send(
74        self,
75        message: V,
76        priority: bool,
77    ) -> Result<Vec<S::PublicKey>, <S::Checked<'a> as CheckedSender>::Error> {
78        let encoded = message.encode();
79        self.sender.send(encoded, priority).await
80    }
81}
82
83/// Wrapper around a [Receiver] that decodes messages using a [Codec].
84pub struct WrappedReceiver<R: Receiver, V: Codec> {
85    config: V::Cfg,
86    receiver: R,
87}
88
89impl<R: Receiver, V: Codec> WrappedReceiver<R, V> {
90    /// Create a new [WrappedReceiver] with the given [Receiver].
91    pub const fn new(config: V::Cfg, receiver: R) -> Self {
92        Self { config, receiver }
93    }
94
95    /// Receive a message from an arbitrary recipient.
96    pub async fn recv(&mut self) -> Result<WrappedMessage<R::PublicKey, V>, R::Error> {
97        let (pk, bytes) = self.receiver.recv().await?;
98        let decoded = match V::decode_cfg(bytes.as_ref(), &self.config) {
99            Ok(decoded) => decoded,
100            Err(e) => {
101                return Ok((pk, Err(e)));
102            }
103        };
104        Ok((pk, Ok(decoded)))
105    }
106}