alto_chain/application/
actor.rs

1use super::{
2    ingress::{Mailbox, Message},
3    Config,
4};
5use crate::{supervisor::Supervisor, utils::OneshotClosedFut};
6use alto_types::Block;
7use commonware_consensus::{marshal, threshold_simplex::types::View};
8use commonware_cryptography::{
9    bls12381::primitives::variant::MinSig, Committable, Digestible, Hasher, Sha256,
10};
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Spawner};
13use commonware_utils::SystemTimeExt;
14use futures::StreamExt;
15use futures::{channel::mpsc, future::try_join};
16use futures::{future, future::Either};
17use rand::Rng;
18use std::sync::{Arc, Mutex};
19use tracing::{debug, info, warn};
20
21/// Genesis message to use during initialization.
22const GENESIS: &[u8] = b"commonware is neat";
23
24/// Milliseconds in the future to allow for block timestamps.
25const SYNCHRONY_BOUND: u64 = 500;
26
27/// Application actor.
28pub struct Actor<R: Rng + Spawner + Metrics + Clock> {
29    context: R,
30    hasher: Sha256,
31    mailbox: mpsc::Receiver<Message>,
32}
33
34impl<R: Rng + Spawner + Metrics + Clock> Actor<R> {
35    /// Create a new application actor.
36    pub fn new(context: R, config: Config) -> (Self, Supervisor, Mailbox) {
37        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
38        (
39            Self {
40                context,
41                hasher: Sha256::new(),
42                mailbox,
43            },
44            Supervisor::new(config.polynomial, config.participants, config.share),
45            Mailbox::new(sender),
46        )
47    }
48
49    pub fn start(mut self, marshal: marshal::Mailbox<MinSig, Block>) -> Handle<()> {
50        self.context.spawn_ref()(self.run(marshal))
51    }
52
53    /// Run the application actor.
54    async fn run(mut self, mut marshal: marshal::Mailbox<MinSig, Block>) {
55        // Compute genesis digest
56        self.hasher.update(GENESIS);
57        let genesis_parent = self.hasher.finalize();
58        let genesis = Block::new(genesis_parent, 0, 0);
59        let genesis_digest = genesis.digest();
60        let built: Option<(View, Block)> = None;
61        let built = Arc::new(Mutex::new(built));
62        while let Some(message) = self.mailbox.next().await {
63            match message {
64                Message::Genesis { response } => {
65                    // Use the digest of the genesis message as the initial
66                    // payload.
67                    let _ = response.send(genesis_digest);
68                }
69                Message::Propose {
70                    view,
71                    parent,
72                    mut response,
73                } => {
74                    // Get the parent block
75                    let parent_request = if parent.1 == genesis_digest {
76                        Either::Left(future::ready(Ok(genesis.clone())))
77                    } else {
78                        Either::Right(marshal.subscribe(Some(parent.0), parent.1).await)
79                    };
80
81                    // Wait for the parent block to be available or the request to be cancelled in a separate task (to
82                    // continue processing other messages)
83                    self.context.with_label("propose").spawn({
84                        let built = built.clone();
85                        move |context| async move {
86                            let response_closed = OneshotClosedFut::new(&mut response);
87                            select! {
88                                parent = parent_request => {
89                                    // Get the parent block
90                                    let parent = parent.unwrap();
91
92                                    // Create a new block
93                                    let mut current = context.current().epoch_millis();
94                                    if current <= parent.timestamp {
95                                        current = parent.timestamp + 1;
96                                    }
97                                    let block = Block::new(parent.digest(), parent.height+1, current);
98                                    let digest = block.digest();
99                                    {
100                                        let mut built = built.lock().unwrap();
101                                        *built = Some((view, block));
102                                    }
103
104                                    // Send the digest to the consensus
105                                    let result = response.send(digest);
106                                    info!(view, ?digest, success=result.is_ok(), "proposed new block");
107                                },
108                                _ = response_closed => {
109                                    // The response was cancelled
110                                    warn!(view, "propose aborted");
111                                }
112                            }
113                        }
114                    });
115                }
116                Message::Broadcast { payload } => {
117                    // Check if the last built is equal
118                    let Some(built) = built.lock().unwrap().clone() else {
119                        warn!(?payload, "missing block to broadcast");
120                        continue;
121                    };
122
123                    // Send the block to the syncer
124                    debug!(
125                        ?payload,
126                        view = built.0,
127                        height = built.1.height,
128                        "broadcast requested"
129                    );
130                    marshal.broadcast(built.1.clone()).await;
131                }
132                Message::Verify {
133                    view,
134                    parent,
135                    payload,
136                    mut response,
137                } => {
138                    // Get the parent and current block
139                    let parent_request = if parent.1 == genesis_digest {
140                        Either::Left(future::ready(Ok(genesis.clone())))
141                    } else {
142                        Either::Right(marshal.subscribe(Some(parent.0), parent.1).await)
143                    };
144
145                    // Wait for the blocks to be available or the request to be cancelled in a separate task (to
146                    // continue processing other messages)
147                    self.context.with_label("verify").spawn({
148                        let mut marshal = marshal.clone();
149                        move |context| async move {
150                            let requester =
151                                try_join(parent_request, marshal.subscribe(None, payload).await);
152                            let response_closed = OneshotClosedFut::new(&mut response);
153                            select! {
154                                result = requester => {
155                                    // Unwrap the results
156                                    let (parent, block) = result.unwrap();
157
158                                    // Verify the block
159                                    if block.height != parent.height + 1 {
160                                        let _ = response.send(false);
161                                        return;
162                                    }
163                                    if block.parent != parent.digest() {
164                                        let _ = response.send(false);
165                                        return;
166                                    }
167                                    if block.timestamp <= parent.timestamp {
168                                        let _ = response.send(false);
169                                        return;
170                                    }
171                                    let current = context.current().epoch_millis();
172                                    if block.timestamp > current + SYNCHRONY_BOUND {
173                                        let _ = response.send(false);
174                                        return;
175                                    }
176
177                                    // Persist the verified block
178                                    marshal.verified(view, block).await;
179
180                                    // Send the verification result to the consensus
181                                    let _ = response.send(true);
182                                },
183                                _ = response_closed => {
184                                    // The response was cancelled
185                                    warn!(view, "verify aborted");
186                                }
187                            }
188                        }
189                    });
190                }
191                Message::Finalized { block } => {
192                    // In an application that maintains state, you would compute the state transition function here.
193                    //
194                    // After an unclean shutdown, it is possible that the application may be asked to process a block it has already seen (which it can simply ignore).
195                    info!(
196                        height = block.height,
197                        digest = ?block.commitment(),
198                        "processed block"
199                    );
200                }
201            }
202        }
203    }
204}