alto_chain/application/actor.rs
1use super::{
2 ingress::{Mailbox, Message},
3 Config,
4};
5use crate::{supervisor::Supervisor, utils::OneshotClosedFut};
6use alto_types::Block;
7use commonware_consensus::{marshal, threshold_simplex::types::View};
8use commonware_cryptography::{
9 bls12381::primitives::variant::MinSig, Committable, Digestible, Hasher, Sha256,
10};
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Spawner};
13use commonware_utils::SystemTimeExt;
14use futures::StreamExt;
15use futures::{channel::mpsc, future::try_join};
16use futures::{future, future::Either};
17use rand::Rng;
18use std::sync::{Arc, Mutex};
19use tracing::{debug, info, warn};
20
21/// Genesis message to use during initialization.
22const GENESIS: &[u8] = b"commonware is neat";
23
24/// Milliseconds in the future to allow for block timestamps.
25const SYNCHRONY_BOUND: u64 = 500;
26
27/// Application actor.
28pub struct Actor<R: Rng + Spawner + Metrics + Clock> {
29 context: R,
30 hasher: Sha256,
31 mailbox: mpsc::Receiver<Message>,
32}
33
34impl<R: Rng + Spawner + Metrics + Clock> Actor<R> {
35 /// Create a new application actor.
36 pub fn new(context: R, config: Config) -> (Self, Supervisor, Mailbox) {
37 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
38 (
39 Self {
40 context,
41 hasher: Sha256::new(),
42 mailbox,
43 },
44 Supervisor::new(config.polynomial, config.participants, config.share),
45 Mailbox::new(sender),
46 )
47 }
48
49 pub fn start(mut self, marshal: marshal::Mailbox<MinSig, Block>) -> Handle<()> {
50 self.context.spawn_ref()(self.run(marshal))
51 }
52
53 /// Run the application actor.
54 async fn run(mut self, mut marshal: marshal::Mailbox<MinSig, Block>) {
55 // Compute genesis digest
56 self.hasher.update(GENESIS);
57 let genesis_parent = self.hasher.finalize();
58 let genesis = Block::new(genesis_parent, 0, 0);
59 let genesis_digest = genesis.digest();
60 let built: Option<(View, Block)> = None;
61 let built = Arc::new(Mutex::new(built));
62 while let Some(message) = self.mailbox.next().await {
63 match message {
64 Message::Genesis { response } => {
65 // Use the digest of the genesis message as the initial
66 // payload.
67 let _ = response.send(genesis_digest);
68 }
69 Message::Propose {
70 view,
71 parent,
72 mut response,
73 } => {
74 // Get the parent block
75 let parent_request = if parent.1 == genesis_digest {
76 Either::Left(future::ready(Ok(genesis.clone())))
77 } else {
78 Either::Right(marshal.subscribe(Some(parent.0), parent.1).await)
79 };
80
81 // Wait for the parent block to be available or the request to be cancelled in a separate task (to
82 // continue processing other messages)
83 self.context.with_label("propose").spawn({
84 let built = built.clone();
85 move |context| async move {
86 let response_closed = OneshotClosedFut::new(&mut response);
87 select! {
88 parent = parent_request => {
89 // Get the parent block
90 let parent = parent.unwrap();
91
92 // Create a new block
93 let mut current = context.current().epoch_millis();
94 if current <= parent.timestamp {
95 current = parent.timestamp + 1;
96 }
97 let block = Block::new(parent.digest(), parent.height+1, current);
98 let digest = block.digest();
99 {
100 let mut built = built.lock().unwrap();
101 *built = Some((view, block));
102 }
103
104 // Send the digest to the consensus
105 let result = response.send(digest);
106 info!(view, ?digest, success=result.is_ok(), "proposed new block");
107 },
108 _ = response_closed => {
109 // The response was cancelled
110 warn!(view, "propose aborted");
111 }
112 }
113 }
114 });
115 }
116 Message::Broadcast { payload } => {
117 // Check if the last built is equal
118 let Some(built) = built.lock().unwrap().clone() else {
119 warn!(?payload, "missing block to broadcast");
120 continue;
121 };
122
123 // Send the block to the syncer
124 debug!(
125 ?payload,
126 view = built.0,
127 height = built.1.height,
128 "broadcast requested"
129 );
130 marshal.broadcast(built.1.clone()).await;
131 }
132 Message::Verify {
133 view,
134 parent,
135 payload,
136 mut response,
137 } => {
138 // Get the parent and current block
139 let parent_request = if parent.1 == genesis_digest {
140 Either::Left(future::ready(Ok(genesis.clone())))
141 } else {
142 Either::Right(marshal.subscribe(Some(parent.0), parent.1).await)
143 };
144
145 // Wait for the blocks to be available or the request to be cancelled in a separate task (to
146 // continue processing other messages)
147 self.context.with_label("verify").spawn({
148 let mut marshal = marshal.clone();
149 move |context| async move {
150 let requester =
151 try_join(parent_request, marshal.subscribe(None, payload).await);
152 let response_closed = OneshotClosedFut::new(&mut response);
153 select! {
154 result = requester => {
155 // Unwrap the results
156 let (parent, block) = result.unwrap();
157
158 // Verify the block
159 if block.height != parent.height + 1 {
160 let _ = response.send(false);
161 return;
162 }
163 if block.parent != parent.digest() {
164 let _ = response.send(false);
165 return;
166 }
167 if block.timestamp <= parent.timestamp {
168 let _ = response.send(false);
169 return;
170 }
171 let current = context.current().epoch_millis();
172 if block.timestamp > current + SYNCHRONY_BOUND {
173 let _ = response.send(false);
174 return;
175 }
176
177 // Persist the verified block
178 marshal.verified(view, block).await;
179
180 // Send the verification result to the consensus
181 let _ = response.send(true);
182 },
183 _ = response_closed => {
184 // The response was cancelled
185 warn!(view, "verify aborted");
186 }
187 }
188 }
189 });
190 }
191 Message::Finalized { block } => {
192 // In an application that maintains state, you would compute the state transition function here.
193 //
194 // After an unclean shutdown, it is possible that the application may be asked to process a block it has already seen (which it can simply ignore).
195 info!(
196 height = block.height,
197 digest = ?block.commitment(),
198 "processed block"
199 );
200 }
201 }
202 }
203 }
204}