Skip to main content

commonware_consensus/aggregation/
types.rs

1//! Types used in [aggregation](super).
2
3use crate::{
4    aggregation::scheme,
5    types::{Epoch, Height},
6    Heightable,
7};
8use bytes::{Buf, BufMut, Bytes};
9use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write};
10use commonware_cryptography::{
11    certificate::{self, Attestation, Scheme, Subject},
12    Digest,
13};
14use commonware_parallel::Strategy;
15use commonware_utils::{channel::oneshot, union, N3f1};
16use rand_core::CryptoRngCore;
17use std::hash::Hash;
18
19/// Error that may be encountered when interacting with `aggregation`.
20#[derive(Debug, thiserror::Error)]
21pub enum Error {
22    // Proposal Errors
23    /// The proposal was canceled by the application
24    #[error("Application verify error: {0}")]
25    AppProposeCanceled(oneshot::error::RecvError),
26
27    // Epoch Errors
28    /// The specified validator is not a participant in the epoch
29    #[error("Epoch {0} has no validator {1}")]
30    UnknownValidator(Epoch, String),
31    /// The local node is not a signer in the scheme for the specified epoch.
32    #[error("Not a signer at epoch {0}")]
33    NotSigner(Epoch),
34
35    // Peer Errors
36    /// The sender's public key doesn't match the expected key
37    #[error("Peer mismatch")]
38    PeerMismatch,
39
40    // Signature Errors
41    /// The acknowledgment signature is invalid
42    #[error("Invalid ack signature")]
43    InvalidAckSignature,
44
45    // Ignorable Message Errors
46    /// The acknowledgment's epoch is outside the accepted bounds
47    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
48    AckEpochOutsideBounds(Epoch, Epoch, Epoch),
49    /// The acknowledgment's height is outside the accepted bounds
50    #[error("Non-useful ack height {0}")]
51    AckHeight(Height),
52    /// The acknowledgment's digest is incorrect
53    #[error("Invalid ack digest {0}")]
54    AckDigest(Height),
55    /// Duplicate acknowledgment for the same height
56    #[error("Duplicate ack from sender {0} for height {1}")]
57    AckDuplicate(String, Height),
58    /// The acknowledgement is for a height that already has a certificate
59    #[error("Ack for height {0} already has been certified")]
60    AckCertified(Height),
61    /// The epoch is unknown
62    #[error("Unknown epoch {0}")]
63    UnknownEpoch(Epoch),
64}
65
66impl Error {
67    /// Returns true if the error represents a blockable offense by a peer.
68    pub const fn blockable(&self) -> bool {
69        matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
70    }
71}
72
73/// Suffix used to identify an acknowledgment (ack) namespace for domain separation.
74/// Used when signing and verifying acks to prevent signature reuse across different message types.
75const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
76
77/// Returns a suffixed namespace for signing an ack.
78///
79/// This provides domain separation for signatures, preventing cross-protocol attacks
80/// by ensuring signatures for acks cannot be reused for other message types.
81#[inline]
82fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
83    union(namespace, ACK_SUFFIX)
84}
85
86/// Namespace type for aggregation acknowledgments.
87///
88/// This type encapsulates the pre-computed namespace bytes used for signing and
89/// verifying acks.
90#[derive(Clone, Debug)]
91pub struct Namespace(Vec<u8>);
92
93impl certificate::Namespace for Namespace {
94    fn derive(namespace: &[u8]) -> Self {
95        Self(ack_namespace(namespace))
96    }
97}
98
99/// Item represents a single element being aggregated in the protocol.
100/// Each item has a unique height and contains a digest that validators sign.
101#[derive(Clone, Debug, PartialEq, Eq, Hash)]
102pub struct Item<D: Digest> {
103    /// Sequential position of this item within the current epoch
104    pub height: Height,
105    /// Cryptographic digest of the data being aggregated
106    pub digest: D,
107}
108
109impl<D: Digest> Heightable for Item<D> {
110    fn height(&self) -> Height {
111        self.height
112    }
113}
114
115impl<D: Digest> Write for Item<D> {
116    fn write(&self, writer: &mut impl BufMut) {
117        self.height.write(writer);
118        self.digest.write(writer);
119    }
120}
121
122impl<D: Digest> Read for Item<D> {
123    type Cfg = ();
124
125    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
126        let height = Height::read(reader)?;
127        let digest = D::read(reader)?;
128        Ok(Self { height, digest })
129    }
130}
131
132impl<D: Digest> EncodeSize for Item<D> {
133    fn encode_size(&self) -> usize {
134        self.height.encode_size() + self.digest.encode_size()
135    }
136}
137
138impl<D: Digest> Subject for &Item<D> {
139    type Namespace = Namespace;
140
141    fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
142        &derived.0
143    }
144
145    fn message(&self) -> Bytes {
146        self.encode()
147    }
148}
149
150#[cfg(feature = "arbitrary")]
151impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
152where
153    D: for<'a> arbitrary::Arbitrary<'a>,
154{
155    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
156        let height = u.arbitrary::<Height>()?;
157        let digest = u.arbitrary::<D>()?;
158        Ok(Self { height, digest })
159    }
160}
161
162/// Acknowledgment (ack) represents a validator's vote on an item.
163/// Multiple acks can be recovered into a certificate for consensus.
164#[derive(Clone, Debug, PartialEq, Eq, Hash)]
165pub struct Ack<S: Scheme, D: Digest> {
166    /// The item being acknowledged
167    pub item: Item<D>,
168    /// The epoch in which this acknowledgment was created
169    pub epoch: Epoch,
170    /// Scheme-specific attestation material
171    pub attestation: Attestation<S>,
172}
173
174impl<S: Scheme, D: Digest> Ack<S, D> {
175    /// Verifies the attestation on this acknowledgment.
176    ///
177    /// Returns `true` if the attestation is valid for the given namespace and public key.
178    /// Domain separation is automatically applied to prevent signature reuse.
179    pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
180    where
181        R: CryptoRngCore,
182        S: scheme::Scheme<D>,
183    {
184        scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation, strategy)
185    }
186
187    /// Creates a new acknowledgment by signing an item with a validator's key.
188    ///
189    /// The signature uses domain separation to prevent cross-protocol attacks.
190    ///
191    /// # Determinism
192    ///
193    /// Signatures produced by this function are deterministic and safe for consensus.
194    pub fn sign(scheme: &S, epoch: Epoch, item: Item<D>) -> Option<Self>
195    where
196        S: scheme::Scheme<D>,
197    {
198        let attestation = scheme.sign::<D>(&item)?;
199        Some(Self {
200            item,
201            epoch,
202            attestation,
203        })
204    }
205}
206
207impl<S: Scheme, D: Digest> Write for Ack<S, D> {
208    fn write(&self, writer: &mut impl BufMut) {
209        self.item.write(writer);
210        self.epoch.write(writer);
211        self.attestation.write(writer);
212    }
213}
214
215impl<S: Scheme, D: Digest> Read for Ack<S, D> {
216    type Cfg = ();
217
218    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
219        let item = Item::read(reader)?;
220        let epoch = Epoch::read(reader)?;
221        let attestation = Attestation::read(reader)?;
222        Ok(Self {
223            item,
224            epoch,
225            attestation,
226        })
227    }
228}
229
230impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
231    fn encode_size(&self) -> usize {
232        self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
233    }
234}
235
236#[cfg(feature = "arbitrary")]
237impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
238where
239    S::Signature: for<'a> arbitrary::Arbitrary<'a>,
240    D: for<'a> arbitrary::Arbitrary<'a>,
241{
242    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
243        let item = u.arbitrary::<Item<D>>()?;
244        let epoch = u.arbitrary::<Epoch>()?;
245        let attestation = Attestation::arbitrary(u)?;
246        Ok(Self {
247            item,
248            epoch,
249            attestation,
250        })
251    }
252}
253
254/// Message exchanged between peers containing an acknowledgment and tip information.
255/// This combines a validator's vote with their view of consensus progress.
256#[derive(Clone, Debug, PartialEq, Eq, Hash)]
257pub struct TipAck<S: Scheme, D: Digest> {
258    /// The peer's local view of the tip (the lowest height that is not yet confirmed).
259    pub tip: Height,
260
261    /// The peer's acknowledgement (vote) for an item.
262    pub ack: Ack<S, D>,
263}
264
265impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
266    fn write(&self, writer: &mut impl BufMut) {
267        self.tip.write(writer);
268        self.ack.write(writer);
269    }
270}
271
272impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
273    type Cfg = ();
274
275    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
276        let tip = Height::read(reader)?;
277        let ack = Ack::read(reader)?;
278        Ok(Self { tip, ack })
279    }
280}
281
282impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
283    fn encode_size(&self) -> usize {
284        self.tip.encode_size() + self.ack.encode_size()
285    }
286}
287
288#[cfg(feature = "arbitrary")]
289impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
290where
291    D: for<'a> arbitrary::Arbitrary<'a>,
292    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
293{
294    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
295        let tip = u.arbitrary::<Height>()?;
296        let ack = u.arbitrary::<Ack<S, D>>()?;
297        Ok(Self { tip, ack })
298    }
299}
300
301/// A recovered certificate for some [Item].
302#[derive(Clone, Debug, PartialEq, Eq, Hash)]
303pub struct Certificate<S: Scheme, D: Digest> {
304    /// The item that was recovered.
305    pub item: Item<D>,
306    /// The recovered certificate.
307    pub certificate: S::Certificate,
308}
309
310impl<S: Scheme, D: Digest> Certificate<S, D> {
311    pub fn from_acks<'a, I>(scheme: &S, acks: I, strategy: &impl Strategy) -> Option<Self>
312    where
313        S: scheme::Scheme<D>,
314        I: IntoIterator<Item = &'a Ack<S, D>>,
315        I::IntoIter: Send,
316    {
317        let mut iter = acks.into_iter().peekable();
318        let item = iter.peek()?.item.clone();
319        let attestations = iter
320            .filter(|ack| ack.item == item)
321            .map(|ack| ack.attestation.clone());
322        let certificate = scheme.assemble::<_, N3f1>(attestations, strategy)?;
323
324        Some(Self { item, certificate })
325    }
326
327    /// Verifies the recovered certificate for the item.
328    pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
329    where
330        R: CryptoRngCore,
331        S: scheme::Scheme<D>,
332    {
333        scheme.verify_certificate::<_, D, N3f1>(rng, &self.item, &self.certificate, strategy)
334    }
335}
336
337impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
338    fn write(&self, writer: &mut impl BufMut) {
339        self.item.write(writer);
340        self.certificate.write(writer);
341    }
342}
343
344impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
345    type Cfg = <S::Certificate as Read>::Cfg;
346
347    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
348        let item = Item::read(reader)?;
349        let certificate = S::Certificate::read_cfg(reader, cfg)?;
350        Ok(Self { item, certificate })
351    }
352}
353
354impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
355    fn encode_size(&self) -> usize {
356        self.item.encode_size() + self.certificate.encode_size()
357    }
358}
359
360#[cfg(feature = "arbitrary")]
361impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
362where
363    D: for<'a> arbitrary::Arbitrary<'a>,
364    S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
365{
366    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
367        let item = u.arbitrary::<Item<D>>()?;
368        let certificate = u.arbitrary::<S::Certificate>()?;
369        Ok(Self { item, certificate })
370    }
371}
372
373/// Used as [Reporter::Activity](crate::Reporter::Activity) to report activities that occur during
374/// aggregation. Also used to journal events that are needed to initialize the aggregation engine
375/// when the node restarts.
376#[derive(Clone, Debug, PartialEq)]
377pub enum Activity<S: Scheme, D: Digest> {
378    /// Received an ack from a participant.
379    Ack(Ack<S, D>),
380
381    /// Certified an [Item].
382    Certified(Certificate<S, D>),
383
384    /// Moved the tip to a new height.
385    Tip(Height),
386}
387
388impl<S: Scheme, D: Digest> Write for Activity<S, D> {
389    fn write(&self, writer: &mut impl BufMut) {
390        match self {
391            Self::Ack(ack) => {
392                0u8.write(writer);
393                ack.write(writer);
394            }
395            Self::Certified(certificate) => {
396                1u8.write(writer);
397                certificate.write(writer);
398            }
399            Self::Tip(height) => {
400                2u8.write(writer);
401                height.write(writer);
402            }
403        }
404    }
405}
406
407impl<S: Scheme, D: Digest> Read for Activity<S, D> {
408    type Cfg = <S::Certificate as Read>::Cfg;
409
410    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
411        match u8::read(reader)? {
412            0 => Ok(Self::Ack(Ack::read(reader)?)),
413            1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
414            2 => Ok(Self::Tip(Height::read(reader)?)),
415            _ => Err(CodecError::Invalid(
416                "consensus::aggregation::Activity",
417                "Invalid type",
418            )),
419        }
420    }
421}
422
423impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
424    fn encode_size(&self) -> usize {
425        1 + match self {
426            Self::Ack(ack) => ack.encode_size(),
427            Self::Certified(certificate) => certificate.encode_size(),
428            Self::Tip(height) => height.encode_size(),
429        }
430    }
431}
432
433#[cfg(feature = "arbitrary")]
434impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
435where
436    D: for<'a> arbitrary::Arbitrary<'a>,
437    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
438    Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
439{
440    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
441        let choice = u.int_in_range(0..=2)?;
442        match choice {
443            0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
444            1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
445            2 => Ok(Self::Tip(u.arbitrary::<Height>()?)),
446            _ => unreachable!(),
447        }
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use crate::aggregation::scheme::{
455        bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme,
456    };
457    use bytes::BytesMut;
458    use commonware_codec::{Decode, DecodeExt, Encode};
459    use commonware_cryptography::{
460        bls12381::primitives::variant::{MinPk, MinSig},
461        certificate::mocks::Fixture,
462        Hasher, Sha256,
463    };
464    use commonware_parallel::Sequential;
465    use commonware_utils::{ordered::Quorum, test_rng, N3f1};
466    use rand::rngs::StdRng;
467
468    const NAMESPACE: &[u8] = b"test";
469
470    type Sha256Digest = <Sha256 as Hasher>::Digest;
471
472    #[test]
473    fn test_ack_namespace() {
474        let namespace = b"test_namespace";
475        let expected = [namespace, ACK_SUFFIX].concat();
476        assert_eq!(ack_namespace(namespace), expected);
477    }
478
479    fn codec<S, F>(fixture: F)
480    where
481        S: Scheme<Sha256Digest>,
482        F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
483    {
484        let mut rng = test_rng();
485        let fixture = fixture(&mut rng, NAMESPACE, 4);
486        let schemes = &fixture.schemes;
487        let item = Item {
488            height: Height::new(100),
489            digest: Sha256::hash(b"test_item"),
490        };
491
492        // Test Item codec
493        let restored_item = Item::decode(item.encode()).unwrap();
494        assert_eq!(item, restored_item);
495
496        // Test Ack creation and codec
497        let ack = Ack::sign(&schemes[0], Epoch::new(1), item.clone()).unwrap();
498        let cfg = schemes[0].certificate_codec_config();
499        let encoded_ack = ack.encode();
500        let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
501
502        // Verify the restored ack
503        assert_eq!(restored_ack.item, item);
504        assert_eq!(restored_ack.epoch, Epoch::new(1));
505        assert!(restored_ack.verify(&mut rng, &schemes[0], &Sequential));
506
507        // Test TipAck codec
508        let tip_ack = TipAck {
509            ack: ack.clone(),
510            tip: Height::new(42),
511        };
512        let encoded_tip_ack = tip_ack.encode();
513        let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
514        assert_eq!(restored_tip_ack.tip, Height::new(42));
515        assert_eq!(restored_tip_ack.ack.item, item);
516        assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
517
518        // Test Activity codec - Ack variant
519        let activity_ack = Activity::Ack(ack);
520        let encoded_activity = activity_ack.encode();
521        let restored_activity_ack: Activity<S, Sha256Digest> =
522            Activity::decode_cfg(encoded_activity, &cfg).unwrap();
523        if let Activity::Ack(restored) = restored_activity_ack {
524            assert_eq!(restored.item, item);
525            assert_eq!(restored.epoch, Epoch::new(1));
526        } else {
527            panic!("Expected Activity::Ack");
528        }
529
530        // Test Activity codec - Certified variant
531        // Collect enough acks for a certificate
532        let acks: Vec<_> = schemes
533            .iter()
534            .take(schemes[0].participants().quorum::<N3f1>() as usize)
535            .filter_map(|scheme| Ack::sign(scheme, Epoch::new(1), item.clone()))
536            .collect();
537
538        let certificate = Certificate::from_acks(&schemes[0], &acks, &Sequential).unwrap();
539        assert!(certificate.verify(&mut rng, &schemes[0], &Sequential));
540
541        let activity_certified = Activity::Certified(certificate.clone());
542        let encoded_certified = activity_certified.encode();
543        let restored_activity_certified: Activity<S, Sha256Digest> =
544            Activity::decode_cfg(encoded_certified, &cfg).unwrap();
545        if let Activity::Certified(restored) = restored_activity_certified {
546            assert_eq!(restored.item, item);
547            assert!(restored.verify(&mut rng, &schemes[0], &Sequential));
548        } else {
549            panic!("Expected Activity::Certified");
550        }
551
552        // Test Activity codec - Tip variant
553        let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(Height::new(123));
554        let encoded_tip = activity_tip.encode();
555        let restored_activity_tip: Activity<S, Sha256Digest> =
556            Activity::decode_cfg(encoded_tip, &cfg).unwrap();
557        if let Activity::Tip(height) = restored_activity_tip {
558            assert_eq!(height, Height::new(123));
559        } else {
560            panic!("Expected Activity::Tip");
561        }
562    }
563
564    #[test]
565    fn test_codec() {
566        codec(ed25519::fixture);
567        codec(secp256r1::fixture);
568        codec(bls12381_multisig::fixture::<MinPk, _>);
569        codec(bls12381_multisig::fixture::<MinSig, _>);
570        codec(bls12381_threshold::fixture::<MinPk, _>);
571        codec(bls12381_threshold::fixture::<MinSig, _>);
572    }
573
574    fn activity_invalid_enum<S, F>(fixture: F)
575    where
576        S: Scheme<Sha256Digest>,
577        F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
578    {
579        let fixture = fixture(&mut test_rng(), NAMESPACE, 4);
580        let mut buf = BytesMut::new();
581        3u8.write(&mut buf); // Invalid discriminant
582
583        let cfg = fixture.schemes[0].certificate_codec_config();
584        let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
585        assert!(matches!(
586            result,
587            Err(CodecError::Invalid(
588                "consensus::aggregation::Activity",
589                "Invalid type"
590            ))
591        ));
592    }
593
594    #[test]
595    fn test_activity_invalid_enum() {
596        activity_invalid_enum(ed25519::fixture);
597        activity_invalid_enum(secp256r1::fixture);
598        activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
599        activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
600        activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
601        activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
602    }
603
604    #[cfg(feature = "arbitrary")]
605    mod conformance {
606        use super::*;
607        use crate::aggregation::scheme::bls12381_threshold;
608        use commonware_codec::conformance::CodecConformance;
609        use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
610
611        type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
612
613        commonware_conformance::conformance_tests! {
614            CodecConformance<Item<Sha256Digest>>,
615            CodecConformance<Ack<Scheme, Sha256Digest>>,
616            CodecConformance<TipAck<Scheme, Sha256Digest>>,
617            CodecConformance<Certificate<Scheme, Sha256Digest>>,
618            CodecConformance<Activity<Scheme, Sha256Digest>>,
619        }
620    }
621}