commonware_consensus/aggregation/
types.rs

1//! Types used in [aggregation](super).
2
3use crate::{
4    aggregation::scheme,
5    types::{Epoch, Height},
6    Heightable,
7};
8use bytes::{Buf, BufMut, Bytes};
9use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write};
10use commonware_cryptography::{
11    certificate::{self, Attestation, Scheme, Subject},
12    Digest,
13};
14use commonware_parallel::Strategy;
15use commonware_utils::{union, N3f1};
16use futures::channel::oneshot;
17use rand_core::CryptoRngCore;
18use std::hash::Hash;
19
20/// Error that may be encountered when interacting with `aggregation`.
21#[derive(Debug, thiserror::Error)]
22pub enum Error {
23    // Proposal Errors
24    /// The proposal was canceled by the application
25    #[error("Application verify error: {0}")]
26    AppProposeCanceled(oneshot::Canceled),
27
28    // P2P Errors
29    /// Unable to send a message over the P2P network
30    #[error("Unable to send message")]
31    UnableToSendMessage,
32
33    // Epoch Errors
34    /// The specified validator is not a participant in the epoch
35    #[error("Epoch {0} has no validator {1}")]
36    UnknownValidator(Epoch, String),
37    /// The local node is not a signer in the scheme for the specified epoch.
38    #[error("Not a signer at epoch {0}")]
39    NotSigner(Epoch),
40
41    // Peer Errors
42    /// The sender's public key doesn't match the expected key
43    #[error("Peer mismatch")]
44    PeerMismatch,
45
46    // Signature Errors
47    /// The acknowledgment signature is invalid
48    #[error("Invalid ack signature")]
49    InvalidAckSignature,
50
51    // Ignorable Message Errors
52    /// The acknowledgment's epoch is outside the accepted bounds
53    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
54    AckEpochOutsideBounds(Epoch, Epoch, Epoch),
55    /// The acknowledgment's height is outside the accepted bounds
56    #[error("Non-useful ack height {0}")]
57    AckHeight(Height),
58    /// The acknowledgment's digest is incorrect
59    #[error("Invalid ack digest {0}")]
60    AckDigest(Height),
61    /// Duplicate acknowledgment for the same height
62    #[error("Duplicate ack from sender {0} for height {1}")]
63    AckDuplicate(String, Height),
64    /// The acknowledgement is for a height that already has a certificate
65    #[error("Ack for height {0} already has been certified")]
66    AckCertified(Height),
67    /// The epoch is unknown
68    #[error("Unknown epoch {0}")]
69    UnknownEpoch(Epoch),
70}
71
72impl Error {
73    /// Returns true if the error represents a blockable offense by a peer.
74    pub const fn blockable(&self) -> bool {
75        matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
76    }
77}
78
79/// Suffix used to identify an acknowledgment (ack) namespace for domain separation.
80/// Used when signing and verifying acks to prevent signature reuse across different message types.
81const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
82
83/// Returns a suffixed namespace for signing an ack.
84///
85/// This provides domain separation for signatures, preventing cross-protocol attacks
86/// by ensuring signatures for acks cannot be reused for other message types.
87#[inline]
88fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
89    union(namespace, ACK_SUFFIX)
90}
91
92/// Namespace type for aggregation acknowledgments.
93///
94/// This type encapsulates the pre-computed namespace bytes used for signing and
95/// verifying acks.
96#[derive(Clone, Debug)]
97pub struct Namespace(Vec<u8>);
98
99impl certificate::Namespace for Namespace {
100    fn derive(namespace: &[u8]) -> Self {
101        Self(ack_namespace(namespace))
102    }
103}
104
105/// Item represents a single element being aggregated in the protocol.
106/// Each item has a unique height and contains a digest that validators sign.
107#[derive(Clone, Debug, PartialEq, Eq, Hash)]
108pub struct Item<D: Digest> {
109    /// Sequential position of this item within the current epoch
110    pub height: Height,
111    /// Cryptographic digest of the data being aggregated
112    pub digest: D,
113}
114
115impl<D: Digest> Heightable for Item<D> {
116    fn height(&self) -> Height {
117        self.height
118    }
119}
120
121impl<D: Digest> Write for Item<D> {
122    fn write(&self, writer: &mut impl BufMut) {
123        self.height.write(writer);
124        self.digest.write(writer);
125    }
126}
127
128impl<D: Digest> Read for Item<D> {
129    type Cfg = ();
130
131    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
132        let height = Height::read(reader)?;
133        let digest = D::read(reader)?;
134        Ok(Self { height, digest })
135    }
136}
137
138impl<D: Digest> EncodeSize for Item<D> {
139    fn encode_size(&self) -> usize {
140        self.height.encode_size() + self.digest.encode_size()
141    }
142}
143
144impl<D: Digest> Subject for &Item<D> {
145    type Namespace = Namespace;
146
147    fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
148        &derived.0
149    }
150
151    fn message(&self) -> Bytes {
152        self.encode()
153    }
154}
155
156#[cfg(feature = "arbitrary")]
157impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
158where
159    D: for<'a> arbitrary::Arbitrary<'a>,
160{
161    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
162        let height = u.arbitrary::<Height>()?;
163        let digest = u.arbitrary::<D>()?;
164        Ok(Self { height, digest })
165    }
166}
167
168/// Acknowledgment (ack) represents a validator's vote on an item.
169/// Multiple acks can be recovered into a certificate for consensus.
170#[derive(Clone, Debug, PartialEq, Eq, Hash)]
171pub struct Ack<S: Scheme, D: Digest> {
172    /// The item being acknowledged
173    pub item: Item<D>,
174    /// The epoch in which this acknowledgment was created
175    pub epoch: Epoch,
176    /// Scheme-specific attestation material
177    pub attestation: Attestation<S>,
178}
179
180impl<S: Scheme, D: Digest> Ack<S, D> {
181    /// Verifies the attestation on this acknowledgment.
182    ///
183    /// Returns `true` if the attestation is valid for the given namespace and public key.
184    /// Domain separation is automatically applied to prevent signature reuse.
185    pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
186    where
187        R: CryptoRngCore,
188        S: scheme::Scheme<D>,
189    {
190        scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation, strategy)
191    }
192
193    /// Creates a new acknowledgment by signing an item with a validator's key.
194    ///
195    /// The signature uses domain separation to prevent cross-protocol attacks.
196    ///
197    /// # Determinism
198    ///
199    /// Signatures produced by this function are deterministic and safe for consensus.
200    pub fn sign(scheme: &S, epoch: Epoch, item: Item<D>) -> Option<Self>
201    where
202        S: scheme::Scheme<D>,
203    {
204        let attestation = scheme.sign::<D>(&item)?;
205        Some(Self {
206            item,
207            epoch,
208            attestation,
209        })
210    }
211}
212
213impl<S: Scheme, D: Digest> Write for Ack<S, D> {
214    fn write(&self, writer: &mut impl BufMut) {
215        self.item.write(writer);
216        self.epoch.write(writer);
217        self.attestation.write(writer);
218    }
219}
220
221impl<S: Scheme, D: Digest> Read for Ack<S, D> {
222    type Cfg = ();
223
224    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
225        let item = Item::read(reader)?;
226        let epoch = Epoch::read(reader)?;
227        let attestation = Attestation::read(reader)?;
228        Ok(Self {
229            item,
230            epoch,
231            attestation,
232        })
233    }
234}
235
236impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
237    fn encode_size(&self) -> usize {
238        self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
239    }
240}
241
242#[cfg(feature = "arbitrary")]
243impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
244where
245    S::Signature: for<'a> arbitrary::Arbitrary<'a>,
246    D: for<'a> arbitrary::Arbitrary<'a>,
247{
248    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
249        let item = u.arbitrary::<Item<D>>()?;
250        let epoch = u.arbitrary::<Epoch>()?;
251        let attestation = Attestation::arbitrary(u)?;
252        Ok(Self {
253            item,
254            epoch,
255            attestation,
256        })
257    }
258}
259
260/// Message exchanged between peers containing an acknowledgment and tip information.
261/// This combines a validator's vote with their view of consensus progress.
262#[derive(Clone, Debug, PartialEq, Eq, Hash)]
263pub struct TipAck<S: Scheme, D: Digest> {
264    /// The peer's local view of the tip (the lowest height that is not yet confirmed).
265    pub tip: Height,
266
267    /// The peer's acknowledgement (vote) for an item.
268    pub ack: Ack<S, D>,
269}
270
271impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
272    fn write(&self, writer: &mut impl BufMut) {
273        self.tip.write(writer);
274        self.ack.write(writer);
275    }
276}
277
278impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
279    type Cfg = ();
280
281    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
282        let tip = Height::read(reader)?;
283        let ack = Ack::read(reader)?;
284        Ok(Self { tip, ack })
285    }
286}
287
288impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
289    fn encode_size(&self) -> usize {
290        self.tip.encode_size() + self.ack.encode_size()
291    }
292}
293
294#[cfg(feature = "arbitrary")]
295impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
296where
297    D: for<'a> arbitrary::Arbitrary<'a>,
298    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
299{
300    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
301        let tip = u.arbitrary::<Height>()?;
302        let ack = u.arbitrary::<Ack<S, D>>()?;
303        Ok(Self { tip, ack })
304    }
305}
306
307/// A recovered certificate for some [Item].
308#[derive(Clone, Debug, PartialEq, Eq, Hash)]
309pub struct Certificate<S: Scheme, D: Digest> {
310    /// The item that was recovered.
311    pub item: Item<D>,
312    /// The recovered certificate.
313    pub certificate: S::Certificate,
314}
315
316impl<S: Scheme, D: Digest> Certificate<S, D> {
317    pub fn from_acks<'a>(
318        scheme: &S,
319        acks: impl IntoIterator<Item = &'a Ack<S, D>>,
320        strategy: &impl Strategy,
321    ) -> Option<Self>
322    where
323        S: scheme::Scheme<D>,
324    {
325        let mut iter = acks.into_iter().peekable();
326        let item = iter.peek()?.item.clone();
327        let attestations = iter
328            .filter(|ack| ack.item == item)
329            .map(|ack| ack.attestation.clone());
330        let certificate = scheme.assemble::<_, N3f1>(attestations, strategy)?;
331
332        Some(Self { item, certificate })
333    }
334
335    /// Verifies the recovered certificate for the item.
336    pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
337    where
338        R: CryptoRngCore,
339        S: scheme::Scheme<D>,
340    {
341        scheme.verify_certificate::<_, D, N3f1>(rng, &self.item, &self.certificate, strategy)
342    }
343}
344
345impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
346    fn write(&self, writer: &mut impl BufMut) {
347        self.item.write(writer);
348        self.certificate.write(writer);
349    }
350}
351
352impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
353    type Cfg = <S::Certificate as Read>::Cfg;
354
355    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
356        let item = Item::read(reader)?;
357        let certificate = S::Certificate::read_cfg(reader, cfg)?;
358        Ok(Self { item, certificate })
359    }
360}
361
362impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
363    fn encode_size(&self) -> usize {
364        self.item.encode_size() + self.certificate.encode_size()
365    }
366}
367
368#[cfg(feature = "arbitrary")]
369impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
370where
371    D: for<'a> arbitrary::Arbitrary<'a>,
372    S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
373{
374    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
375        let item = u.arbitrary::<Item<D>>()?;
376        let certificate = u.arbitrary::<S::Certificate>()?;
377        Ok(Self { item, certificate })
378    }
379}
380
381/// Used as [Reporter::Activity](crate::Reporter::Activity) to report activities that occur during
382/// aggregation. Also used to journal events that are needed to initialize the aggregation engine
383/// when the node restarts.
384#[derive(Clone, Debug, PartialEq)]
385pub enum Activity<S: Scheme, D: Digest> {
386    /// Received an ack from a participant.
387    Ack(Ack<S, D>),
388
389    /// Certified an [Item].
390    Certified(Certificate<S, D>),
391
392    /// Moved the tip to a new height.
393    Tip(Height),
394}
395
396impl<S: Scheme, D: Digest> Write for Activity<S, D> {
397    fn write(&self, writer: &mut impl BufMut) {
398        match self {
399            Self::Ack(ack) => {
400                0u8.write(writer);
401                ack.write(writer);
402            }
403            Self::Certified(certificate) => {
404                1u8.write(writer);
405                certificate.write(writer);
406            }
407            Self::Tip(height) => {
408                2u8.write(writer);
409                height.write(writer);
410            }
411        }
412    }
413}
414
415impl<S: Scheme, D: Digest> Read for Activity<S, D> {
416    type Cfg = <S::Certificate as Read>::Cfg;
417
418    fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
419        match u8::read(reader)? {
420            0 => Ok(Self::Ack(Ack::read(reader)?)),
421            1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
422            2 => Ok(Self::Tip(Height::read(reader)?)),
423            _ => Err(CodecError::Invalid(
424                "consensus::aggregation::Activity",
425                "Invalid type",
426            )),
427        }
428    }
429}
430
431impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
432    fn encode_size(&self) -> usize {
433        1 + match self {
434            Self::Ack(ack) => ack.encode_size(),
435            Self::Certified(certificate) => certificate.encode_size(),
436            Self::Tip(height) => height.encode_size(),
437        }
438    }
439}
440
441#[cfg(feature = "arbitrary")]
442impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
443where
444    D: for<'a> arbitrary::Arbitrary<'a>,
445    Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
446    Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
447{
448    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
449        let choice = u.int_in_range(0..=2)?;
450        match choice {
451            0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
452            1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
453            2 => Ok(Self::Tip(u.arbitrary::<Height>()?)),
454            _ => unreachable!(),
455        }
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use crate::aggregation::scheme::{
463        bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme,
464    };
465    use bytes::BytesMut;
466    use commonware_codec::{Decode, DecodeExt, Encode};
467    use commonware_cryptography::{
468        bls12381::primitives::variant::{MinPk, MinSig},
469        certificate::mocks::Fixture,
470        Hasher, Sha256,
471    };
472    use commonware_parallel::Sequential;
473    use commonware_utils::{ordered::Quorum, test_rng, N3f1};
474    use rand::rngs::StdRng;
475
476    const NAMESPACE: &[u8] = b"test";
477
478    type Sha256Digest = <Sha256 as Hasher>::Digest;
479
480    #[test]
481    fn test_ack_namespace() {
482        let namespace = b"test_namespace";
483        let expected = [namespace, ACK_SUFFIX].concat();
484        assert_eq!(ack_namespace(namespace), expected);
485    }
486
487    fn codec<S, F>(fixture: F)
488    where
489        S: Scheme<Sha256Digest>,
490        F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
491    {
492        let mut rng = test_rng();
493        let fixture = fixture(&mut rng, NAMESPACE, 4);
494        let schemes = &fixture.schemes;
495        let item = Item {
496            height: Height::new(100),
497            digest: Sha256::hash(b"test_item"),
498        };
499
500        // Test Item codec
501        let restored_item = Item::decode(item.encode()).unwrap();
502        assert_eq!(item, restored_item);
503
504        // Test Ack creation and codec
505        let ack = Ack::sign(&schemes[0], Epoch::new(1), item.clone()).unwrap();
506        let cfg = schemes[0].certificate_codec_config();
507        let encoded_ack = ack.encode();
508        let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
509
510        // Verify the restored ack
511        assert_eq!(restored_ack.item, item);
512        assert_eq!(restored_ack.epoch, Epoch::new(1));
513        assert!(restored_ack.verify(&mut rng, &schemes[0], &Sequential));
514
515        // Test TipAck codec
516        let tip_ack = TipAck {
517            ack: ack.clone(),
518            tip: Height::new(42),
519        };
520        let encoded_tip_ack = tip_ack.encode();
521        let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
522        assert_eq!(restored_tip_ack.tip, Height::new(42));
523        assert_eq!(restored_tip_ack.ack.item, item);
524        assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
525
526        // Test Activity codec - Ack variant
527        let activity_ack = Activity::Ack(ack);
528        let encoded_activity = activity_ack.encode();
529        let restored_activity_ack: Activity<S, Sha256Digest> =
530            Activity::decode_cfg(encoded_activity, &cfg).unwrap();
531        if let Activity::Ack(restored) = restored_activity_ack {
532            assert_eq!(restored.item, item);
533            assert_eq!(restored.epoch, Epoch::new(1));
534        } else {
535            panic!("Expected Activity::Ack");
536        }
537
538        // Test Activity codec - Certified variant
539        // Collect enough acks for a certificate
540        let acks: Vec<_> = schemes
541            .iter()
542            .take(schemes[0].participants().quorum::<N3f1>() as usize)
543            .filter_map(|scheme| Ack::sign(scheme, Epoch::new(1), item.clone()))
544            .collect();
545
546        let certificate = Certificate::from_acks(&schemes[0], &acks, &Sequential).unwrap();
547        assert!(certificate.verify(&mut rng, &schemes[0], &Sequential));
548
549        let activity_certified = Activity::Certified(certificate.clone());
550        let encoded_certified = activity_certified.encode();
551        let restored_activity_certified: Activity<S, Sha256Digest> =
552            Activity::decode_cfg(encoded_certified, &cfg).unwrap();
553        if let Activity::Certified(restored) = restored_activity_certified {
554            assert_eq!(restored.item, item);
555            assert!(restored.verify(&mut rng, &schemes[0], &Sequential));
556        } else {
557            panic!("Expected Activity::Certified");
558        }
559
560        // Test Activity codec - Tip variant
561        let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(Height::new(123));
562        let encoded_tip = activity_tip.encode();
563        let restored_activity_tip: Activity<S, Sha256Digest> =
564            Activity::decode_cfg(encoded_tip, &cfg).unwrap();
565        if let Activity::Tip(height) = restored_activity_tip {
566            assert_eq!(height, Height::new(123));
567        } else {
568            panic!("Expected Activity::Tip");
569        }
570    }
571
572    #[test]
573    fn test_codec() {
574        codec(ed25519::fixture);
575        codec(secp256r1::fixture);
576        codec(bls12381_multisig::fixture::<MinPk, _>);
577        codec(bls12381_multisig::fixture::<MinSig, _>);
578        codec(bls12381_threshold::fixture::<MinPk, _>);
579        codec(bls12381_threshold::fixture::<MinSig, _>);
580    }
581
582    fn activity_invalid_enum<S, F>(fixture: F)
583    where
584        S: Scheme<Sha256Digest>,
585        F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
586    {
587        let fixture = fixture(&mut test_rng(), NAMESPACE, 4);
588        let mut buf = BytesMut::new();
589        3u8.write(&mut buf); // Invalid discriminant
590
591        let cfg = fixture.schemes[0].certificate_codec_config();
592        let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
593        assert!(matches!(
594            result,
595            Err(CodecError::Invalid(
596                "consensus::aggregation::Activity",
597                "Invalid type"
598            ))
599        ));
600    }
601
602    #[test]
603    fn test_activity_invalid_enum() {
604        activity_invalid_enum(ed25519::fixture);
605        activity_invalid_enum(secp256r1::fixture);
606        activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
607        activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
608        activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
609        activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
610    }
611
612    #[cfg(feature = "arbitrary")]
613    mod conformance {
614        use super::*;
615        use crate::aggregation::scheme::bls12381_threshold;
616        use commonware_codec::conformance::CodecConformance;
617        use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
618
619        type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
620
621        commonware_conformance::conformance_tests! {
622            CodecConformance<Item<Sha256Digest>>,
623            CodecConformance<Ack<Scheme, Sha256Digest>>,
624            CodecConformance<TipAck<Scheme, Sha256Digest>>,
625            CodecConformance<Certificate<Scheme, Sha256Digest>>,
626            CodecConformance<Activity<Scheme, Sha256Digest>>,
627        }
628    }
629}