battleware-node 0.0.1

Validator that participates in a battleware network.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
use super::{
    ingress::{Mailbox, Message},
    Config,
};
use crate::{
    aggregator,
    application::mempool::Mempool,
    indexer::Indexer,
    seeder,
    supervisor::{EpochSupervisor, Supervisor, ViewSupervisor},
};
use battleware_execution::{nonce, state_transition, Adb, Noncer};
use battleware_types::{
    execution::{Output, Value, MAX_BLOCK_TRANSACTIONS},
    genesis_block, genesis_digest, Block, Identity,
};
use commonware_consensus::{marshal, threshold_simplex::types::View};
use commonware_cryptography::{
    bls12381::primitives::{poly::public, variant::MinSig},
    ed25519::Batch,
    sha256::Digest,
    BatchVerifier, Committable, Digestible, Sha256,
};
use commonware_macros::select;
use commonware_runtime::{
    buffer::PoolRef, telemetry::metrics::histogram, Clock, Handle, Metrics, Spawner, Storage,
    ThreadPool,
};
use commonware_storage::{
    adb::{self, keyless},
    translator::EightCap,
};
use commonware_utils::{futures::ClosedExt, NZU64};
use futures::StreamExt;
use futures::{channel::mpsc, future::try_join};
use futures::{future, future::Either};
use prometheus_client::metrics::{counter::Counter, histogram::Histogram};
use rand::{CryptoRng, Rng};
use std::{
    num::NonZero,
    sync::{atomic::AtomicU64, Arc, Mutex},
};
use tracing::{debug, info, warn};

/// Histogram buckets for application latency.
const LATENCY: [f64; 20] = [
    0.001, 0.002, 0.003, 0.004, 0.005, 0.0075, 0.010, 0.015, 0.020, 0.025, 0.030, 0.050, 0.075,
    0.100, 0.200, 0.500, 1.0, 2.0, 5.0, 10.0,
];

/// Attempt to prune the state every 10000 blocks (randomly).
const PRUNE_INTERVAL: u64 = 10_000;

// TODO: store the outputs of previously computed state to avoid a ton of recomputation.
async fn ancestry(
    mut marshal: marshal::Mailbox<MinSig, Block>,
    start: (Option<View>, Digest),
    end: u64,
) -> Option<Vec<Block>> {
    let mut ancestry = Vec::new();

    // Get the start block
    let Ok(block) = marshal.subscribe(start.0, start.1).await.await else {
        return None;
    };
    let mut next = (block.height.saturating_sub(1), block.parent);
    ancestry.push(block);

    // Recurse until reaching the end height
    while next.0 > end {
        let request = marshal.subscribe(None, next.1).await;
        let Ok(block) = request.await else {
            return None;
        };
        next = (block.height.saturating_sub(1), block.parent);
        ancestry.push(block);
    }

    // Reverse the ancestry
    Some(ancestry.into_iter().rev().collect())
}

/// Application actor.
pub struct Actor<R: Rng + CryptoRng + Spawner + Metrics + Clock + Storage, I: Indexer> {
    context: R,
    inbound: Mailbox<R>,
    mailbox: mpsc::Receiver<Message<R>>,
    identity: Identity,
    partition_prefix: String,
    mmr_items_per_blob: NonZero<u64>,
    mmr_write_buffer: NonZero<usize>,
    log_items_per_section: NonZero<u64>,
    log_write_buffer: NonZero<usize>,
    locations_items_per_blob: NonZero<u64>,
    buffer_pool: PoolRef,
    indexer: I,
    execution_concurrency: usize,
}

impl<R: Rng + CryptoRng + Spawner + Metrics + Clock + Storage, I: Indexer> Actor<R, I> {
    /// Create a new application actor.
    pub fn new(
        context: R,
        config: Config<I>,
    ) -> (Self, ViewSupervisor, EpochSupervisor, Mailbox<R>) {
        // Create actor
        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
        let inbound = Mailbox::new(sender);

        // Create supervisors
        let identity = *public::<MinSig>(&config.polynomial);
        let supervisor = Supervisor::new(config.polynomial, config.participants, config.share);
        let view_supervisor = ViewSupervisor::new(supervisor.clone());
        let epoch_supervisor = EpochSupervisor::new(supervisor);

        (
            Self {
                context,
                mailbox,
                inbound: inbound.clone(),
                identity,
                partition_prefix: config.partition_prefix,
                mmr_items_per_blob: config.mmr_items_per_blob,
                mmr_write_buffer: config.mmr_write_buffer,
                log_items_per_section: config.log_items_per_section,
                log_write_buffer: config.log_write_buffer,
                locations_items_per_blob: config.locations_items_per_blob,
                buffer_pool: config.buffer_pool,
                indexer: config.indexer,
                execution_concurrency: config.execution_concurrency,
            },
            view_supervisor,
            epoch_supervisor,
            inbound,
        )
    }

    pub fn start(
        mut self,
        marshal: marshal::Mailbox<MinSig, Block>,
        seeder: seeder::Mailbox,
        aggregator: aggregator::Mailbox,
    ) -> Handle<()> {
        self.context.spawn_ref()(self.run(marshal, seeder, aggregator))
    }

    /// Run the application actor.
    async fn run(
        mut self,
        mut marshal: marshal::Mailbox<MinSig, Block>,
        seeder: seeder::Mailbox,
        mut aggregator: aggregator::Mailbox,
    ) {
        // Initialize metrics
        let txs_considered: Counter<u64, AtomicU64> = Counter::default();
        let txs_executed: Counter<u64, AtomicU64> = Counter::default();
        let ancestry_latency = Histogram::new(LATENCY.into_iter());
        let propose_latency = Histogram::new(LATENCY.into_iter());
        let verify_latency = Histogram::new(LATENCY.into_iter());
        let seeded_latency = Histogram::new(LATENCY.into_iter());
        let execute_latency = Histogram::new(LATENCY.into_iter());
        let finalize_latency = Histogram::new(LATENCY.into_iter());
        let prune_latency = Histogram::new(LATENCY.into_iter());
        self.context.register(
            "txs_considered",
            "Number of transactions considered during propose",
            txs_considered.clone(),
        );
        self.context.register(
            "txs_executed",
            "Number of transactions executed after finalization",
            txs_executed.clone(),
        );
        self.context.register(
            "ancestry_latency",
            "Latency of ancestry requests",
            ancestry_latency.clone(),
        );
        self.context.register(
            "propose_latency",
            "Latency of propose requests",
            propose_latency.clone(),
        );
        self.context.register(
            "verify_latency",
            "Latency of verify requests",
            verify_latency.clone(),
        );
        self.context.register(
            "seeded_latency",
            "Latency of seeded requests",
            seeded_latency.clone(),
        );
        self.context.register(
            "execute_latency",
            "Latency of execute requests",
            execute_latency.clone(),
        );
        self.context.register(
            "finalize_latency",
            "Latency of finalize requests",
            finalize_latency.clone(),
        );
        self.context.register(
            "prune_latency",
            "Latency of prune requests",
            prune_latency.clone(),
        );
        let ancestry_latency = histogram::Timed::new(
            ancestry_latency,
            Arc::new(self.context.with_label("ancestry_latency")),
        );
        let propose_latency = histogram::Timed::new(
            propose_latency,
            Arc::new(self.context.with_label("propose_latency")),
        );
        let verify_latency = histogram::Timed::new(
            verify_latency,
            Arc::new(self.context.with_label("verify_latency")),
        );
        let seeded_latency = histogram::Timed::new(
            seeded_latency,
            Arc::new(self.context.with_label("seeded_latency")),
        );
        let execute_latency = histogram::Timed::new(
            execute_latency,
            Arc::new(self.context.with_label("execute_latency")),
        );
        let finalize_latency = histogram::Timed::new(
            finalize_latency,
            Arc::new(self.context.with_label("finalize_latency")),
        );
        let prune_latency = histogram::Timed::new(
            prune_latency,
            Arc::new(self.context.with_label("prune_latency")),
        );

        // Initialize the state
        let mut state = Adb::init(
            self.context.with_label("state"),
            adb::any::variable::Config {
                mmr_journal_partition: format!("{}-state-mmr-journal", self.partition_prefix),
                mmr_metadata_partition: format!("{}-state-mmr-metadata", self.partition_prefix),
                mmr_items_per_blob: self.mmr_items_per_blob,
                mmr_write_buffer: self.mmr_write_buffer,
                log_journal_partition: format!("{}-state-log-journal", self.partition_prefix),
                log_items_per_section: self.log_items_per_section,
                log_write_buffer: self.log_write_buffer,
                log_compression: None,
                log_codec_config: (),
                locations_journal_partition: format!(
                    "{}-state-locations-journal",
                    self.partition_prefix
                ),
                locations_items_per_blob: self.locations_items_per_blob,
                translator: EightCap,
                thread_pool: None,
                buffer_pool: self.buffer_pool.clone(),
            },
        )
        .await
        .unwrap();
        let mut events = keyless::Keyless::<_, Output, Sha256>::init(
            self.context.with_label("events"),
            keyless::Config {
                mmr_journal_partition: format!("{}-events-mmr-journal", self.partition_prefix),
                mmr_metadata_partition: format!("{}-events-mmr-metadata", self.partition_prefix),
                mmr_items_per_blob: self.mmr_items_per_blob,
                mmr_write_buffer: self.mmr_write_buffer,
                log_journal_partition: format!("{}-events-log-journal", self.partition_prefix),
                log_items_per_section: self.log_items_per_section,
                log_write_buffer: self.log_write_buffer,
                log_compression: None,
                log_codec_config: (),
                locations_journal_partition: format!(
                    "{}-events-locations-journal",
                    self.partition_prefix
                ),
                locations_items_per_blob: self.locations_items_per_blob,
                locations_write_buffer: self.log_write_buffer,
                thread_pool: None,
                buffer_pool: self.buffer_pool.clone(),
            },
        )
        .await
        .unwrap();

        // Create the execution pool
        //
        // TODO (https://github.com/commonwarexyz/monorepo/issues/1540): use commonware-runtime::create_pool
        let execution_pool = rayon::ThreadPoolBuilder::new()
            .num_threads(self.execution_concurrency)
            .build()
            .expect("failed to create execution pool");
        let execution_pool = ThreadPool::new(execution_pool);

        // Compute genesis digest
        let genesis_digest = genesis_digest();

        // Track built blocks
        let built: Option<(View, Block)> = None;
        let built = Arc::new(Mutex::new(built));

        // Initialize mempool
        let mut mempool = Mempool::new(self.context.with_label("mempool"));

        // Use reconnecting indexer wrapper
        let reconnecting_indexer = crate::indexer::ReconnectingIndexer::new(
            self.context.with_label("indexer"),
            self.indexer,
        );

        // This will never fail and handles reconnection internally
        let mut next_prune = self.context.gen_range(1..=PRUNE_INTERVAL);
        let mut tx_stream = Box::pin(reconnecting_indexer.listen_mempool().await.unwrap());
        loop {
            select! {
                    message =  self.mailbox.next() => {
                        let Some(message) = message else {
                            return;
                        };
                        match message {
                            Message::Genesis { response } => {
                                // Use the digest of the genesis message as the initial
                                // payload.
                                let _ = response.send(genesis_digest);
                            }
                            Message::Propose {
                                view,
                                parent,
                                mut response,
                            } => {
                                // Start the timer
                                let ancestry_timer = ancestry_latency.timer();
                                let propose_timer = propose_latency.timer();

                                // Immediately send a response for genesis block
                                if parent.1 == genesis_digest {
                                    drop(ancestry_timer);
                                    self.inbound.ancestry(view, vec![genesis_block()], propose_timer, response).await;
                                    continue;
                                }

                                // Get the ancestry
                                let ancestry = ancestry(marshal.clone(), (Some(parent.0), parent.1), state.get_metadata().await.unwrap().and_then(|(_, v)| match v {
                                    Some(Value::Commit { height, start: _ }) => Some(height),
                                    _ => None,
                                }).unwrap_or(0));

                                // Wait for the parent block to be available or the request to be cancelled in a separate task (to
                                // continue processing other messages)
                                self.context.with_label("ancestry").spawn({
                                    let mut inbound = self.inbound.clone();
                                    move |_| async move {
                                        select! {
                                            ancestry = ancestry => {
                                                // Get the ancestry
                                                let Some(ancestry) = ancestry else {
                                                    ancestry_timer.cancel();
                                                    warn!(view, "missing parent ancestry");
                                                    return;
                                                };
                                                drop(ancestry_timer);

                                                // Pass back to mailbox
                                                inbound.ancestry(view, ancestry, propose_timer, response).await;
                                            },
                                            _ = response.closed() => {
                                                // The response was cancelled
                                                ancestry_timer.cancel();
                                                warn!(view, "propose aborted");
                                            }
                                        }
                                    }
                                });
                            }
                            Message::Ancestry {
                                view,
                                blocks,
                                timer,
                                response,
                            } => {
                                // Get parent block
                                let parent = blocks.last().unwrap();

                                // Find first block on top of finalized state (may have increased since we started)
                                let height = state.get_metadata().await.unwrap().and_then(|(_, v)| match v {
                                    Some(Value::Commit { height, start: _ }) => Some(height),
                                    _ => None,
                                }).unwrap_or(0);
                                let mut noncer = Noncer::new(&state);
                                for block in &blocks {
                                    // Skip blocks below our height
                                    if block.height <= height {
                                        debug!(block = block.height, processed = height, "skipping block during propose");
                                        continue;
                                    }

                                    // Apply transaction nonces to state
                                    for tx in &block.transactions {
                                        // We don't care if the nonces are valid or not, we just need to ensure we'll process tip the same way as state will be processed during finalization
                                        noncer.prepare(tx).await;
                                    }
                                }

                                // Select up to max transactions
                                let mut considered = 0;
                                let mut transactions = Vec::new();
                                while transactions.len() < MAX_BLOCK_TRANSACTIONS {
                                    // Get next transaction
                                    let Some(tx) = mempool.next() else {
                                        break;
                                    };
                                    considered += 1;

                                    // Attempt to apply
                                    if !noncer.prepare(&tx).await {
                                        continue;
                                    }

                                    // Add to transactions
                                    transactions.push(tx);
                                }
                                let txs = transactions.len();

                                // Update metrics
                                txs_considered.inc_by(considered as u64);

                                // When ancestry for propose is provided, we can attempt to pack a block
                                let block = Block::new(parent.digest(), view, parent.height+1, transactions);
                                let digest = block.digest();
                                {
                                    // We may drop the transactions from a block that was never broadcast...users
                                    // can rebroadcast.
                                    let mut built = built.lock().unwrap();
                                    *built = Some((view, block));
                                }

                                // Send the digest to the consensus
                                let result = response.send(digest);
                                info!(view, ?digest, txs, success=result.is_ok(), "proposed block");
                                drop(timer);
                            }
                            Message::Broadcast { payload } => {
                                // Check if the last built is equal
                                let Some(built) = built.lock().unwrap().take() else {
                                    warn!(?payload, "missing block to broadcast");
                                    continue;
                                };

                                // Check if the block is equal
                                if built.1.commitment() != payload {
                                    warn!(?payload, "outdated broadcast");
                                    continue;
                                }

                                // Send the block to the syncer
                                debug!(
                                    ?payload,
                                    view = built.0,
                                    height = built.1.height,
                                    "broadcast requested"
                                );
                                marshal.broadcast(built.1).await;
                            }
                            Message::Verify {
                                view,
                                parent,
                                payload,
                                mut response,
                            } => {
                                // Start the timer
                                let timer = verify_latency.timer();

                                // Get the parent and current block
                                let parent_request = if parent.1 == genesis_digest {
                                    Either::Left(future::ready(Ok(genesis_block())))
                                } else {
                                    Either::Right(marshal.subscribe(Some(parent.0), parent.1).await)
                                };

                                // Wait for the blocks to be available or the request to be cancelled in a separate task (to
                                // continue processing other messages)
                                self.context.with_label("verify").spawn({
                                    let mut marshal = marshal.clone();
                                    move |mut context| async move {
                                        let requester =
                                            try_join(parent_request, marshal.subscribe(None, payload).await);
                                        select! {
                                            result = requester => {
                                                // Unwrap the results
                                                let (parent, block) = result.unwrap();

                                                // Verify the block
                                                if block.view != view {
                                                    let _ = response.send(false);
                                                    return;
                                                }
                                                if block.height != parent.height + 1 {
                                                    let _ = response.send(false);
                                                    return;
                                                }
                                                if block.parent != parent.digest() {
                                                    let _ = response.send(false);
                                                    return;
                                                }

                                                // Batch verify transaction signatures (we don't care if the nonces are valid or not, we'll just skip the ones that are invalid)
                                                let mut batcher = Batch::new();
                                                for tx in &block.transactions {
                                                    tx.verify_batch(&mut batcher);
                                                }
                                                if !batcher.verify(&mut context) {
                                                    let _ = response.send(false);
                                                    return;
                                                }

                                                // Persist the verified block (transactions may be invalid)
                                                marshal.verified(view, block).await;

                                                // Send the verification result to the consensus
                                                let _ = response.send(true);

                                                // Stop the timer
                                                drop(timer);
                                            },
                                            _ = response.closed() => {
                                                // The response was cancelled
                                                warn!(view, "verify aborted");
                                            }
                                        }
                                    }
                                });
                            }
                            Message::Finalized { block, response } => {
                                // Start the timer
                                let seeded_timer = seeded_latency.timer();
                                let finalize_timer = finalize_latency.timer();

                                // While waiting for the seed required for processing, we should spawn a task
                                // to handle resolution to avoid blocking the application.
                                self.context.with_label("seeded").spawn({
                                    let mut inbound = self.inbound.clone();
                                    let mut seeder = seeder.clone();
                                    move |_| async move {
                                        let seed = seeder.get(block.view).await;
                                        drop(seeded_timer);
                                        inbound.seeded(block, seed, finalize_timer, response).await;
                                    }
                                });

                            }
                            Message::Seeded { block, seed, timer, response } => {
                                // Execute state transition (will only apply if next block)
                                let height = block.height;
                                let commitment = block.commitment();

                                // Apply the block to our state
                                //
                                // We must wait for the seed to be available before processing the block,
                                // otherwise we will not be able to match players or compute attack strength.
                                let execute_timer = execute_latency.timer();
                                let tx_count = block.transactions.len();
                                let result = state_transition::execute_state_transition(
                                    &mut state,
                                    &mut events,
                                    self.identity,
                                    height,
                                    seed,
                                    block.transactions,
                                    execution_pool.clone(),
                                ).await;
                                drop(execute_timer);

                                // Update metrics
                                txs_executed.inc_by(tx_count as u64);

                                // Update mempool based on processed transactions
                                for (public, next_nonce) in &result.processed_nonces {
                                    mempool.retain(public, *next_nonce);
                                }

                                // Generate range proof for changes
                                let state_proof_ops = result.state_end_op - result.state_start_op;
                                let events_start_op = result.events_start_op;
                                let events_proof_ops = result.events_end_op - events_start_op;
                                let ((state_proof, state_proof_ops), (events_proof, events_proof_ops)) = try_join(
                                    state.historical_proof(result.state_end_op, result.state_start_op, state_proof_ops),
                                    events.historical_proof(result.events_end_op, events_start_op, NZU64!(events_proof_ops)),
                                ).await.expect("failed to generate proofs");

                                // Send to aggregator
                                aggregator.executed(block.view, block.height, commitment, result, state_proof, state_proof_ops, events_proof, events_proof_ops, response).await;

                                // Stop the timer
                                drop(timer);

                                // Attempt to prune (this syncs data prior to prune, so we don't need to call separately)
                                next_prune -= 1;
                                if next_prune == 0 {
                                    // Prune storage
                                    let timer = prune_latency.timer();
                                    try_join(
                                        state.prune(state.inactivity_floor_loc()),
                                        events.prune(events_start_op),
                                    ).await.expect("failed to prune storage");
                                    drop(timer);

                                    // Reset next prune
                                    next_prune = self.context.gen_range(1..=PRUNE_INTERVAL);
                                }
                            },
                        }
                },
                pending = tx_stream.next() => {
                    // The reconnecting wrapper handles all connection issues internally
                    // We only get Some(Ok(tx)) for valid transactions
                    let Some(Ok(pending)) = pending else {
                        // This should only happen if there's a transaction-level error
                        // The stream itself won't end due to the reconnecting wrapper
                        continue;
                    };

                    // Process transactions (already verified in indexer client)
                    for tx in pending.transactions {
                        // Check if below next
                        let next = nonce(&state, &tx.public).await;
                        if tx.nonce < next {
                            // If below next, we drop the incoming transaction
                            debug!(tx = tx.nonce, state = next, "dropping incoming transaction");
                            continue;
                        }

                        // Add to mempool
                        mempool.add(tx);
                    }
                }
            }
        }
    }
}