commonware_consensus/marshal/finalizer.rs
1use crate::{marshal::ingress::orchestrator::Orchestrator, Block, Reporter};
2use commonware_runtime::{Clock, Metrics, Spawner, Storage};
3use commonware_storage::metadata::{self, Metadata};
4use commonware_utils::sequence::FixedBytes;
5use futures::{channel::mpsc, StreamExt};
6use tracing::{debug, error};
7
8// The key used to store the last indexed height in the metadata store.
9const LATEST_KEY: FixedBytes<1> = FixedBytes::new([0u8]);
10
11/// Requests the finalized blocks (in order) from the orchestrator, sends them to the application,
12/// waits for confirmation that the application has processed the block.
13///
14/// Stores the highest height for which the application has processed. This allows resuming
15/// processing from the last processed height after a restart.
16pub struct Finalizer<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>> {
17 // Application that processes the finalized blocks.
18 application: Z,
19
20 // Orchestrator that stores the finalized blocks.
21 orchestrator: Orchestrator<B>,
22
23 // Notifier to indicate that the finalized blocks have been updated and should be re-queried.
24 notifier_rx: mpsc::Receiver<()>,
25
26 // Metadata store that stores the last indexed height.
27 metadata: Metadata<R, FixedBytes<1>, u64>,
28}
29
30impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>>
31 Finalizer<B, R, Z>
32{
33 /// Initialize the finalizer.
34 pub async fn new(
35 context: R,
36 partition_prefix: String,
37 application: Z,
38 orchestrator: Orchestrator<B>,
39 notifier_rx: mpsc::Receiver<()>,
40 ) -> Self {
41 // Initialize metadata
42 let metadata = Metadata::init(
43 context.with_label("metadata"),
44 metadata::Config {
45 partition: format!("{partition_prefix}-metadata"),
46 codec_config: (),
47 },
48 )
49 .await
50 .expect("failed to initialize metadata");
51
52 Self {
53 application,
54 orchestrator,
55 notifier_rx,
56 metadata,
57 }
58 }
59
60 /// Run the finalizer, which continuously fetches and processes finalized blocks.
61 pub async fn run(mut self) {
62 // Initialize last indexed from metadata store.
63 // If the key does not exist, we assume the genesis block (height 0) has been indexed.
64 let mut latest = *self.metadata.get(&LATEST_KEY).unwrap_or(&0);
65
66 // The main loop to process finalized blocks. This loop will hot-spin until a block is
67 // available, at which point it will process it and continue. If a block is not available,
68 // it will request a repair and wait for a notification of an update before retrying.
69 loop {
70 // The next height to process is the next height after the last processed height.
71 let height = latest + 1;
72
73 // Attempt to get the next block from the orchestrator.
74 if let Some(block) = self.orchestrator.get(height).await {
75 // Sanity-check that the block height is the one we expect.
76 assert!(block.height() == height, "block height mismatch");
77
78 // Send the block to the application.
79 //
80 // After an unclean shutdown (where the finalizer metadata is not synced after some
81 // height is processed by the application), it is possible that the application may
82 // be asked to process a block it has already seen (which it can simply ignore).
83 let commitment = block.commitment();
84 self.application.report(block).await;
85
86 // Record that we have processed up through this height.
87 latest = height;
88 if let Err(e) = self.metadata.put_sync(LATEST_KEY.clone(), latest).await {
89 error!("failed to update metadata: {e}");
90 return;
91 }
92
93 // Notify the orchestrator that the block has been processed.
94 self.orchestrator.processed(height, commitment).await;
95
96 // Loop again without waiting for a notification (there may be more to process).
97 continue;
98 }
99
100 // We've reached a height at which we have no (finalized) block.
101 // It may be the case that the block is not finalized yet, or that there is a gap.
102 // Notify the orchestrator that we're trying to access this block.
103 self.orchestrator.repair(height).await;
104
105 // Wait for a notification from the orchestrator that new blocks are available.
106 debug!(height, "waiting to index finalized block");
107 let Some(()) = self.notifier_rx.next().await else {
108 error!("orchestrator closed, shutting down");
109 return;
110 };
111 }
112 }
113}