commonware_consensus/marshal/
finalizer.rs

1use crate::{marshal::ingress::orchestrator::Orchestrator, Block, Reporter};
2use commonware_runtime::{Clock, Metrics, Spawner, Storage};
3use commonware_storage::metadata::{self, Metadata};
4use commonware_utils::sequence::FixedBytes;
5use futures::{channel::mpsc, StreamExt};
6use tracing::{debug, error};
7
8// The key used to store the last indexed height in the metadata store.
9const LATEST_KEY: FixedBytes<1> = FixedBytes::new([0u8]);
10
11/// Requests the finalized blocks (in order) from the orchestrator, sends them to the application,
12/// waits for confirmation that the application has processed the block.
13///
14/// Stores the highest height for which the application has processed. This allows resuming
15/// processing from the last processed height after a restart.
16pub struct Finalizer<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>> {
17    // Application that processes the finalized blocks.
18    application: Z,
19
20    // Orchestrator that stores the finalized blocks.
21    orchestrator: Orchestrator<B>,
22
23    // Notifier to indicate that the finalized blocks have been updated and should be re-queried.
24    notifier_rx: mpsc::Receiver<()>,
25
26    // Metadata store that stores the last indexed height.
27    metadata: Metadata<R, FixedBytes<1>, u64>,
28}
29
30impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>>
31    Finalizer<B, R, Z>
32{
33    /// Initialize the finalizer.
34    pub async fn new(
35        context: R,
36        partition_prefix: String,
37        application: Z,
38        orchestrator: Orchestrator<B>,
39        notifier_rx: mpsc::Receiver<()>,
40    ) -> Self {
41        // Initialize metadata
42        let metadata = Metadata::init(
43            context.with_label("metadata"),
44            metadata::Config {
45                partition: format!("{partition_prefix}-metadata"),
46                codec_config: (),
47            },
48        )
49        .await
50        .expect("failed to initialize metadata");
51
52        Self {
53            application,
54            orchestrator,
55            notifier_rx,
56            metadata,
57        }
58    }
59
60    /// Run the finalizer, which continuously fetches and processes finalized blocks.
61    pub async fn run(mut self) {
62        // Initialize last indexed from metadata store.
63        // If the key does not exist, we assume the genesis block (height 0) has been indexed.
64        let mut latest = *self.metadata.get(&LATEST_KEY).unwrap_or(&0);
65
66        // The main loop to process finalized blocks. This loop will hot-spin until a block is
67        // available, at which point it will process it and continue. If a block is not available,
68        // it will request a repair and wait for a notification of an update before retrying.
69        loop {
70            // The next height to process is the next height after the last processed height.
71            let height = latest + 1;
72
73            // Attempt to get the next block from the orchestrator.
74            if let Some(block) = self.orchestrator.get(height).await {
75                // Sanity-check that the block height is the one we expect.
76                assert!(block.height() == height, "block height mismatch");
77
78                // Send the block to the application.
79                //
80                // After an unclean shutdown (where the finalizer metadata is not synced after some
81                // height is processed by the application), it is possible that the application may
82                // be asked to process a block it has already seen (which it can simply ignore).
83                let commitment = block.commitment();
84                self.application.report(block).await;
85
86                // Record that we have processed up through this height.
87                latest = height;
88                if let Err(e) = self.metadata.put_sync(LATEST_KEY.clone(), latest).await {
89                    error!("failed to update metadata: {e}");
90                    return;
91                }
92
93                // Notify the orchestrator that the block has been processed.
94                self.orchestrator.processed(height, commitment).await;
95
96                // Loop again without waiting for a notification (there may be more to process).
97                continue;
98            }
99
100            // We've reached a height at which we have no (finalized) block.
101            // It may be the case that the block is not finalized yet, or that there is a gap.
102            // Notify the orchestrator that we're trying to access this block.
103            self.orchestrator.repair(height).await;
104
105            // Wait for a notification from the orchestrator that new blocks are available.
106            debug!(height, "waiting to index finalized block");
107            let Some(()) = self.notifier_rx.next().await else {
108                error!("orchestrator closed, shutting down");
109                return;
110            };
111        }
112    }
113}