commonware_consensus/aggregation/
types.rs

1//! Types used in [aggregation](super).
2
3use bytes::{Buf, BufMut};
4use commonware_codec::{
5    varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
6};
7use commonware_cryptography::{
8    bls12381::primitives::{group::Share, ops, poly::PartialSignature, variant::Variant},
9    Digest,
10};
11use commonware_utils::union;
12use futures::channel::oneshot;
13use std::hash::Hash;
14
15/// Error that may be encountered when interacting with `aggregation`.
16#[derive(Debug, thiserror::Error)]
17pub enum Error {
18    // Proposal Errors
19    /// The proposal was canceled by the application
20    #[error("Application verify error: {0}")]
21    AppProposeCanceled(oneshot::Canceled),
22
23    // P2P Errors
24    /// Unable to send a message over the P2P network
25    #[error("Unable to send message")]
26    UnableToSendMessage,
27
28    // Epoch Errors
29    /// The specified validator is not a participant in the epoch
30    #[error("Epoch {0} has no validator {1}")]
31    UnknownValidator(u64, String),
32    /// No cryptographic share is known for the specified epoch
33    #[error("Unknown share at epoch {0}")]
34    UnknownShare(u64),
35
36    // Peer Errors
37    /// The sender's public key doesn't match the expected key
38    #[error("Peer mismatch")]
39    PeerMismatch,
40
41    // Signature Errors
42    /// The acknowledgment signature is invalid
43    #[error("Invalid ack signature")]
44    InvalidAckSignature,
45
46    // Ignorable Message Errors
47    /// The acknowledgment's epoch is outside the accepted bounds
48    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
49    AckEpochOutsideBounds(u64, u64, u64),
50    /// The acknowledgment's height is outside the accepted bounds
51    #[error("Non-useful ack index {0}")]
52    AckIndex(u64),
53    /// The acknowledgment's digest is incorrect
54    #[error("Invalid ack digest {0}")]
55    AckDigest(u64),
56    /// Duplicate acknowledgment for the same index
57    #[error("Duplicate ack from sender {0} for index {1}")]
58    AckDuplicate(String, u64),
59    /// The acknowledgement is for an index that already has a threshold
60    #[error("Ack for index {0} already has a threshold")]
61    AckThresholded(u64),
62    /// The epoch is unknown
63    #[error("Unknown epoch {0}")]
64    UnknownEpoch(u64),
65}
66
67impl Error {
68    /// Returns true if the error represents a blockable offense by a peer.
69    pub fn blockable(&self) -> bool {
70        matches!(self, Error::PeerMismatch | Error::InvalidAckSignature)
71    }
72}
73
74/// Epoch represents a configuration period in the aggregation protocol.
75/// Validators may change between epochs, requiring new threshold signatures.
76pub type Epoch = u64;
77
78/// Index represents the sequential position of items being aggregated.
79/// Indices are monotonically increasing within each epoch.
80pub type Index = u64;
81
82/// Suffix used to identify an acknowledgment (ack) namespace for domain separation.
83/// Used when signing and verifying acks to prevent signature reuse across different message types.
84const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
85
86/// Returns a suffixed namespace for signing an ack.
87///
88/// This provides domain separation for signatures, preventing cross-protocol attacks
89/// by ensuring signatures for acks cannot be reused for other message types.
90#[inline]
91fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
92    union(namespace, ACK_SUFFIX)
93}
94
95/// Item represents a single element being aggregated in the protocol.
96/// Each item has a unique index and contains a digest that validators sign.
97#[derive(Clone, Debug, PartialEq, Eq, Hash)]
98pub struct Item<D: Digest> {
99    /// Sequential position of this item within the current epoch
100    pub index: Index,
101    /// Cryptographic digest of the data being aggregated
102    pub digest: D,
103}
104
105impl<D: Digest> Write for Item<D> {
106    fn write(&self, writer: &mut impl BufMut) {
107        UInt(self.index).write(writer);
108        self.digest.write(writer);
109    }
110}
111
112impl<D: Digest> Read for Item<D> {
113    type Cfg = ();
114
115    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
116        let index = UInt::read(reader)?.into();
117        let digest = D::read(reader)?;
118        Ok(Self { index, digest })
119    }
120}
121
122impl<D: Digest> EncodeSize for Item<D> {
123    fn encode_size(&self) -> usize {
124        UInt(self.index).encode_size() + self.digest.encode_size()
125    }
126}
127
128/// Acknowledgment (ack) represents a validator's partial signature on an item.
129/// Multiple acks can be recovered into a threshold signature for consensus.
130#[derive(Clone, Debug, PartialEq, Eq, Hash)]
131pub struct Ack<V: Variant, D: Digest> {
132    /// The item being acknowledged
133    pub item: Item<D>,
134    /// The epoch in which this acknowledgment was created
135    pub epoch: Epoch,
136    /// Partial signature on the item using the validator's threshold share
137    pub signature: PartialSignature<V>,
138}
139
140impl<V: Variant, D: Digest> Ack<V, D> {
141    /// Verifies the partial signature on this acknowledgment.
142    ///
143    /// Returns `true` if the signature is valid for the given namespace and public key.
144    /// Domain separation is automatically applied to prevent signature reuse.
145    pub fn verify(&self, namespace: &[u8], polynomial: &[V::Public]) -> bool {
146        let Some(public) = polynomial.get(self.signature.index as usize) else {
147            return false;
148        };
149        ops::verify_message::<V>(
150            public,
151            Some(ack_namespace(namespace).as_ref()),
152            self.item.encode().as_ref(),
153            &self.signature.value,
154        )
155        .is_ok()
156    }
157
158    /// Creates a new acknowledgment by signing an item with a validator's threshold share.
159    ///
160    /// The signature uses domain separation to prevent cross-protocol attacks.
161    ///
162    /// # Determinism
163    ///
164    /// Signatures produced by this function are deterministic and safe for consensus.
165    pub fn sign(namespace: &[u8], epoch: Epoch, share: &Share, item: Item<D>) -> Self {
166        let ack_namespace = ack_namespace(namespace);
167        let signature = ops::partial_sign_message::<V>(
168            share,
169            Some(ack_namespace.as_ref()),
170            item.encode().as_ref(),
171        );
172        Self {
173            item,
174            epoch,
175            signature,
176        }
177    }
178}
179
180impl<V: Variant, D: Digest> Write for Ack<V, D> {
181    fn write(&self, writer: &mut impl BufMut) {
182        self.item.write(writer);
183        UInt(self.epoch).write(writer);
184        self.signature.write(writer);
185    }
186}
187
188impl<V: Variant, D: Digest> Read for Ack<V, D> {
189    type Cfg = ();
190
191    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
192        let item = Item::read(reader)?;
193        let epoch = UInt::read(reader)?.into();
194        let signature = PartialSignature::<V>::read(reader)?;
195        Ok(Self {
196            item,
197            epoch,
198            signature,
199        })
200    }
201}
202
203impl<V: Variant, D: Digest> EncodeSize for Ack<V, D> {
204    fn encode_size(&self) -> usize {
205        self.item.encode_size() + UInt(self.epoch).encode_size() + self.signature.encode_size()
206    }
207}
208
209/// Message exchanged between peers containing an acknowledgment and tip information.
210/// This combines a validator's partial signature with their view of consensus progress.
211#[derive(Clone, Debug, PartialEq, Eq, Hash)]
212pub struct TipAck<V: Variant, D: Digest> {
213    /// The peer's local view of the tip (the lowest index that is not yet confirmed).
214    pub tip: Index,
215
216    /// The peer's acknowledgement (partial signature) for an item.
217    pub ack: Ack<V, D>,
218}
219
220impl<V: Variant, D: Digest> Write for TipAck<V, D> {
221    fn write(&self, writer: &mut impl BufMut) {
222        UInt(self.tip).write(writer);
223        self.ack.write(writer);
224    }
225}
226
227impl<V: Variant, D: Digest> Read for TipAck<V, D> {
228    type Cfg = ();
229
230    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
231        let tip = UInt::read(reader)?.into();
232        let ack = Ack::<V, D>::read(reader)?;
233        Ok(Self { tip, ack })
234    }
235}
236
237impl<V: Variant, D: Digest> EncodeSize for TipAck<V, D> {
238    fn encode_size(&self) -> usize {
239        UInt(self.tip).encode_size() + self.ack.encode_size()
240    }
241}
242
243/// A recovered signature for some [Item].
244#[derive(Clone, Debug, PartialEq, Eq, Hash)]
245pub struct Certificate<V: Variant, D: Digest> {
246    /// The item that was recovered.
247    pub item: Item<D>,
248    /// The recovered signature.
249    pub signature: V::Signature,
250}
251
252impl<V: Variant, D: Digest> Write for Certificate<V, D> {
253    fn write(&self, writer: &mut impl BufMut) {
254        self.item.write(writer);
255        self.signature.write(writer);
256    }
257}
258
259impl<V: Variant, D: Digest> Read for Certificate<V, D> {
260    type Cfg = ();
261
262    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
263        let item = Item::read(reader)?;
264        let signature = V::Signature::read(reader)?;
265        Ok(Self { item, signature })
266    }
267}
268
269impl<V: Variant, D: Digest> EncodeSize for Certificate<V, D> {
270    fn encode_size(&self) -> usize {
271        self.item.encode_size() + self.signature.encode_size()
272    }
273}
274
275impl<V: Variant, D: Digest> Certificate<V, D> {
276    /// Verifies the signature on this certificate.
277    ///
278    /// Returns `true` if the signature is valid for the given namespace and public key.
279    /// Domain separation is automatically applied to prevent signature reuse.
280    pub fn verify(&self, namespace: &[u8], identity: &V::Public) -> bool {
281        ops::verify_message::<V>(
282            identity,
283            Some(ack_namespace(namespace).as_ref()),
284            self.item.encode().as_ref(),
285            &self.signature,
286        )
287        .is_ok()
288    }
289}
290
291/// Used as [Reporter::Activity](crate::Reporter::Activity) to report activities that occur during
292/// aggregation. Also used to journal events that are needed to initialize the aggregation engine
293/// when the node restarts.
294#[derive(Clone, Debug, PartialEq)]
295pub enum Activity<V: Variant, D: Digest> {
296    /// Received an ack from a participant.
297    Ack(Ack<V, D>),
298
299    /// Certified an [Item].
300    Certified(Certificate<V, D>),
301
302    /// Moved the tip to a new index.
303    Tip(Index),
304}
305
306impl<V: Variant, D: Digest> Write for Activity<V, D> {
307    fn write(&self, writer: &mut impl BufMut) {
308        match self {
309            Activity::Ack(ack) => {
310                0u8.write(writer);
311                ack.write(writer);
312            }
313            Activity::Certified(certificate) => {
314                1u8.write(writer);
315                certificate.write(writer);
316            }
317            Activity::Tip(index) => {
318                2u8.write(writer);
319                UInt(*index).write(writer);
320            }
321        }
322    }
323}
324
325impl<V: Variant, D: Digest> Read for Activity<V, D> {
326    type Cfg = ();
327
328    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
329        match u8::read(reader)? {
330            0 => Ok(Activity::Ack(Ack::read(reader)?)),
331            1 => Ok(Activity::Certified(Certificate::read(reader)?)),
332            2 => Ok(Activity::Tip(UInt::read(reader)?.into())),
333            _ => Err(CodecError::Invalid(
334                "consensus::aggregation::Activity",
335                "Invalid type",
336            )),
337        }
338    }
339}
340
341impl<V: Variant, D: Digest> EncodeSize for Activity<V, D> {
342    fn encode_size(&self) -> usize {
343        1 + match self {
344            Activity::Ack(ack) => ack.encode_size(),
345            Activity::Certified(certificate) => certificate.encode_size(),
346            Activity::Tip(index) => UInt(*index).encode_size(),
347        }
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use bytes::BytesMut;
355    use commonware_codec::{DecodeExt, Encode};
356    use commonware_cryptography::{
357        bls12381::{
358            dkg::ops::{self, evaluate_all},
359            primitives::{ops::sign_message, variant::MinSig},
360        },
361        Hasher, Sha256,
362    };
363    use commonware_runtime::deterministic;
364
365    #[test]
366    fn test_ack_namespace() {
367        let namespace = b"test_namespace";
368        let expected = [namespace, ACK_SUFFIX].concat();
369        assert_eq!(ack_namespace(namespace), expected);
370    }
371
372    #[test]
373    fn test_codec() {
374        let namespace = b"test";
375        let mut context = deterministic::Context::default();
376        let (public, shares) = ops::generate_shares::<_, MinSig>(&mut context, None, 4, 3);
377        let polynomial = evaluate_all::<MinSig>(&public, 4);
378        let item = Item {
379            index: 100,
380            digest: Sha256::hash(b"test_item"),
381        };
382
383        // Test Item codec
384        let restored_item = Item::decode(item.encode()).unwrap();
385        assert_eq!(item, restored_item);
386
387        // Test Ack creation, signing, verification, and codec
388        let ack: Ack<MinSig, _> = Ack::sign(namespace, 1, &shares[0], item.clone());
389        assert!(ack.verify(namespace, &polynomial));
390        assert!(!ack.verify(b"wrong", &polynomial));
391
392        let restored_ack: Ack<MinSig, <Sha256 as Hasher>::Digest> =
393            Ack::decode(ack.encode()).unwrap();
394        assert_eq!(ack, restored_ack);
395
396        // Test TipAck codec
397        let tip_ack = TipAck { ack, tip: 42 };
398        let restored: TipAck<MinSig, <Sha256 as Hasher>::Digest> =
399            TipAck::decode(tip_ack.encode()).unwrap();
400        assert_eq!(tip_ack, restored);
401
402        // Test Activity codec - Ack variant
403        let activity_ack = Activity::Ack(Ack::sign(namespace, 1, &shares[0], item.clone()));
404        let restored_activity_ack: Activity<MinSig, <Sha256 as Hasher>::Digest> =
405            Activity::decode(activity_ack.encode()).unwrap();
406        assert_eq!(activity_ack, restored_activity_ack);
407
408        // Test Activity codec - Certified variant
409        let signature = sign_message::<MinSig>(&shares[0].private, Some(b"test"), b"message");
410        let activity_certified = Activity::Certified(Certificate { item, signature });
411        let restored_activity_certified: Activity<MinSig, <Sha256 as Hasher>::Digest> =
412            Activity::decode(activity_certified.encode()).unwrap();
413        assert_eq!(activity_certified, restored_activity_certified);
414
415        // Test Activity codec - Tip variant
416        let activity_tip = Activity::Tip(123);
417        let restored_activity_tip: Activity<MinSig, <Sha256 as Hasher>::Digest> =
418            Activity::decode(activity_tip.encode()).unwrap();
419        assert_eq!(activity_tip, restored_activity_tip);
420    }
421
422    #[test]
423    fn test_activity_invalid_enum() {
424        let mut buf = BytesMut::new();
425        3u8.write(&mut buf); // Invalid discriminant
426
427        let result = Activity::<MinSig, <Sha256 as Hasher>::Digest>::decode(&buf[..]);
428        assert!(matches!(
429            result,
430            Err(CodecError::Invalid(
431                "consensus::aggregation::Activity",
432                "Invalid type"
433            ))
434        ));
435    }
436}