Skip to main content

commonware_consensus/marshal/coding/shards/
mailbox.rs

1//! Mailbox for the shard buffer engine.
2
3use crate::{
4    marshal::coding::types::CodedBlock,
5    types::{coding::Commitment, Round},
6    CertifiableBlock,
7};
8use commonware_coding::Scheme as CodingScheme;
9use commonware_cryptography::{Hasher, PublicKey};
10use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
11use std::sync::Arc;
12
13/// A message that can be sent to the coding [`Engine`].
14///
15/// [`Engine`]: super::Engine
16pub enum Message<B, C, H, P>
17where
18    B: CertifiableBlock,
19    C: CodingScheme,
20    H: Hasher,
21    P: PublicKey,
22{
23    /// A request to broadcast a proposed [`CodedBlock`] to all peers.
24    Proposed {
25        /// The erasure coded block.
26        block: CodedBlock<B, C, H>,
27        /// The round in which the block was proposed.
28        round: Round,
29    },
30    /// A notification from consensus that a [`Commitment`] has been discovered.
31    Discovered {
32        /// The [`Commitment`] of the proposed block.
33        commitment: Commitment,
34        /// The leader's public key.
35        leader: P,
36        /// The round in which the commitment was proposed.
37        round: Round,
38    },
39    /// A request to get a reconstructed block, if available.
40    GetByCommitment {
41        /// The [`Commitment`] of the block to get.
42        commitment: Commitment,
43        /// The response channel.
44        response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
45    },
46    /// A request to get a reconstructed block by its digest, if available.
47    GetByDigest {
48        /// The digest of the block to get.
49        digest: B::Digest,
50        /// The response channel.
51        response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
52    },
53    /// A request to open a subscription for the receipt of our valid shard from
54    /// the leader.
55    SubscribeShard {
56        /// The block's commitment.
57        commitment: Commitment,
58        /// The response channel.
59        response: oneshot::Sender<()>,
60    },
61    /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
62    /// by its [`Commitment`].
63    SubscribeByCommitment {
64        /// The block's digest.
65        commitment: Commitment,
66        /// The response channel.
67        response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
68    },
69    /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
70    /// by its digest.
71    SubscribeByDigest {
72        /// The block's digest.
73        digest: B::Digest,
74        /// The response channel.
75        response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
76    },
77    /// A request to prune all caches at and below the given commitment.
78    Prune {
79        /// Inclusive prune target [`Commitment`].
80        through: Commitment,
81    },
82}
83
84/// A mailbox for sending messages to the [`Engine`].
85///
86/// [`Engine`]: super::Engine
87#[derive(Clone)]
88pub struct Mailbox<B, C, H, P>
89where
90    B: CertifiableBlock,
91    C: CodingScheme,
92    H: Hasher,
93    P: PublicKey,
94{
95    pub(super) sender: mpsc::Sender<Message<B, C, H, P>>,
96}
97
98impl<B, C, H, P> Mailbox<B, C, H, P>
99where
100    B: CertifiableBlock,
101    C: CodingScheme,
102    H: Hasher,
103    P: PublicKey,
104{
105    /// Create a new [`Mailbox`] with the given sender.
106    pub const fn new(sender: mpsc::Sender<Message<B, C, H, P>>) -> Self {
107        Self { sender }
108    }
109
110    /// Broadcast a proposed erasure coded block's shards to the participants.
111    pub async fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
112        let msg = Message::Proposed { block, round };
113        self.sender.send_lossy(msg).await;
114    }
115
116    /// Inform the engine of an externally proposed [`Commitment`].
117    pub async fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
118        let msg = Message::Discovered {
119            commitment,
120            leader,
121            round,
122        };
123        self.sender.send_lossy(msg).await;
124    }
125
126    /// Request a reconstructed block by its [`Commitment`].
127    pub async fn get(&self, commitment: Commitment) -> Option<Arc<CodedBlock<B, C, H>>> {
128        self.sender
129            .request(|tx| Message::GetByCommitment {
130                commitment,
131                response: tx,
132            })
133            .await
134            .flatten()
135    }
136
137    /// Request a reconstructed block by its digest.
138    pub async fn get_by_digest(&self, digest: B::Digest) -> Option<Arc<CodedBlock<B, C, H>>> {
139        self.sender
140            .request(|tx| Message::GetByDigest {
141                digest,
142                response: tx,
143            })
144            .await
145            .flatten()
146    }
147
148    /// Subscribe to the receipt of our valid shard from the leader.
149    pub async fn subscribe_shard(&self, commitment: Commitment) -> oneshot::Receiver<()> {
150        let (responder, receiver) = oneshot::channel();
151        let msg = Message::SubscribeShard {
152            commitment,
153            response: responder,
154        };
155        self.sender.send_lossy(msg).await;
156        receiver
157    }
158
159    /// Subscribe to the reconstruction of a [`CodedBlock`] by its [`Commitment`].
160    pub async fn subscribe(
161        &self,
162        commitment: Commitment,
163    ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
164        let (responder, receiver) = oneshot::channel();
165        let msg = Message::SubscribeByCommitment {
166            commitment,
167            response: responder,
168        };
169        self.sender.send_lossy(msg).await;
170        receiver
171    }
172
173    /// Subscribe to the reconstruction of a [`CodedBlock`] by its digest.
174    pub async fn subscribe_by_digest(
175        &self,
176        digest: B::Digest,
177    ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
178        let (responder, receiver) = oneshot::channel();
179        let msg = Message::SubscribeByDigest {
180            digest,
181            response: responder,
182        };
183        self.sender.send_lossy(msg).await;
184        receiver
185    }
186
187    /// Request to prune all caches at and below the given commitment.
188    pub async fn prune(&self, through: Commitment) {
189        let msg = Message::Prune { through };
190        self.sender.send_lossy(msg).await;
191    }
192}