commonware_consensus/aggregation/
types.rs

1//! Types used in [aggregation](super).
2
3use crate::{aggregation::scheme, types::Epoch};
4use bytes::{Buf, BufMut, Bytes};
5use commonware_codec::{
6    varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
7};
8use commonware_cryptography::{
9    certificate::{Attestation, Scheme, Subject},
10    Digest,
11};
12use commonware_utils::union;
13use futures::channel::oneshot;
14use rand::{CryptoRng, Rng};
15use std::hash::Hash;
16
17/// Error that may be encountered when interacting with `aggregation`.
18#[derive(Debug, thiserror::Error)]
19pub enum Error {
20    // Proposal Errors
21    /// The proposal was canceled by the application
22    #[error("Application verify error: {0}")]
23    AppProposeCanceled(oneshot::Canceled),
24
25    // P2P Errors
26    /// Unable to send a message over the P2P network
27    #[error("Unable to send message")]
28    UnableToSendMessage,
29
30    // Epoch Errors
31    /// The specified validator is not a participant in the epoch
32    #[error("Epoch {0} has no validator {1}")]
33    UnknownValidator(Epoch, String),
34    /// The local node is not a signer in the scheme for the specified epoch.
35    #[error("Not a signer at epoch {0}")]
36    NotSigner(Epoch),
37
38    // Peer Errors
39    /// The sender's public key doesn't match the expected key
40    #[error("Peer mismatch")]
41    PeerMismatch,
42
43    // Signature Errors
44    /// The acknowledgment signature is invalid
45    #[error("Invalid ack signature")]
46    InvalidAckSignature,
47
48    // Ignorable Message Errors
49    /// The acknowledgment's epoch is outside the accepted bounds
50    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
51    AckEpochOutsideBounds(Epoch, Epoch, Epoch),
52    /// The acknowledgment's height is outside the accepted bounds
53    #[error("Non-useful ack index {0}")]
54    AckIndex(Index),
55    /// The acknowledgment's digest is incorrect
56    #[error("Invalid ack digest {0}")]
57    AckDigest(Index),
58    /// Duplicate acknowledgment for the same index
59    #[error("Duplicate ack from sender {0} for index {1}")]
60    AckDuplicate(String, Index),
61    /// The acknowledgement is for an index that already has a certificate
62    #[error("Ack for index {0} already has been certified")]
63    AckCertified(Index),
64    /// The epoch is unknown
65    #[error("Unknown epoch {0}")]
66    UnknownEpoch(Epoch),
67}
68
69impl Error {
70    /// Returns true if the error represents a blockable offense by a peer.
71    pub const fn blockable(&self) -> bool {
72        matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
73    }
74}
75
76/// Index represents the sequential position of items being aggregated.
77/// Indices are monotonically increasing within each epoch.
78pub type Index = u64;
79
80/// Suffix used to identify an acknowledgment (ack) namespace for domain separation.
81/// Used when signing and verifying acks to prevent signature reuse across different message types.
82const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
83
84/// Returns a suffixed namespace for signing an ack.
85///
86/// This provides domain separation for signatures, preventing cross-protocol attacks
87/// by ensuring signatures for acks cannot be reused for other message types.
88#[inline]
89fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
90    union(namespace, ACK_SUFFIX)
91}
92
93/// Item represents a single element being aggregated in the protocol.
94/// Each item has a unique index and contains a digest that validators sign.
95#[derive(Clone, Debug, PartialEq, Eq, Hash)]
96pub struct Item<D: Digest> {
97    /// Sequential position of this item within the current epoch
98    pub index: Index,
99    /// Cryptographic digest of the data being aggregated
100    pub digest: D,
101}
102
103impl<D: Digest> Write for Item<D> {
104    fn write(&self, writer: &mut impl BufMut) {
105        UInt(self.index).write(writer);
106        self.digest.write(writer);
107    }
108}
109
110impl<D: Digest> Read for Item<D> {
111    type Cfg = ();
112
113    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
114        let index = UInt::read(reader)?.into();
115        let digest = D::read(reader)?;
116        Ok(Self { index, digest })
117    }
118}
119
120impl<D: Digest> EncodeSize for Item<D> {
121    fn encode_size(&self) -> usize {
122        UInt(self.index).encode_size() + self.digest.encode_size()
123    }
124}
125
126impl<D: Digest> Subject for &Item<D> {
127    fn namespace_and_message(&self, namespace: &[u8]) -> (Bytes, Bytes) {
128        (ack_namespace(namespace).into(), self.encode().freeze())
129    }
130}
131
132#[cfg(feature = "arbitrary")]
133impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
134where
135    D: for<'a> arbitrary::Arbitrary<'a>,
136{
137    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
138        let index = u.arbitrary::<u64>()?;
139        let digest = u.arbitrary::<D>()?;
140        Ok(Self { index, digest })
141    }
142}
143
144/// Acknowledgment (ack) represents a validator's vote on an item.
145/// Multiple acks can be recovered into a certificate for consensus.
146#[derive(Clone, Debug, PartialEq, Eq, Hash)]
147pub struct Ack<S: Scheme, D: Digest> {
148    /// The item being acknowledged
149    pub item: Item<D>,
150    /// The epoch in which this acknowledgment was created
151    pub epoch: Epoch,
152    /// Scheme-specific attestation material
153    pub attestation: Attestation<S>,
154}
155
156impl<S: Scheme, D: Digest> Ack<S, D> {
157    /// Verifies the attestation on this acknowledgment.
158    ///
159    /// Returns `true` if the attestation is valid for the given namespace and public key.
160    /// Domain separation is automatically applied to prevent signature reuse.
161    pub fn verify(&self, scheme: &S, namespace: &[u8]) -> bool
162    where
163        S: scheme::Scheme<D>,
164    {
165        scheme.verify_attestation::<D>(namespace, &self.item, &self.attestation)
166    }
167
168    /// Creates a new acknowledgment by signing an item with a validator's key.
169    ///
170    /// The signature uses domain separation to prevent cross-protocol attacks.
171    ///
172    /// # Determinism
173    ///
174    /// Signatures produced by this function are deterministic and safe for consensus.
175    pub fn sign(scheme: &S, namespace: &[u8], epoch: Epoch, item: Item<D>) -> Option<Self>
176    where
177        S: scheme::Scheme<D>,
178    {
179        let attestation = scheme.sign::<D>(namespace, &item)?;
180        Some(Self {
181            item,
182            epoch,
183            attestation,
184        })
185    }
186}
187
188impl<S: Scheme, D: Digest> Write for Ack<S, D> {
189    fn write(&self, writer: &mut impl BufMut) {
190        self.item.write(writer);
191        self.epoch.write(writer);
192        self.attestation.write(writer);
193    }
194}
195
196impl<S: Scheme, D: Digest> Read for Ack<S, D> {
197    type Cfg = ();
198
199    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
200        let item = Item::read(reader)?;
201        let epoch = Epoch::read(reader)?;
202        let attestation = Attestation::read(reader)?;
203        Ok(Self {
204            item,
205            epoch,
206            attestation,
207        })
208    }
209}
210
211impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
212    fn encode_size(&self) -> usize {
213        self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
214    }
215}
216
217#[cfg(feature = "arbitrary")]
218impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
219where
220    S::Signature: for<'a> arbitrary::Arbitrary<'a>,
221    D: for<'a> arbitrary::Arbitrary<'a>,
222{
223    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
224        let item = u.arbitrary::<Item<D>>()?;
225        let epoch = u.arbitrary::<Epoch>()?;
226        let attestation = Attestation::arbitrary(u)?;
227        Ok(Self {
228            item,
229            epoch,
230            attestation,
231        })
232    }
233}
234
235/// Message exchanged between peers containing an acknowledgment and tip information.
236/// This combines a validator's vote with their view of consensus progress.
237#[derive(Clone, Debug, PartialEq, Eq, Hash)]
238pub struct TipAck<S: Scheme, D: Digest> {
239    /// The peer's local view of the tip (the lowest index that is not yet confirmed).
240    pub tip: Index,
241
242    /// The peer's acknowledgement (vote) for an item.
243    pub ack: Ack<S, D>,
244}
245
246impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
247    fn write(&self, writer: &mut impl BufMut) {
248        UInt(self.tip).write(writer);
249        self.ack.write(writer);
250    }
251}
252
253impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
254    type Cfg = ();
255
256    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
257        let tip = UInt::read(reader)?.into();
258        let ack = Ack::read(reader)?;
259        Ok(Self { tip, ack })
260    }
261}
262
263impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
264    fn encode_size(&self) -> usize {
265        UInt(self.tip).encode_size() + self.ack.encode_size()
266    }
267}
268
269#[cfg(feature = "arbitrary")]
270impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
271where
272    D: for<'a> arbitrary::Arbitrary<'a>,
273    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
274{
275    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
276        let tip = u.arbitrary::<u64>()?;
277        let ack = u.arbitrary::<Ack<S, D>>()?;
278        Ok(Self { tip, ack })
279    }
280}
281
282/// A recovered certificate for some [Item].
283#[derive(Clone, Debug, PartialEq, Eq, Hash)]
284pub struct Certificate<S: Scheme, D: Digest> {
285    /// The item that was recovered.
286    pub item: Item<D>,
287    /// The recovered certificate.
288    pub certificate: S::Certificate,
289}
290
291impl<S: Scheme, D: Digest> Certificate<S, D> {
292    pub fn from_acks<'a>(scheme: &S, acks: impl IntoIterator<Item = &'a Ack<S, D>>) -> Option<Self>
293    where
294        S: scheme::Scheme<D>,
295    {
296        let mut iter = acks.into_iter().peekable();
297        let item = iter.peek()?.item.clone();
298        let attestations = iter
299            .filter(|ack| ack.item == item)
300            .map(|ack| ack.attestation.clone());
301        let certificate = scheme.assemble(attestations)?;
302
303        Some(Self { item, certificate })
304    }
305
306    /// Verifies the recovered certificate for the item.
307    pub fn verify<R>(&self, rng: &mut R, scheme: &S, namespace: &[u8]) -> bool
308    where
309        R: Rng + CryptoRng,
310        S: scheme::Scheme<D>,
311    {
312        scheme.verify_certificate::<_, D>(rng, namespace, &self.item, &self.certificate)
313    }
314}
315
316impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
317    fn write(&self, writer: &mut impl BufMut) {
318        self.item.write(writer);
319        self.certificate.write(writer);
320    }
321}
322
323impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
324    type Cfg = <S::Certificate as Read>::Cfg;
325
326    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
327        let item = Item::read(reader)?;
328        let certificate = S::Certificate::read_cfg(reader, cfg)?;
329        Ok(Self { item, certificate })
330    }
331}
332
333impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
334    fn encode_size(&self) -> usize {
335        self.item.encode_size() + self.certificate.encode_size()
336    }
337}
338
339#[cfg(feature = "arbitrary")]
340impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
341where
342    D: for<'a> arbitrary::Arbitrary<'a>,
343    S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
344{
345    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
346        let item = u.arbitrary::<Item<D>>()?;
347        let certificate = u.arbitrary::<S::Certificate>()?;
348        Ok(Self { item, certificate })
349    }
350}
351
352/// Used as [Reporter::Activity](crate::Reporter::Activity) to report activities that occur during
353/// aggregation. Also used to journal events that are needed to initialize the aggregation engine
354/// when the node restarts.
355#[derive(Clone, Debug, PartialEq)]
356pub enum Activity<S: Scheme, D: Digest> {
357    /// Received an ack from a participant.
358    Ack(Ack<S, D>),
359
360    /// Certified an [Item].
361    Certified(Certificate<S, D>),
362
363    /// Moved the tip to a new index.
364    Tip(Index),
365}
366
367impl<S: Scheme, D: Digest> Write for Activity<S, D> {
368    fn write(&self, writer: &mut impl BufMut) {
369        match self {
370            Self::Ack(ack) => {
371                0u8.write(writer);
372                ack.write(writer);
373            }
374            Self::Certified(certificate) => {
375                1u8.write(writer);
376                certificate.write(writer);
377            }
378            Self::Tip(index) => {
379                2u8.write(writer);
380                UInt(*index).write(writer);
381            }
382        }
383    }
384}
385
386impl<S: Scheme, D: Digest> Read for Activity<S, D> {
387    type Cfg = <S::Certificate as Read>::Cfg;
388
389    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
390        match u8::read(reader)? {
391            0 => Ok(Self::Ack(Ack::read(reader)?)),
392            1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
393            2 => Ok(Self::Tip(UInt::read(reader)?.into())),
394            _ => Err(CodecError::Invalid(
395                "consensus::aggregation::Activity",
396                "Invalid type",
397            )),
398        }
399    }
400}
401
402impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
403    fn encode_size(&self) -> usize {
404        1 + match self {
405            Self::Ack(ack) => ack.encode_size(),
406            Self::Certified(certificate) => certificate.encode_size(),
407            Self::Tip(index) => UInt(*index).encode_size(),
408        }
409    }
410}
411
412#[cfg(feature = "arbitrary")]
413impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
414where
415    D: for<'a> arbitrary::Arbitrary<'a>,
416    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
417    Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
418{
419    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
420        let choice = u.int_in_range(0..=2)?;
421        match choice {
422            0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
423            1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
424            2 => Ok(Self::Tip(u.arbitrary::<u64>()?)),
425            _ => unreachable!(),
426        }
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use crate::aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme};
434    use bytes::BytesMut;
435    use commonware_codec::{Decode, DecodeExt, Encode};
436    use commonware_cryptography::{
437        bls12381::primitives::variant::{MinPk, MinSig},
438        certificate::mocks::Fixture,
439        Hasher, Sha256,
440    };
441    use commonware_utils::ordered::Quorum;
442    use rand::{rngs::StdRng, SeedableRng};
443
444    const NAMESPACE: &[u8] = b"test";
445
446    type Sha256Digest = <Sha256 as Hasher>::Digest;
447
448    /// Generate a fixture using the provided generator function.
449    fn setup<S, F>(n: u32, fixture: F) -> Fixture<S>
450    where
451        F: FnOnce(&mut StdRng, u32) -> Fixture<S>,
452    {
453        let mut rng = StdRng::seed_from_u64(0);
454        fixture(&mut rng, n)
455    }
456
457    #[test]
458    fn test_ack_namespace() {
459        let namespace = b"test_namespace";
460        let expected = [namespace, ACK_SUFFIX].concat();
461        assert_eq!(ack_namespace(namespace), expected);
462    }
463
464    fn codec<S, F>(fixture: F)
465    where
466        S: Scheme<Sha256Digest>,
467        F: FnOnce(&mut StdRng, u32) -> Fixture<S>,
468    {
469        let fixture = setup(4, fixture);
470        let schemes = &fixture.schemes;
471        let item = Item {
472            index: 100,
473            digest: Sha256::hash(b"test_item"),
474        };
475
476        // Test Item codec
477        let restored_item = Item::decode(item.encode()).unwrap();
478        assert_eq!(item, restored_item);
479
480        // Test Ack creation and codec
481        let ack = Ack::sign(&schemes[0], NAMESPACE, Epoch::new(1), item.clone()).unwrap();
482        let cfg = schemes[0].certificate_codec_config();
483        let encoded_ack = ack.encode();
484        let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
485
486        // Verify the restored ack
487        assert_eq!(restored_ack.item, item);
488        assert_eq!(restored_ack.epoch, Epoch::new(1));
489        assert!(restored_ack.verify(&schemes[0], NAMESPACE));
490
491        // Test TipAck codec
492        let tip_ack = TipAck {
493            ack: ack.clone(),
494            tip: 42,
495        };
496        let encoded_tip_ack = tip_ack.encode();
497        let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
498        assert_eq!(restored_tip_ack.tip, 42);
499        assert_eq!(restored_tip_ack.ack.item, item);
500        assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
501
502        // Test Activity codec - Ack variant
503        let activity_ack = Activity::Ack(ack);
504        let encoded_activity = activity_ack.encode();
505        let restored_activity_ack: Activity<S, Sha256Digest> =
506            Activity::decode_cfg(encoded_activity, &cfg).unwrap();
507        if let Activity::Ack(restored) = restored_activity_ack {
508            assert_eq!(restored.item, item);
509            assert_eq!(restored.epoch, Epoch::new(1));
510        } else {
511            panic!("Expected Activity::Ack");
512        }
513
514        // Test Activity codec - Certified variant
515        // Collect enough acks for a certificate
516        let acks: Vec<_> = schemes
517            .iter()
518            .take(schemes[0].participants().quorum() as usize)
519            .filter_map(|scheme| Ack::sign(scheme, NAMESPACE, Epoch::new(1), item.clone()))
520            .collect();
521
522        let certificate = Certificate::from_acks(&schemes[0], &acks).unwrap();
523        let mut rng = StdRng::seed_from_u64(0);
524        assert!(certificate.verify(&mut rng, &schemes[0], NAMESPACE));
525
526        let activity_certified = Activity::Certified(certificate.clone());
527        let encoded_certified = activity_certified.encode();
528        let restored_activity_certified: Activity<S, Sha256Digest> =
529            Activity::decode_cfg(encoded_certified, &cfg).unwrap();
530        if let Activity::Certified(restored) = restored_activity_certified {
531            assert_eq!(restored.item, item);
532            assert!(restored.verify(&mut rng, &schemes[0], NAMESPACE));
533        } else {
534            panic!("Expected Activity::Certified");
535        }
536
537        // Test Activity codec - Tip variant
538        let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(123);
539        let encoded_tip = activity_tip.encode();
540        let restored_activity_tip: Activity<S, Sha256Digest> =
541            Activity::decode_cfg(encoded_tip, &cfg).unwrap();
542        if let Activity::Tip(index) = restored_activity_tip {
543            assert_eq!(index, 123);
544        } else {
545            panic!("Expected Activity::Tip");
546        }
547    }
548
549    #[test]
550    fn test_codec() {
551        codec(ed25519::fixture);
552        codec(bls12381_multisig::fixture::<MinPk, _>);
553        codec(bls12381_multisig::fixture::<MinSig, _>);
554        codec(bls12381_threshold::fixture::<MinPk, _>);
555        codec(bls12381_threshold::fixture::<MinSig, _>);
556    }
557
558    fn activity_invalid_enum<S, F>(fixture: F)
559    where
560        S: Scheme<Sha256Digest>,
561        F: FnOnce(&mut StdRng, u32) -> Fixture<S>,
562    {
563        let fixture = setup(4, fixture);
564        let mut buf = BytesMut::new();
565        3u8.write(&mut buf); // Invalid discriminant
566
567        let cfg = fixture.schemes[0].certificate_codec_config();
568        let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
569        assert!(matches!(
570            result,
571            Err(CodecError::Invalid(
572                "consensus::aggregation::Activity",
573                "Invalid type"
574            ))
575        ));
576    }
577
578    #[test]
579    fn test_activity_invalid_enum() {
580        activity_invalid_enum(ed25519::fixture);
581        activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
582        activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
583        activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
584        activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
585    }
586
587    #[cfg(feature = "arbitrary")]
588    mod conformance {
589        use super::*;
590        use crate::aggregation::scheme::bls12381_threshold;
591        use commonware_codec::conformance::CodecConformance;
592        use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
593
594        type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
595
596        commonware_conformance::conformance_tests! {
597            CodecConformance<Item<Sha256Digest>>,
598            CodecConformance<Ack<Scheme, Sha256Digest>>,
599            CodecConformance<TipAck<Scheme, Sha256Digest>>,
600            CodecConformance<Certificate<Scheme, Sha256Digest>>,
601            CodecConformance<Activity<Scheme, Sha256Digest>>,
602        }
603    }
604}