guts_consensus/simplex/
application.rs

1//! Application actor for Simplex BFT consensus.
2//!
3//! This module provides the application layer that interfaces between the
4//! Simplex consensus engine and the Guts application logic.
5
6use super::block::SimplexBlock;
7use super::types::Scheme;
8use commonware_consensus::{
9    marshal::{self, Update},
10    simplex::types::Context,
11    types::{Epoch, Round, View},
12    Automaton, Relay, Reporter,
13};
14use commonware_cryptography::{ed25519::PublicKey, sha256::Digest, Digestible, Hasher, Sha256};
15use commonware_macros::select;
16use commonware_runtime::{Clock, Metrics, Spawner};
17use commonware_utils::SystemTimeExt;
18use futures::{
19    channel::{mpsc, oneshot},
20    future::{self, Either},
21    SinkExt, StreamExt,
22};
23use rand::Rng;
24use std::sync::{Arc, Mutex};
25use tracing::{debug, info, warn};
26
27/// Type alias for the finalization callback.
28pub type FinalizedCallback = Arc<dyn Fn(&SimplexBlock) + Send + Sync>;
29
30/// Genesis message used during initialization.
31const GENESIS: &[u8] = b"guts-genesis";
32
33/// Milliseconds in the future to allow for block timestamps.
34const SYNCHRONY_BOUND: u64 = 500;
35
36/// Configuration for the application actor.
37pub struct Config {
38    /// Size of the mailbox channel.
39    pub mailbox_size: usize,
40}
41
42impl Default for Config {
43    fn default() -> Self {
44        Self { mailbox_size: 1024 }
45    }
46}
47
48/// Messages sent to the application.
49pub enum Message {
50    /// Request for genesis digest.
51    Genesis { response: oneshot::Sender<Digest> },
52    /// Request to propose a new block.
53    Propose {
54        round: Round,
55        parent: (View, Digest),
56        response: oneshot::Sender<Digest>,
57    },
58    /// Request to broadcast a block.
59    Broadcast { payload: Digest },
60    /// Request to verify a proposed block.
61    Verify {
62        round: Round,
63        parent: (View, Digest),
64        payload: Digest,
65        response: oneshot::Sender<bool>,
66    },
67    /// A block has been finalized.
68    Finalized { block: SimplexBlock },
69}
70
71/// Mailbox for the application actor.
72#[derive(Clone)]
73pub struct Mailbox {
74    sender: mpsc::Sender<Message>,
75}
76
77impl Mailbox {
78    /// Creates a new mailbox.
79    pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
80        Self { sender }
81    }
82}
83
84impl Automaton for Mailbox {
85    type Digest = Digest;
86    type Context = Context<Self::Digest, PublicKey>;
87
88    async fn genesis(&mut self, _epoch: Epoch) -> Self::Digest {
89        let (response, receiver) = oneshot::channel();
90        self.sender
91            .send(Message::Genesis { response })
92            .await
93            .expect("Failed to send genesis");
94        receiver.await.expect("Failed to receive genesis")
95    }
96
97    async fn propose(
98        &mut self,
99        context: Context<Self::Digest, PublicKey>,
100    ) -> oneshot::Receiver<Self::Digest> {
101        let (response, receiver) = oneshot::channel();
102        self.sender
103            .send(Message::Propose {
104                round: context.round,
105                parent: context.parent,
106                response,
107            })
108            .await
109            .expect("Failed to send propose");
110        receiver
111    }
112
113    async fn verify(
114        &mut self,
115        context: Context<Self::Digest, PublicKey>,
116        payload: Self::Digest,
117    ) -> oneshot::Receiver<bool> {
118        let (response, receiver) = oneshot::channel();
119        self.sender
120            .send(Message::Verify {
121                round: context.round,
122                parent: context.parent,
123                payload,
124                response,
125            })
126            .await
127            .expect("Failed to send verify");
128        receiver
129    }
130}
131
132impl Relay for Mailbox {
133    type Digest = Digest;
134
135    async fn broadcast(&mut self, digest: Self::Digest) {
136        self.sender
137            .send(Message::Broadcast { payload: digest })
138            .await
139            .expect("Failed to send broadcast");
140    }
141}
142
143impl Reporter for Mailbox {
144    type Activity = Update<SimplexBlock>;
145
146    async fn report(&mut self, update: Self::Activity) {
147        let Update::Block(block) = update else {
148            return;
149        };
150        self.sender
151            .send(Message::Finalized { block })
152            .await
153            .expect("Failed to send finalized");
154    }
155}
156
157/// Application actor that handles consensus callbacks.
158pub struct Actor<R: Rng + Spawner + Metrics + Clock> {
159    context: R,
160    hasher: Sha256,
161    mailbox: mpsc::Receiver<Message>,
162
163    /// Callback for when a block is finalized.
164    on_finalized: Option<FinalizedCallback>,
165}
166
167impl<R: Rng + Spawner + Metrics + Clock> Actor<R> {
168    /// Creates a new application actor.
169    pub fn new(context: R, config: Config) -> (Self, Mailbox) {
170        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
171        (
172            Self {
173                context,
174                hasher: Sha256::new(),
175                mailbox,
176                on_finalized: None,
177            },
178            Mailbox::new(sender),
179        )
180    }
181
182    /// Sets the callback for finalized blocks.
183    pub fn on_finalized<F>(mut self, callback: F) -> Self
184    where
185        F: Fn(&SimplexBlock) + Send + Sync + 'static,
186    {
187        self.on_finalized = Some(Arc::new(callback));
188        self
189    }
190
191    /// Runs the application actor.
192    pub async fn run(mut self, mut marshal: marshal::Mailbox<Scheme, SimplexBlock>) {
193        // Compute genesis digest
194        self.hasher.update(GENESIS);
195        let genesis_parent = self.hasher.finalize();
196        let genesis = SimplexBlock::new(genesis_parent, 0, 0, [0u8; 32], 0, [0u8; 32]);
197        let genesis_digest = genesis.digest();
198
199        let built: Option<(Round, SimplexBlock)> = None;
200        let built = Arc::new(Mutex::new(built));
201
202        while let Some(message) = self.mailbox.next().await {
203            match message {
204                Message::Genesis { response } => {
205                    // Return the digest of the genesis block
206                    let _ = response.send(genesis_digest);
207                }
208                Message::Propose {
209                    round,
210                    parent,
211                    response,
212                } => {
213                    // Get the parent block
214                    let parent_request = if parent.1 == genesis_digest {
215                        Either::Left(future::ready(Ok(genesis.clone())))
216                    } else {
217                        Either::Right(
218                            marshal
219                                .subscribe(Some(Round::new(round.epoch(), parent.0)), parent.1)
220                                .await,
221                        )
222                    };
223
224                    // Build the new block in a separate task
225                    let built_clone = built.clone();
226                    let context_clone = self.context.clone();
227                    context_clone.clone().spawn(move |_ctx| async move {
228                        select! {
229                            parent_result = parent_request => {
230                                let parent_block = parent_result.unwrap();
231
232                                // Create timestamp
233                                let mut current = context_clone.current().epoch_millis();
234                                if current <= parent_block.timestamp {
235                                    current = parent_block.timestamp + 1;
236                                }
237
238                                // Create new block
239                                let block = SimplexBlock::new(
240                                    parent_block.digest(),
241                                    parent_block.height + 1,
242                                    current,
243                                    [0u8; 32], // State root computed elsewhere
244                                    0,         // TX count
245                                    [0u8; 32], // TX root
246                                );
247                                let digest = block.digest();
248
249                                {
250                                    let mut built = built_clone.lock().unwrap();
251                                    *built = Some((round, block));
252                                }
253
254                                let result = response.send(digest);
255                                info!(?round, ?digest, success = result.is_ok(), "proposed new block");
256                            }
257                        }
258                    });
259                }
260                Message::Broadcast { payload } => {
261                    // Get the built block and broadcast it
262                    let Some(built_block) = built.lock().unwrap().clone() else {
263                        warn!(?payload, "missing block to broadcast");
264                        continue;
265                    };
266
267                    debug!(
268                        ?payload,
269                        round = ?built_block.0,
270                        height = built_block.1.height,
271                        "broadcast requested"
272                    );
273                    marshal.broadcast(built_block.1.clone()).await;
274                }
275                Message::Verify {
276                    round,
277                    parent,
278                    payload,
279                    response,
280                } => {
281                    // Get parent and verify the block
282                    let parent_request = if parent.1 == genesis_digest {
283                        Either::Left(future::ready(Ok(genesis.clone())))
284                    } else {
285                        Either::Right(
286                            marshal
287                                .subscribe(Some(Round::new(round.epoch(), parent.0)), parent.1)
288                                .await,
289                        )
290                    };
291
292                    let mut marshal_clone = marshal.clone();
293                    let context_clone = self.context.clone();
294                    context_clone.clone().spawn(move |_ctx| async move {
295                        let block_request = marshal_clone.subscribe(None, payload).await;
296
297                        select! {
298                            results = futures::future::try_join(parent_request, block_request) => {
299                                let (parent_block, block) = results.unwrap();
300
301                                // Verify block
302                                if block.height != parent_block.height + 1 {
303                                    let _ = response.send(false);
304                                    return;
305                                }
306                                if block.parent != parent_block.digest() {
307                                    let _ = response.send(false);
308                                    return;
309                                }
310                                if block.timestamp <= parent_block.timestamp {
311                                    let _ = response.send(false);
312                                    return;
313                                }
314                                let current = context_clone.current().epoch_millis();
315                                if block.timestamp > current + SYNCHRONY_BOUND {
316                                    let _ = response.send(false);
317                                    return;
318                                }
319
320                                // Mark as verified
321                                marshal_clone.verified(round, block).await;
322
323                                let _ = response.send(true);
324                            }
325                        }
326                    });
327                }
328                Message::Finalized { block } => {
329                    info!(
330                        height = block.height,
331                        digest = ?block.digest(),
332                        "processed finalized block"
333                    );
334
335                    // Call the finalization callback if set
336                    if let Some(ref callback) = self.on_finalized {
337                        callback(&block);
338                    }
339                }
340            }
341        }
342    }
343}