commonware-consensus 2026.4.0

Order opaque messages in a Byzantine environment.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
use crate::{
    simplex::types::{Certificate, Notarization},
    types::View,
    Viewable,
};
use commonware_cryptography::{certificate::Scheme, Digest};
use commonware_resolver::Resolver;
use commonware_utils::sequence::U64;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};

/// Tracks all known certificates from the last
/// certified notarization or finalized view to the current view.
pub struct State<S: Scheme, D: Digest> {
    /// Highest seen view.
    current_view: View,
    /// Most recent certified notarization or finalization.
    floor: Option<Certificate<S, D>>,
    /// Notarizations pending certification (possible floors).
    notarizations: BTreeMap<View, Notarization<S, D>>,
    /// Nullifications for any view greater than the floor.
    nullifications: BTreeMap<View, Certificate<S, D>>,
    /// Window of requests to send to the resolver.
    fetch_concurrent: usize,
    /// Next view to consider when fetching. Avoids re-scanning
    /// views we've already requested or have nullifications for.
    fetch_floor: View,
    /// Maps notarization view -> request views it satisfied.
    /// When a higher-view notarization satisfies a lower-view request,
    /// we track it here so we can re-request on certification failure.
    satisfied_by: HashMap<View, BTreeSet<View>>,
    /// Views where certification has failed. Only nullifications
    /// are accepted for these views.
    failed_views: HashSet<View>,
}

impl<S: Scheme, D: Digest> State<S, D> {
    /// Create a new instance of [State].
    pub fn new(fetch_concurrent: usize) -> Self {
        Self {
            current_view: View::zero(),
            floor: None,
            notarizations: BTreeMap::new(),
            nullifications: BTreeMap::new(),
            fetch_concurrent,
            fetch_floor: View::zero(),
            satisfied_by: HashMap::new(),
            failed_views: HashSet::new(),
        }
    }

    /// Returns true if the given view has failed certification.
    pub fn is_failed(&self, view: View) -> bool {
        self.failed_views.contains(&view)
    }

    /// Handle a new certificate and update the [Resolver] accordingly.
    ///
    /// The `request` parameter is the view that was originally requested
    /// when this certificate was fetched. If the certificate is a notarization
    /// at a higher view, we track that the request was "satisfied by" this
    /// notarization so we can re-request on certification failure.
    pub async fn handle(
        &mut self,
        certificate: Certificate<S, D>,
        request: Option<View>,
        resolver: &mut impl Resolver<Key = U64>,
    ) {
        match certificate {
            Certificate::Nullification(nullification) => {
                let view = nullification.view();
                if self.encounter_view(view) {
                    self.nullifications
                        .insert(view, Certificate::Nullification(nullification));
                    resolver.cancel(view.into()).await;
                }
            }
            Certificate::Notarization(notarization) => {
                // Store as pending (waiting for certification result).
                let view = notarization.view();
                if self.encounter_view(view) {
                    self.notarizations.insert(view, notarization);
                    if let Some(request) = request {
                        self.satisfied_by.entry(view).or_default().insert(request);
                    }
                }
            }
            Certificate::Finalization(finalization) => {
                let view = finalization.view();
                if self.encounter_view(view) || self.can_upgrade_floor(view) {
                    self.floor = Some(Certificate::Finalization(finalization));
                    self.prune(resolver).await;
                }
            }
        }

        // Request missing nullifications
        self.fetch(resolver).await;
    }

    /// Handle a certification result from the voter.
    pub async fn handle_certified(
        &mut self,
        view: View,
        success: bool,
        resolver: &mut impl Resolver<Key = U64>,
    ) {
        if success {
            // Certification passed - set floor to notarization if we have it.
            //
            // This may occur before or after a nullification for the same view (and should always be favored).
            // Finalization remains the stronger proof and can later supersede this floor at the same or higher view.
            if let Some(notarization) = self.notarizations.remove(&view) {
                if view > self.floor_view() {
                    self.floor = Some(Certificate::Notarization(notarization));
                    self.prune(resolver).await;
                }
            }

            // Clean up satisfaction tracking
            self.satisfied_by.remove(&view);
        } else {
            // Discard notarization and mark view as failed (ensures we can penalize
            // malicious peers that hand us useless notarizations)
            self.notarizations.remove(&view);
            self.failed_views.insert(view);

            // Request nullification for this view (if above floor)
            let floor = self.floor_view();
            if view > floor {
                resolver.fetch(view.into()).await;
            }

            // Re-request any lower views this notarization had satisfied
            if let Some(satisfied_views) = self.satisfied_by.remove(&view) {
                for &v in satisfied_views.iter().filter(|v| **v > floor) {
                    resolver.fetch(v.into()).await;
                }
            }
        }
    }

    /// Get the best certificate for a given view (or the floor
    /// if the view is below the floor).
    pub fn get(&self, view: View) -> Option<&Certificate<S, D>> {
        // If view is <= floor, return the floor
        if let Some(floor) = &self.floor {
            if view <= floor.view() {
                return Some(floor);
            }
        }

        // Otherwise, return the nullification for the view if it exists
        self.nullifications.get(&view)
    }

    /// Updates the current view if the new view is greater.
    ///
    /// Returns true if the view is "interesting" (i.e. greater than or equal to the floor).
    fn encounter_view(&mut self, view: View) -> bool {
        self.current_view = self.current_view.max(view);
        view > self.floor_view()
    }

    /// Get the view of the floor.
    fn floor_view(&self) -> View {
        self.floor
            .as_ref()
            .map(|floor| floor.view())
            .unwrap_or(View::zero())
    }

    /// Returns true if the floor can be upgraded at the given view.
    ///
    /// A finalization can upgrade a notarization at the same view since
    /// finalization is a stronger proof than notarization.
    fn can_upgrade_floor(&self, view: View) -> bool {
        matches!(
            self.floor.as_ref(),
            Some(Certificate::Notarization(n)) if n.view() == view
        )
    }

    /// Inform the [Resolver] of any missing nullifications.
    async fn fetch(&mut self, resolver: &mut impl Resolver<Key = U64>) {
        // We must either receive a nullification at the current view or a notarization/finalization at the current
        // view or higher, so we don't need to worry about getting stuck (where peers cannot resolve our requests).
        let start = self.fetch_floor.max(self.floor_view().next());
        let views: Vec<_> = View::range(start, self.current_view)
            .filter(|view| !self.nullifications.contains_key(view))
            .take(self.fetch_concurrent)
            .collect();

        // Update the fetch floor to reduce duplicate iteration in the future.
        if let Some(&last) = views.last() {
            self.fetch_floor = last.next();
        }

        // Send the requests to the resolver.
        let requests = views.into_iter().map(U64::from).collect();
        resolver.fetch_all(requests).await;
    }

    /// Prune stored certificates and requests that are not higher than the floor.
    async fn prune(&mut self, resolver: &mut impl Resolver<Key = U64>) {
        let floor = self.floor_view();
        self.notarizations.retain(|view, _| *view > floor);
        self.nullifications.retain(|view, _| *view > floor);
        self.satisfied_by.retain(|view, _| *view > floor);
        self.failed_views.retain(|view| *view > floor);
        resolver.retain(move |key| *key > floor.into()).await;
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        simplex::{
            scheme::ed25519,
            types::{
                Finalization, Finalize, Notarization, Notarize, Nullification, Nullify, Proposal,
            },
        },
        types::{Epoch, Round, View},
    };
    use commonware_cryptography::{
        certificate::mocks::Fixture, ed25519::PublicKey, sha256::Digest as Sha256Digest,
    };
    use commonware_macros::test_async;
    use commonware_parallel::Sequential;
    use commonware_utils::{sync::Mutex, test_rng, vec::NonEmptyVec};
    use std::{collections::BTreeSet, sync::Arc};

    const NAMESPACE: &[u8] = b"resolver-state";
    const EPOCH: Epoch = Epoch::new(9);

    type TestScheme = ed25519::Scheme;

    #[derive(Clone, Default)]
    struct MockResolver {
        outstanding: Arc<Mutex<BTreeSet<U64>>>,
    }

    impl MockResolver {
        fn outstanding(&self) -> Vec<u64> {
            self.outstanding
                .lock()
                .iter()
                .map(|key| key.into())
                .collect()
        }
    }

    impl Resolver for MockResolver {
        type Key = U64;
        type PublicKey = PublicKey;

        async fn fetch(&mut self, key: U64) {
            self.outstanding.lock().insert(key);
        }

        async fn fetch_all(&mut self, keys: Vec<U64>) {
            for key in keys {
                self.outstanding.lock().insert(key);
            }
        }

        async fn fetch_targeted(&mut self, key: U64, _targets: NonEmptyVec<PublicKey>) {
            // For testing, just treat targeted fetch the same as regular fetch
            self.outstanding.lock().insert(key);
        }

        async fn fetch_all_targeted(&mut self, requests: Vec<(U64, NonEmptyVec<PublicKey>)>) {
            // For testing, just treat targeted fetch the same as regular fetch
            for (key, _targets) in requests {
                self.outstanding.lock().insert(key);
            }
        }

        async fn cancel(&mut self, key: U64) {
            self.outstanding.lock().remove(&key);
        }

        async fn clear(&mut self) {
            self.outstanding.lock().clear();
        }

        async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
            self.outstanding.lock().retain(|key| predicate(key));
        }
    }

    fn ed25519_fixture() -> (Vec<TestScheme>, TestScheme) {
        let mut rng = test_rng();
        let Fixture {
            schemes, verifier, ..
        } = ed25519::fixture(&mut rng, NAMESPACE, 5);
        (schemes, verifier)
    }

    fn build_nullification(
        schemes: &[TestScheme],
        verifier: &TestScheme,
        view: View,
    ) -> Nullification<TestScheme> {
        let round = Round::new(EPOCH, view);
        let votes: Vec<_> = schemes
            .iter()
            .map(|scheme| Nullify::sign::<Sha256Digest>(scheme, round).unwrap())
            .collect();
        Nullification::from_nullifies(verifier, &votes, &Sequential).expect("nullification quorum")
    }

    fn build_notarization(
        schemes: &[TestScheme],
        verifier: &TestScheme,
        view: View,
    ) -> Notarization<TestScheme, Sha256Digest> {
        let proposal = Proposal::new(
            Round::new(EPOCH, view),
            view.previous().unwrap_or(View::zero()),
            Sha256Digest::from([view.get() as u8; 32]),
        );
        let votes: Vec<_> = schemes
            .iter()
            .map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
            .collect();
        Notarization::from_notarizes(verifier, &votes, &Sequential).expect("notarization quorum")
    }

    fn build_finalization(
        schemes: &[TestScheme],
        verifier: &TestScheme,
        view: View,
    ) -> Finalization<TestScheme, Sha256Digest> {
        let proposal = Proposal::new(
            Round::new(EPOCH, view),
            view.previous().unwrap_or(View::zero()),
            Sha256Digest::from([view.get() as u8; 32]),
        );
        let votes: Vec<_> = schemes
            .iter()
            .map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
            .collect();
        Finalization::from_finalizes(verifier, &votes, &Sequential).expect("finalization quorum")
    }

    #[test_async]
    async fn handle_nullification_requests_missing_views() {
        let (schemes, verifier) = ed25519_fixture();
        let mut state: State<TestScheme, Sha256Digest> = State::new(2);
        let mut resolver = MockResolver::default();

        let nullification_v4 = build_nullification(&schemes, &verifier, View::new(4));
        state
            .handle(
                Certificate::Nullification(nullification_v4.clone()),
                None,
                &mut resolver,
            )
            .await;
        assert_eq!(state.current_view, View::new(4));
        assert!(
            matches!(state.get(View::new(4)), Some(Certificate::Nullification(n)) if n == &nullification_v4)
        );
        assert_eq!(resolver.outstanding(), vec![1, 2]); // limited to concurrency

        let nullification_v2 = build_nullification(&schemes, &verifier, View::new(2));
        state
            .handle(
                Certificate::Nullification(nullification_v2.clone()),
                None,
                &mut resolver,
            )
            .await;
        assert_eq!(state.current_view, View::new(4));
        assert!(
            matches!(state.get(View::new(2)), Some(Certificate::Nullification(n)) if n == &nullification_v2)
        );
        assert_eq!(resolver.outstanding(), vec![1, 3]); // limited to concurrency

        let nullification_v1 = build_nullification(&schemes, &verifier, View::new(1));
        state
            .handle(
                Certificate::Nullification(nullification_v1.clone()),
                None,
                &mut resolver,
            )
            .await;
        assert_eq!(state.current_view, View::new(4));
        assert!(
            matches!(state.get(View::new(1)), Some(Certificate::Nullification(n)) if n == &nullification_v1)
        );
        assert_eq!(resolver.outstanding(), vec![3]);
    }

    #[test_async]
    async fn floor_prunes_outstanding_requests() {
        let (schemes, verifier) = ed25519_fixture();
        let mut state: State<TestScheme, Sha256Digest> = State::new(10);
        let mut resolver = MockResolver::default();

        for view in 4..=6 {
            let nullification = build_nullification(&schemes, &verifier, View::new(view));
            state
                .handle(
                    Certificate::Nullification(nullification),
                    None,
                    &mut resolver,
                )
                .await;
        }
        assert_eq!(state.current_view, View::new(6));
        assert_eq!(resolver.outstanding(), vec![1, 2, 3]);

        // Notarization does not set floor or prune
        let notarization = build_notarization(&schemes, &verifier, View::new(6));
        state
            .handle(
                Certificate::Notarization(notarization.clone()),
                None,
                &mut resolver,
            )
            .await;

        assert!(state.floor.is_none());
        assert_eq!(state.nullifications.len(), 3); // nullifications remain
        assert_eq!(resolver.outstanding(), vec![1, 2, 3]); // requests remain

        // Finalization sets floor and prunes
        let finalization = build_finalization(&schemes, &verifier, View::new(6));
        state
            .handle(
                Certificate::Finalization(finalization.clone()),
                None,
                &mut resolver,
            )
            .await;
        assert!(
            matches!(state.floor.as_ref(), Some(Certificate::Finalization(f)) if f == &finalization)
        );
    }

    #[test_async]
    async fn produce_returns_floor_or_nullifications() {
        let (schemes, verifier) = ed25519_fixture();
        let mut state: State<TestScheme, Sha256Digest> = State::new(2);
        let mut resolver = MockResolver::default();

        // Finalization sets floor
        let finalization = build_finalization(&schemes, &verifier, View::new(3));
        state
            .handle(
                Certificate::Finalization(finalization.clone()),
                None,
                &mut resolver,
            )
            .await;
        assert!(
            matches!(state.get(View::new(1)), Some(Certificate::Finalization(f)) if f == &finalization)
        );
        assert!(
            matches!(state.get(View::new(3)), Some(Certificate::Finalization(f)) if f == &finalization)
        );

        // New nullification is kept
        let nullification_v4 = build_nullification(&schemes, &verifier, View::new(4));
        state
            .handle(
                Certificate::Nullification(nullification_v4.clone()),
                None,
                &mut resolver,
            )
            .await;
        assert!(
            matches!(state.get(View::new(4)), Some(Certificate::Nullification(n)) if n == &nullification_v4)
        );
        assert!(
            matches!(state.get(View::new(2)), Some(Certificate::Finalization(f)) if f == &finalization)
        );

        // Old nullification is ignored
        let nullification_v1 = build_nullification(&schemes, &verifier, View::new(1));
        state
            .handle(
                Certificate::Nullification(nullification_v1.clone()),
                None,
                &mut resolver,
            )
            .await;
        assert!(
            matches!(state.get(View::new(1)), Some(Certificate::Finalization(f)) if f == &finalization)
        );
        assert!(
            matches!(state.get(View::new(2)), Some(Certificate::Finalization(f)) if f == &finalization)
        );
        assert!(
            matches!(state.get(View::new(3)), Some(Certificate::Finalization(f)) if f == &finalization)
        );
        assert!(
            matches!(state.get(View::new(4)), Some(Certificate::Nullification(n)) if n == &nullification_v4)
        );
        assert!(resolver.outstanding().is_empty());
    }

    #[test_async]
    async fn certification_failure_re_requests_satisfied_views() {
        let (schemes, verifier) = ed25519_fixture();
        let mut state: State<TestScheme, Sha256Digest> = State::new(10);
        let mut resolver = MockResolver::default();

        // Notarization at view 5 satisfies request for view 2
        let notarization_v5 = build_notarization(&schemes, &verifier, View::new(5));
        state
            .handle(
                Certificate::Notarization(notarization_v5.clone()),
                Some(View::new(2)),
                &mut resolver,
            )
            .await;

        // Verify tracking
        assert!(state.satisfied_by.contains_key(&View::new(5)));
        assert!(state.satisfied_by[&View::new(5)].contains(&View::new(2)));
        assert!(!state.is_failed(View::new(5)));

        // Certification fails for view 5
        state
            .handle_certified(View::new(5), false, &mut resolver)
            .await;

        // View 5 should be marked as failed
        assert!(state.is_failed(View::new(5)));
        // Satisfied_by should be cleaned up
        assert!(!state.satisfied_by.contains_key(&View::new(5)));
        // Both view 5 and view 2 should have requests
        let outstanding = resolver.outstanding();
        assert!(outstanding.contains(&5));
        assert!(outstanding.contains(&2));
    }

    #[test_async]
    async fn certification_success_clears_tracking() {
        let (schemes, verifier) = ed25519_fixture();
        let mut state: State<TestScheme, Sha256Digest> = State::new(10);
        let mut resolver = MockResolver::default();

        // Notarization at view 5 satisfies request for view 2
        let notarization_v5 = build_notarization(&schemes, &verifier, View::new(5));
        state
            .handle(
                Certificate::Notarization(notarization_v5.clone()),
                Some(View::new(2)),
                &mut resolver,
            )
            .await;

        assert!(state.satisfied_by.contains_key(&View::new(5)));

        // Certification succeeds for view 5
        state
            .handle_certified(View::new(5), true, &mut resolver)
            .await;

        // Floor should be set
        assert!(
            matches!(state.floor.as_ref(), Some(Certificate::Notarization(n)) if n == &notarization_v5)
        );
        // Tracking should be cleaned up
        assert!(!state.satisfied_by.contains_key(&View::new(5)));
        // View 5 should not be marked as failed
        assert!(!state.is_failed(View::new(5)));
    }

    #[test_async]
    async fn finalization_upgrades_certified_notarization_at_same_view() {
        let (schemes, verifier) = ed25519_fixture();
        let mut state: State<TestScheme, Sha256Digest> = State::new(10);
        let mut resolver = MockResolver::default();

        // Create and certify a notarization at view 5
        let notarization_v5 = build_notarization(&schemes, &verifier, View::new(5));
        state
            .handle(
                Certificate::Notarization(notarization_v5.clone()),
                None,
                &mut resolver,
            )
            .await;
        state
            .handle_certified(View::new(5), true, &mut resolver)
            .await;

        // Floor should be the notarization at view 5
        assert!(
            matches!(state.floor.as_ref(), Some(Certificate::Notarization(n)) if n == &notarization_v5)
        );
        assert_eq!(state.floor_view(), View::new(5));

        // A finalization at the same view should upgrade the floor
        let finalization_v5 = build_finalization(&schemes, &verifier, View::new(5));
        state
            .handle(
                Certificate::Finalization(finalization_v5.clone()),
                None,
                &mut resolver,
            )
            .await;

        // Floor should now be the finalization (stronger proof)
        assert!(
            matches!(state.floor.as_ref(), Some(Certificate::Finalization(f)) if f == &finalization_v5)
        );
    }
}