1use crate::{
46 marshal::{
47 ancestry::AncestorStream,
48 application::validation::LastBuilt,
49 core::Mailbox,
50 standard::{
51 validation::{
52 fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
53 },
54 Standard,
55 },
56 Update,
57 },
58 simplex::types::Context,
59 types::{Epoch, Epocher, Round},
60 Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
61 VerifyingApplication,
62};
63use commonware_cryptography::certificate::Scheme;
64use commonware_macros::select;
65use commonware_runtime::{
66 telemetry::metrics::histogram::{Buckets, Timed},
67 Clock, Metrics, Spawner,
68};
69use commonware_utils::{
70 channel::{fallible::OneshotExt, oneshot},
71 sync::Mutex,
72};
73use prometheus_client::metrics::histogram::Histogram;
74use rand::Rng;
75use std::sync::Arc;
76use tracing::{debug, warn};
77
78#[derive(Clone)]
95pub struct Inline<E, S, A, B, ES>
96where
97 E: Rng + Spawner + Metrics + Clock,
98 S: Scheme,
99 A: Application<E>,
100 B: Block + Clone,
101 ES: Epocher,
102{
103 context: E,
104 application: A,
105 marshal: Mailbox<S, Standard<B>>,
106 epocher: ES,
107 last_built: LastBuilt<B>,
108
109 build_duration: Timed<E>,
110}
111
112impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
113where
114 E: Rng + Spawner + Metrics + Clock,
115 S: Scheme,
116 A: VerifyingApplication<
117 E,
118 Block = B,
119 SigningScheme = S,
120 Context = Context<B::Digest, S::PublicKey>,
121 >,
122 B: Block + Clone,
123 ES: Epocher,
124{
125 pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
130 let build_histogram = Histogram::new(Buckets::LOCAL);
131 context.register(
132 "build_duration",
133 "Histogram of time taken for the application to build a new block, in seconds",
134 build_histogram.clone(),
135 );
136 let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
137
138 Self {
139 context,
140 application,
141 marshal,
142 epocher,
143 last_built: Arc::new(Mutex::new(None)),
144 build_duration,
145 }
146 }
147}
148
149impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
150where
151 E: Rng + Spawner + Metrics + Clock,
152 S: Scheme,
153 A: VerifyingApplication<
154 E,
155 Block = B,
156 SigningScheme = S,
157 Context = Context<B::Digest, S::PublicKey>,
158 >,
159 B: Block + Clone,
160 ES: Epocher,
161{
162 type Digest = B::Digest;
163 type Context = Context<Self::Digest, S::PublicKey>;
164
165 async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
170 if epoch.is_zero() {
171 return self.application.genesis().await.digest();
172 }
173
174 let prev = epoch.previous().expect("checked to be non-zero above");
175 let last_height = self
176 .epocher
177 .last(prev)
178 .expect("previous epoch should exist");
179 let Some(block) = self.marshal.get_block(last_height).await else {
180 unreachable!("missing starting epoch block at height {}", last_height);
181 };
182 block.digest()
183 }
184
185 async fn propose(
191 &mut self,
192 consensus_context: Context<Self::Digest, S::PublicKey>,
193 ) -> oneshot::Receiver<Self::Digest> {
194 let mut marshal = self.marshal.clone();
195 let mut application = self.application.clone();
196 let last_built = self.last_built.clone();
197 let epocher = self.epocher.clone();
198 let build_duration = self.build_duration.clone();
199
200 let (mut tx, rx) = oneshot::channel();
201 self.context
202 .with_label("propose")
203 .with_attribute("round", consensus_context.round)
204 .spawn(move |runtime_context| async move {
205 let (parent_view, parent_digest) = consensus_context.parent;
206 let parent_request = fetch_parent(
207 parent_digest,
208 Some(Round::new(consensus_context.epoch(), parent_view)),
212 &mut application,
213 &mut marshal,
214 )
215 .await;
216
217 let parent = select! {
218 _ = tx.closed() => {
219 debug!(reason = "consensus dropped receiver", "skipping proposal");
220 return;
221 },
222 result = parent_request => match result {
223 Ok(parent) => parent,
224 Err(_) => {
225 debug!(
226 ?parent_digest,
227 reason = "failed to fetch parent block",
228 "skipping proposal"
229 );
230 return;
231 }
232 },
233 };
234
235 let last_in_epoch = epocher
237 .last(consensus_context.epoch())
238 .expect("current epoch should exist");
239 if parent.height() == last_in_epoch {
240 let digest = parent.digest();
241 {
242 let mut lock = last_built.lock();
243 *lock = Some((consensus_context.round, parent));
244 }
245
246 let success = tx.send_lossy(digest);
247 debug!(
248 round = ?consensus_context.round,
249 ?digest,
250 success,
251 "re-proposed parent block at epoch boundary"
252 );
253 return;
254 }
255
256 let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
257 let build_request = application.propose(
258 (
259 runtime_context.with_label("app_propose"),
260 consensus_context.clone(),
261 ),
262 ancestor_stream,
263 );
264
265 let mut build_timer = build_duration.timer();
266 let built_block = select! {
267 _ = tx.closed() => {
268 debug!(reason = "consensus dropped receiver", "skipping proposal");
269 return;
270 },
271 result = build_request => match result {
272 Some(block) => block,
273 None => {
274 debug!(
275 ?parent_digest,
276 reason = "block building failed",
277 "skipping proposal"
278 );
279 return;
280 }
281 },
282 };
283 build_timer.observe();
284
285 let digest = built_block.digest();
286 {
287 let mut lock = last_built.lock();
288 *lock = Some((consensus_context.round, built_block));
289 }
290 let success = tx.send_lossy(digest);
291 debug!(
292 round = ?consensus_context.round,
293 ?digest,
294 success,
295 "proposed new block"
296 );
297 });
298 rx
299 }
300
301 async fn verify(
312 &mut self,
313 context: Context<Self::Digest, S::PublicKey>,
314 digest: Self::Digest,
315 ) -> oneshot::Receiver<bool> {
316 let mut marshal = self.marshal.clone();
317 let mut application = self.application.clone();
318 let epocher = self.epocher.clone();
319
320 let (mut tx, rx) = oneshot::channel();
321 self.context
322 .with_label("inline_verify")
323 .with_attribute("round", context.round)
324 .spawn(move |runtime_context| async move {
325 let block_request = marshal
326 .subscribe_by_digest(Some(context.round), digest)
327 .await;
328 let block = select! {
329 _ = tx.closed() => {
330 debug!(
331 reason = "consensus dropped receiver",
332 "skipping verification"
333 );
334 return;
335 },
336 result = block_request => match result {
337 Ok(block) => block,
338 Err(_) => {
339 debug!(
340 ?digest,
341 reason = "failed to fetch block for verification",
342 "skipping verification"
343 );
344 return;
345 }
346 },
347 };
348
349 let block = match precheck_epoch_and_reproposal(
357 &epocher,
358 &mut marshal,
359 &context,
360 digest,
361 block,
362 )
363 .await
364 {
365 Decision::Complete(valid) => {
366 tx.send_lossy(valid);
369 return;
370 }
371 Decision::Continue(block) => block,
372 };
373
374 let application_valid = match verify_with_parent(
379 runtime_context,
380 context,
381 block,
382 &mut application,
383 &mut marshal,
384 &mut tx,
385 )
386 .await
387 {
388 Some(valid) => valid,
389 None => return,
390 };
391 tx.send_lossy(application_valid);
392 });
393 rx
394 }
395}
396
397impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
402where
403 E: Rng + Spawner + Metrics + Clock,
404 S: Scheme,
405 A: VerifyingApplication<
406 E,
407 Block = B,
408 SigningScheme = S,
409 Context = Context<B::Digest, S::PublicKey>,
410 >,
411 B: Block + Clone,
412 ES: Epocher,
413{
414}
415
416impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
417where
418 E: Rng + Spawner + Metrics + Clock,
419 S: Scheme,
420 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
421 B: Block + Clone,
422 ES: Epocher,
423{
424 type Digest = B::Digest;
425
426 async fn broadcast(&mut self, digest: Self::Digest) {
428 let Some((round, block)) = self.last_built.lock().take() else {
429 warn!("missing block to broadcast");
430 return;
431 };
432 if block.digest() != digest {
433 warn!(
434 round = %round,
435 digest = %block.digest(),
436 height = %block.height(),
437 "skipping requested broadcast of block with mismatched digest"
438 );
439 return;
440 }
441 self.marshal.proposed(round, block).await;
442 }
443}
444
445impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
446where
447 E: Rng + Spawner + Metrics + Clock,
448 S: Scheme,
449 A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
450 + Reporter<Activity = Update<B>>,
451 B: Block + Clone,
452 ES: Epocher,
453{
454 type Activity = A::Activity;
455
456 async fn report(&mut self, update: Self::Activity) {
458 self.application.report(update).await
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::Inline;
465 use crate::{
466 simplex::types::Context, Automaton, Block, CertifiableAutomaton, Relay,
467 VerifyingApplication,
468 };
469 use commonware_cryptography::certificate::Scheme;
470 use commonware_runtime::{Clock, Metrics, Spawner};
471 use rand::Rng;
472
473 #[allow(dead_code)]
475 fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
476 where
477 E: Rng + Spawner + Metrics + Clock,
478 S: Scheme,
479 A: VerifyingApplication<
480 E,
481 Block = B,
482 SigningScheme = S,
483 Context = Context<B::Digest, S::PublicKey>,
484 >,
485 B: Block + Clone,
486 ES: crate::types::Epocher,
487 {
488 fn assert_automaton<T: Automaton>() {}
489 fn assert_certifiable<T: CertifiableAutomaton>() {}
490 fn assert_relay<T: Relay>() {}
491
492 assert_automaton::<Inline<E, S, A, B, ES>>();
493 assert_certifiable::<Inline<E, S, A, B, ES>>();
494 assert_relay::<Inline<E, S, A, B, ES>>();
495 }
496}