Skip to main content

commonware_glue/stateful/actor/core/
mailbox.rs

1//! Mailbox for the [`super::Stateful`] actor.
2
3use crate::stateful::Application;
4use commonware_actor::{
5    mailbox::{Overflow, Policy, Sender},
6    Feedback,
7};
8use commonware_consensus::{marshal::Update, Application as ConsensusApplication, Reporter};
9use commonware_runtime::{Clock, Metrics, Spawner};
10use commonware_utils::{acknowledgement::Exact, channel::oneshot};
11use futures::Stream;
12use rand::Rng;
13use std::{collections::VecDeque, pin::Pin};
14
15/// Type alias for an ancestor stream sent through the actor mailbox.
16pub(crate) type ErasedAncestorStream<B> = Pin<Box<dyn Stream<Item = B> + Send>>;
17
18/// Messages processed by the actor loop.
19pub(crate) enum Message<E, A>
20where
21    E: Rng + Spawner + Metrics + Clock,
22    A: Application<E>,
23{
24    /// A request to propose a block.
25    Propose {
26        context: (E, A::Context),
27        ancestry: ErasedAncestorStream<A::Block>,
28        response: oneshot::Sender<Option<A::Block>>,
29    },
30
31    /// A request to verify a block.
32    Verify {
33        context: (E, A::Context),
34        ancestry: ErasedAncestorStream<A::Block>,
35        response: oneshot::Sender<bool>,
36    },
37
38    /// A reporting of a new finalized block.
39    Finalized {
40        block: A::Block,
41        acknowledgement: Exact,
42    },
43
44    /// Requests the attached database set.
45    ///
46    /// The actor replies once the database set has been attached to the
47    /// serving stateful actor, or immediately if that has already happened.
48    SubscribeDatabases {
49        response: oneshot::Sender<A::Databases>,
50    },
51}
52
53impl<E, A> Message<E, A>
54where
55    E: Rng + Spawner + Metrics + Clock,
56    A: Application<E>,
57{
58    fn response_closed(&self) -> bool {
59        match self {
60            Self::Propose { response, .. } => response.is_closed(),
61            Self::Verify { response, .. } => response.is_closed(),
62            Self::SubscribeDatabases { response } => response.is_closed(),
63            Self::Finalized { .. } => false,
64        }
65    }
66}
67
68pub(crate) struct Pending<E, A>(VecDeque<Message<E, A>>)
69where
70    E: Rng + Spawner + Metrics + Clock,
71    A: Application<E>;
72
73impl<E, A> Default for Pending<E, A>
74where
75    E: Rng + Spawner + Metrics + Clock,
76    A: Application<E>,
77{
78    fn default() -> Self {
79        Self(VecDeque::new())
80    }
81}
82
83impl<E, A> Overflow<Message<E, A>> for Pending<E, A>
84where
85    E: Rng + Spawner + Metrics + Clock,
86    A: Application<E>,
87{
88    fn is_empty(&self) -> bool {
89        self.0.is_empty()
90    }
91
92    fn drain<F>(&mut self, mut push: F)
93    where
94        F: FnMut(Message<E, A>) -> Option<Message<E, A>>,
95    {
96        while let Some(message) = self.0.pop_front() {
97            if message.response_closed() {
98                continue;
99            }
100
101            if let Some(message) = push(message) {
102                self.0.push_front(message);
103                break;
104            }
105        }
106    }
107}
108
109impl<E, A> Policy for Message<E, A>
110where
111    E: Rng + Spawner + Metrics + Clock,
112    A: Application<E>,
113{
114    type Overflow = Pending<E, A>;
115
116    fn handle(overflow: &mut Self::Overflow, message: Self) {
117        if message.response_closed() {
118            return;
119        }
120        overflow.0.push_back(message);
121    }
122}
123
124/// Channel-based proxy to the [`Stateful`](super::Stateful) actor.
125///
126/// Implements the consensus application and verifying traits by forwarding
127/// each call to the actor via a message and awaiting the response.
128pub struct Mailbox<E, A>
129where
130    E: Rng + Spawner + Metrics + Clock,
131    A: Application<E>,
132{
133    sender: Sender<Message<E, A>>,
134}
135
136impl<E, A> Clone for Mailbox<E, A>
137where
138    E: Rng + Spawner + Metrics + Clock,
139    A: Application<E>,
140{
141    fn clone(&self) -> Self {
142        Self {
143            sender: self.sender.clone(),
144        }
145    }
146}
147
148impl<E, A> Mailbox<E, A>
149where
150    E: Rng + Spawner + Metrics + Clock,
151    A: Application<E>,
152{
153    /// Create a mailbox from the send half of the actor's message channel.
154    pub(crate) const fn new(sender: Sender<Message<E, A>>) -> Self {
155        Self { sender }
156    }
157}
158
159impl<E, A> Mailbox<E, A>
160where
161    E: Rng + Spawner + Metrics + Clock,
162    A: Application<E>,
163{
164    /// Wait for the attached database set.
165    ///
166    /// This resolves once startup handoff has attached the database set to the
167    /// serving actor. Late callers receive the current database set
168    /// immediately.
169    pub async fn subscribe_databases(&self) -> A::Databases {
170        let (response, receiver) = oneshot::channel();
171        let _ = self
172            .sender
173            .enqueue(Message::SubscribeDatabases { response });
174        receiver
175            .await
176            .expect("stateful actor dropped during subscribe_databases")
177    }
178}
179
180impl<E, A> ConsensusApplication<E> for Mailbox<E, A>
181where
182    E: Rng + Spawner + Metrics + Clock,
183    A: Application<E>,
184{
185    type SigningScheme = A::SigningScheme;
186    type Context = A::Context;
187    type Block = A::Block;
188
189    async fn propose(
190        &mut self,
191        context: (E, Self::Context),
192        ancestry: impl Stream<Item = Self::Block> + Send + 'static,
193    ) -> Option<Self::Block> {
194        let (response, receiver) = oneshot::channel();
195        let _ = self.sender.enqueue(Message::Propose {
196            context,
197            ancestry: Box::pin(ancestry),
198            response,
199        });
200        receiver.await.ok().flatten()
201    }
202
203    async fn verify(
204        &mut self,
205        context: (E, Self::Context),
206        ancestry: impl Stream<Item = Self::Block> + Send + 'static,
207    ) -> bool {
208        // We must panic if we don't get a response; We cannot override the decision
209        // of the application based on the availabilitiy of the actor.
210        let (response, receiver) = oneshot::channel();
211        let _ = self.sender.enqueue(Message::Verify {
212            context,
213            ancestry: Box::pin(ancestry),
214            response,
215        });
216        receiver
217            .await
218            .expect("stateful actor dropped during verify")
219    }
220}
221
222impl<E, A> Reporter for Mailbox<E, A>
223where
224    E: Rng + Spawner + Metrics + Clock,
225    A: Application<E>,
226{
227    type Activity = Update<A::Block>;
228
229    fn report(&mut self, activity: Self::Activity) -> Feedback {
230        let message = match activity {
231            Update::Tip(_, _, _) => return Feedback::Ok,
232            Update::Block(block, acknowledgement) => Message::Finalized {
233                block,
234                acknowledgement,
235            },
236        };
237
238        self.sender.enqueue(message)
239    }
240}