commonware-consensus 2026.4.0

Order opaque messages in a Byzantine environment.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
use crate::{
    marshal::core::Variant,
    simplex::types::{Finalization, Notarization},
    types::{Epoch, Round, View},
};
use commonware_codec::{CodecShared, Read};
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_runtime::{buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Spawner, Storage};
use commonware_storage::{
    archive::{self, prunable, Archive as _, Identifier},
    metadata::{self, Metadata},
    translator::TwoCap,
};
use rand::Rng;
use std::{
    cmp::max,
    collections::BTreeMap,
    num::{NonZero, NonZeroUsize},
    time::Duration,
};
use tracing::{debug, info};

// The key used to store the current epoch in the metadata store.
const CACHED_EPOCHS_KEY: u8 = 0;

/// Configuration parameters for prunable archives.
pub(crate) struct Config {
    pub partition_prefix: String,
    pub prunable_items_per_section: NonZero<u64>,
    pub replay_buffer: NonZeroUsize,
    pub key_write_buffer: NonZeroUsize,
    pub value_write_buffer: NonZeroUsize,
    pub key_page_cache: CacheRef,
}

/// Prunable archives for a single epoch.
#[allow(clippy::type_complexity)]
struct Cache<R, V, S>
where
    R: BufferPooler + Rng + Spawner + Metrics + Clock + Storage,
    V: Variant,
    S: Scheme,
{
    /// Scoped context that keeps this epoch's metrics alive until the cache is dropped.
    _scope: R,
    /// Verified blocks stored by view
    verified_blocks: prunable::Archive<TwoCap, R, <V::Block as Digestible>::Digest, V::StoredBlock>,
    /// Notarized blocks stored by view
    notarized_blocks:
        prunable::Archive<TwoCap, R, <V::Block as Digestible>::Digest, V::StoredBlock>,
    /// Notarizations stored by view
    notarizations: prunable::Archive<
        TwoCap,
        R,
        <V::Block as Digestible>::Digest,
        Notarization<S, V::Commitment>,
    >,
    /// Finalizations stored by view
    finalizations: prunable::Archive<
        TwoCap,
        R,
        <V::Block as Digestible>::Digest,
        Finalization<S, V::Commitment>,
    >,
}

impl<R, V, S> Cache<R, V, S>
where
    R: BufferPooler + Rng + Spawner + Metrics + Clock + Storage,
    V: Variant,
    S: Scheme,
{
    /// Prune the archives to the given view.
    async fn prune(&mut self, min_view: View) {
        match futures::try_join!(
            self.verified_blocks.prune(min_view.get()),
            self.notarized_blocks.prune(min_view.get()),
            self.notarizations.prune(min_view.get()),
            self.finalizations.prune(min_view.get()),
        ) {
            Ok(_) => debug!(min_view = %min_view, "pruned archives"),
            Err(e) => panic!("failed to prune archives: {e}"),
        }
    }
}

/// Manages prunable caches and their metadata.
pub(crate) struct Manager<R, V, S>
where
    R: BufferPooler + Rng + Spawner + Metrics + Clock + Storage,
    V: Variant,
    S: Scheme,
{
    /// Context
    context: R,

    /// Configuration for underlying prunable archives
    cfg: Config,

    /// Codec configuration for block type
    block_codec_config: <V::Block as Read>::Cfg,

    /// Metadata store for recording which epochs may have data. The value is a tuple of the floor
    /// and ceiling, the minimum and maximum epochs (inclusive) that may have data.
    metadata: Metadata<R, u8, (Epoch, Epoch)>,

    /// A map from epoch to its cache
    caches: BTreeMap<Epoch, Cache<R, V, S>>,
}

impl<R, V, S> Manager<R, V, S>
where
    R: BufferPooler + Rng + Spawner + Metrics + Clock + Storage,
    V: Variant,
    S: Scheme,
{
    /// Initialize the cache manager and its metadata store.
    pub(crate) async fn init(
        context: R,
        cfg: Config,
        block_codec_config: <V::Block as Read>::Cfg,
    ) -> Self {
        // Initialize metadata
        let metadata = Metadata::init(
            context.with_label("metadata"),
            metadata::Config {
                partition: format!("{}-metadata", cfg.partition_prefix),
                codec_config: ((), ()),
            },
        )
        .await
        .expect("failed to initialize metadata");

        // We don't eagerly initialize any epoch caches here, they will be
        // initialized on demand, otherwise there could be coordination issues
        // around the scheme provider.
        Self {
            context,
            cfg,
            block_codec_config,
            metadata,
            caches: BTreeMap::new(),
        }
    }

    /// Load all persisted epoch caches so that `find_block` can discover
    /// blocks written before the last shutdown.
    pub(crate) async fn load_persisted_epochs(&mut self) {
        let (floor, ceiling) = self.get_metadata();
        for e in floor.get()..=ceiling.get() {
            let epoch = Epoch::new(e);
            if !self.caches.contains_key(&epoch) {
                self.init_epoch(epoch).await;
            }
        }
    }

    /// Retrieve the epoch range that may have data.
    fn get_metadata(&self) -> (Epoch, Epoch) {
        self.metadata
            .get(&CACHED_EPOCHS_KEY)
            .cloned()
            .unwrap_or((Epoch::zero(), Epoch::zero()))
    }

    /// Set the epoch range that may have data.
    async fn set_metadata(&mut self, floor: Epoch, ceiling: Epoch) {
        self.metadata
            .put_sync(CACHED_EPOCHS_KEY, (floor, ceiling))
            .await
            .expect("failed to write metadata");
    }

    /// Get the cache for the given epoch, initializing it if it doesn't exist.
    ///
    /// If the epoch is less than the minimum cached epoch, then it has already been pruned,
    /// and this will return `None`.
    async fn get_or_init_epoch(&mut self, epoch: Epoch) -> Option<&mut Cache<R, V, S>> {
        // If the cache exists, return it
        if self.caches.contains_key(&epoch) {
            return self.caches.get_mut(&epoch);
        }

        // If the epoch is less than the epoch floor, then it has already been pruned
        let (floor, ceiling) = self.get_metadata();
        if epoch < floor {
            return None;
        }

        // Update the metadata (metadata-first is safe; init is idempotent)
        if epoch > ceiling {
            self.set_metadata(floor, epoch).await;
        }

        // Initialize and return the epoch
        self.init_epoch(epoch).await;
        self.caches.get_mut(&epoch) // Should always be Some
    }

    /// Helper to initialize the cache for a given epoch.
    async fn init_epoch(&mut self, epoch: Epoch) {
        let scope = self
            .context
            .with_label("cache")
            .with_attribute("epoch", epoch)
            .with_scope();
        let (verified_blocks, notarized_blocks, notarizations, finalizations) = futures::join!(
            Self::init_archive(
                &scope,
                &self.cfg,
                epoch,
                "verified",
                self.block_codec_config.clone()
            ),
            Self::init_archive(
                &scope,
                &self.cfg,
                epoch,
                "notarized",
                self.block_codec_config.clone()
            ),
            Self::init_archive(
                &scope,
                &self.cfg,
                epoch,
                "notarizations",
                S::certificate_codec_config_unbounded(),
            ),
            Self::init_archive(
                &scope,
                &self.cfg,
                epoch,
                "finalizations",
                S::certificate_codec_config_unbounded(),
            ),
        );
        let existing = self.caches.insert(
            epoch,
            Cache {
                _scope: scope,
                verified_blocks,
                notarized_blocks,
                notarizations,
                finalizations,
            },
        );
        assert!(existing.is_none(), "cache already exists for epoch {epoch}");
    }

    /// Helper to initialize an archive.
    async fn init_archive<T: CodecShared>(
        ctx: &R,
        cfg: &Config,
        epoch: Epoch,
        name: &str,
        codec_config: T::Cfg,
    ) -> prunable::Archive<TwoCap, R, <V::Block as Digestible>::Digest, T> {
        let start = ctx.current();
        let archive_cfg = prunable::Config {
            translator: TwoCap,
            key_partition: format!("{}-cache-{epoch}-{name}-key", cfg.partition_prefix),
            key_page_cache: cfg.key_page_cache.clone(),
            value_partition: format!("{}-cache-{epoch}-{name}-value", cfg.partition_prefix),
            items_per_section: cfg.prunable_items_per_section,
            compression: None,
            codec_config,
            replay_buffer: cfg.replay_buffer,
            key_write_buffer: cfg.key_write_buffer,
            value_write_buffer: cfg.value_write_buffer,
        };
        let archive = prunable::Archive::init(ctx.with_label(name), archive_cfg)
            .await
            .unwrap_or_else(|_| panic!("failed to initialize {name} archive"));
        info!(elapsed = ?ctx.current().duration_since(start).unwrap_or(Duration::ZERO), "restored {name} archive");
        archive
    }

    /// Add a verified block to the prunable archive.
    pub(crate) async fn put_verified(
        &mut self,
        round: Round,
        digest: <V::Block as Digestible>::Digest,
        block: V::StoredBlock,
    ) {
        let Some(cache) = self.get_or_init_epoch(round.epoch()).await else {
            return;
        };
        let result = cache
            .verified_blocks
            .put_sync(round.view().get(), digest, block)
            .await;
        Self::handle_result(result, round, "verified");
    }

    /// Add a notarized block to the prunable archive.
    pub(crate) async fn put_block(
        &mut self,
        round: Round,
        digest: <V::Block as Digestible>::Digest,
        block: V::StoredBlock,
    ) {
        let Some(cache) = self.get_or_init_epoch(round.epoch()).await else {
            return;
        };
        let result = cache
            .notarized_blocks
            .put_sync(round.view().get(), digest, block)
            .await;
        Self::handle_result(result, round, "notarized");
    }

    /// Add a notarization to the prunable archive.
    pub(crate) async fn put_notarization(
        &mut self,
        round: Round,
        digest: <V::Block as Digestible>::Digest,
        notarization: Notarization<S, V::Commitment>,
    ) {
        let Some(cache) = self.get_or_init_epoch(round.epoch()).await else {
            return;
        };
        let result = cache
            .notarizations
            .put_sync(round.view().get(), digest, notarization)
            .await;
        Self::handle_result(result, round, "notarization");
    }

    /// Add a finalization to the prunable archive.
    pub(crate) async fn put_finalization(
        &mut self,
        round: Round,
        digest: <V::Block as Digestible>::Digest,
        finalization: Finalization<S, V::Commitment>,
    ) {
        let Some(cache) = self.get_or_init_epoch(round.epoch()).await else {
            return;
        };
        let result = cache
            .finalizations
            .put_sync(round.view().get(), digest, finalization)
            .await;
        Self::handle_result(result, round, "finalization");
    }

    /// Helper to debug cache results.
    fn handle_result(result: Result<(), archive::Error>, round: Round, name: &str) {
        match result {
            Ok(_) => {
                debug!(?round, name, "cached");
            }
            Err(archive::Error::AlreadyPrunedTo(_)) => {
                debug!(?round, name, "already pruned");
            }
            Err(e) => {
                panic!("failed to insert {name}: {e}");
            }
        }
    }

    /// Get a notarization from the prunable archive by round.
    pub(crate) async fn get_notarization(
        &self,
        round: Round,
    ) -> Option<Notarization<S, V::Commitment>> {
        let cache = self.caches.get(&round.epoch())?;
        cache
            .notarizations
            .get(Identifier::Index(round.view().get()))
            .await
            .expect("failed to get notarization")
    }

    /// Get a finalization from the prunable archive by block digest.
    ///
    /// SAFETY: For blocks/certificates admitted by marshal verification, a block digest
    /// maps to exactly one consensus payload commitment for the active marshal
    /// [`Variant`] instance.
    pub(crate) async fn get_finalization_for(
        &self,
        digest: <V::Block as Digestible>::Digest,
    ) -> Option<Finalization<S, V::Commitment>> {
        for cache in self.caches.values().rev() {
            match cache.finalizations.get(Identifier::Key(&digest)).await {
                Ok(Some(finalization)) => return Some(finalization),
                Ok(None) => continue,
                Err(e) => panic!("failed to get cached finalization: {e}"),
            }
        }
        None
    }

    /// Looks for a block (verified or notarized).
    pub(crate) async fn find_block(
        &self,
        digest: <V::Block as Digestible>::Digest,
    ) -> Option<V::StoredBlock> {
        // Check in reverse order
        for cache in self.caches.values().rev() {
            // Check verified blocks
            if let Some(block) = cache
                .verified_blocks
                .get(Identifier::Key(&digest))
                .await
                .expect("failed to get verified block")
            {
                return Some(block);
            }

            // Check notarized blocks
            if let Some(block) = cache
                .notarized_blocks
                .get(Identifier::Key(&digest))
                .await
                .expect("failed to get notarized block")
            {
                return Some(block);
            }
        }
        None
    }

    /// Prune the caches below the given round.
    pub(crate) async fn prune(&mut self, round: Round) {
        // Remove and close prunable archives from older epochs
        let new_floor = round.epoch();
        let old_epochs: Vec<Epoch> = self
            .caches
            .keys()
            .copied()
            .filter(|epoch| *epoch < new_floor)
            .collect();
        for epoch in old_epochs.iter() {
            let Cache {
                verified_blocks: vb,
                notarized_blocks: nb,
                notarizations: nv,
                finalizations: fv,
                ..
            } = self.caches.remove(epoch).unwrap();
            vb.destroy().await.expect("failed to destroy vb");
            nb.destroy().await.expect("failed to destroy nb");
            nv.destroy().await.expect("failed to destroy nv");
            fv.destroy().await.expect("failed to destroy fv");
        }

        // Update metadata if necessary
        let (floor, ceiling) = self.get_metadata();
        if new_floor > floor {
            let new_ceiling = max(ceiling, new_floor);
            self.set_metadata(new_floor, new_ceiling).await;
        }

        // Prune archives for the given epoch
        let min_view = round.view();
        if let Some(prunable) = self.caches.get_mut(&round.epoch()) {
            prunable.prune(min_view).await;
        }
    }
}