commonware_consensus/aggregation/
types.rs

1//! Types used in [aggregation](super).
2
3use bytes::{Buf, BufMut};
4use commonware_codec::{
5    varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
6};
7use commonware_cryptography::{
8    bls12381::primitives::{
9        group::Share,
10        ops,
11        poly::{self, PartialSignature},
12        variant::Variant,
13    },
14    Digest,
15};
16use commonware_utils::union;
17use futures::channel::oneshot;
18use std::hash::Hash;
19
20/// Error that may be encountered when interacting with `aggregation`.
21#[derive(Debug, thiserror::Error)]
22pub enum Error {
23    // Proposal Errors
24    /// The proposal was canceled by the application
25    #[error("Application verify error: {0}")]
26    AppProposeCanceled(oneshot::Canceled),
27
28    // P2P Errors
29    /// Unable to send a message over the P2P network
30    #[error("Unable to send message")]
31    UnableToSendMessage,
32
33    // Epoch Errors
34    /// The specified validator is not a participant in the epoch
35    #[error("Epoch {0} has no validator {1}")]
36    UnknownValidator(u64, String),
37    /// No cryptographic share is known for the specified epoch
38    #[error("Unknown share at epoch {0}")]
39    UnknownShare(u64),
40
41    // Peer Errors
42    /// The sender's public key doesn't match the expected key
43    #[error("Peer mismatch")]
44    PeerMismatch,
45
46    // Signature Errors
47    /// The acknowledgment signature is invalid
48    #[error("Invalid ack signature")]
49    InvalidAckSignature,
50
51    // Ignorable Message Errors
52    /// The acknowledgment's epoch is outside the accepted bounds
53    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
54    AckEpochOutsideBounds(u64, u64, u64),
55    /// The acknowledgment's height is outside the accepted bounds
56    #[error("Non-useful ack index {0}")]
57    AckIndex(u64),
58    /// Duplicate acknowledgment for the same index
59    #[error("Duplicate ack from sender {0} for index {1}")]
60    AckDuplicate(String, u64),
61    /// The acknowledgement is for an index that already has a threshold
62    #[error("Ack for index {0} already has a threshold")]
63    AckThresholded(u64),
64}
65
66impl Error {
67    /// Returns true if the error represents a blockable offense by a peer.
68    pub fn blockable(&self) -> bool {
69        matches!(self, Error::PeerMismatch | Error::InvalidAckSignature)
70    }
71}
72
73/// Epoch represents a configuration period in the aggregation protocol.
74/// Validators may change between epochs, requiring new threshold signatures.
75pub type Epoch = u64;
76
77/// Index represents the sequential position of items being aggregated.
78/// Indices are monotonically increasing within each epoch.
79pub type Index = u64;
80
81/// Suffix used to identify an acknowledgment (ack) namespace for domain separation.
82/// Used when signing and verifying acks to prevent signature reuse across different message types.
83const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
84
85/// Returns a suffixed namespace for signing an ack.
86///
87/// This provides domain separation for signatures, preventing cross-protocol attacks
88/// by ensuring signatures for acks cannot be reused for other message types.
89#[inline]
90fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
91    union(namespace, ACK_SUFFIX)
92}
93
94/// Item represents a single element being aggregated in the protocol.
95/// Each item has a unique index and contains a digest that validators sign.
96#[derive(Clone, Debug, PartialEq, Eq, Hash)]
97pub struct Item<D: Digest> {
98    /// Sequential position of this item within the current epoch
99    pub index: Index,
100    /// Cryptographic digest of the data being aggregated
101    pub digest: D,
102}
103
104impl<D: Digest> Write for Item<D> {
105    fn write(&self, writer: &mut impl BufMut) {
106        UInt(self.index).write(writer);
107        self.digest.write(writer);
108    }
109}
110
111impl<D: Digest> Read for Item<D> {
112    type Cfg = ();
113
114    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
115        let index = UInt::read(reader)?.into();
116        let digest = D::read(reader)?;
117        Ok(Self { index, digest })
118    }
119}
120
121impl<D: Digest> EncodeSize for Item<D> {
122    fn encode_size(&self) -> usize {
123        UInt(self.index).encode_size() + self.digest.encode_size()
124    }
125}
126
127/// Acknowledgment (ack) represents a validator's partial signature on an item.
128/// Multiple acks can be recovered into a threshold signature for consensus.
129#[derive(Clone, Debug, PartialEq, Eq, Hash)]
130pub struct Ack<V: Variant, D: Digest> {
131    /// The item being acknowledged
132    pub item: Item<D>,
133    /// The epoch in which this acknowledgment was created
134    pub epoch: Epoch,
135    /// Partial signature on the item using the validator's threshold share
136    pub signature: PartialSignature<V>,
137}
138
139impl<V: Variant, D: Digest> Ack<V, D> {
140    /// Verifies the partial signature on this acknowledgment.
141    ///
142    /// Returns `true` if the signature is valid for the given namespace and public key.
143    /// Domain separation is automatically applied to prevent signature reuse.
144    pub fn verify(&self, namespace: &[u8], identity: &poly::Public<V>) -> bool {
145        ops::partial_verify_message::<V>(
146            identity,
147            Some(ack_namespace(namespace).as_ref()),
148            self.item.encode().as_ref(),
149            &self.signature,
150        )
151        .is_ok()
152    }
153
154    /// Creates a new acknowledgment by signing an item with a validator's threshold share.
155    ///
156    /// The signature uses domain separation to prevent cross-protocol attacks.
157    ///
158    /// # Determinism
159    ///
160    /// Signatures produced by this function are deterministic and safe for consensus.
161    pub fn sign(namespace: &[u8], epoch: Epoch, share: &Share, item: Item<D>) -> Self {
162        let ack_namespace = ack_namespace(namespace);
163        let signature = ops::partial_sign_message::<V>(
164            share,
165            Some(ack_namespace.as_ref()),
166            item.encode().as_ref(),
167        );
168        Self {
169            item,
170            epoch,
171            signature,
172        }
173    }
174}
175
176impl<V: Variant, D: Digest> Write for Ack<V, D> {
177    fn write(&self, writer: &mut impl BufMut) {
178        self.item.write(writer);
179        UInt(self.epoch).write(writer);
180        self.signature.write(writer);
181    }
182}
183
184impl<V: Variant, D: Digest> Read for Ack<V, D> {
185    type Cfg = ();
186
187    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
188        let item = Item::read(reader)?;
189        let epoch = UInt::read(reader)?.into();
190        let signature = PartialSignature::<V>::read(reader)?;
191        Ok(Self {
192            item,
193            epoch,
194            signature,
195        })
196    }
197}
198
199impl<V: Variant, D: Digest> EncodeSize for Ack<V, D> {
200    fn encode_size(&self) -> usize {
201        self.item.encode_size() + UInt(self.epoch).encode_size() + self.signature.encode_size()
202    }
203}
204
205/// Message exchanged between peers containing an acknowledgment and tip information.
206/// This combines a validator's partial signature with their view of consensus progress.
207#[derive(Clone, Debug, PartialEq, Eq, Hash)]
208pub struct TipAck<V: Variant, D: Digest> {
209    /// The peer's local view of the tip (the lowest index that is not yet confirmed).
210    pub tip: Index,
211
212    /// The peer's acknowledgement (partial signature) for an item.
213    pub ack: Ack<V, D>,
214}
215
216impl<V: Variant, D: Digest> Write for TipAck<V, D> {
217    fn write(&self, writer: &mut impl BufMut) {
218        UInt(self.tip).write(writer);
219        self.ack.write(writer);
220    }
221}
222
223impl<V: Variant, D: Digest> Read for TipAck<V, D> {
224    type Cfg = ();
225
226    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
227        let tip = UInt::read(reader)?.into();
228        let ack = Ack::<V, D>::read(reader)?;
229        Ok(Self { tip, ack })
230    }
231}
232
233impl<V: Variant, D: Digest> EncodeSize for TipAck<V, D> {
234    fn encode_size(&self) -> usize {
235        UInt(self.tip).encode_size() + self.ack.encode_size()
236    }
237}
238
239/// Used as [Reporter::Activity](crate::Reporter::Activity) to report activities that occur during
240/// aggregation. Also used to journal events that are needed to initialize the aggregation engine
241/// when the node restarts.
242#[derive(Clone, Debug, PartialEq)]
243pub enum Activity<V: Variant, D: Digest> {
244    /// Received an ack from a participant.
245    Ack(Ack<V, D>),
246
247    /// Recovered a threshold signature.
248    Recovered(Item<D>, V::Signature),
249
250    /// Moved the tip to a new index.
251    Tip(Index),
252}
253
254impl<V: Variant, D: Digest> Write for Activity<V, D> {
255    fn write(&self, writer: &mut impl BufMut) {
256        match self {
257            Activity::Ack(ack) => {
258                0u8.write(writer);
259                ack.write(writer);
260            }
261            Activity::Recovered(item, signature) => {
262                1u8.write(writer);
263                item.write(writer);
264                signature.write(writer);
265            }
266            Activity::Tip(index) => {
267                2u8.write(writer);
268                UInt(*index).write(writer);
269            }
270        }
271    }
272}
273
274impl<V: Variant, D: Digest> Read for Activity<V, D> {
275    type Cfg = ();
276
277    fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
278        match u8::read(reader)? {
279            0 => Ok(Activity::Ack(Ack::read(reader)?)),
280            1 => Ok(Activity::Recovered(
281                Item::read(reader)?,
282                V::Signature::read(reader)?,
283            )),
284            2 => Ok(Activity::Tip(UInt::read(reader)?.into())),
285            _ => Err(CodecError::Invalid(
286                "consensus::aggregation::Activity",
287                "Invalid type",
288            )),
289        }
290    }
291}
292
293impl<V: Variant, D: Digest> EncodeSize for Activity<V, D> {
294    fn encode_size(&self) -> usize {
295        1 + match self {
296            Activity::Ack(ack) => ack.encode_size(),
297            Activity::Recovered(item, signature) => item.encode_size() + signature.encode_size(),
298            Activity::Tip(index) => UInt(*index).encode_size(),
299        }
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use bytes::BytesMut;
307    use commonware_codec::{DecodeExt, Encode};
308    use commonware_cryptography::{
309        bls12381::{
310            dkg::ops,
311            primitives::{ops::sign_message, variant::MinSig},
312        },
313        sha256,
314    };
315    use commonware_runtime::deterministic;
316
317    #[test]
318    fn test_ack_namespace() {
319        let namespace = b"test_namespace";
320        let expected = [namespace, ACK_SUFFIX].concat();
321        assert_eq!(ack_namespace(namespace), expected);
322    }
323
324    #[test]
325    fn test_codec() {
326        let namespace = b"test";
327        let mut context = deterministic::Context::default();
328        let (public, shares) = ops::generate_shares::<_, MinSig>(&mut context, None, 4, 3);
329        let item = Item {
330            index: 100,
331            digest: sha256::hash(b"test_item"),
332        };
333
334        // Test Item codec
335        let restored_item = Item::decode(item.encode()).unwrap();
336        assert_eq!(item, restored_item);
337
338        // Test Ack creation, signing, verification, and codec
339        let ack: Ack<MinSig, _> = Ack::sign(namespace, 1, &shares[0], item.clone());
340        assert!(ack.verify(namespace, &public));
341        assert!(!ack.verify(b"wrong", &public));
342
343        let restored_ack: Ack<MinSig, sha256::Digest> = Ack::decode(ack.encode()).unwrap();
344        assert_eq!(ack, restored_ack);
345
346        // Test TipAck codec
347        let tip_ack = TipAck { ack, tip: 42 };
348        let restored: TipAck<MinSig, sha256::Digest> = TipAck::decode(tip_ack.encode()).unwrap();
349        assert_eq!(tip_ack, restored);
350
351        // Test Activity codec - Ack variant
352        let activity_ack = Activity::Ack(Ack::sign(namespace, 1, &shares[0], item.clone()));
353        let restored_activity_ack: Activity<MinSig, sha256::Digest> =
354            Activity::decode(activity_ack.encode()).unwrap();
355        assert_eq!(activity_ack, restored_activity_ack);
356
357        // Test Activity codec - Recovered variant
358        let signature = sign_message::<MinSig>(&shares[0].private, Some(b"test"), b"message");
359        let activity_recovered = Activity::Recovered(item, signature);
360        let restored_activity_recovered: Activity<MinSig, sha256::Digest> =
361            Activity::decode(activity_recovered.encode()).unwrap();
362        assert_eq!(activity_recovered, restored_activity_recovered);
363
364        // Test Activity codec - Tip variant
365        let activity_tip = Activity::Tip(123);
366        let restored_activity_tip: Activity<MinSig, sha256::Digest> =
367            Activity::decode(activity_tip.encode()).unwrap();
368        assert_eq!(activity_tip, restored_activity_tip);
369    }
370
371    #[test]
372    fn test_activity_invalid_enum() {
373        let mut buf = BytesMut::new();
374        3u8.write(&mut buf); // Invalid discriminant
375
376        let result = Activity::<MinSig, sha256::Digest>::decode(&buf[..]);
377        assert!(matches!(
378            result,
379            Err(CodecError::Invalid(
380                "consensus::aggregation::Activity",
381                "Invalid type"
382            ))
383        ));
384    }
385}