1use super::{
2 ingress::{Mailbox, Message},
3 Config,
4};
5use crate::{
6 aggregator,
7 application::mempool::Mempool,
8 indexer::Indexer,
9 seeder,
10 supervisor::{EpochSupervisor, Supervisor, ViewSupervisor},
11};
12use battleware_execution::{nonce, state_transition, Adb, Noncer};
13use battleware_types::{
14 execution::{Output, Value, MAX_BLOCK_TRANSACTIONS},
15 genesis_block, genesis_digest, Block, Identity,
16};
17use commonware_consensus::{marshal, threshold_simplex::types::View};
18use commonware_cryptography::{
19 bls12381::primitives::{poly::public, variant::MinSig},
20 ed25519::Batch,
21 sha256::Digest,
22 BatchVerifier, Committable, Digestible, Sha256,
23};
24use commonware_macros::select;
25use commonware_runtime::{
26 buffer::PoolRef, telemetry::metrics::histogram, Clock, Handle, Metrics, Spawner, Storage,
27 ThreadPool,
28};
29use commonware_storage::{
30 adb::{self, keyless},
31 translator::EightCap,
32};
33use commonware_utils::{futures::ClosedExt, NZU64};
34use futures::StreamExt;
35use futures::{channel::mpsc, future::try_join};
36use futures::{future, future::Either};
37use prometheus_client::metrics::{counter::Counter, histogram::Histogram};
38use rand::{CryptoRng, Rng};
39use std::{
40 num::NonZero,
41 sync::{atomic::AtomicU64, Arc, Mutex},
42};
43use tracing::{debug, info, warn};
44
45const LATENCY: [f64; 20] = [
47 0.001, 0.002, 0.003, 0.004, 0.005, 0.0075, 0.010, 0.015, 0.020, 0.025, 0.030, 0.050, 0.075,
48 0.100, 0.200, 0.500, 1.0, 2.0, 5.0, 10.0,
49];
50
51const PRUNE_INTERVAL: u64 = 10_000;
53
54async fn ancestry(
56 mut marshal: marshal::Mailbox<MinSig, Block>,
57 start: (Option<View>, Digest),
58 end: u64,
59) -> Option<Vec<Block>> {
60 let mut ancestry = Vec::new();
61
62 let Ok(block) = marshal.subscribe(start.0, start.1).await.await else {
64 return None;
65 };
66 let mut next = (block.height.saturating_sub(1), block.parent);
67 ancestry.push(block);
68
69 while next.0 > end {
71 let request = marshal.subscribe(None, next.1).await;
72 let Ok(block) = request.await else {
73 return None;
74 };
75 next = (block.height.saturating_sub(1), block.parent);
76 ancestry.push(block);
77 }
78
79 Some(ancestry.into_iter().rev().collect())
81}
82
83pub struct Actor<R: Rng + CryptoRng + Spawner + Metrics + Clock + Storage, I: Indexer> {
85 context: R,
86 inbound: Mailbox<R>,
87 mailbox: mpsc::Receiver<Message<R>>,
88 identity: Identity,
89 partition_prefix: String,
90 mmr_items_per_blob: NonZero<u64>,
91 mmr_write_buffer: NonZero<usize>,
92 log_items_per_section: NonZero<u64>,
93 log_write_buffer: NonZero<usize>,
94 locations_items_per_blob: NonZero<u64>,
95 buffer_pool: PoolRef,
96 indexer: I,
97 execution_concurrency: usize,
98}
99
100impl<R: Rng + CryptoRng + Spawner + Metrics + Clock + Storage, I: Indexer> Actor<R, I> {
101 pub fn new(
103 context: R,
104 config: Config<I>,
105 ) -> (Self, ViewSupervisor, EpochSupervisor, Mailbox<R>) {
106 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
108 let inbound = Mailbox::new(sender);
109
110 let identity = *public::<MinSig>(&config.polynomial);
112 let supervisor = Supervisor::new(config.polynomial, config.participants, config.share);
113 let view_supervisor = ViewSupervisor::new(supervisor.clone());
114 let epoch_supervisor = EpochSupervisor::new(supervisor);
115
116 (
117 Self {
118 context,
119 mailbox,
120 inbound: inbound.clone(),
121 identity,
122 partition_prefix: config.partition_prefix,
123 mmr_items_per_blob: config.mmr_items_per_blob,
124 mmr_write_buffer: config.mmr_write_buffer,
125 log_items_per_section: config.log_items_per_section,
126 log_write_buffer: config.log_write_buffer,
127 locations_items_per_blob: config.locations_items_per_blob,
128 buffer_pool: config.buffer_pool,
129 indexer: config.indexer,
130 execution_concurrency: config.execution_concurrency,
131 },
132 view_supervisor,
133 epoch_supervisor,
134 inbound,
135 )
136 }
137
138 pub fn start(
139 mut self,
140 marshal: marshal::Mailbox<MinSig, Block>,
141 seeder: seeder::Mailbox,
142 aggregator: aggregator::Mailbox,
143 ) -> Handle<()> {
144 self.context.spawn_ref()(self.run(marshal, seeder, aggregator))
145 }
146
147 async fn run(
149 mut self,
150 mut marshal: marshal::Mailbox<MinSig, Block>,
151 seeder: seeder::Mailbox,
152 mut aggregator: aggregator::Mailbox,
153 ) {
154 let txs_considered: Counter<u64, AtomicU64> = Counter::default();
156 let txs_executed: Counter<u64, AtomicU64> = Counter::default();
157 let ancestry_latency = Histogram::new(LATENCY.into_iter());
158 let propose_latency = Histogram::new(LATENCY.into_iter());
159 let verify_latency = Histogram::new(LATENCY.into_iter());
160 let seeded_latency = Histogram::new(LATENCY.into_iter());
161 let execute_latency = Histogram::new(LATENCY.into_iter());
162 let finalize_latency = Histogram::new(LATENCY.into_iter());
163 let prune_latency = Histogram::new(LATENCY.into_iter());
164 self.context.register(
165 "txs_considered",
166 "Number of transactions considered during propose",
167 txs_considered.clone(),
168 );
169 self.context.register(
170 "txs_executed",
171 "Number of transactions executed after finalization",
172 txs_executed.clone(),
173 );
174 self.context.register(
175 "ancestry_latency",
176 "Latency of ancestry requests",
177 ancestry_latency.clone(),
178 );
179 self.context.register(
180 "propose_latency",
181 "Latency of propose requests",
182 propose_latency.clone(),
183 );
184 self.context.register(
185 "verify_latency",
186 "Latency of verify requests",
187 verify_latency.clone(),
188 );
189 self.context.register(
190 "seeded_latency",
191 "Latency of seeded requests",
192 seeded_latency.clone(),
193 );
194 self.context.register(
195 "execute_latency",
196 "Latency of execute requests",
197 execute_latency.clone(),
198 );
199 self.context.register(
200 "finalize_latency",
201 "Latency of finalize requests",
202 finalize_latency.clone(),
203 );
204 self.context.register(
205 "prune_latency",
206 "Latency of prune requests",
207 prune_latency.clone(),
208 );
209 let ancestry_latency = histogram::Timed::new(
210 ancestry_latency,
211 Arc::new(self.context.with_label("ancestry_latency")),
212 );
213 let propose_latency = histogram::Timed::new(
214 propose_latency,
215 Arc::new(self.context.with_label("propose_latency")),
216 );
217 let verify_latency = histogram::Timed::new(
218 verify_latency,
219 Arc::new(self.context.with_label("verify_latency")),
220 );
221 let seeded_latency = histogram::Timed::new(
222 seeded_latency,
223 Arc::new(self.context.with_label("seeded_latency")),
224 );
225 let execute_latency = histogram::Timed::new(
226 execute_latency,
227 Arc::new(self.context.with_label("execute_latency")),
228 );
229 let finalize_latency = histogram::Timed::new(
230 finalize_latency,
231 Arc::new(self.context.with_label("finalize_latency")),
232 );
233 let prune_latency = histogram::Timed::new(
234 prune_latency,
235 Arc::new(self.context.with_label("prune_latency")),
236 );
237
238 let mut state = Adb::init(
240 self.context.with_label("state"),
241 adb::any::variable::Config {
242 mmr_journal_partition: format!("{}-state-mmr-journal", self.partition_prefix),
243 mmr_metadata_partition: format!("{}-state-mmr-metadata", self.partition_prefix),
244 mmr_items_per_blob: self.mmr_items_per_blob,
245 mmr_write_buffer: self.mmr_write_buffer,
246 log_journal_partition: format!("{}-state-log-journal", self.partition_prefix),
247 log_items_per_section: self.log_items_per_section,
248 log_write_buffer: self.log_write_buffer,
249 log_compression: None,
250 log_codec_config: (),
251 locations_journal_partition: format!(
252 "{}-state-locations-journal",
253 self.partition_prefix
254 ),
255 locations_items_per_blob: self.locations_items_per_blob,
256 translator: EightCap,
257 thread_pool: None,
258 buffer_pool: self.buffer_pool.clone(),
259 },
260 )
261 .await
262 .unwrap();
263 let mut events = keyless::Keyless::<_, Output, Sha256>::init(
264 self.context.with_label("events"),
265 keyless::Config {
266 mmr_journal_partition: format!("{}-events-mmr-journal", self.partition_prefix),
267 mmr_metadata_partition: format!("{}-events-mmr-metadata", self.partition_prefix),
268 mmr_items_per_blob: self.mmr_items_per_blob,
269 mmr_write_buffer: self.mmr_write_buffer,
270 log_journal_partition: format!("{}-events-log-journal", self.partition_prefix),
271 log_items_per_section: self.log_items_per_section,
272 log_write_buffer: self.log_write_buffer,
273 log_compression: None,
274 log_codec_config: (),
275 locations_journal_partition: format!(
276 "{}-events-locations-journal",
277 self.partition_prefix
278 ),
279 locations_items_per_blob: self.locations_items_per_blob,
280 locations_write_buffer: self.log_write_buffer,
281 thread_pool: None,
282 buffer_pool: self.buffer_pool.clone(),
283 },
284 )
285 .await
286 .unwrap();
287
288 let execution_pool = rayon::ThreadPoolBuilder::new()
292 .num_threads(self.execution_concurrency)
293 .build()
294 .expect("failed to create execution pool");
295 let execution_pool = ThreadPool::new(execution_pool);
296
297 let genesis_digest = genesis_digest();
299
300 let built: Option<(View, Block)> = None;
302 let built = Arc::new(Mutex::new(built));
303
304 let mut mempool = Mempool::new(self.context.with_label("mempool"));
306
307 let reconnecting_indexer = crate::indexer::ReconnectingIndexer::new(
309 self.context.with_label("indexer"),
310 self.indexer,
311 );
312
313 let mut next_prune = self.context.gen_range(1..=PRUNE_INTERVAL);
315 let mut tx_stream = Box::pin(reconnecting_indexer.listen_mempool().await.unwrap());
316 loop {
317 select! {
318 message = self.mailbox.next() => {
319 let Some(message) = message else {
320 return;
321 };
322 match message {
323 Message::Genesis { response } => {
324 let _ = response.send(genesis_digest);
327 }
328 Message::Propose {
329 view,
330 parent,
331 mut response,
332 } => {
333 let ancestry_timer = ancestry_latency.timer();
335 let propose_timer = propose_latency.timer();
336
337 if parent.1 == genesis_digest {
339 drop(ancestry_timer);
340 self.inbound.ancestry(view, vec![genesis_block()], propose_timer, response).await;
341 continue;
342 }
343
344 let ancestry = ancestry(marshal.clone(), (Some(parent.0), parent.1), state.get_metadata().await.unwrap().and_then(|(_, v)| match v {
346 Some(Value::Commit { height, start: _ }) => Some(height),
347 _ => None,
348 }).unwrap_or(0));
349
350 self.context.with_label("ancestry").spawn({
353 let mut inbound = self.inbound.clone();
354 move |_| async move {
355 select! {
356 ancestry = ancestry => {
357 let Some(ancestry) = ancestry else {
359 ancestry_timer.cancel();
360 warn!(view, "missing parent ancestry");
361 return;
362 };
363 drop(ancestry_timer);
364
365 inbound.ancestry(view, ancestry, propose_timer, response).await;
367 },
368 _ = response.closed() => {
369 ancestry_timer.cancel();
371 warn!(view, "propose aborted");
372 }
373 }
374 }
375 });
376 }
377 Message::Ancestry {
378 view,
379 blocks,
380 timer,
381 response,
382 } => {
383 let parent = blocks.last().unwrap();
385
386 let height = state.get_metadata().await.unwrap().and_then(|(_, v)| match v {
388 Some(Value::Commit { height, start: _ }) => Some(height),
389 _ => None,
390 }).unwrap_or(0);
391 let mut noncer = Noncer::new(&state);
392 for block in &blocks {
393 if block.height <= height {
395 debug!(block = block.height, processed = height, "skipping block during propose");
396 continue;
397 }
398
399 for tx in &block.transactions {
401 noncer.prepare(tx).await;
403 }
404 }
405
406 let mut considered = 0;
408 let mut transactions = Vec::new();
409 while transactions.len() < MAX_BLOCK_TRANSACTIONS {
410 let Some(tx) = mempool.next() else {
412 break;
413 };
414 considered += 1;
415
416 if !noncer.prepare(&tx).await {
418 continue;
419 }
420
421 transactions.push(tx);
423 }
424 let txs = transactions.len();
425
426 txs_considered.inc_by(considered as u64);
428
429 let block = Block::new(parent.digest(), view, parent.height+1, transactions);
431 let digest = block.digest();
432 {
433 let mut built = built.lock().unwrap();
436 *built = Some((view, block));
437 }
438
439 let result = response.send(digest);
441 info!(view, ?digest, txs, success=result.is_ok(), "proposed block");
442 drop(timer);
443 }
444 Message::Broadcast { payload } => {
445 let Some(built) = built.lock().unwrap().take() else {
447 warn!(?payload, "missing block to broadcast");
448 continue;
449 };
450
451 if built.1.commitment() != payload {
453 warn!(?payload, "outdated broadcast");
454 continue;
455 }
456
457 debug!(
459 ?payload,
460 view = built.0,
461 height = built.1.height,
462 "broadcast requested"
463 );
464 marshal.broadcast(built.1).await;
465 }
466 Message::Verify {
467 view,
468 parent,
469 payload,
470 mut response,
471 } => {
472 let timer = verify_latency.timer();
474
475 let parent_request = if parent.1 == genesis_digest {
477 Either::Left(future::ready(Ok(genesis_block())))
478 } else {
479 Either::Right(marshal.subscribe(Some(parent.0), parent.1).await)
480 };
481
482 self.context.with_label("verify").spawn({
485 let mut marshal = marshal.clone();
486 move |mut context| async move {
487 let requester =
488 try_join(parent_request, marshal.subscribe(None, payload).await);
489 select! {
490 result = requester => {
491 let (parent, block) = result.unwrap();
493
494 if block.view != view {
496 let _ = response.send(false);
497 return;
498 }
499 if block.height != parent.height + 1 {
500 let _ = response.send(false);
501 return;
502 }
503 if block.parent != parent.digest() {
504 let _ = response.send(false);
505 return;
506 }
507
508 let mut batcher = Batch::new();
510 for tx in &block.transactions {
511 tx.verify_batch(&mut batcher);
512 }
513 if !batcher.verify(&mut context) {
514 let _ = response.send(false);
515 return;
516 }
517
518 marshal.verified(view, block).await;
520
521 let _ = response.send(true);
523
524 drop(timer);
526 },
527 _ = response.closed() => {
528 warn!(view, "verify aborted");
530 }
531 }
532 }
533 });
534 }
535 Message::Finalized { block, response } => {
536 let seeded_timer = seeded_latency.timer();
538 let finalize_timer = finalize_latency.timer();
539
540 self.context.with_label("seeded").spawn({
543 let mut inbound = self.inbound.clone();
544 let mut seeder = seeder.clone();
545 move |_| async move {
546 let seed = seeder.get(block.view).await;
547 drop(seeded_timer);
548 inbound.seeded(block, seed, finalize_timer, response).await;
549 }
550 });
551
552 }
553 Message::Seeded { block, seed, timer, response } => {
554 let height = block.height;
556 let commitment = block.commitment();
557
558 let execute_timer = execute_latency.timer();
563 let tx_count = block.transactions.len();
564 let result = state_transition::execute_state_transition(
565 &mut state,
566 &mut events,
567 self.identity,
568 height,
569 seed,
570 block.transactions,
571 execution_pool.clone(),
572 ).await;
573 drop(execute_timer);
574
575 txs_executed.inc_by(tx_count as u64);
577
578 for (public, next_nonce) in &result.processed_nonces {
580 mempool.retain(public, *next_nonce);
581 }
582
583 let state_proof_ops = result.state_end_op - result.state_start_op;
585 let events_start_op = result.events_start_op;
586 let events_proof_ops = result.events_end_op - events_start_op;
587 let ((state_proof, state_proof_ops), (events_proof, events_proof_ops)) = try_join(
588 state.historical_proof(result.state_end_op, result.state_start_op, state_proof_ops),
589 events.historical_proof(result.events_end_op, events_start_op, NZU64!(events_proof_ops)),
590 ).await.expect("failed to generate proofs");
591
592 aggregator.executed(block.view, block.height, commitment, result, state_proof, state_proof_ops, events_proof, events_proof_ops, response).await;
594
595 drop(timer);
597
598 next_prune -= 1;
600 if next_prune == 0 {
601 let timer = prune_latency.timer();
603 try_join(
604 state.prune(state.inactivity_floor_loc()),
605 events.prune(events_start_op),
606 ).await.expect("failed to prune storage");
607 drop(timer);
608
609 next_prune = self.context.gen_range(1..=PRUNE_INTERVAL);
611 }
612 },
613 }
614 },
615 pending = tx_stream.next() => {
616 let Some(Ok(pending)) = pending else {
619 continue;
622 };
623
624 for tx in pending.transactions {
626 let next = nonce(&state, &tx.public).await;
628 if tx.nonce < next {
629 debug!(tx = tx.nonce, state = next, "dropping incoming transaction");
631 continue;
632 }
633
634 mempool.add(tx);
636 }
637 }
638 }
639 }
640 }
641}