battleware_node/application/
actor.rs

1use super::{
2    ingress::{Mailbox, Message},
3    Config,
4};
5use crate::{
6    aggregator,
7    application::mempool::Mempool,
8    indexer::Indexer,
9    seeder,
10    supervisor::{EpochSupervisor, Supervisor, ViewSupervisor},
11};
12use battleware_execution::{nonce, state_transition, Adb, Noncer};
13use battleware_types::{
14    execution::{Output, Value, MAX_BLOCK_TRANSACTIONS},
15    genesis_block, genesis_digest, Block, Identity,
16};
17use commonware_consensus::{marshal, threshold_simplex::types::View};
18use commonware_cryptography::{
19    bls12381::primitives::{poly::public, variant::MinSig},
20    ed25519::Batch,
21    sha256::Digest,
22    BatchVerifier, Committable, Digestible, Sha256,
23};
24use commonware_macros::select;
25use commonware_runtime::{
26    buffer::PoolRef, telemetry::metrics::histogram, Clock, Handle, Metrics, Spawner, Storage,
27    ThreadPool,
28};
29use commonware_storage::{
30    adb::{self, keyless},
31    translator::EightCap,
32};
33use commonware_utils::{futures::ClosedExt, NZU64};
34use futures::StreamExt;
35use futures::{channel::mpsc, future::try_join};
36use futures::{future, future::Either};
37use prometheus_client::metrics::{counter::Counter, histogram::Histogram};
38use rand::{CryptoRng, Rng};
39use std::{
40    num::NonZero,
41    sync::{atomic::AtomicU64, Arc, Mutex},
42};
43use tracing::{debug, info, warn};
44
45/// Histogram buckets for application latency.
46const LATENCY: [f64; 20] = [
47    0.001, 0.002, 0.003, 0.004, 0.005, 0.0075, 0.010, 0.015, 0.020, 0.025, 0.030, 0.050, 0.075,
48    0.100, 0.200, 0.500, 1.0, 2.0, 5.0, 10.0,
49];
50
51/// Attempt to prune the state every 10000 blocks (randomly).
52const PRUNE_INTERVAL: u64 = 10_000;
53
54// TODO: store the outputs of previously computed state to avoid a ton of recomputation.
55async fn ancestry(
56    mut marshal: marshal::Mailbox<MinSig, Block>,
57    start: (Option<View>, Digest),
58    end: u64,
59) -> Option<Vec<Block>> {
60    let mut ancestry = Vec::new();
61
62    // Get the start block
63    let Ok(block) = marshal.subscribe(start.0, start.1).await.await else {
64        return None;
65    };
66    let mut next = (block.height.saturating_sub(1), block.parent);
67    ancestry.push(block);
68
69    // Recurse until reaching the end height
70    while next.0 > end {
71        let request = marshal.subscribe(None, next.1).await;
72        let Ok(block) = request.await else {
73            return None;
74        };
75        next = (block.height.saturating_sub(1), block.parent);
76        ancestry.push(block);
77    }
78
79    // Reverse the ancestry
80    Some(ancestry.into_iter().rev().collect())
81}
82
83/// Application actor.
84pub struct Actor<R: Rng + CryptoRng + Spawner + Metrics + Clock + Storage, I: Indexer> {
85    context: R,
86    inbound: Mailbox<R>,
87    mailbox: mpsc::Receiver<Message<R>>,
88    identity: Identity,
89    partition_prefix: String,
90    mmr_items_per_blob: NonZero<u64>,
91    mmr_write_buffer: NonZero<usize>,
92    log_items_per_section: NonZero<u64>,
93    log_write_buffer: NonZero<usize>,
94    locations_items_per_blob: NonZero<u64>,
95    buffer_pool: PoolRef,
96    indexer: I,
97    execution_concurrency: usize,
98}
99
100impl<R: Rng + CryptoRng + Spawner + Metrics + Clock + Storage, I: Indexer> Actor<R, I> {
101    /// Create a new application actor.
102    pub fn new(
103        context: R,
104        config: Config<I>,
105    ) -> (Self, ViewSupervisor, EpochSupervisor, Mailbox<R>) {
106        // Create actor
107        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
108        let inbound = Mailbox::new(sender);
109
110        // Create supervisors
111        let identity = *public::<MinSig>(&config.polynomial);
112        let supervisor = Supervisor::new(config.polynomial, config.participants, config.share);
113        let view_supervisor = ViewSupervisor::new(supervisor.clone());
114        let epoch_supervisor = EpochSupervisor::new(supervisor);
115
116        (
117            Self {
118                context,
119                mailbox,
120                inbound: inbound.clone(),
121                identity,
122                partition_prefix: config.partition_prefix,
123                mmr_items_per_blob: config.mmr_items_per_blob,
124                mmr_write_buffer: config.mmr_write_buffer,
125                log_items_per_section: config.log_items_per_section,
126                log_write_buffer: config.log_write_buffer,
127                locations_items_per_blob: config.locations_items_per_blob,
128                buffer_pool: config.buffer_pool,
129                indexer: config.indexer,
130                execution_concurrency: config.execution_concurrency,
131            },
132            view_supervisor,
133            epoch_supervisor,
134            inbound,
135        )
136    }
137
138    pub fn start(
139        mut self,
140        marshal: marshal::Mailbox<MinSig, Block>,
141        seeder: seeder::Mailbox,
142        aggregator: aggregator::Mailbox,
143    ) -> Handle<()> {
144        self.context.spawn_ref()(self.run(marshal, seeder, aggregator))
145    }
146
147    /// Run the application actor.
148    async fn run(
149        mut self,
150        mut marshal: marshal::Mailbox<MinSig, Block>,
151        seeder: seeder::Mailbox,
152        mut aggregator: aggregator::Mailbox,
153    ) {
154        // Initialize metrics
155        let txs_considered: Counter<u64, AtomicU64> = Counter::default();
156        let txs_executed: Counter<u64, AtomicU64> = Counter::default();
157        let ancestry_latency = Histogram::new(LATENCY.into_iter());
158        let propose_latency = Histogram::new(LATENCY.into_iter());
159        let verify_latency = Histogram::new(LATENCY.into_iter());
160        let seeded_latency = Histogram::new(LATENCY.into_iter());
161        let execute_latency = Histogram::new(LATENCY.into_iter());
162        let finalize_latency = Histogram::new(LATENCY.into_iter());
163        let prune_latency = Histogram::new(LATENCY.into_iter());
164        self.context.register(
165            "txs_considered",
166            "Number of transactions considered during propose",
167            txs_considered.clone(),
168        );
169        self.context.register(
170            "txs_executed",
171            "Number of transactions executed after finalization",
172            txs_executed.clone(),
173        );
174        self.context.register(
175            "ancestry_latency",
176            "Latency of ancestry requests",
177            ancestry_latency.clone(),
178        );
179        self.context.register(
180            "propose_latency",
181            "Latency of propose requests",
182            propose_latency.clone(),
183        );
184        self.context.register(
185            "verify_latency",
186            "Latency of verify requests",
187            verify_latency.clone(),
188        );
189        self.context.register(
190            "seeded_latency",
191            "Latency of seeded requests",
192            seeded_latency.clone(),
193        );
194        self.context.register(
195            "execute_latency",
196            "Latency of execute requests",
197            execute_latency.clone(),
198        );
199        self.context.register(
200            "finalize_latency",
201            "Latency of finalize requests",
202            finalize_latency.clone(),
203        );
204        self.context.register(
205            "prune_latency",
206            "Latency of prune requests",
207            prune_latency.clone(),
208        );
209        let ancestry_latency = histogram::Timed::new(
210            ancestry_latency,
211            Arc::new(self.context.with_label("ancestry_latency")),
212        );
213        let propose_latency = histogram::Timed::new(
214            propose_latency,
215            Arc::new(self.context.with_label("propose_latency")),
216        );
217        let verify_latency = histogram::Timed::new(
218            verify_latency,
219            Arc::new(self.context.with_label("verify_latency")),
220        );
221        let seeded_latency = histogram::Timed::new(
222            seeded_latency,
223            Arc::new(self.context.with_label("seeded_latency")),
224        );
225        let execute_latency = histogram::Timed::new(
226            execute_latency,
227            Arc::new(self.context.with_label("execute_latency")),
228        );
229        let finalize_latency = histogram::Timed::new(
230            finalize_latency,
231            Arc::new(self.context.with_label("finalize_latency")),
232        );
233        let prune_latency = histogram::Timed::new(
234            prune_latency,
235            Arc::new(self.context.with_label("prune_latency")),
236        );
237
238        // Initialize the state
239        let mut state = Adb::init(
240            self.context.with_label("state"),
241            adb::any::variable::Config {
242                mmr_journal_partition: format!("{}-state-mmr-journal", self.partition_prefix),
243                mmr_metadata_partition: format!("{}-state-mmr-metadata", self.partition_prefix),
244                mmr_items_per_blob: self.mmr_items_per_blob,
245                mmr_write_buffer: self.mmr_write_buffer,
246                log_journal_partition: format!("{}-state-log-journal", self.partition_prefix),
247                log_items_per_section: self.log_items_per_section,
248                log_write_buffer: self.log_write_buffer,
249                log_compression: None,
250                log_codec_config: (),
251                locations_journal_partition: format!(
252                    "{}-state-locations-journal",
253                    self.partition_prefix
254                ),
255                locations_items_per_blob: self.locations_items_per_blob,
256                translator: EightCap,
257                thread_pool: None,
258                buffer_pool: self.buffer_pool.clone(),
259            },
260        )
261        .await
262        .unwrap();
263        let mut events = keyless::Keyless::<_, Output, Sha256>::init(
264            self.context.with_label("events"),
265            keyless::Config {
266                mmr_journal_partition: format!("{}-events-mmr-journal", self.partition_prefix),
267                mmr_metadata_partition: format!("{}-events-mmr-metadata", self.partition_prefix),
268                mmr_items_per_blob: self.mmr_items_per_blob,
269                mmr_write_buffer: self.mmr_write_buffer,
270                log_journal_partition: format!("{}-events-log-journal", self.partition_prefix),
271                log_items_per_section: self.log_items_per_section,
272                log_write_buffer: self.log_write_buffer,
273                log_compression: None,
274                log_codec_config: (),
275                locations_journal_partition: format!(
276                    "{}-events-locations-journal",
277                    self.partition_prefix
278                ),
279                locations_items_per_blob: self.locations_items_per_blob,
280                locations_write_buffer: self.log_write_buffer,
281                thread_pool: None,
282                buffer_pool: self.buffer_pool.clone(),
283            },
284        )
285        .await
286        .unwrap();
287
288        // Create the execution pool
289        //
290        // TODO (https://github.com/commonwarexyz/monorepo/issues/1540): use commonware-runtime::create_pool
291        let execution_pool = rayon::ThreadPoolBuilder::new()
292            .num_threads(self.execution_concurrency)
293            .build()
294            .expect("failed to create execution pool");
295        let execution_pool = ThreadPool::new(execution_pool);
296
297        // Compute genesis digest
298        let genesis_digest = genesis_digest();
299
300        // Track built blocks
301        let built: Option<(View, Block)> = None;
302        let built = Arc::new(Mutex::new(built));
303
304        // Initialize mempool
305        let mut mempool = Mempool::new(self.context.with_label("mempool"));
306
307        // Use reconnecting indexer wrapper
308        let reconnecting_indexer = crate::indexer::ReconnectingIndexer::new(
309            self.context.with_label("indexer"),
310            self.indexer,
311        );
312
313        // This will never fail and handles reconnection internally
314        let mut next_prune = self.context.gen_range(1..=PRUNE_INTERVAL);
315        let mut tx_stream = Box::pin(reconnecting_indexer.listen_mempool().await.unwrap());
316        loop {
317            select! {
318                    message =  self.mailbox.next() => {
319                        let Some(message) = message else {
320                            return;
321                        };
322                        match message {
323                            Message::Genesis { response } => {
324                                // Use the digest of the genesis message as the initial
325                                // payload.
326                                let _ = response.send(genesis_digest);
327                            }
328                            Message::Propose {
329                                view,
330                                parent,
331                                mut response,
332                            } => {
333                                // Start the timer
334                                let ancestry_timer = ancestry_latency.timer();
335                                let propose_timer = propose_latency.timer();
336
337                                // Immediately send a response for genesis block
338                                if parent.1 == genesis_digest {
339                                    drop(ancestry_timer);
340                                    self.inbound.ancestry(view, vec![genesis_block()], propose_timer, response).await;
341                                    continue;
342                                }
343
344                                // Get the ancestry
345                                let ancestry = ancestry(marshal.clone(), (Some(parent.0), parent.1), state.get_metadata().await.unwrap().and_then(|(_, v)| match v {
346                                    Some(Value::Commit { height, start: _ }) => Some(height),
347                                    _ => None,
348                                }).unwrap_or(0));
349
350                                // Wait for the parent block to be available or the request to be cancelled in a separate task (to
351                                // continue processing other messages)
352                                self.context.with_label("ancestry").spawn({
353                                    let mut inbound = self.inbound.clone();
354                                    move |_| async move {
355                                        select! {
356                                            ancestry = ancestry => {
357                                                // Get the ancestry
358                                                let Some(ancestry) = ancestry else {
359                                                    ancestry_timer.cancel();
360                                                    warn!(view, "missing parent ancestry");
361                                                    return;
362                                                };
363                                                drop(ancestry_timer);
364
365                                                // Pass back to mailbox
366                                                inbound.ancestry(view, ancestry, propose_timer, response).await;
367                                            },
368                                            _ = response.closed() => {
369                                                // The response was cancelled
370                                                ancestry_timer.cancel();
371                                                warn!(view, "propose aborted");
372                                            }
373                                        }
374                                    }
375                                });
376                            }
377                            Message::Ancestry {
378                                view,
379                                blocks,
380                                timer,
381                                response,
382                            } => {
383                                // Get parent block
384                                let parent = blocks.last().unwrap();
385
386                                // Find first block on top of finalized state (may have increased since we started)
387                                let height = state.get_metadata().await.unwrap().and_then(|(_, v)| match v {
388                                    Some(Value::Commit { height, start: _ }) => Some(height),
389                                    _ => None,
390                                }).unwrap_or(0);
391                                let mut noncer = Noncer::new(&state);
392                                for block in &blocks {
393                                    // Skip blocks below our height
394                                    if block.height <= height {
395                                        debug!(block = block.height, processed = height, "skipping block during propose");
396                                        continue;
397                                    }
398
399                                    // Apply transaction nonces to state
400                                    for tx in &block.transactions {
401                                        // We don't care if the nonces are valid or not, we just need to ensure we'll process tip the same way as state will be processed during finalization
402                                        noncer.prepare(tx).await;
403                                    }
404                                }
405
406                                // Select up to max transactions
407                                let mut considered = 0;
408                                let mut transactions = Vec::new();
409                                while transactions.len() < MAX_BLOCK_TRANSACTIONS {
410                                    // Get next transaction
411                                    let Some(tx) = mempool.next() else {
412                                        break;
413                                    };
414                                    considered += 1;
415
416                                    // Attempt to apply
417                                    if !noncer.prepare(&tx).await {
418                                        continue;
419                                    }
420
421                                    // Add to transactions
422                                    transactions.push(tx);
423                                }
424                                let txs = transactions.len();
425
426                                // Update metrics
427                                txs_considered.inc_by(considered as u64);
428
429                                // When ancestry for propose is provided, we can attempt to pack a block
430                                let block = Block::new(parent.digest(), view, parent.height+1, transactions);
431                                let digest = block.digest();
432                                {
433                                    // We may drop the transactions from a block that was never broadcast...users
434                                    // can rebroadcast.
435                                    let mut built = built.lock().unwrap();
436                                    *built = Some((view, block));
437                                }
438
439                                // Send the digest to the consensus
440                                let result = response.send(digest);
441                                info!(view, ?digest, txs, success=result.is_ok(), "proposed block");
442                                drop(timer);
443                            }
444                            Message::Broadcast { payload } => {
445                                // Check if the last built is equal
446                                let Some(built) = built.lock().unwrap().take() else {
447                                    warn!(?payload, "missing block to broadcast");
448                                    continue;
449                                };
450
451                                // Check if the block is equal
452                                if built.1.commitment() != payload {
453                                    warn!(?payload, "outdated broadcast");
454                                    continue;
455                                }
456
457                                // Send the block to the syncer
458                                debug!(
459                                    ?payload,
460                                    view = built.0,
461                                    height = built.1.height,
462                                    "broadcast requested"
463                                );
464                                marshal.broadcast(built.1).await;
465                            }
466                            Message::Verify {
467                                view,
468                                parent,
469                                payload,
470                                mut response,
471                            } => {
472                                // Start the timer
473                                let timer = verify_latency.timer();
474
475                                // Get the parent and current block
476                                let parent_request = if parent.1 == genesis_digest {
477                                    Either::Left(future::ready(Ok(genesis_block())))
478                                } else {
479                                    Either::Right(marshal.subscribe(Some(parent.0), parent.1).await)
480                                };
481
482                                // Wait for the blocks to be available or the request to be cancelled in a separate task (to
483                                // continue processing other messages)
484                                self.context.with_label("verify").spawn({
485                                    let mut marshal = marshal.clone();
486                                    move |mut context| async move {
487                                        let requester =
488                                            try_join(parent_request, marshal.subscribe(None, payload).await);
489                                        select! {
490                                            result = requester => {
491                                                // Unwrap the results
492                                                let (parent, block) = result.unwrap();
493
494                                                // Verify the block
495                                                if block.view != view {
496                                                    let _ = response.send(false);
497                                                    return;
498                                                }
499                                                if block.height != parent.height + 1 {
500                                                    let _ = response.send(false);
501                                                    return;
502                                                }
503                                                if block.parent != parent.digest() {
504                                                    let _ = response.send(false);
505                                                    return;
506                                                }
507
508                                                // Batch verify transaction signatures (we don't care if the nonces are valid or not, we'll just skip the ones that are invalid)
509                                                let mut batcher = Batch::new();
510                                                for tx in &block.transactions {
511                                                    tx.verify_batch(&mut batcher);
512                                                }
513                                                if !batcher.verify(&mut context) {
514                                                    let _ = response.send(false);
515                                                    return;
516                                                }
517
518                                                // Persist the verified block (transactions may be invalid)
519                                                marshal.verified(view, block).await;
520
521                                                // Send the verification result to the consensus
522                                                let _ = response.send(true);
523
524                                                // Stop the timer
525                                                drop(timer);
526                                            },
527                                            _ = response.closed() => {
528                                                // The response was cancelled
529                                                warn!(view, "verify aborted");
530                                            }
531                                        }
532                                    }
533                                });
534                            }
535                            Message::Finalized { block, response } => {
536                                // Start the timer
537                                let seeded_timer = seeded_latency.timer();
538                                let finalize_timer = finalize_latency.timer();
539
540                                // While waiting for the seed required for processing, we should spawn a task
541                                // to handle resolution to avoid blocking the application.
542                                self.context.with_label("seeded").spawn({
543                                    let mut inbound = self.inbound.clone();
544                                    let mut seeder = seeder.clone();
545                                    move |_| async move {
546                                        let seed = seeder.get(block.view).await;
547                                        drop(seeded_timer);
548                                        inbound.seeded(block, seed, finalize_timer, response).await;
549                                    }
550                                });
551
552                            }
553                            Message::Seeded { block, seed, timer, response } => {
554                                // Execute state transition (will only apply if next block)
555                                let height = block.height;
556                                let commitment = block.commitment();
557
558                                // Apply the block to our state
559                                //
560                                // We must wait for the seed to be available before processing the block,
561                                // otherwise we will not be able to match players or compute attack strength.
562                                let execute_timer = execute_latency.timer();
563                                let tx_count = block.transactions.len();
564                                let result = state_transition::execute_state_transition(
565                                    &mut state,
566                                    &mut events,
567                                    self.identity,
568                                    height,
569                                    seed,
570                                    block.transactions,
571                                    execution_pool.clone(),
572                                ).await;
573                                drop(execute_timer);
574
575                                // Update metrics
576                                txs_executed.inc_by(tx_count as u64);
577
578                                // Update mempool based on processed transactions
579                                for (public, next_nonce) in &result.processed_nonces {
580                                    mempool.retain(public, *next_nonce);
581                                }
582
583                                // Generate range proof for changes
584                                let state_proof_ops = result.state_end_op - result.state_start_op;
585                                let events_start_op = result.events_start_op;
586                                let events_proof_ops = result.events_end_op - events_start_op;
587                                let ((state_proof, state_proof_ops), (events_proof, events_proof_ops)) = try_join(
588                                    state.historical_proof(result.state_end_op, result.state_start_op, state_proof_ops),
589                                    events.historical_proof(result.events_end_op, events_start_op, NZU64!(events_proof_ops)),
590                                ).await.expect("failed to generate proofs");
591
592                                // Send to aggregator
593                                aggregator.executed(block.view, block.height, commitment, result, state_proof, state_proof_ops, events_proof, events_proof_ops, response).await;
594
595                                // Stop the timer
596                                drop(timer);
597
598                                // Attempt to prune (this syncs data prior to prune, so we don't need to call separately)
599                                next_prune -= 1;
600                                if next_prune == 0 {
601                                    // Prune storage
602                                    let timer = prune_latency.timer();
603                                    try_join(
604                                        state.prune(state.inactivity_floor_loc()),
605                                        events.prune(events_start_op),
606                                    ).await.expect("failed to prune storage");
607                                    drop(timer);
608
609                                    // Reset next prune
610                                    next_prune = self.context.gen_range(1..=PRUNE_INTERVAL);
611                                }
612                            },
613                        }
614                },
615                pending = tx_stream.next() => {
616                    // The reconnecting wrapper handles all connection issues internally
617                    // We only get Some(Ok(tx)) for valid transactions
618                    let Some(Ok(pending)) = pending else {
619                        // This should only happen if there's a transaction-level error
620                        // The stream itself won't end due to the reconnecting wrapper
621                        continue;
622                    };
623
624                    // Process transactions (already verified in indexer client)
625                    for tx in pending.transactions {
626                        // Check if below next
627                        let next = nonce(&state, &tx.public).await;
628                        if tx.nonce < next {
629                            // If below next, we drop the incoming transaction
630                            debug!(tx = tx.nonce, state = next, "dropping incoming transaction");
631                            continue;
632                        }
633
634                        // Add to mempool
635                        mempool.add(tx);
636                    }
637                }
638            }
639        }
640    }
641}