alto_chain/actors/application/
actor.rs

1use super::{
2    ingress::{Mailbox, Message},
3    supervisor::Supervisor,
4    Config,
5};
6use crate::actors::syncer;
7use alto_types::{Block, Finalization, Notarization};
8use commonware_consensus::threshold_simplex::Prover;
9use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
10use commonware_macros::select;
11use commonware_runtime::{Clock, Handle, Metrics, Spawner};
12use commonware_utils::SystemTimeExt;
13use futures::StreamExt;
14use futures::{channel::mpsc, future::try_join};
15use futures::{channel::oneshot, future};
16use futures::{
17    future::Either,
18    task::{Context, Poll},
19};
20use rand::Rng;
21use std::{
22    pin::Pin,
23    sync::{Arc, Mutex},
24};
25use tracing::{info, warn};
26
27// Define a future that checks if the oneshot channel is closed using a mutable reference
28struct ChannelClosedFuture<'a, T> {
29    sender: &'a mut oneshot::Sender<T>,
30}
31
32impl<T> futures::Future for ChannelClosedFuture<'_, T> {
33    type Output = ();
34
35    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36        // Use poll_canceled to check if the receiver has dropped the channel
37        match self.sender.poll_canceled(cx) {
38            Poll::Ready(()) => Poll::Ready(()), // Receiver dropped, channel closed
39            Poll::Pending => Poll::Pending,     // Channel still open
40        }
41    }
42}
43
44// Helper function to create the future using a mutable reference
45fn oneshot_closed_future<T>(sender: &mut oneshot::Sender<T>) -> ChannelClosedFuture<T> {
46    ChannelClosedFuture { sender }
47}
48
49/// Genesis message to use during initialization.
50const GENESIS: &[u8] = b"commonware is neat";
51
52/// Milliseconds in the future to allow for block timestamps.
53const SYNCHRONY_BOUND: u64 = 500;
54
55/// Application actor.
56pub struct Actor<R: Rng + Spawner + Metrics + Clock> {
57    context: R,
58    prover: Prover<Digest>,
59    hasher: Sha256,
60    mailbox: mpsc::Receiver<Message>,
61}
62
63impl<R: Rng + Spawner + Metrics + Clock> Actor<R> {
64    /// Create a new application actor.
65    pub fn new(context: R, config: Config) -> (Self, Supervisor, Mailbox) {
66        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
67        (
68            Self {
69                context,
70                prover: config.prover,
71                hasher: Sha256::new(),
72                mailbox,
73            },
74            Supervisor::new(config.identity, config.participants, config.share),
75            Mailbox::new(sender),
76        )
77    }
78
79    pub fn start(mut self, syncer: syncer::Mailbox) -> Handle<()> {
80        self.context.spawn_ref()(self.run(syncer))
81    }
82
83    /// Run the application actor.
84    async fn run(mut self, mut syncer: syncer::Mailbox) {
85        // Compute genesis digest
86        self.hasher.update(GENESIS);
87        let genesis_parent = self.hasher.finalize();
88        let genesis = Block::new(genesis_parent, 0, 0);
89        let genesis_digest = genesis.digest();
90        let built: Option<Block> = None;
91        let built = Arc::new(Mutex::new(built));
92        while let Some(message) = self.mailbox.next().await {
93            match message {
94                Message::Genesis { response } => {
95                    // Use the digest of the genesis message as the initial
96                    // payload.
97                    let _ = response.send(genesis_digest.clone());
98                }
99                Message::Propose {
100                    view,
101                    parent,
102                    mut response,
103                } => {
104                    // Get the parent block
105                    let parent_request = if parent.1 == genesis_digest {
106                        Either::Left(future::ready(Ok(genesis.clone())))
107                    } else {
108                        Either::Right(syncer.get(Some(parent.0), parent.1).await)
109                    };
110
111                    // Wait for the parent block to be available or the request to be cancelled in a separate task (to
112                    // continue processing other messages)
113                    self.context.with_label("propose").spawn({
114                        let built = built.clone();
115                        move |context| async move {
116                            let response_closed = oneshot_closed_future(&mut response);
117                            select! {
118                                parent = parent_request => {
119                                    // Get the parent block
120                                    let parent = parent.unwrap();
121
122                                    // Create a new block
123                                    let mut current = context.current().epoch_millis();
124                                    if current <= parent.timestamp {
125                                        current = parent.timestamp + 1;
126                                    }
127                                    let block = Block::new(parent.digest(), parent.height+1, current);
128                                    let digest = block.digest();
129                                    {
130                                        let mut built = built.lock().unwrap();
131                                        *built = Some(block);
132                                    }
133
134                                    // Send the digest to the consensus
135                                    let result = response.send(digest.clone());
136                                    info!(view, ?digest, success=result.is_ok(), "proposed new block");
137                                },
138                                _ = response_closed => {
139                                    // The response was cancelled
140                                    warn!(view, "propose aborted");
141                                }
142                            }
143                        }
144                    });
145                }
146                Message::Broadcast { payload } => {
147                    // Check if the last built is equal
148                    let Some(built) = built.lock().unwrap().clone() else {
149                        warn!(?payload, "missing block to broadcast");
150                        continue;
151                    };
152
153                    // Send the block to the syncer
154                    info!(?payload, "broadcast requested");
155                    syncer.broadcast(built.clone()).await;
156                }
157                Message::Verify {
158                    view,
159                    parent,
160                    payload,
161                    mut response,
162                } => {
163                    // Get the parent and current block
164                    let parent_request = if parent.1 == genesis_digest {
165                        Either::Left(future::ready(Ok(genesis.clone())))
166                    } else {
167                        Either::Right(syncer.get(Some(parent.0), parent.1).await)
168                    };
169
170                    // Wait for the blocks to be available or the request to be cancelled in a separate task (to
171                    // continue processing other messages)
172                    self.context.with_label("verify").spawn({
173                        let mut syncer = syncer.clone();
174                        move |context| async move {
175                            let requester =
176                                try_join(parent_request, syncer.get(None, payload).await);
177                            let response_closed = oneshot_closed_future(&mut response);
178                            select! {
179                                result = requester => {
180                                    // Unwrap the results
181                                    let (parent, block) = result.unwrap();
182
183                                    // Verify the block
184                                    if block.height != parent.height + 1 {
185                                        let _ = response.send(false);
186                                        return;
187                                    }
188                                    if block.parent != parent.digest() {
189                                        let _ = response.send(false);
190                                        return;
191                                    }
192                                    if block.timestamp <= parent.timestamp {
193                                        let _ = response.send(false);
194                                        return;
195                                    }
196                                    let current = context.current().epoch_millis();
197                                    if block.timestamp > current + SYNCHRONY_BOUND {
198                                        let _ = response.send(false);
199                                        return;
200                                    }
201
202                                    // Persist the verified block
203                                    syncer.verified(view, block).await;
204
205                                    // Send the verification result to the consensus
206                                    let _ = response.send(true);
207                                },
208                                _ = response_closed => {
209                                    // The response was cancelled
210                                    warn!(view, "verify aborted");
211                                }
212                            }
213                        }
214                    });
215                }
216                Message::Prepared { proof, payload } => {
217                    // Parse the proof
218                    let (view, parent, _, signature, _) =
219                        self.prover.deserialize_notarization(proof).unwrap();
220                    let notarization = Notarization::new(view, parent, payload, signature.into());
221
222                    // Send the notarization to the syncer
223                    syncer.notarized(notarization).await;
224                }
225                Message::Finalized { proof, payload } => {
226                    // Parse the proof
227                    let (view, parent, _, signature, _) =
228                        self.prover.deserialize_finalization(proof.clone()).unwrap();
229                    let finalization = Finalization::new(view, parent, payload, signature.into());
230
231                    // Send the finalization to the syncer
232                    syncer.finalized(finalization).await;
233                }
234            }
235        }
236    }
237}