commonware_consensus/marshal/ingress/
mailbox.rs

1use crate::{
2    simplex::{
3        signing_scheme::Scheme,
4        types::{Activity, Finalization, Notarization},
5    },
6    types::Round,
7    Block, Reporter,
8};
9use commonware_cryptography::Digest;
10use commonware_storage::archive;
11use futures::{
12    channel::{mpsc, oneshot},
13    SinkExt,
14};
15use tracing::error;
16
17/// An identifier for a block request.
18pub enum Identifier<D: Digest> {
19    /// The height of the block to retrieve.
20    Height(u64),
21    /// The commitment of the block to retrieve.
22    Commitment(D),
23    /// The highest finalized block. It may be the case that marshal does not have some of the
24    /// blocks below this height.
25    Latest,
26}
27
28// Allows using u64 directly for convenience.
29impl<D: Digest> From<u64> for Identifier<D> {
30    fn from(src: u64) -> Self {
31        Self::Height(src)
32    }
33}
34
35// Allows using &Digest directly for convenience.
36impl<D: Digest> From<&D> for Identifier<D> {
37    fn from(src: &D) -> Self {
38        Self::Commitment(*src)
39    }
40}
41
42// Allows using archive identifiers directly for convenience.
43impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
44    fn from(src: archive::Identifier<'_, D>) -> Self {
45        match src {
46            archive::Identifier::Index(index) => Self::Height(index),
47            archive::Identifier::Key(key) => Self::Commitment(*key),
48        }
49    }
50}
51
52/// Messages sent to the marshal [Actor](super::super::actor::Actor).
53///
54/// These messages are sent from the consensus engine and other parts of the
55/// system to drive the state of the marshal.
56pub(crate) enum Message<S: Scheme, B: Block> {
57    // -------------------- Application Messages --------------------
58    /// A request to retrieve the (height, commitment) of a block by its identifier.
59    /// The block must be finalized; returns `None` if the block is not finalized.
60    GetInfo {
61        /// The identifier of the block to get the information of.
62        identifier: Identifier<B::Commitment>,
63        /// A channel to send the retrieved (height, commitment).
64        response: oneshot::Sender<Option<(u64, B::Commitment)>>,
65    },
66    /// A request to retrieve a block by its identifier.
67    ///
68    /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
69    /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
70    GetBlock {
71        /// The identifier of the block to retrieve.
72        identifier: Identifier<B::Commitment>,
73        /// A channel to send the retrieved block.
74        response: oneshot::Sender<Option<B>>,
75    },
76    /// A request to retrieve a finalization by height.
77    GetFinalization {
78        /// The height of the finalization to retrieve.
79        height: u64,
80        /// A channel to send the retrieved finalization.
81        response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
82    },
83    /// A request to retrieve a block by its commitment.
84    Subscribe {
85        /// The view in which the block was notarized. This is an optimization
86        /// to help locate the block.
87        round: Option<Round>,
88        /// The commitment of the block to retrieve.
89        commitment: B::Commitment,
90        /// A channel to send the retrieved block.
91        response: oneshot::Sender<B>,
92    },
93    /// A request to broadcast a block to all peers.
94    Broadcast {
95        /// The block to broadcast.
96        block: B,
97    },
98    /// A notification that a block has been verified by the application.
99    Verified {
100        /// The round in which the block was verified.
101        round: Round,
102        /// The verified block.
103        block: B,
104    },
105
106    // -------------------- Consensus Engine Messages --------------------
107    /// A notarization from the consensus engine.
108    Notarization {
109        /// The notarization.
110        notarization: Notarization<S, B::Commitment>,
111    },
112    /// A finalization from the consensus engine.
113    Finalization {
114        /// The finalization.
115        finalization: Finalization<S, B::Commitment>,
116    },
117}
118
119/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
120#[derive(Clone)]
121pub struct Mailbox<S: Scheme, B: Block> {
122    sender: mpsc::Sender<Message<S, B>>,
123}
124
125impl<S: Scheme, B: Block> Mailbox<S, B> {
126    /// Creates a new mailbox.
127    pub(crate) fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
128        Self { sender }
129    }
130
131    /// A request to retrieve the information about the highest finalized block.
132    pub async fn get_info(
133        &mut self,
134        identifier: impl Into<Identifier<B::Commitment>>,
135    ) -> Option<(u64, B::Commitment)> {
136        let (tx, rx) = oneshot::channel();
137        if self
138            .sender
139            .send(Message::GetInfo {
140                identifier: identifier.into(),
141                response: tx,
142            })
143            .await
144            .is_err()
145        {
146            error!("failed to send get info message to actor: receiver dropped");
147        }
148        match rx.await {
149            Ok(result) => result,
150            Err(_) => {
151                error!("failed to get info: receiver dropped");
152                None
153            }
154        }
155    }
156
157    /// A best-effort attempt to retrieve a given block from local
158    /// storage. It is not an indication to go fetch the block from the network.
159    pub async fn get_block(
160        &mut self,
161        identifier: impl Into<Identifier<B::Commitment>>,
162    ) -> Option<B> {
163        let (tx, rx) = oneshot::channel();
164        if self
165            .sender
166            .send(Message::GetBlock {
167                identifier: identifier.into(),
168                response: tx,
169            })
170            .await
171            .is_err()
172        {
173            error!("failed to send get block message to actor: receiver dropped");
174        }
175        match rx.await {
176            Ok(result) => result,
177            Err(_) => {
178                error!("failed to get block: receiver dropped");
179                None
180            }
181        }
182    }
183
184    /// A best-effort attempt to retrieve a given [Finalization] from local
185    /// storage. It is not an indication to go fetch the [Finalization] from the network.
186    pub async fn get_finalization(
187        &mut self,
188        height: u64,
189    ) -> Option<Finalization<S, B::Commitment>> {
190        let (tx, rx) = oneshot::channel();
191        if self
192            .sender
193            .send(Message::GetFinalization {
194                height,
195                response: tx,
196            })
197            .await
198            .is_err()
199        {
200            error!("failed to send get finalization message to actor: receiver dropped");
201        }
202        match rx.await {
203            Ok(result) => result,
204            Err(_) => {
205                error!("failed to get finalization: receiver dropped");
206                None
207            }
208        }
209    }
210
211    /// A request to retrieve a block by its commitment.
212    ///
213    /// If the block is found available locally, the block will be returned immediately.
214    ///
215    /// If the block is not available locally, the request will be registered and the caller will
216    /// be notified when the block is available. If the block is not finalized, it's possible that
217    /// it may never become available.
218    ///
219    /// The oneshot receiver should be dropped to cancel the subscription.
220    pub async fn subscribe(
221        &mut self,
222        round: Option<Round>,
223        commitment: B::Commitment,
224    ) -> oneshot::Receiver<B> {
225        let (tx, rx) = oneshot::channel();
226        if self
227            .sender
228            .send(Message::Subscribe {
229                round,
230                commitment,
231                response: tx,
232            })
233            .await
234            .is_err()
235        {
236            error!("failed to send subscribe message to actor: receiver dropped");
237        }
238        rx
239    }
240
241    /// Broadcast indicates that a block should be sent to all peers.
242    pub async fn broadcast(&mut self, block: B) {
243        if self
244            .sender
245            .send(Message::Broadcast { block })
246            .await
247            .is_err()
248        {
249            error!("failed to send broadcast message to actor: receiver dropped");
250        }
251    }
252
253    /// Notifies the actor that a block has been verified.
254    pub async fn verified(&mut self, round: Round, block: B) {
255        if self
256            .sender
257            .send(Message::Verified { round, block })
258            .await
259            .is_err()
260        {
261            error!("failed to send verified message to actor: receiver dropped");
262        }
263    }
264}
265
266impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
267    type Activity = Activity<S, B::Commitment>;
268
269    async fn report(&mut self, activity: Self::Activity) {
270        let message = match activity {
271            Activity::Notarization(notarization) => Message::Notarization { notarization },
272            Activity::Finalization(finalization) => Message::Finalization { finalization },
273            _ => {
274                // Ignore other activity types
275                return;
276            }
277        };
278        if self.sender.send(message).await.is_err() {
279            error!("failed to report activity to actor: receiver dropped");
280        }
281    }
282}