1use super::block::SimplexBlock;
7use super::types::Scheme;
8use commonware_consensus::{
9 marshal::{self, Update},
10 simplex::types::Context,
11 types::{Epoch, Round, View},
12 Automaton, Relay, Reporter,
13};
14use commonware_cryptography::{ed25519::PublicKey, sha256::Digest, Digestible, Hasher, Sha256};
15use commonware_macros::select;
16use commonware_runtime::{Clock, Metrics, Spawner};
17use commonware_utils::SystemTimeExt;
18use futures::{
19 channel::{mpsc, oneshot},
20 future::{self, Either},
21 SinkExt, StreamExt,
22};
23use rand::Rng;
24use std::sync::{Arc, Mutex};
25use tracing::{debug, info, warn};
26
27pub type FinalizedCallback = Arc<dyn Fn(&SimplexBlock) + Send + Sync>;
29
30const GENESIS: &[u8] = b"guts-genesis";
32
33const SYNCHRONY_BOUND: u64 = 500;
35
36pub struct Config {
38 pub mailbox_size: usize,
40}
41
42impl Default for Config {
43 fn default() -> Self {
44 Self { mailbox_size: 1024 }
45 }
46}
47
48pub enum Message {
50 Genesis { response: oneshot::Sender<Digest> },
52 Propose {
54 round: Round,
55 parent: (View, Digest),
56 response: oneshot::Sender<Digest>,
57 },
58 Broadcast { payload: Digest },
60 Verify {
62 round: Round,
63 parent: (View, Digest),
64 payload: Digest,
65 response: oneshot::Sender<bool>,
66 },
67 Finalized { block: SimplexBlock },
69}
70
71#[derive(Clone)]
73pub struct Mailbox {
74 sender: mpsc::Sender<Message>,
75}
76
77impl Mailbox {
78 pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
80 Self { sender }
81 }
82}
83
84impl Automaton for Mailbox {
85 type Digest = Digest;
86 type Context = Context<Self::Digest, PublicKey>;
87
88 async fn genesis(&mut self, _epoch: Epoch) -> Self::Digest {
89 let (response, receiver) = oneshot::channel();
90 self.sender
91 .send(Message::Genesis { response })
92 .await
93 .expect("Failed to send genesis");
94 receiver.await.expect("Failed to receive genesis")
95 }
96
97 async fn propose(
98 &mut self,
99 context: Context<Self::Digest, PublicKey>,
100 ) -> oneshot::Receiver<Self::Digest> {
101 let (response, receiver) = oneshot::channel();
102 self.sender
103 .send(Message::Propose {
104 round: context.round,
105 parent: context.parent,
106 response,
107 })
108 .await
109 .expect("Failed to send propose");
110 receiver
111 }
112
113 async fn verify(
114 &mut self,
115 context: Context<Self::Digest, PublicKey>,
116 payload: Self::Digest,
117 ) -> oneshot::Receiver<bool> {
118 let (response, receiver) = oneshot::channel();
119 self.sender
120 .send(Message::Verify {
121 round: context.round,
122 parent: context.parent,
123 payload,
124 response,
125 })
126 .await
127 .expect("Failed to send verify");
128 receiver
129 }
130}
131
132impl Relay for Mailbox {
133 type Digest = Digest;
134
135 async fn broadcast(&mut self, digest: Self::Digest) {
136 self.sender
137 .send(Message::Broadcast { payload: digest })
138 .await
139 .expect("Failed to send broadcast");
140 }
141}
142
143impl Reporter for Mailbox {
144 type Activity = Update<SimplexBlock>;
145
146 async fn report(&mut self, update: Self::Activity) {
147 let Update::Block(block) = update else {
148 return;
149 };
150 self.sender
151 .send(Message::Finalized { block })
152 .await
153 .expect("Failed to send finalized");
154 }
155}
156
157pub struct Actor<R: Rng + Spawner + Metrics + Clock> {
159 context: R,
160 hasher: Sha256,
161 mailbox: mpsc::Receiver<Message>,
162
163 on_finalized: Option<FinalizedCallback>,
165}
166
167impl<R: Rng + Spawner + Metrics + Clock> Actor<R> {
168 pub fn new(context: R, config: Config) -> (Self, Mailbox) {
170 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
171 (
172 Self {
173 context,
174 hasher: Sha256::new(),
175 mailbox,
176 on_finalized: None,
177 },
178 Mailbox::new(sender),
179 )
180 }
181
182 pub fn on_finalized<F>(mut self, callback: F) -> Self
184 where
185 F: Fn(&SimplexBlock) + Send + Sync + 'static,
186 {
187 self.on_finalized = Some(Arc::new(callback));
188 self
189 }
190
191 pub async fn run(mut self, mut marshal: marshal::Mailbox<Scheme, SimplexBlock>) {
193 self.hasher.update(GENESIS);
195 let genesis_parent = self.hasher.finalize();
196 let genesis = SimplexBlock::new(genesis_parent, 0, 0, [0u8; 32], 0, [0u8; 32]);
197 let genesis_digest = genesis.digest();
198
199 let built: Option<(Round, SimplexBlock)> = None;
200 let built = Arc::new(Mutex::new(built));
201
202 while let Some(message) = self.mailbox.next().await {
203 match message {
204 Message::Genesis { response } => {
205 let _ = response.send(genesis_digest);
207 }
208 Message::Propose {
209 round,
210 parent,
211 response,
212 } => {
213 let parent_request = if parent.1 == genesis_digest {
215 Either::Left(future::ready(Ok(genesis.clone())))
216 } else {
217 Either::Right(
218 marshal
219 .subscribe(Some(Round::new(round.epoch(), parent.0)), parent.1)
220 .await,
221 )
222 };
223
224 let built_clone = built.clone();
226 let context_clone = self.context.clone();
227 context_clone.clone().spawn(move |_ctx| async move {
228 select! {
229 parent_result = parent_request => {
230 let parent_block = parent_result.unwrap();
231
232 let mut current = context_clone.current().epoch_millis();
234 if current <= parent_block.timestamp {
235 current = parent_block.timestamp + 1;
236 }
237
238 let block = SimplexBlock::new(
240 parent_block.digest(),
241 parent_block.height + 1,
242 current,
243 [0u8; 32], 0, [0u8; 32], );
247 let digest = block.digest();
248
249 {
250 let mut built = built_clone.lock().unwrap();
251 *built = Some((round, block));
252 }
253
254 let result = response.send(digest);
255 info!(?round, ?digest, success = result.is_ok(), "proposed new block");
256 }
257 }
258 });
259 }
260 Message::Broadcast { payload } => {
261 let Some(built_block) = built.lock().unwrap().clone() else {
263 warn!(?payload, "missing block to broadcast");
264 continue;
265 };
266
267 debug!(
268 ?payload,
269 round = ?built_block.0,
270 height = built_block.1.height,
271 "broadcast requested"
272 );
273 marshal.broadcast(built_block.1.clone()).await;
274 }
275 Message::Verify {
276 round,
277 parent,
278 payload,
279 response,
280 } => {
281 let parent_request = if parent.1 == genesis_digest {
283 Either::Left(future::ready(Ok(genesis.clone())))
284 } else {
285 Either::Right(
286 marshal
287 .subscribe(Some(Round::new(round.epoch(), parent.0)), parent.1)
288 .await,
289 )
290 };
291
292 let mut marshal_clone = marshal.clone();
293 let context_clone = self.context.clone();
294 context_clone.clone().spawn(move |_ctx| async move {
295 let block_request = marshal_clone.subscribe(None, payload).await;
296
297 select! {
298 results = futures::future::try_join(parent_request, block_request) => {
299 let (parent_block, block) = results.unwrap();
300
301 if block.height != parent_block.height + 1 {
303 let _ = response.send(false);
304 return;
305 }
306 if block.parent != parent_block.digest() {
307 let _ = response.send(false);
308 return;
309 }
310 if block.timestamp <= parent_block.timestamp {
311 let _ = response.send(false);
312 return;
313 }
314 let current = context_clone.current().epoch_millis();
315 if block.timestamp > current + SYNCHRONY_BOUND {
316 let _ = response.send(false);
317 return;
318 }
319
320 marshal_clone.verified(round, block).await;
322
323 let _ = response.send(true);
324 }
325 }
326 });
327 }
328 Message::Finalized { block } => {
329 info!(
330 height = block.height,
331 digest = ?block.digest(),
332 "processed finalized block"
333 );
334
335 if let Some(ref callback) = self.on_finalized {
337 callback(&block);
338 }
339 }
340 }
341 }
342 }
343}