commonware_consensus/aggregation/
types.rs

1//! Types used in [aggregation](super).
2
3use crate::{types::Epoch, Epochable};
4use bytes::{Buf, BufMut};
5use commonware_codec::{
6    varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
7};
8use commonware_cryptography::{
9    bls12381::primitives::{group::Share, ops, poly::PartialSignature, variant::Variant},
10    Digest,
11};
12use commonware_utils::union;
13use futures::channel::oneshot;
14use std::hash::Hash;
15
16/// Error that may be encountered when interacting with `aggregation`.
17#[derive(Debug, thiserror::Error)]
18pub enum Error {
19    // Proposal Errors
20    /// The proposal was canceled by the application
21    #[error("Application verify error: {0}")]
22    AppProposeCanceled(oneshot::Canceled),
23
24    // P2P Errors
25    /// Unable to send a message over the P2P network
26    #[error("Unable to send message")]
27    UnableToSendMessage,
28
29    // Epoch Errors
30    /// The specified validator is not a participant in the epoch
31    #[error("Epoch {0} has no validator {1}")]
32    UnknownValidator(u64, String),
33    /// No cryptographic share is known for the specified epoch
34    #[error("Unknown share at epoch {0}")]
35    UnknownShare(u64),
36
37    // Peer Errors
38    /// The sender's public key doesn't match the expected key
39    #[error("Peer mismatch")]
40    PeerMismatch,
41
42    // Signature Errors
43    /// The acknowledgment signature is invalid
44    #[error("Invalid ack signature")]
45    InvalidAckSignature,
46
47    // Ignorable Message Errors
48    /// The acknowledgment's epoch is outside the accepted bounds
49    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
50    AckEpochOutsideBounds(u64, u64, u64),
51    /// The acknowledgment's height is outside the accepted bounds
52    #[error("Non-useful ack index {0}")]
53    AckIndex(u64),
54    /// The acknowledgment's digest is incorrect
55    #[error("Invalid ack digest {0}")]
56    AckDigest(u64),
57    /// Duplicate acknowledgment for the same index
58    #[error("Duplicate ack from sender {0} for index {1}")]
59    AckDuplicate(String, u64),
60    /// The acknowledgement is for an index that already has a threshold
61    #[error("Ack for index {0} already has a threshold")]
62    AckThresholded(u64),
63    /// The epoch is unknown
64    #[error("Unknown epoch {0}")]
65    UnknownEpoch(u64),
66}
67
68impl Error {
69    /// Returns true if the error represents a blockable offense by a peer.
70    pub fn blockable(&self) -> bool {
71        matches!(self, Error::PeerMismatch | Error::InvalidAckSignature)
72    }
73}
74
75/// Index represents the sequential position of items being aggregated.
76/// Indices are monotonically increasing within each epoch.
77pub type Index = u64;
78
79impl Epochable for Index {
80    type Epoch = ();
81
82    fn epoch(&self) -> Self::Epoch {}
83}
84
85/// Suffix used to identify an acknowledgment (ack) namespace for domain separation.
86/// Used when signing and verifying acks to prevent signature reuse across different message types.
87const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
88
89/// Returns a suffixed namespace for signing an ack.
90///
91/// This provides domain separation for signatures, preventing cross-protocol attacks
92/// by ensuring signatures for acks cannot be reused for other message types.
93#[inline]
94fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
95    union(namespace, ACK_SUFFIX)
96}
97
98/// Item represents a single element being aggregated in the protocol.
99/// Each item has a unique index and contains a digest that validators sign.
100#[derive(Clone, Debug, PartialEq, Eq, Hash)]
101pub struct Item<D: Digest> {
102    /// Sequential position of this item within the current epoch
103    pub index: Index,
104    /// Cryptographic digest of the data being aggregated
105    pub digest: D,
106}
107
108impl<D: Digest> Write for Item<D> {
109    fn write(&self, writer: &mut impl BufMut) {
110        UInt(self.index).write(writer);
111        self.digest.write(writer);
112    }
113}
114
115impl<D: Digest> Read for Item<D> {
116    type Cfg = ();
117
118    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
119        let index = UInt::read(reader)?.into();
120        let digest = D::read(reader)?;
121        Ok(Self { index, digest })
122    }
123}
124
125impl<D: Digest> EncodeSize for Item<D> {
126    fn encode_size(&self) -> usize {
127        UInt(self.index).encode_size() + self.digest.encode_size()
128    }
129}
130
131/// Acknowledgment (ack) represents a validator's partial signature on an item.
132/// Multiple acks can be recovered into a threshold signature for consensus.
133#[derive(Clone, Debug, PartialEq, Eq, Hash)]
134pub struct Ack<V: Variant, D: Digest> {
135    /// The item being acknowledged
136    pub item: Item<D>,
137    /// The epoch in which this acknowledgment was created
138    pub epoch: Epoch,
139    /// Partial signature on the item using the validator's threshold share
140    pub signature: PartialSignature<V>,
141}
142
143impl<V: Variant, D: Digest> Ack<V, D> {
144    /// Verifies the partial signature on this acknowledgment.
145    ///
146    /// Returns `true` if the signature is valid for the given namespace and public key.
147    /// Domain separation is automatically applied to prevent signature reuse.
148    pub fn verify(&self, namespace: &[u8], polynomial: &[V::Public]) -> bool {
149        let Some(public) = polynomial.get(self.signature.index as usize) else {
150            return false;
151        };
152        ops::verify_message::<V>(
153            public,
154            Some(ack_namespace(namespace).as_ref()),
155            self.item.encode().as_ref(),
156            &self.signature.value,
157        )
158        .is_ok()
159    }
160
161    /// Creates a new acknowledgment by signing an item with a validator's threshold share.
162    ///
163    /// The signature uses domain separation to prevent cross-protocol attacks.
164    ///
165    /// # Determinism
166    ///
167    /// Signatures produced by this function are deterministic and safe for consensus.
168    pub fn sign(namespace: &[u8], epoch: Epoch, share: &Share, item: Item<D>) -> Self {
169        let ack_namespace = ack_namespace(namespace);
170        let signature = ops::partial_sign_message::<V>(
171            share,
172            Some(ack_namespace.as_ref()),
173            item.encode().as_ref(),
174        );
175        Self {
176            item,
177            epoch,
178            signature,
179        }
180    }
181}
182
183impl<V: Variant, D: Digest> Write for Ack<V, D> {
184    fn write(&self, writer: &mut impl BufMut) {
185        self.item.write(writer);
186        UInt(self.epoch).write(writer);
187        self.signature.write(writer);
188    }
189}
190
191impl<V: Variant, D: Digest> Read for Ack<V, D> {
192    type Cfg = ();
193
194    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
195        let item = Item::read(reader)?;
196        let epoch = UInt::read(reader)?.into();
197        let signature = PartialSignature::<V>::read(reader)?;
198        Ok(Self {
199            item,
200            epoch,
201            signature,
202        })
203    }
204}
205
206impl<V: Variant, D: Digest> EncodeSize for Ack<V, D> {
207    fn encode_size(&self) -> usize {
208        self.item.encode_size() + UInt(self.epoch).encode_size() + self.signature.encode_size()
209    }
210}
211
212/// Message exchanged between peers containing an acknowledgment and tip information.
213/// This combines a validator's partial signature with their view of consensus progress.
214#[derive(Clone, Debug, PartialEq, Eq, Hash)]
215pub struct TipAck<V: Variant, D: Digest> {
216    /// The peer's local view of the tip (the lowest index that is not yet confirmed).
217    pub tip: Index,
218
219    /// The peer's acknowledgement (partial signature) for an item.
220    pub ack: Ack<V, D>,
221}
222
223impl<V: Variant, D: Digest> Write for TipAck<V, D> {
224    fn write(&self, writer: &mut impl BufMut) {
225        UInt(self.tip).write(writer);
226        self.ack.write(writer);
227    }
228}
229
230impl<V: Variant, D: Digest> Read for TipAck<V, D> {
231    type Cfg = ();
232
233    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
234        let tip = UInt::read(reader)?.into();
235        let ack = Ack::<V, D>::read(reader)?;
236        Ok(Self { tip, ack })
237    }
238}
239
240impl<V: Variant, D: Digest> EncodeSize for TipAck<V, D> {
241    fn encode_size(&self) -> usize {
242        UInt(self.tip).encode_size() + self.ack.encode_size()
243    }
244}
245
246/// A recovered signature for some [Item].
247#[derive(Clone, Debug, PartialEq, Eq, Hash)]
248pub struct Certificate<V: Variant, D: Digest> {
249    /// The item that was recovered.
250    pub item: Item<D>,
251    /// The recovered signature.
252    pub signature: V::Signature,
253}
254
255impl<V: Variant, D: Digest> Write for Certificate<V, D> {
256    fn write(&self, writer: &mut impl BufMut) {
257        self.item.write(writer);
258        self.signature.write(writer);
259    }
260}
261
262impl<V: Variant, D: Digest> Read for Certificate<V, D> {
263    type Cfg = ();
264
265    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
266        let item = Item::read(reader)?;
267        let signature = V::Signature::read(reader)?;
268        Ok(Self { item, signature })
269    }
270}
271
272impl<V: Variant, D: Digest> EncodeSize for Certificate<V, D> {
273    fn encode_size(&self) -> usize {
274        self.item.encode_size() + self.signature.encode_size()
275    }
276}
277
278impl<V: Variant, D: Digest> Certificate<V, D> {
279    /// Verifies the signature on this certificate.
280    ///
281    /// Returns `true` if the signature is valid for the given namespace and public key.
282    /// Domain separation is automatically applied to prevent signature reuse.
283    pub fn verify(&self, namespace: &[u8], identity: &V::Public) -> bool {
284        ops::verify_message::<V>(
285            identity,
286            Some(ack_namespace(namespace).as_ref()),
287            self.item.encode().as_ref(),
288            &self.signature,
289        )
290        .is_ok()
291    }
292}
293
294/// Used as [Reporter::Activity](crate::Reporter::Activity) to report activities that occur during
295/// aggregation. Also used to journal events that are needed to initialize the aggregation engine
296/// when the node restarts.
297#[derive(Clone, Debug, PartialEq)]
298pub enum Activity<V: Variant, D: Digest> {
299    /// Received an ack from a participant.
300    Ack(Ack<V, D>),
301
302    /// Certified an [Item].
303    Certified(Certificate<V, D>),
304
305    /// Moved the tip to a new index.
306    Tip(Index),
307}
308
309impl<V: Variant, D: Digest> Write for Activity<V, D> {
310    fn write(&self, writer: &mut impl BufMut) {
311        match self {
312            Activity::Ack(ack) => {
313                0u8.write(writer);
314                ack.write(writer);
315            }
316            Activity::Certified(certificate) => {
317                1u8.write(writer);
318                certificate.write(writer);
319            }
320            Activity::Tip(index) => {
321                2u8.write(writer);
322                UInt(*index).write(writer);
323            }
324        }
325    }
326}
327
328impl<V: Variant, D: Digest> Read for Activity<V, D> {
329    type Cfg = ();
330
331    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
332        match u8::read(reader)? {
333            0 => Ok(Activity::Ack(Ack::read(reader)?)),
334            1 => Ok(Activity::Certified(Certificate::read(reader)?)),
335            2 => Ok(Activity::Tip(UInt::read(reader)?.into())),
336            _ => Err(CodecError::Invalid(
337                "consensus::aggregation::Activity",
338                "Invalid type",
339            )),
340        }
341    }
342}
343
344impl<V: Variant, D: Digest> EncodeSize for Activity<V, D> {
345    fn encode_size(&self) -> usize {
346        1 + match self {
347            Activity::Ack(ack) => ack.encode_size(),
348            Activity::Certified(certificate) => certificate.encode_size(),
349            Activity::Tip(index) => UInt(*index).encode_size(),
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use bytes::BytesMut;
358    use commonware_codec::{DecodeExt, Encode};
359    use commonware_cryptography::{
360        bls12381::{
361            dkg::ops::{self, evaluate_all},
362            primitives::{ops::sign_message, variant::MinSig},
363        },
364        Hasher, Sha256,
365    };
366    use rand::{rngs::StdRng, SeedableRng};
367
368    #[test]
369    fn test_ack_namespace() {
370        let namespace = b"test_namespace";
371        let expected = [namespace, ACK_SUFFIX].concat();
372        assert_eq!(ack_namespace(namespace), expected);
373    }
374
375    #[test]
376    fn test_codec() {
377        let namespace = b"test";
378        let mut rng = StdRng::seed_from_u64(0);
379        let (public, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, 4, 3);
380        let polynomial = evaluate_all::<MinSig>(&public, 4);
381        let item = Item {
382            index: 100,
383            digest: Sha256::hash(b"test_item"),
384        };
385
386        // Test Item codec
387        let restored_item = Item::decode(item.encode()).unwrap();
388        assert_eq!(item, restored_item);
389
390        // Test Ack creation, signing, verification, and codec
391        let ack: Ack<MinSig, _> = Ack::sign(namespace, 1, &shares[0], item.clone());
392        assert!(ack.verify(namespace, &polynomial));
393        assert!(!ack.verify(b"wrong", &polynomial));
394
395        let restored_ack: Ack<MinSig, <Sha256 as Hasher>::Digest> =
396            Ack::decode(ack.encode()).unwrap();
397        assert_eq!(ack, restored_ack);
398
399        // Test TipAck codec
400        let tip_ack = TipAck { ack, tip: 42 };
401        let restored: TipAck<MinSig, <Sha256 as Hasher>::Digest> =
402            TipAck::decode(tip_ack.encode()).unwrap();
403        assert_eq!(tip_ack, restored);
404
405        // Test Activity codec - Ack variant
406        let activity_ack = Activity::Ack(Ack::sign(namespace, 1, &shares[0], item.clone()));
407        let restored_activity_ack: Activity<MinSig, <Sha256 as Hasher>::Digest> =
408            Activity::decode(activity_ack.encode()).unwrap();
409        assert_eq!(activity_ack, restored_activity_ack);
410
411        // Test Activity codec - Certified variant
412        let signature = sign_message::<MinSig>(&shares[0].private, Some(b"test"), b"message");
413        let activity_certified = Activity::Certified(Certificate { item, signature });
414        let restored_activity_certified: Activity<MinSig, <Sha256 as Hasher>::Digest> =
415            Activity::decode(activity_certified.encode()).unwrap();
416        assert_eq!(activity_certified, restored_activity_certified);
417
418        // Test Activity codec - Tip variant
419        let activity_tip = Activity::Tip(123);
420        let restored_activity_tip: Activity<MinSig, <Sha256 as Hasher>::Digest> =
421            Activity::decode(activity_tip.encode()).unwrap();
422        assert_eq!(activity_tip, restored_activity_tip);
423    }
424
425    #[test]
426    fn test_activity_invalid_enum() {
427        let mut buf = BytesMut::new();
428        3u8.write(&mut buf); // Invalid discriminant
429
430        let result = Activity::<MinSig, <Sha256 as Hasher>::Digest>::decode(&buf[..]);
431        assert!(matches!(
432            result,
433            Err(CodecError::Invalid(
434                "consensus::aggregation::Activity",
435                "Invalid type"
436            ))
437        ));
438    }
439}