alto_chain/actors/application/
actor.rs1use super::{
2 ingress::{Mailbox, Message},
3 supervisor::Supervisor,
4 Config,
5};
6use crate::actors::syncer;
7use alto_types::{Block, Finalization, Notarization};
8use commonware_consensus::threshold_simplex::Prover;
9use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
10use commonware_macros::select;
11use commonware_runtime::{Clock, Handle, Metrics, Spawner};
12use commonware_utils::SystemTimeExt;
13use futures::StreamExt;
14use futures::{channel::mpsc, future::try_join};
15use futures::{channel::oneshot, future};
16use futures::{
17 future::Either,
18 task::{Context, Poll},
19};
20use rand::Rng;
21use std::{
22 pin::Pin,
23 sync::{Arc, Mutex},
24};
25use tracing::{info, warn};
26
27struct ChannelClosedFuture<'a, T> {
29 sender: &'a mut oneshot::Sender<T>,
30}
31
32impl<T> futures::Future for ChannelClosedFuture<'_, T> {
33 type Output = ();
34
35 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36 match self.sender.poll_canceled(cx) {
38 Poll::Ready(()) => Poll::Ready(()), Poll::Pending => Poll::Pending, }
41 }
42}
43
44fn oneshot_closed_future<T>(sender: &mut oneshot::Sender<T>) -> ChannelClosedFuture<T> {
46 ChannelClosedFuture { sender }
47}
48
49const GENESIS: &[u8] = b"commonware is neat";
51
52const SYNCHRONY_BOUND: u64 = 500;
54
55pub struct Actor<R: Rng + Spawner + Metrics + Clock> {
57 context: R,
58 prover: Prover<Digest>,
59 hasher: Sha256,
60 mailbox: mpsc::Receiver<Message>,
61}
62
63impl<R: Rng + Spawner + Metrics + Clock> Actor<R> {
64 pub fn new(context: R, config: Config) -> (Self, Supervisor, Mailbox) {
66 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
67 (
68 Self {
69 context,
70 prover: config.prover,
71 hasher: Sha256::new(),
72 mailbox,
73 },
74 Supervisor::new(config.identity, config.participants, config.share),
75 Mailbox::new(sender),
76 )
77 }
78
79 pub fn start(mut self, syncer: syncer::Mailbox) -> Handle<()> {
80 self.context.spawn_ref()(self.run(syncer))
81 }
82
83 async fn run(mut self, mut syncer: syncer::Mailbox) {
85 self.hasher.update(GENESIS);
87 let genesis_parent = self.hasher.finalize();
88 let genesis = Block::new(genesis_parent, 0, 0);
89 let genesis_digest = genesis.digest();
90 let built: Option<Block> = None;
91 let built = Arc::new(Mutex::new(built));
92 while let Some(message) = self.mailbox.next().await {
93 match message {
94 Message::Genesis { response } => {
95 let _ = response.send(genesis_digest.clone());
98 }
99 Message::Propose {
100 view,
101 parent,
102 mut response,
103 } => {
104 let parent_request = if parent.1 == genesis_digest {
106 Either::Left(future::ready(Ok(genesis.clone())))
107 } else {
108 Either::Right(syncer.get(Some(parent.0), parent.1).await)
109 };
110
111 self.context.with_label("propose").spawn({
114 let built = built.clone();
115 move |context| async move {
116 let response_closed = oneshot_closed_future(&mut response);
117 select! {
118 parent = parent_request => {
119 let parent = parent.unwrap();
121
122 let mut current = context.current().epoch_millis();
124 if current <= parent.timestamp {
125 current = parent.timestamp + 1;
126 }
127 let block = Block::new(parent.digest(), parent.height+1, current);
128 let digest = block.digest();
129 {
130 let mut built = built.lock().unwrap();
131 *built = Some(block);
132 }
133
134 let result = response.send(digest.clone());
136 info!(view, ?digest, success=result.is_ok(), "proposed new block");
137 },
138 _ = response_closed => {
139 warn!(view, "propose aborted");
141 }
142 }
143 }
144 });
145 }
146 Message::Broadcast { payload } => {
147 let Some(built) = built.lock().unwrap().clone() else {
149 warn!(?payload, "missing block to broadcast");
150 continue;
151 };
152
153 info!(?payload, "broadcast requested");
155 syncer.broadcast(built.clone()).await;
156 }
157 Message::Verify {
158 view,
159 parent,
160 payload,
161 mut response,
162 } => {
163 let parent_request = if parent.1 == genesis_digest {
165 Either::Left(future::ready(Ok(genesis.clone())))
166 } else {
167 Either::Right(syncer.get(Some(parent.0), parent.1).await)
168 };
169
170 self.context.with_label("verify").spawn({
173 let mut syncer = syncer.clone();
174 move |context| async move {
175 let requester =
176 try_join(parent_request, syncer.get(None, payload).await);
177 let response_closed = oneshot_closed_future(&mut response);
178 select! {
179 result = requester => {
180 let (parent, block) = result.unwrap();
182
183 if block.height != parent.height + 1 {
185 let _ = response.send(false);
186 return;
187 }
188 if block.parent != parent.digest() {
189 let _ = response.send(false);
190 return;
191 }
192 if block.timestamp <= parent.timestamp {
193 let _ = response.send(false);
194 return;
195 }
196 let current = context.current().epoch_millis();
197 if block.timestamp > current + SYNCHRONY_BOUND {
198 let _ = response.send(false);
199 return;
200 }
201
202 syncer.verified(view, block).await;
204
205 let _ = response.send(true);
207 },
208 _ = response_closed => {
209 warn!(view, "verify aborted");
211 }
212 }
213 }
214 });
215 }
216 Message::Prepared { proof, payload } => {
217 let (view, parent, _, signature, _) =
219 self.prover.deserialize_notarization(proof).unwrap();
220 let notarization = Notarization::new(view, parent, payload, signature.into());
221
222 syncer.notarized(notarization).await;
224 }
225 Message::Finalized { proof, payload } => {
226 let (view, parent, _, signature, _) =
228 self.prover.deserialize_finalization(proof.clone()).unwrap();
229 let finalization = Finalization::new(view, parent, payload, signature.into());
230
231 syncer.finalized(finalization).await;
233 }
234 }
235 }
236 }
237}