Skip to main content

commonware_consensus/aggregation/
types.rs

1//! Types used in [aggregation](super).
2
3use crate::{
4    aggregation::scheme,
5    types::{Epoch, Height},
6    Heightable,
7};
8use bytes::{Buf, BufMut, Bytes};
9use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write};
10use commonware_cryptography::{
11    certificate::{self, Attestation, Scheme, Subject},
12    Digest,
13};
14use commonware_parallel::Strategy;
15use commonware_utils::{channel::oneshot, union, N3f1};
16use rand_core::CryptoRngCore;
17use std::hash::Hash;
18
19/// Error that may be encountered when interacting with `aggregation`.
20#[derive(Debug, thiserror::Error)]
21pub enum Error {
22    // Proposal Errors
23    /// The proposal was canceled by the application
24    #[error("Application verify error: {0}")]
25    AppProposeCanceled(oneshot::error::RecvError),
26
27    // P2P Errors
28    /// Unable to send a message over the P2P network
29    #[error("Unable to send message")]
30    UnableToSendMessage,
31
32    // Epoch Errors
33    /// The specified validator is not a participant in the epoch
34    #[error("Epoch {0} has no validator {1}")]
35    UnknownValidator(Epoch, String),
36    /// The local node is not a signer in the scheme for the specified epoch.
37    #[error("Not a signer at epoch {0}")]
38    NotSigner(Epoch),
39
40    // Peer Errors
41    /// The sender's public key doesn't match the expected key
42    #[error("Peer mismatch")]
43    PeerMismatch,
44
45    // Signature Errors
46    /// The acknowledgment signature is invalid
47    #[error("Invalid ack signature")]
48    InvalidAckSignature,
49
50    // Ignorable Message Errors
51    /// The acknowledgment's epoch is outside the accepted bounds
52    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
53    AckEpochOutsideBounds(Epoch, Epoch, Epoch),
54    /// The acknowledgment's height is outside the accepted bounds
55    #[error("Non-useful ack height {0}")]
56    AckHeight(Height),
57    /// The acknowledgment's digest is incorrect
58    #[error("Invalid ack digest {0}")]
59    AckDigest(Height),
60    /// Duplicate acknowledgment for the same height
61    #[error("Duplicate ack from sender {0} for height {1}")]
62    AckDuplicate(String, Height),
63    /// The acknowledgement is for a height that already has a certificate
64    #[error("Ack for height {0} already has been certified")]
65    AckCertified(Height),
66    /// The epoch is unknown
67    #[error("Unknown epoch {0}")]
68    UnknownEpoch(Epoch),
69}
70
71impl Error {
72    /// Returns true if the error represents a blockable offense by a peer.
73    pub const fn blockable(&self) -> bool {
74        matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
75    }
76}
77
78/// Suffix used to identify an acknowledgment (ack) namespace for domain separation.
79/// Used when signing and verifying acks to prevent signature reuse across different message types.
80const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
81
82/// Returns a suffixed namespace for signing an ack.
83///
84/// This provides domain separation for signatures, preventing cross-protocol attacks
85/// by ensuring signatures for acks cannot be reused for other message types.
86#[inline]
87fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
88    union(namespace, ACK_SUFFIX)
89}
90
91/// Namespace type for aggregation acknowledgments.
92///
93/// This type encapsulates the pre-computed namespace bytes used for signing and
94/// verifying acks.
95#[derive(Clone, Debug)]
96pub struct Namespace(Vec<u8>);
97
98impl certificate::Namespace for Namespace {
99    fn derive(namespace: &[u8]) -> Self {
100        Self(ack_namespace(namespace))
101    }
102}
103
104/// Item represents a single element being aggregated in the protocol.
105/// Each item has a unique height and contains a digest that validators sign.
106#[derive(Clone, Debug, PartialEq, Eq, Hash)]
107pub struct Item<D: Digest> {
108    /// Sequential position of this item within the current epoch
109    pub height: Height,
110    /// Cryptographic digest of the data being aggregated
111    pub digest: D,
112}
113
114impl<D: Digest> Heightable for Item<D> {
115    fn height(&self) -> Height {
116        self.height
117    }
118}
119
120impl<D: Digest> Write for Item<D> {
121    fn write(&self, writer: &mut impl BufMut) {
122        self.height.write(writer);
123        self.digest.write(writer);
124    }
125}
126
127impl<D: Digest> Read for Item<D> {
128    type Cfg = ();
129
130    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
131        let height = Height::read(reader)?;
132        let digest = D::read(reader)?;
133        Ok(Self { height, digest })
134    }
135}
136
137impl<D: Digest> EncodeSize for Item<D> {
138    fn encode_size(&self) -> usize {
139        self.height.encode_size() + self.digest.encode_size()
140    }
141}
142
143impl<D: Digest> Subject for &Item<D> {
144    type Namespace = Namespace;
145
146    fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
147        &derived.0
148    }
149
150    fn message(&self) -> Bytes {
151        self.encode()
152    }
153}
154
155#[cfg(feature = "arbitrary")]
156impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
157where
158    D: for<'a> arbitrary::Arbitrary<'a>,
159{
160    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
161        let height = u.arbitrary::<Height>()?;
162        let digest = u.arbitrary::<D>()?;
163        Ok(Self { height, digest })
164    }
165}
166
167/// Acknowledgment (ack) represents a validator's vote on an item.
168/// Multiple acks can be recovered into a certificate for consensus.
169#[derive(Clone, Debug, PartialEq, Eq, Hash)]
170pub struct Ack<S: Scheme, D: Digest> {
171    /// The item being acknowledged
172    pub item: Item<D>,
173    /// The epoch in which this acknowledgment was created
174    pub epoch: Epoch,
175    /// Scheme-specific attestation material
176    pub attestation: Attestation<S>,
177}
178
179impl<S: Scheme, D: Digest> Ack<S, D> {
180    /// Verifies the attestation on this acknowledgment.
181    ///
182    /// Returns `true` if the attestation is valid for the given namespace and public key.
183    /// Domain separation is automatically applied to prevent signature reuse.
184    pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
185    where
186        R: CryptoRngCore,
187        S: scheme::Scheme<D>,
188    {
189        scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation, strategy)
190    }
191
192    /// Creates a new acknowledgment by signing an item with a validator's key.
193    ///
194    /// The signature uses domain separation to prevent cross-protocol attacks.
195    ///
196    /// # Determinism
197    ///
198    /// Signatures produced by this function are deterministic and safe for consensus.
199    pub fn sign(scheme: &S, epoch: Epoch, item: Item<D>) -> Option<Self>
200    where
201        S: scheme::Scheme<D>,
202    {
203        let attestation = scheme.sign::<D>(&item)?;
204        Some(Self {
205            item,
206            epoch,
207            attestation,
208        })
209    }
210}
211
212impl<S: Scheme, D: Digest> Write for Ack<S, D> {
213    fn write(&self, writer: &mut impl BufMut) {
214        self.item.write(writer);
215        self.epoch.write(writer);
216        self.attestation.write(writer);
217    }
218}
219
220impl<S: Scheme, D: Digest> Read for Ack<S, D> {
221    type Cfg = ();
222
223    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
224        let item = Item::read(reader)?;
225        let epoch = Epoch::read(reader)?;
226        let attestation = Attestation::read(reader)?;
227        Ok(Self {
228            item,
229            epoch,
230            attestation,
231        })
232    }
233}
234
235impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
236    fn encode_size(&self) -> usize {
237        self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
238    }
239}
240
241#[cfg(feature = "arbitrary")]
242impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
243where
244    S::Signature: for<'a> arbitrary::Arbitrary<'a>,
245    D: for<'a> arbitrary::Arbitrary<'a>,
246{
247    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
248        let item = u.arbitrary::<Item<D>>()?;
249        let epoch = u.arbitrary::<Epoch>()?;
250        let attestation = Attestation::arbitrary(u)?;
251        Ok(Self {
252            item,
253            epoch,
254            attestation,
255        })
256    }
257}
258
259/// Message exchanged between peers containing an acknowledgment and tip information.
260/// This combines a validator's vote with their view of consensus progress.
261#[derive(Clone, Debug, PartialEq, Eq, Hash)]
262pub struct TipAck<S: Scheme, D: Digest> {
263    /// The peer's local view of the tip (the lowest height that is not yet confirmed).
264    pub tip: Height,
265
266    /// The peer's acknowledgement (vote) for an item.
267    pub ack: Ack<S, D>,
268}
269
270impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
271    fn write(&self, writer: &mut impl BufMut) {
272        self.tip.write(writer);
273        self.ack.write(writer);
274    }
275}
276
277impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
278    type Cfg = ();
279
280    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
281        let tip = Height::read(reader)?;
282        let ack = Ack::read(reader)?;
283        Ok(Self { tip, ack })
284    }
285}
286
287impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
288    fn encode_size(&self) -> usize {
289        self.tip.encode_size() + self.ack.encode_size()
290    }
291}
292
293#[cfg(feature = "arbitrary")]
294impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
295where
296    D: for<'a> arbitrary::Arbitrary<'a>,
297    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
298{
299    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
300        let tip = u.arbitrary::<Height>()?;
301        let ack = u.arbitrary::<Ack<S, D>>()?;
302        Ok(Self { tip, ack })
303    }
304}
305
306/// A recovered certificate for some [Item].
307#[derive(Clone, Debug, PartialEq, Eq, Hash)]
308pub struct Certificate<S: Scheme, D: Digest> {
309    /// The item that was recovered.
310    pub item: Item<D>,
311    /// The recovered certificate.
312    pub certificate: S::Certificate,
313}
314
315impl<S: Scheme, D: Digest> Certificate<S, D> {
316    pub fn from_acks<'a, I>(scheme: &S, acks: I, strategy: &impl Strategy) -> Option<Self>
317    where
318        S: scheme::Scheme<D>,
319        I: IntoIterator<Item = &'a Ack<S, D>>,
320        I::IntoIter: Send,
321    {
322        let mut iter = acks.into_iter().peekable();
323        let item = iter.peek()?.item.clone();
324        let attestations = iter
325            .filter(|ack| ack.item == item)
326            .map(|ack| ack.attestation.clone());
327        let certificate = scheme.assemble::<_, N3f1>(attestations.into_iter(), strategy)?;
328
329        Some(Self { item, certificate })
330    }
331
332    /// Verifies the recovered certificate for the item.
333    pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
334    where
335        R: CryptoRngCore,
336        S: scheme::Scheme<D>,
337    {
338        scheme.verify_certificate::<_, D, N3f1>(rng, &self.item, &self.certificate, strategy)
339    }
340}
341
342impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
343    fn write(&self, writer: &mut impl BufMut) {
344        self.item.write(writer);
345        self.certificate.write(writer);
346    }
347}
348
349impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
350    type Cfg = <S::Certificate as Read>::Cfg;
351
352    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
353        let item = Item::read(reader)?;
354        let certificate = S::Certificate::read_cfg(reader, cfg)?;
355        Ok(Self { item, certificate })
356    }
357}
358
359impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
360    fn encode_size(&self) -> usize {
361        self.item.encode_size() + self.certificate.encode_size()
362    }
363}
364
365#[cfg(feature = "arbitrary")]
366impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
367where
368    D: for<'a> arbitrary::Arbitrary<'a>,
369    S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
370{
371    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
372        let item = u.arbitrary::<Item<D>>()?;
373        let certificate = u.arbitrary::<S::Certificate>()?;
374        Ok(Self { item, certificate })
375    }
376}
377
378/// Used as [Reporter::Activity](crate::Reporter::Activity) to report activities that occur during
379/// aggregation. Also used to journal events that are needed to initialize the aggregation engine
380/// when the node restarts.
381#[derive(Clone, Debug, PartialEq)]
382pub enum Activity<S: Scheme, D: Digest> {
383    /// Received an ack from a participant.
384    Ack(Ack<S, D>),
385
386    /// Certified an [Item].
387    Certified(Certificate<S, D>),
388
389    /// Moved the tip to a new height.
390    Tip(Height),
391}
392
393impl<S: Scheme, D: Digest> Write for Activity<S, D> {
394    fn write(&self, writer: &mut impl BufMut) {
395        match self {
396            Self::Ack(ack) => {
397                0u8.write(writer);
398                ack.write(writer);
399            }
400            Self::Certified(certificate) => {
401                1u8.write(writer);
402                certificate.write(writer);
403            }
404            Self::Tip(height) => {
405                2u8.write(writer);
406                height.write(writer);
407            }
408        }
409    }
410}
411
412impl<S: Scheme, D: Digest> Read for Activity<S, D> {
413    type Cfg = <S::Certificate as Read>::Cfg;
414
415    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
416        match u8::read(reader)? {
417            0 => Ok(Self::Ack(Ack::read(reader)?)),
418            1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
419            2 => Ok(Self::Tip(Height::read(reader)?)),
420            _ => Err(CodecError::Invalid(
421                "consensus::aggregation::Activity",
422                "Invalid type",
423            )),
424        }
425    }
426}
427
428impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
429    fn encode_size(&self) -> usize {
430        1 + match self {
431            Self::Ack(ack) => ack.encode_size(),
432            Self::Certified(certificate) => certificate.encode_size(),
433            Self::Tip(height) => height.encode_size(),
434        }
435    }
436}
437
438#[cfg(feature = "arbitrary")]
439impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
440where
441    D: for<'a> arbitrary::Arbitrary<'a>,
442    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
443    Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
444{
445    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
446        let choice = u.int_in_range(0..=2)?;
447        match choice {
448            0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
449            1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
450            2 => Ok(Self::Tip(u.arbitrary::<Height>()?)),
451            _ => unreachable!(),
452        }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use crate::aggregation::scheme::{
460        bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme,
461    };
462    use bytes::BytesMut;
463    use commonware_codec::{Decode, DecodeExt, Encode};
464    use commonware_cryptography::{
465        bls12381::primitives::variant::{MinPk, MinSig},
466        certificate::mocks::Fixture,
467        Hasher, Sha256,
468    };
469    use commonware_parallel::Sequential;
470    use commonware_utils::{ordered::Quorum, test_rng, N3f1};
471    use rand::rngs::StdRng;
472
473    const NAMESPACE: &[u8] = b"test";
474
475    type Sha256Digest = <Sha256 as Hasher>::Digest;
476
477    #[test]
478    fn test_ack_namespace() {
479        let namespace = b"test_namespace";
480        let expected = [namespace, ACK_SUFFIX].concat();
481        assert_eq!(ack_namespace(namespace), expected);
482    }
483
484    fn codec<S, F>(fixture: F)
485    where
486        S: Scheme<Sha256Digest>,
487        F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
488    {
489        let mut rng = test_rng();
490        let fixture = fixture(&mut rng, NAMESPACE, 4);
491        let schemes = &fixture.schemes;
492        let item = Item {
493            height: Height::new(100),
494            digest: Sha256::hash(b"test_item"),
495        };
496
497        // Test Item codec
498        let restored_item = Item::decode(item.encode()).unwrap();
499        assert_eq!(item, restored_item);
500
501        // Test Ack creation and codec
502        let ack = Ack::sign(&schemes[0], Epoch::new(1), item.clone()).unwrap();
503        let cfg = schemes[0].certificate_codec_config();
504        let encoded_ack = ack.encode();
505        let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
506
507        // Verify the restored ack
508        assert_eq!(restored_ack.item, item);
509        assert_eq!(restored_ack.epoch, Epoch::new(1));
510        assert!(restored_ack.verify(&mut rng, &schemes[0], &Sequential));
511
512        // Test TipAck codec
513        let tip_ack = TipAck {
514            ack: ack.clone(),
515            tip: Height::new(42),
516        };
517        let encoded_tip_ack = tip_ack.encode();
518        let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
519        assert_eq!(restored_tip_ack.tip, Height::new(42));
520        assert_eq!(restored_tip_ack.ack.item, item);
521        assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
522
523        // Test Activity codec - Ack variant
524        let activity_ack = Activity::Ack(ack);
525        let encoded_activity = activity_ack.encode();
526        let restored_activity_ack: Activity<S, Sha256Digest> =
527            Activity::decode_cfg(encoded_activity, &cfg).unwrap();
528        if let Activity::Ack(restored) = restored_activity_ack {
529            assert_eq!(restored.item, item);
530            assert_eq!(restored.epoch, Epoch::new(1));
531        } else {
532            panic!("Expected Activity::Ack");
533        }
534
535        // Test Activity codec - Certified variant
536        // Collect enough acks for a certificate
537        let acks: Vec<_> = schemes
538            .iter()
539            .take(schemes[0].participants().quorum::<N3f1>() as usize)
540            .filter_map(|scheme| Ack::sign(scheme, Epoch::new(1), item.clone()))
541            .collect();
542
543        let certificate = Certificate::from_acks(&schemes[0], &acks, &Sequential).unwrap();
544        assert!(certificate.verify(&mut rng, &schemes[0], &Sequential));
545
546        let activity_certified = Activity::Certified(certificate.clone());
547        let encoded_certified = activity_certified.encode();
548        let restored_activity_certified: Activity<S, Sha256Digest> =
549            Activity::decode_cfg(encoded_certified, &cfg).unwrap();
550        if let Activity::Certified(restored) = restored_activity_certified {
551            assert_eq!(restored.item, item);
552            assert!(restored.verify(&mut rng, &schemes[0], &Sequential));
553        } else {
554            panic!("Expected Activity::Certified");
555        }
556
557        // Test Activity codec - Tip variant
558        let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(Height::new(123));
559        let encoded_tip = activity_tip.encode();
560        let restored_activity_tip: Activity<S, Sha256Digest> =
561            Activity::decode_cfg(encoded_tip, &cfg).unwrap();
562        if let Activity::Tip(height) = restored_activity_tip {
563            assert_eq!(height, Height::new(123));
564        } else {
565            panic!("Expected Activity::Tip");
566        }
567    }
568
569    #[test]
570    fn test_codec() {
571        codec(ed25519::fixture);
572        codec(secp256r1::fixture);
573        codec(bls12381_multisig::fixture::<MinPk, _>);
574        codec(bls12381_multisig::fixture::<MinSig, _>);
575        codec(bls12381_threshold::fixture::<MinPk, _>);
576        codec(bls12381_threshold::fixture::<MinSig, _>);
577    }
578
579    fn activity_invalid_enum<S, F>(fixture: F)
580    where
581        S: Scheme<Sha256Digest>,
582        F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
583    {
584        let fixture = fixture(&mut test_rng(), NAMESPACE, 4);
585        let mut buf = BytesMut::new();
586        3u8.write(&mut buf); // Invalid discriminant
587
588        let cfg = fixture.schemes[0].certificate_codec_config();
589        let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
590        assert!(matches!(
591            result,
592            Err(CodecError::Invalid(
593                "consensus::aggregation::Activity",
594                "Invalid type"
595            ))
596        ));
597    }
598
599    #[test]
600    fn test_activity_invalid_enum() {
601        activity_invalid_enum(ed25519::fixture);
602        activity_invalid_enum(secp256r1::fixture);
603        activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
604        activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
605        activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
606        activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
607    }
608
609    #[cfg(feature = "arbitrary")]
610    mod conformance {
611        use super::*;
612        use crate::aggregation::scheme::bls12381_threshold;
613        use commonware_codec::conformance::CodecConformance;
614        use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
615
616        type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
617
618        commonware_conformance::conformance_tests! {
619            CodecConformance<Item<Sha256Digest>>,
620            CodecConformance<Ack<Scheme, Sha256Digest>>,
621            CodecConformance<TipAck<Scheme, Sha256Digest>>,
622            CodecConformance<Certificate<Scheme, Sha256Digest>>,
623            CodecConformance<Activity<Scheme, Sha256Digest>>,
624        }
625    }
626}