commonware_consensus/marshal/ingress/
mailbox.rs

1use crate::{
2    threshold_simplex::types::{Activity, Finalization, Notarization},
3    Block, Reporter,
4};
5use commonware_cryptography::bls12381::primitives::variant::Variant;
6use futures::{
7    channel::{mpsc, oneshot},
8    SinkExt,
9};
10use tracing::error;
11
12/// Messages sent to the marshal [Actor](super::super::actor::Actor).
13///
14/// These messages are sent from the consensus engine and other parts of the
15/// system to drive the state of the marshal.
16pub(crate) enum Message<V: Variant, B: Block> {
17    // -------------------- Application Messages --------------------
18    /// A request to retrieve a block by its digest.
19    Get {
20        /// The digest of the block to retrieve.
21        commitment: B::Commitment,
22        /// A channel to send the retrieved block.
23        response: oneshot::Sender<Option<B>>,
24    },
25    /// A request to retrieve a block by its digest.
26    Subscribe {
27        /// The view in which the block was notarized. This is an optimization
28        /// to help locate the block.
29        view: Option<u64>,
30        /// The digest of the block to retrieve.
31        commitment: B::Commitment,
32        /// A channel to send the retrieved block.
33        response: oneshot::Sender<B>,
34    },
35    /// A request to broadcast a block to all peers.
36    Broadcast {
37        /// The block to broadcast.
38        block: B,
39    },
40    /// A notification that a block has been verified by the application.
41    Verified {
42        /// The view in which the block was verified.
43        view: u64,
44        /// The verified block.
45        block: B,
46    },
47
48    // -------------------- Consensus Engine Messages --------------------
49    /// A notarization from the consensus engine.
50    Notarization {
51        /// The notarization.
52        notarization: Notarization<V, B::Commitment>,
53    },
54    /// A finalization from the consensus engine.
55    Finalization {
56        /// The finalization.
57        finalization: Finalization<V, B::Commitment>,
58    },
59}
60
61/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
62#[derive(Clone)]
63pub struct Mailbox<V: Variant, B: Block> {
64    sender: mpsc::Sender<Message<V, B>>,
65}
66
67impl<V: Variant, B: Block> Mailbox<V, B> {
68    /// Creates a new mailbox.
69    pub(crate) fn new(sender: mpsc::Sender<Message<V, B>>) -> Self {
70        Self { sender }
71    }
72
73    /// Get is a best-effort attempt to retrieve a given block from local
74    /// storage. It is not an indication to go fetch the block from the network.
75    pub async fn get(&mut self, commitment: B::Commitment) -> oneshot::Receiver<Option<B>> {
76        let (tx, rx) = oneshot::channel();
77        if self
78            .sender
79            .send(Message::Get {
80                commitment,
81                response: tx,
82            })
83            .await
84            .is_err()
85        {
86            error!("failed to send get message to actor: receiver dropped");
87        }
88        rx
89    }
90
91    /// Subscribe is a request to retrieve a block by its commitment.
92    ///
93    /// If the block is found available locally, the block will be returned immediately.
94    ///
95    /// If the block is not available locally, the request will be registered and the caller will
96    /// be notified when the block is available. If the block is not finalized, it's possible that
97    /// it may never become available.
98    ///
99    /// The oneshot receiver should be dropped to cancel the subscription.
100    pub async fn subscribe(
101        &mut self,
102        view: Option<u64>,
103        commitment: B::Commitment,
104    ) -> oneshot::Receiver<B> {
105        let (tx, rx) = oneshot::channel();
106        if self
107            .sender
108            .send(Message::Subscribe {
109                view,
110                commitment,
111                response: tx,
112            })
113            .await
114            .is_err()
115        {
116            error!("failed to send subscribe message to actor: receiver dropped");
117        }
118        rx
119    }
120
121    /// Broadcast indicates that a block should be sent to all peers.
122    pub async fn broadcast(&mut self, block: B) {
123        if self
124            .sender
125            .send(Message::Broadcast { block })
126            .await
127            .is_err()
128        {
129            error!("failed to send broadcast message to actor: receiver dropped");
130        }
131    }
132
133    /// Notifies the actor that a block has been verified.
134    pub async fn verified(&mut self, view: u64, block: B) {
135        if self
136            .sender
137            .send(Message::Verified { view, block })
138            .await
139            .is_err()
140        {
141            error!("failed to send verified message to actor: receiver dropped");
142        }
143    }
144}
145
146impl<V: Variant, B: Block> Reporter for Mailbox<V, B> {
147    type Activity = Activity<V, B::Commitment>;
148
149    async fn report(&mut self, activity: Self::Activity) {
150        let message = match activity {
151            Activity::Notarization(notarization) => Message::Notarization { notarization },
152            Activity::Finalization(finalization) => Message::Finalization { finalization },
153            _ => {
154                // Ignore other activity types
155                return;
156            }
157        };
158        if self.sender.send(message).await.is_err() {
159            error!("failed to report activity to actor: receiver dropped");
160        }
161    }
162}