commonware_consensus/marshal/ingress/mailbox.rs
1use crate::{
2 simplex::{
3 signing_scheme::Scheme,
4 types::{Activity, Finalization, Notarization},
5 },
6 types::Round,
7 Block, Reporter,
8};
9use commonware_cryptography::Digest;
10use commonware_storage::archive;
11use futures::{
12 channel::{mpsc, oneshot},
13 SinkExt,
14};
15use tracing::error;
16
17/// An identifier for a block request.
18pub enum Identifier<D: Digest> {
19 /// The height of the block to retrieve.
20 Height(u64),
21 /// The commitment of the block to retrieve.
22 Commitment(D),
23 /// The highest finalized block. It may be the case that marshal does not have some of the
24 /// blocks below this height.
25 Latest,
26}
27
28// Allows using u64 directly for convenience.
29impl<D: Digest> From<u64> for Identifier<D> {
30 fn from(src: u64) -> Self {
31 Self::Height(src)
32 }
33}
34
35// Allows using &Digest directly for convenience.
36impl<D: Digest> From<&D> for Identifier<D> {
37 fn from(src: &D) -> Self {
38 Self::Commitment(*src)
39 }
40}
41
42// Allows using archive identifiers directly for convenience.
43impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
44 fn from(src: archive::Identifier<'_, D>) -> Self {
45 match src {
46 archive::Identifier::Index(index) => Self::Height(index),
47 archive::Identifier::Key(key) => Self::Commitment(*key),
48 }
49 }
50}
51
52/// Messages sent to the marshal [Actor](super::super::actor::Actor).
53///
54/// These messages are sent from the consensus engine and other parts of the
55/// system to drive the state of the marshal.
56pub(crate) enum Message<S: Scheme, B: Block> {
57 // -------------------- Application Messages --------------------
58 /// A request to retrieve the (height, commitment) of a block by its identifier.
59 /// The block must be finalized; returns `None` if the block is not finalized.
60 GetInfo {
61 /// The identifier of the block to get the information of.
62 identifier: Identifier<B::Commitment>,
63 /// A channel to send the retrieved (height, commitment).
64 response: oneshot::Sender<Option<(u64, B::Commitment)>>,
65 },
66 /// A request to retrieve a block by its identifier.
67 ///
68 /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
69 /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
70 GetBlock {
71 /// The identifier of the block to retrieve.
72 identifier: Identifier<B::Commitment>,
73 /// A channel to send the retrieved block.
74 response: oneshot::Sender<Option<B>>,
75 },
76 /// A request to retrieve a finalization by height.
77 GetFinalization {
78 /// The height of the finalization to retrieve.
79 height: u64,
80 /// A channel to send the retrieved finalization.
81 response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
82 },
83 /// A request to retrieve a block by its commitment.
84 Subscribe {
85 /// The view in which the block was notarized. This is an optimization
86 /// to help locate the block.
87 round: Option<Round>,
88 /// The commitment of the block to retrieve.
89 commitment: B::Commitment,
90 /// A channel to send the retrieved block.
91 response: oneshot::Sender<B>,
92 },
93 /// A request to broadcast a block to all peers.
94 Broadcast {
95 /// The block to broadcast.
96 block: B,
97 },
98 /// A notification that a block has been verified by the application.
99 Verified {
100 /// The round in which the block was verified.
101 round: Round,
102 /// The verified block.
103 block: B,
104 },
105
106 // -------------------- Consensus Engine Messages --------------------
107 /// A notarization from the consensus engine.
108 Notarization {
109 /// The notarization.
110 notarization: Notarization<S, B::Commitment>,
111 },
112 /// A finalization from the consensus engine.
113 Finalization {
114 /// The finalization.
115 finalization: Finalization<S, B::Commitment>,
116 },
117}
118
119/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
120#[derive(Clone)]
121pub struct Mailbox<S: Scheme, B: Block> {
122 sender: mpsc::Sender<Message<S, B>>,
123}
124
125impl<S: Scheme, B: Block> Mailbox<S, B> {
126 /// Creates a new mailbox.
127 pub(crate) fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
128 Self { sender }
129 }
130
131 /// A request to retrieve the information about the highest finalized block.
132 pub async fn get_info(
133 &mut self,
134 identifier: impl Into<Identifier<B::Commitment>>,
135 ) -> Option<(u64, B::Commitment)> {
136 let (tx, rx) = oneshot::channel();
137 if self
138 .sender
139 .send(Message::GetInfo {
140 identifier: identifier.into(),
141 response: tx,
142 })
143 .await
144 .is_err()
145 {
146 error!("failed to send get info message to actor: receiver dropped");
147 }
148 match rx.await {
149 Ok(result) => result,
150 Err(_) => {
151 error!("failed to get info: receiver dropped");
152 None
153 }
154 }
155 }
156
157 /// A best-effort attempt to retrieve a given block from local
158 /// storage. It is not an indication to go fetch the block from the network.
159 pub async fn get_block(
160 &mut self,
161 identifier: impl Into<Identifier<B::Commitment>>,
162 ) -> Option<B> {
163 let (tx, rx) = oneshot::channel();
164 if self
165 .sender
166 .send(Message::GetBlock {
167 identifier: identifier.into(),
168 response: tx,
169 })
170 .await
171 .is_err()
172 {
173 error!("failed to send get block message to actor: receiver dropped");
174 }
175 match rx.await {
176 Ok(result) => result,
177 Err(_) => {
178 error!("failed to get block: receiver dropped");
179 None
180 }
181 }
182 }
183
184 /// A best-effort attempt to retrieve a given [Finalization] from local
185 /// storage. It is not an indication to go fetch the [Finalization] from the network.
186 pub async fn get_finalization(
187 &mut self,
188 height: u64,
189 ) -> Option<Finalization<S, B::Commitment>> {
190 let (tx, rx) = oneshot::channel();
191 if self
192 .sender
193 .send(Message::GetFinalization {
194 height,
195 response: tx,
196 })
197 .await
198 .is_err()
199 {
200 error!("failed to send get finalization message to actor: receiver dropped");
201 }
202 match rx.await {
203 Ok(result) => result,
204 Err(_) => {
205 error!("failed to get finalization: receiver dropped");
206 None
207 }
208 }
209 }
210
211 /// A request to retrieve a block by its commitment.
212 ///
213 /// If the block is found available locally, the block will be returned immediately.
214 ///
215 /// If the block is not available locally, the request will be registered and the caller will
216 /// be notified when the block is available. If the block is not finalized, it's possible that
217 /// it may never become available.
218 ///
219 /// The oneshot receiver should be dropped to cancel the subscription.
220 pub async fn subscribe(
221 &mut self,
222 round: Option<Round>,
223 commitment: B::Commitment,
224 ) -> oneshot::Receiver<B> {
225 let (tx, rx) = oneshot::channel();
226 if self
227 .sender
228 .send(Message::Subscribe {
229 round,
230 commitment,
231 response: tx,
232 })
233 .await
234 .is_err()
235 {
236 error!("failed to send subscribe message to actor: receiver dropped");
237 }
238 rx
239 }
240
241 /// Broadcast indicates that a block should be sent to all peers.
242 pub async fn broadcast(&mut self, block: B) {
243 if self
244 .sender
245 .send(Message::Broadcast { block })
246 .await
247 .is_err()
248 {
249 error!("failed to send broadcast message to actor: receiver dropped");
250 }
251 }
252
253 /// Notifies the actor that a block has been verified.
254 pub async fn verified(&mut self, round: Round, block: B) {
255 if self
256 .sender
257 .send(Message::Verified { round, block })
258 .await
259 .is_err()
260 {
261 error!("failed to send verified message to actor: receiver dropped");
262 }
263 }
264}
265
266impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
267 type Activity = Activity<S, B::Commitment>;
268
269 async fn report(&mut self, activity: Self::Activity) {
270 let message = match activity {
271 Activity::Notarization(notarization) => Message::Notarization { notarization },
272 Activity::Finalization(finalization) => Message::Finalization { finalization },
273 _ => {
274 // Ignore other activity types
275 return;
276 }
277 };
278 if self.sender.send(message).await.is_err() {
279 error!("failed to report activity to actor: receiver dropped");
280 }
281 }
282}