commonware_consensus/marshal/ingress/mailbox.rs
1use crate::{
2 threshold_simplex::types::{Activity, Finalization, Notarization},
3 Block, Reporter,
4};
5use commonware_cryptography::bls12381::primitives::variant::Variant;
6use futures::{
7 channel::{mpsc, oneshot},
8 SinkExt,
9};
10use tracing::error;
11
12/// Messages sent to the marshal [Actor](super::super::actor::Actor).
13///
14/// These messages are sent from the consensus engine and other parts of the
15/// system to drive the state of the marshal.
16pub(crate) enum Message<V: Variant, B: Block> {
17 // -------------------- Application Messages --------------------
18 /// A request to retrieve a block by its digest.
19 Get {
20 /// The digest of the block to retrieve.
21 commitment: B::Commitment,
22 /// A channel to send the retrieved block.
23 response: oneshot::Sender<Option<B>>,
24 },
25 /// A request to retrieve a block by its digest.
26 Subscribe {
27 /// The view in which the block was notarized. This is an optimization
28 /// to help locate the block.
29 view: Option<u64>,
30 /// The digest of the block to retrieve.
31 commitment: B::Commitment,
32 /// A channel to send the retrieved block.
33 response: oneshot::Sender<B>,
34 },
35 /// A request to broadcast a block to all peers.
36 Broadcast {
37 /// The block to broadcast.
38 block: B,
39 },
40 /// A notification that a block has been verified by the application.
41 Verified {
42 /// The view in which the block was verified.
43 view: u64,
44 /// The verified block.
45 block: B,
46 },
47
48 // -------------------- Consensus Engine Messages --------------------
49 /// A notarization from the consensus engine.
50 Notarization {
51 /// The notarization.
52 notarization: Notarization<V, B::Commitment>,
53 },
54 /// A finalization from the consensus engine.
55 Finalization {
56 /// The finalization.
57 finalization: Finalization<V, B::Commitment>,
58 },
59}
60
61/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
62#[derive(Clone)]
63pub struct Mailbox<V: Variant, B: Block> {
64 sender: mpsc::Sender<Message<V, B>>,
65}
66
67impl<V: Variant, B: Block> Mailbox<V, B> {
68 /// Creates a new mailbox.
69 pub(crate) fn new(sender: mpsc::Sender<Message<V, B>>) -> Self {
70 Self { sender }
71 }
72
73 /// Get is a best-effort attempt to retrieve a given block from local
74 /// storage. It is not an indication to go fetch the block from the network.
75 pub async fn get(&mut self, commitment: B::Commitment) -> oneshot::Receiver<Option<B>> {
76 let (tx, rx) = oneshot::channel();
77 if self
78 .sender
79 .send(Message::Get {
80 commitment,
81 response: tx,
82 })
83 .await
84 .is_err()
85 {
86 error!("failed to send get message to actor: receiver dropped");
87 }
88 rx
89 }
90
91 /// Subscribe is a request to retrieve a block by its commitment.
92 ///
93 /// If the block is found available locally, the block will be returned immediately.
94 ///
95 /// If the block is not available locally, the request will be registered and the caller will
96 /// be notified when the block is available. If the block is not finalized, it's possible that
97 /// it may never become available.
98 ///
99 /// The oneshot receiver should be dropped to cancel the subscription.
100 pub async fn subscribe(
101 &mut self,
102 view: Option<u64>,
103 commitment: B::Commitment,
104 ) -> oneshot::Receiver<B> {
105 let (tx, rx) = oneshot::channel();
106 if self
107 .sender
108 .send(Message::Subscribe {
109 view,
110 commitment,
111 response: tx,
112 })
113 .await
114 .is_err()
115 {
116 error!("failed to send subscribe message to actor: receiver dropped");
117 }
118 rx
119 }
120
121 /// Broadcast indicates that a block should be sent to all peers.
122 pub async fn broadcast(&mut self, block: B) {
123 if self
124 .sender
125 .send(Message::Broadcast { block })
126 .await
127 .is_err()
128 {
129 error!("failed to send broadcast message to actor: receiver dropped");
130 }
131 }
132
133 /// Notifies the actor that a block has been verified.
134 pub async fn verified(&mut self, view: u64, block: B) {
135 if self
136 .sender
137 .send(Message::Verified { view, block })
138 .await
139 .is_err()
140 {
141 error!("failed to send verified message to actor: receiver dropped");
142 }
143 }
144}
145
146impl<V: Variant, B: Block> Reporter for Mailbox<V, B> {
147 type Activity = Activity<V, B::Commitment>;
148
149 async fn report(&mut self, activity: Self::Activity) {
150 let message = match activity {
151 Activity::Notarization(notarization) => Message::Notarization { notarization },
152 Activity::Finalization(finalization) => Message::Finalization { finalization },
153 _ => {
154 // Ignore other activity types
155 return;
156 }
157 };
158 if self.sender.send(message).await.is_err() {
159 error!("failed to report activity to actor: receiver dropped");
160 }
161 }
162}