commonware_glue/stateful/actor/core/
mailbox.rs1use crate::stateful::Application;
4use commonware_actor::{
5 mailbox::{Overflow, Policy, Sender},
6 Feedback,
7};
8use commonware_consensus::{marshal::Update, Application as ConsensusApplication, Reporter};
9use commonware_runtime::{Clock, Metrics, Spawner};
10use commonware_utils::{acknowledgement::Exact, channel::oneshot};
11use futures::Stream;
12use rand::Rng;
13use std::{collections::VecDeque, pin::Pin};
14
15pub(crate) type ErasedAncestorStream<B> = Pin<Box<dyn Stream<Item = B> + Send>>;
17
18pub(crate) enum Message<E, A>
20where
21 E: Rng + Spawner + Metrics + Clock,
22 A: Application<E>,
23{
24 Propose {
26 context: (E, A::Context),
27 ancestry: ErasedAncestorStream<A::Block>,
28 response: oneshot::Sender<Option<A::Block>>,
29 },
30
31 Verify {
33 context: (E, A::Context),
34 ancestry: ErasedAncestorStream<A::Block>,
35 response: oneshot::Sender<bool>,
36 },
37
38 Finalized {
40 block: A::Block,
41 acknowledgement: Exact,
42 },
43
44 SubscribeDatabases {
49 response: oneshot::Sender<A::Databases>,
50 },
51}
52
53impl<E, A> Message<E, A>
54where
55 E: Rng + Spawner + Metrics + Clock,
56 A: Application<E>,
57{
58 fn response_closed(&self) -> bool {
59 match self {
60 Self::Propose { response, .. } => response.is_closed(),
61 Self::Verify { response, .. } => response.is_closed(),
62 Self::SubscribeDatabases { response } => response.is_closed(),
63 Self::Finalized { .. } => false,
64 }
65 }
66}
67
68pub(crate) struct Pending<E, A>(VecDeque<Message<E, A>>)
69where
70 E: Rng + Spawner + Metrics + Clock,
71 A: Application<E>;
72
73impl<E, A> Default for Pending<E, A>
74where
75 E: Rng + Spawner + Metrics + Clock,
76 A: Application<E>,
77{
78 fn default() -> Self {
79 Self(VecDeque::new())
80 }
81}
82
83impl<E, A> Overflow<Message<E, A>> for Pending<E, A>
84where
85 E: Rng + Spawner + Metrics + Clock,
86 A: Application<E>,
87{
88 fn is_empty(&self) -> bool {
89 self.0.is_empty()
90 }
91
92 fn drain<F>(&mut self, mut push: F)
93 where
94 F: FnMut(Message<E, A>) -> Option<Message<E, A>>,
95 {
96 while let Some(message) = self.0.pop_front() {
97 if message.response_closed() {
98 continue;
99 }
100
101 if let Some(message) = push(message) {
102 self.0.push_front(message);
103 break;
104 }
105 }
106 }
107}
108
109impl<E, A> Policy for Message<E, A>
110where
111 E: Rng + Spawner + Metrics + Clock,
112 A: Application<E>,
113{
114 type Overflow = Pending<E, A>;
115
116 fn handle(overflow: &mut Self::Overflow, message: Self) {
117 if message.response_closed() {
118 return;
119 }
120 overflow.0.push_back(message);
121 }
122}
123
124pub struct Mailbox<E, A>
129where
130 E: Rng + Spawner + Metrics + Clock,
131 A: Application<E>,
132{
133 sender: Sender<Message<E, A>>,
134}
135
136impl<E, A> Clone for Mailbox<E, A>
137where
138 E: Rng + Spawner + Metrics + Clock,
139 A: Application<E>,
140{
141 fn clone(&self) -> Self {
142 Self {
143 sender: self.sender.clone(),
144 }
145 }
146}
147
148impl<E, A> Mailbox<E, A>
149where
150 E: Rng + Spawner + Metrics + Clock,
151 A: Application<E>,
152{
153 pub(crate) const fn new(sender: Sender<Message<E, A>>) -> Self {
155 Self { sender }
156 }
157}
158
159impl<E, A> Mailbox<E, A>
160where
161 E: Rng + Spawner + Metrics + Clock,
162 A: Application<E>,
163{
164 pub async fn subscribe_databases(&self) -> A::Databases {
170 let (response, receiver) = oneshot::channel();
171 let _ = self
172 .sender
173 .enqueue(Message::SubscribeDatabases { response });
174 receiver
175 .await
176 .expect("stateful actor dropped during subscribe_databases")
177 }
178}
179
180impl<E, A> ConsensusApplication<E> for Mailbox<E, A>
181where
182 E: Rng + Spawner + Metrics + Clock,
183 A: Application<E>,
184{
185 type SigningScheme = A::SigningScheme;
186 type Context = A::Context;
187 type Block = A::Block;
188
189 async fn propose(
190 &mut self,
191 context: (E, Self::Context),
192 ancestry: impl Stream<Item = Self::Block> + Send + 'static,
193 ) -> Option<Self::Block> {
194 let (response, receiver) = oneshot::channel();
195 let _ = self.sender.enqueue(Message::Propose {
196 context,
197 ancestry: Box::pin(ancestry),
198 response,
199 });
200 receiver.await.ok().flatten()
201 }
202
203 async fn verify(
204 &mut self,
205 context: (E, Self::Context),
206 ancestry: impl Stream<Item = Self::Block> + Send + 'static,
207 ) -> bool {
208 let (response, receiver) = oneshot::channel();
211 let _ = self.sender.enqueue(Message::Verify {
212 context,
213 ancestry: Box::pin(ancestry),
214 response,
215 });
216 receiver
217 .await
218 .expect("stateful actor dropped during verify")
219 }
220}
221
222impl<E, A> Reporter for Mailbox<E, A>
223where
224 E: Rng + Spawner + Metrics + Clock,
225 A: Application<E>,
226{
227 type Activity = Update<A::Block>;
228
229 fn report(&mut self, activity: Self::Activity) -> Feedback {
230 let message = match activity {
231 Update::Tip(_, _, _) => return Feedback::Ok,
232 Update::Block(block, acknowledgement) => Message::Finalized {
233 block,
234 acknowledgement,
235 },
236 };
237
238 self.sender.enqueue(message)
239 }
240}