1use crate::{
46 marshal::{
47 ancestry::AncestorStream,
48 application::validation::LastBuilt,
49 core::Mailbox,
50 standard::{
51 validation::{
52 fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
53 },
54 Standard,
55 },
56 Update,
57 },
58 simplex::{types::Context, Plan},
59 types::{Epoch, Epocher, Round},
60 Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
61 VerifyingApplication,
62};
63use commonware_cryptography::certificate::Scheme;
64use commonware_macros::select;
65use commonware_runtime::{
66 telemetry::metrics::histogram::{Buckets, Timed},
67 Clock, Metrics, Spawner,
68};
69use commonware_utils::{
70 channel::{fallible::OneshotExt, oneshot},
71 sync::Mutex,
72};
73use prometheus_client::metrics::histogram::Histogram;
74use rand::Rng;
75use std::{collections::BTreeSet, sync::Arc};
76use tracing::{debug, warn};
77
78type AvailableBlocks<D> = Arc<Mutex<BTreeSet<(Round, D)>>>;
81
82async fn await_block_subscription<T, D>(
84 tx: &mut oneshot::Sender<bool>,
85 block_rx: oneshot::Receiver<T>,
86 digest: &D,
87 stage: &'static str,
88) -> Option<T>
89where
90 D: std::fmt::Debug + ?Sized,
91{
92 select! {
93 _ = tx.closed() => {
94 debug!(
95 stage,
96 reason = "consensus dropped receiver",
97 "skipping block wait"
98 );
99 None
100 },
101 result = block_rx => {
102 if result.is_err() {
103 debug!(
104 stage,
105 ?digest,
106 reason = "failed to fetch block",
107 "skipping block wait"
108 );
109 }
110 result.ok()
111 },
112 }
113}
114
115#[derive(Clone)]
132pub struct Inline<E, S, A, B, ES>
133where
134 E: Rng + Spawner + Metrics + Clock,
135 S: Scheme,
136 A: Application<E>,
137 B: Block + Clone,
138 ES: Epocher,
139{
140 context: E,
141 application: A,
142 marshal: Mailbox<S, Standard<B>>,
143 epocher: ES,
144 last_built: LastBuilt<B>,
145 available_blocks: AvailableBlocks<B::Digest>,
146
147 build_duration: Timed<E>,
148}
149
150impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
151where
152 E: Rng + Spawner + Metrics + Clock,
153 S: Scheme,
154 A: VerifyingApplication<
155 E,
156 Block = B,
157 SigningScheme = S,
158 Context = Context<B::Digest, S::PublicKey>,
159 >,
160 B: Block + Clone,
161 ES: Epocher,
162{
163 pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
168 let build_histogram = Histogram::new(Buckets::LOCAL);
169 context.register(
170 "build_duration",
171 "Histogram of time taken for the application to build a new block, in seconds",
172 build_histogram.clone(),
173 );
174 let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
175
176 Self {
177 context,
178 application,
179 marshal,
180 epocher,
181 last_built: Arc::new(Mutex::new(None)),
182 available_blocks: Arc::new(Mutex::new(BTreeSet::new())),
183 build_duration,
184 }
185 }
186}
187
188impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
189where
190 E: Rng + Spawner + Metrics + Clock,
191 S: Scheme,
192 A: VerifyingApplication<
193 E,
194 Block = B,
195 SigningScheme = S,
196 Context = Context<B::Digest, S::PublicKey>,
197 >,
198 B: Block + Clone,
199 ES: Epocher,
200{
201 type Digest = B::Digest;
202 type Context = Context<Self::Digest, S::PublicKey>;
203
204 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
209 if epoch.is_zero() {
210 return self.application.genesis().await.digest();
211 }
212
213 let prev = epoch.previous().expect("checked to be non-zero above");
214 let last_height = self
215 .epocher
216 .last(prev)
217 .expect("previous epoch should exist");
218 let Some(block) = self.marshal.get_block(last_height).await else {
219 unreachable!("missing starting epoch block at height {}", last_height);
220 };
221 block.digest()
222 }
223
224 async fn propose(
230 &mut self,
231 consensus_context: Context<Self::Digest, S::PublicKey>,
232 ) -> oneshot::Receiver<Self::Digest> {
233 let mut marshal = self.marshal.clone();
234 let mut application = self.application.clone();
235 let last_built = self.last_built.clone();
236 let epocher = self.epocher.clone();
237 let build_duration = self.build_duration.clone();
238
239 let (mut tx, rx) = oneshot::channel();
240 self.context
241 .with_label("propose")
242 .with_attribute("round", consensus_context.round)
243 .spawn(move |runtime_context| async move {
244 let (parent_view, parent_digest) = consensus_context.parent;
245 let parent_request = fetch_parent(
246 parent_digest,
247 Some(Round::new(consensus_context.epoch(), parent_view)),
251 &mut application,
252 &mut marshal,
253 )
254 .await;
255
256 let parent = select! {
257 _ = tx.closed() => {
258 debug!(reason = "consensus dropped receiver", "skipping proposal");
259 return;
260 },
261 result = parent_request => match result {
262 Ok(parent) => parent,
263 Err(_) => {
264 debug!(
265 ?parent_digest,
266 reason = "failed to fetch parent block",
267 "skipping proposal"
268 );
269 return;
270 }
271 },
272 };
273
274 let last_in_epoch = epocher
276 .last(consensus_context.epoch())
277 .expect("current epoch should exist");
278 if parent.height() == last_in_epoch {
279 let digest = parent.digest();
280 {
281 let mut lock = last_built.lock();
282 *lock = Some((consensus_context.round, parent));
283 }
284
285 let success = tx.send_lossy(digest);
286 debug!(
287 round = ?consensus_context.round,
288 ?digest,
289 success,
290 "re-proposed parent block at epoch boundary"
291 );
292 return;
293 }
294
295 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
296 let build_request = application.propose(
297 (
298 runtime_context.with_label("app_propose"),
299 consensus_context.clone(),
300 ),
301 ancestor_stream,
302 );
303
304 let mut build_timer = build_duration.timer();
305 let built_block = select! {
306 _ = tx.closed() => {
307 debug!(reason = "consensus dropped receiver", "skipping proposal");
308 return;
309 },
310 result = build_request => match result {
311 Some(block) => block,
312 None => {
313 debug!(
314 ?parent_digest,
315 reason = "block building failed",
316 "skipping proposal"
317 );
318 return;
319 }
320 },
321 };
322 build_timer.observe();
323
324 let digest = built_block.digest();
325 {
326 let mut lock = last_built.lock();
327 *lock = Some((consensus_context.round, built_block));
328 }
329 let success = tx.send_lossy(digest);
330 debug!(
331 round = ?consensus_context.round,
332 ?digest,
333 success,
334 "proposed new block"
335 );
336 });
337 rx
338 }
339
340 async fn verify(
351 &mut self,
352 context: Context<Self::Digest, S::PublicKey>,
353 digest: Self::Digest,
354 ) -> oneshot::Receiver<bool> {
355 let mut marshal = self.marshal.clone();
356 let mut application = self.application.clone();
357 let epocher = self.epocher.clone();
358 let available_blocks = self.available_blocks.clone();
359
360 let (mut tx, rx) = oneshot::channel();
361 self.context
362 .with_label("inline_verify")
363 .with_attribute("round", context.round)
364 .spawn(move |runtime_context| async move {
365 let block_request = marshal
367 .subscribe_by_digest(Some(context.round), digest)
368 .await;
369 let Some(block) =
370 await_block_subscription(&mut tx, block_request, &digest, "verification").await
371 else {
372 return;
373 };
374 available_blocks.lock().insert((context.round, digest));
375
376 let block = match precheck_epoch_and_reproposal(
384 &epocher,
385 &mut marshal,
386 &context,
387 digest,
388 block,
389 )
390 .await
391 {
392 Decision::Complete(valid) => {
393 tx.send_lossy(valid);
396 return;
397 }
398 Decision::Continue(block) => block,
399 };
400
401 let application_valid = match verify_with_parent(
407 runtime_context,
408 context,
409 block,
410 &mut application,
411 &mut marshal,
412 &mut tx,
413 )
414 .await
415 {
416 Some(valid) => valid,
417 None => return,
418 };
419 tx.send_lossy(application_valid);
420 });
421 rx
422 }
423}
424
425impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
427where
428 E: Rng + Spawner + Metrics + Clock,
429 S: Scheme,
430 A: VerifyingApplication<
431 E,
432 Block = B,
433 SigningScheme = S,
434 Context = Context<B::Digest, S::PublicKey>,
435 >,
436 B: Block + Clone,
437 ES: Epocher,
438{
439 async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
440 if self.available_blocks.lock().contains(&(round, digest)) {
442 let (tx, rx) = oneshot::channel();
443 tx.send_lossy(true);
444 return rx;
445 }
446
447 let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
451 let (mut tx, rx) = oneshot::channel();
452 self.context
453 .with_label("inline_certify")
454 .with_attribute("round", round)
455 .spawn(move |_| async move {
456 if await_block_subscription(&mut tx, block_rx, &digest, "certification")
457 .await
458 .is_some()
459 {
460 tx.send_lossy(true);
461 }
462 });
463
464 rx
468 }
469}
470
471impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
472where
473 E: Rng + Spawner + Metrics + Clock,
474 S: Scheme,
475 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
476 B: Block + Clone,
477 ES: Epocher,
478{
479 type Digest = B::Digest;
480 type PublicKey = S::PublicKey;
481 type Plan = Plan<S::PublicKey>;
482
483 async fn broadcast(&mut self, digest: Self::Digest, plan: Plan<S::PublicKey>) {
484 match plan {
485 Plan::Propose => {
486 let Some((round, block)) = self.last_built.lock().take() else {
487 warn!("missing block to broadcast");
488 return;
489 };
490 if block.digest() != digest {
491 warn!(
492 round = %round,
493 digest = %block.digest(),
494 height = %block.height(),
495 "skipping requested broadcast of block with mismatched digest"
496 );
497 return;
498 }
499 self.marshal.proposed(round, block).await;
500 }
501 Plan::Forward { round, peers } => {
502 self.marshal.forward(round, digest, peers).await;
503 }
504 }
505 }
506}
507
508impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
509where
510 E: Rng + Spawner + Metrics + Clock,
511 S: Scheme,
512 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
513 + Reporter<Activity = Update<B>>,
514 B: Block + Clone,
515 ES: Epocher,
516{
517 type Activity = A::Activity;
518
519 async fn report(&mut self, update: Self::Activity) {
521 if let Update::Tip(tip_round, _, _) = &update {
522 self.available_blocks
523 .lock()
524 .retain(|(round, _)| round > tip_round);
525 }
526 self.application.report(update).await
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::Inline;
533 use crate::{
534 marshal::mocks::{
535 harness::{
536 default_leader, make_raw_block, setup_network_with_participants, Ctx,
537 StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
538 },
539 verifying::MockVerifyingApp,
540 },
541 simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context},
542 types::{Epoch, FixedEpocher, Height, Round, View},
543 Automaton, Block, CertifiableAutomaton, Relay, VerifyingApplication,
544 };
545 use commonware_cryptography::{
546 certificate::{mocks::Fixture, ConstantProvider, Scheme},
547 sha256::Sha256,
548 Digestible, Hasher as _,
549 };
550 use commonware_macros::{select, test_traced};
551 use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
552 use commonware_utils::NZUsize;
553 use rand::Rng;
554 use std::time::Duration;
555
556 #[allow(dead_code)]
558 fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
559 where
560 E: Rng + Spawner + Metrics + Clock,
561 S: Scheme,
562 A: VerifyingApplication<
563 E,
564 Block = B,
565 SigningScheme = S,
566 Context = Context<B::Digest, S::PublicKey>,
567 >,
568 B: Block + Clone,
569 ES: crate::types::Epocher,
570 {
571 fn assert_automaton<T: Automaton>() {}
572 fn assert_certifiable<T: CertifiableAutomaton>() {}
573 fn assert_relay<T: Relay>() {}
574
575 assert_automaton::<Inline<E, S, A, B, ES>>();
576 assert_certifiable::<Inline<E, S, A, B, ES>>();
577 assert_relay::<Inline<E, S, A, B, ES>>();
578 }
579
580 #[test_traced("INFO")]
581 fn test_certify_returns_immediately_after_verify_fetches_block() {
582 let runner = deterministic::Runner::timed(Duration::from_secs(30));
583 runner.start(|mut context| async move {
584 let Fixture {
585 participants,
586 schemes,
587 ..
588 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
589 let mut oracle =
590 setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
591 .await;
592
593 let me = participants[0].clone();
594 let setup = StandardHarness::setup_validator(
595 context.with_label("validator_0"),
596 &mut oracle,
597 me.clone(),
598 ConstantProvider::new(schemes[0].clone()),
599 )
600 .await;
601 let marshal = setup.mailbox;
602
603 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
604 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
605 let mut inline = Inline::new(
606 context.clone(),
607 mock_app,
608 marshal.clone(),
609 FixedEpocher::new(BLOCKS_PER_EPOCH),
610 );
611
612 let parent_round = Round::new(Epoch::zero(), View::new(1));
614 let parent_ctx = Ctx {
615 round: parent_round,
616 leader: default_leader(),
617 parent: (View::zero(), genesis.digest()),
618 };
619 let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
620 let parent_digest = parent.digest();
621 marshal.clone().proposed(parent_round, parent).await;
622
623 let round = Round::new(Epoch::zero(), View::new(2));
624 let verify_context = Ctx {
625 round,
626 leader: me,
627 parent: (View::new(1), parent_digest),
628 };
629 let block =
630 B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
631 let digest = block.digest();
632 marshal.clone().proposed(round, block).await;
633
634 let verify_rx = inline.verify(verify_context, digest).await;
636 assert!(
637 verify_rx.await.unwrap(),
638 "verify should complete successfully before certify"
639 );
640
641 let certify_rx = inline.certify(round, digest).await;
643
644 select! {
645 result = certify_rx => {
646 assert!(
647 result.unwrap(),
648 "certify should return immediately once verify has fetched the block"
649 );
650 },
651 _ = context.sleep(Duration::from_secs(5)) => {
652 panic!("certify should not hang after local verify completed");
653 },
654 }
655 });
656 }
657
658 #[test_traced("INFO")]
659 fn test_certify_succeeds_without_verify_task() {
660 let runner = deterministic::Runner::timed(Duration::from_secs(30));
661 runner.start(|mut context| async move {
662 let Fixture {
663 participants,
664 schemes,
665 ..
666 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
667 let mut oracle =
668 setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
669 .await;
670
671 let me = participants[0].clone();
672 let setup = StandardHarness::setup_validator(
673 context.with_label("validator_0"),
674 &mut oracle,
675 me.clone(),
676 ConstantProvider::new(schemes[0].clone()),
677 )
678 .await;
679 let marshal = setup.mailbox;
680
681 let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
682 let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
683 let mut inline = Inline::new(
684 context.clone(),
685 mock_app,
686 marshal.clone(),
687 FixedEpocher::new(BLOCKS_PER_EPOCH),
688 );
689
690 let parent_round = Round::new(Epoch::zero(), View::new(1));
692 let parent_ctx = Ctx {
693 round: parent_round,
694 leader: default_leader(),
695 parent: (View::zero(), genesis.digest()),
696 };
697 let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
698 let parent_digest = parent.digest();
699 marshal.clone().proposed(parent_round, parent).await;
700
701 let round = Round::new(Epoch::zero(), View::new(2));
702 let verify_context = Ctx {
703 round,
704 leader: me,
705 parent: (View::new(1), parent_digest),
706 };
707 let block =
708 B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
709 let digest = block.digest();
710 marshal.clone().proposed(round, block).await;
711
712 let certify_rx = inline.certify(round, digest).await;
714
715 select! {
716 result = certify_rx => {
717 assert!(
718 result.unwrap(),
719 "certify should resolve once block availability is known"
720 );
721 },
722 _ = context.sleep(Duration::from_secs(5)) => {
723 panic!("certify should not hang when block is already available in marshal");
724 },
725 }
726 });
727 }
728}