1use crate::{types::Epoch, Epochable};
4use bytes::{Buf, BufMut};
5use commonware_codec::{
6 varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
7};
8use commonware_cryptography::{
9 bls12381::primitives::{group::Share, ops, poly::PartialSignature, variant::Variant},
10 Digest,
11};
12use commonware_utils::union;
13use futures::channel::oneshot;
14use std::hash::Hash;
15
16#[derive(Debug, thiserror::Error)]
18pub enum Error {
19 #[error("Application verify error: {0}")]
22 AppProposeCanceled(oneshot::Canceled),
23
24 #[error("Unable to send message")]
27 UnableToSendMessage,
28
29 #[error("Epoch {0} has no validator {1}")]
32 UnknownValidator(u64, String),
33 #[error("Unknown share at epoch {0}")]
35 UnknownShare(u64),
36
37 #[error("Peer mismatch")]
40 PeerMismatch,
41
42 #[error("Invalid ack signature")]
45 InvalidAckSignature,
46
47 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
50 AckEpochOutsideBounds(u64, u64, u64),
51 #[error("Non-useful ack index {0}")]
53 AckIndex(u64),
54 #[error("Invalid ack digest {0}")]
56 AckDigest(u64),
57 #[error("Duplicate ack from sender {0} for index {1}")]
59 AckDuplicate(String, u64),
60 #[error("Ack for index {0} already has a threshold")]
62 AckThresholded(u64),
63 #[error("Unknown epoch {0}")]
65 UnknownEpoch(u64),
66}
67
68impl Error {
69 pub fn blockable(&self) -> bool {
71 matches!(self, Error::PeerMismatch | Error::InvalidAckSignature)
72 }
73}
74
75pub type Index = u64;
78
79impl Epochable for Index {
80 type Epoch = ();
81
82 fn epoch(&self) -> Self::Epoch {}
83}
84
85const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
88
89#[inline]
94fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
95 union(namespace, ACK_SUFFIX)
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Hash)]
101pub struct Item<D: Digest> {
102 pub index: Index,
104 pub digest: D,
106}
107
108impl<D: Digest> Write for Item<D> {
109 fn write(&self, writer: &mut impl BufMut) {
110 UInt(self.index).write(writer);
111 self.digest.write(writer);
112 }
113}
114
115impl<D: Digest> Read for Item<D> {
116 type Cfg = ();
117
118 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
119 let index = UInt::read(reader)?.into();
120 let digest = D::read(reader)?;
121 Ok(Self { index, digest })
122 }
123}
124
125impl<D: Digest> EncodeSize for Item<D> {
126 fn encode_size(&self) -> usize {
127 UInt(self.index).encode_size() + self.digest.encode_size()
128 }
129}
130
131#[derive(Clone, Debug, PartialEq, Eq, Hash)]
134pub struct Ack<V: Variant, D: Digest> {
135 pub item: Item<D>,
137 pub epoch: Epoch,
139 pub signature: PartialSignature<V>,
141}
142
143impl<V: Variant, D: Digest> Ack<V, D> {
144 pub fn verify(&self, namespace: &[u8], polynomial: &[V::Public]) -> bool {
149 let Some(public) = polynomial.get(self.signature.index as usize) else {
150 return false;
151 };
152 ops::verify_message::<V>(
153 public,
154 Some(ack_namespace(namespace).as_ref()),
155 self.item.encode().as_ref(),
156 &self.signature.value,
157 )
158 .is_ok()
159 }
160
161 pub fn sign(namespace: &[u8], epoch: Epoch, share: &Share, item: Item<D>) -> Self {
169 let ack_namespace = ack_namespace(namespace);
170 let signature = ops::partial_sign_message::<V>(
171 share,
172 Some(ack_namespace.as_ref()),
173 item.encode().as_ref(),
174 );
175 Self {
176 item,
177 epoch,
178 signature,
179 }
180 }
181}
182
183impl<V: Variant, D: Digest> Write for Ack<V, D> {
184 fn write(&self, writer: &mut impl BufMut) {
185 self.item.write(writer);
186 UInt(self.epoch).write(writer);
187 self.signature.write(writer);
188 }
189}
190
191impl<V: Variant, D: Digest> Read for Ack<V, D> {
192 type Cfg = ();
193
194 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
195 let item = Item::read(reader)?;
196 let epoch = UInt::read(reader)?.into();
197 let signature = PartialSignature::<V>::read(reader)?;
198 Ok(Self {
199 item,
200 epoch,
201 signature,
202 })
203 }
204}
205
206impl<V: Variant, D: Digest> EncodeSize for Ack<V, D> {
207 fn encode_size(&self) -> usize {
208 self.item.encode_size() + UInt(self.epoch).encode_size() + self.signature.encode_size()
209 }
210}
211
212#[derive(Clone, Debug, PartialEq, Eq, Hash)]
215pub struct TipAck<V: Variant, D: Digest> {
216 pub tip: Index,
218
219 pub ack: Ack<V, D>,
221}
222
223impl<V: Variant, D: Digest> Write for TipAck<V, D> {
224 fn write(&self, writer: &mut impl BufMut) {
225 UInt(self.tip).write(writer);
226 self.ack.write(writer);
227 }
228}
229
230impl<V: Variant, D: Digest> Read for TipAck<V, D> {
231 type Cfg = ();
232
233 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
234 let tip = UInt::read(reader)?.into();
235 let ack = Ack::<V, D>::read(reader)?;
236 Ok(Self { tip, ack })
237 }
238}
239
240impl<V: Variant, D: Digest> EncodeSize for TipAck<V, D> {
241 fn encode_size(&self) -> usize {
242 UInt(self.tip).encode_size() + self.ack.encode_size()
243 }
244}
245
246#[derive(Clone, Debug, PartialEq, Eq, Hash)]
248pub struct Certificate<V: Variant, D: Digest> {
249 pub item: Item<D>,
251 pub signature: V::Signature,
253}
254
255impl<V: Variant, D: Digest> Write for Certificate<V, D> {
256 fn write(&self, writer: &mut impl BufMut) {
257 self.item.write(writer);
258 self.signature.write(writer);
259 }
260}
261
262impl<V: Variant, D: Digest> Read for Certificate<V, D> {
263 type Cfg = ();
264
265 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
266 let item = Item::read(reader)?;
267 let signature = V::Signature::read(reader)?;
268 Ok(Self { item, signature })
269 }
270}
271
272impl<V: Variant, D: Digest> EncodeSize for Certificate<V, D> {
273 fn encode_size(&self) -> usize {
274 self.item.encode_size() + self.signature.encode_size()
275 }
276}
277
278impl<V: Variant, D: Digest> Certificate<V, D> {
279 pub fn verify(&self, namespace: &[u8], identity: &V::Public) -> bool {
284 ops::verify_message::<V>(
285 identity,
286 Some(ack_namespace(namespace).as_ref()),
287 self.item.encode().as_ref(),
288 &self.signature,
289 )
290 .is_ok()
291 }
292}
293
294#[derive(Clone, Debug, PartialEq)]
298pub enum Activity<V: Variant, D: Digest> {
299 Ack(Ack<V, D>),
301
302 Certified(Certificate<V, D>),
304
305 Tip(Index),
307}
308
309impl<V: Variant, D: Digest> Write for Activity<V, D> {
310 fn write(&self, writer: &mut impl BufMut) {
311 match self {
312 Activity::Ack(ack) => {
313 0u8.write(writer);
314 ack.write(writer);
315 }
316 Activity::Certified(certificate) => {
317 1u8.write(writer);
318 certificate.write(writer);
319 }
320 Activity::Tip(index) => {
321 2u8.write(writer);
322 UInt(*index).write(writer);
323 }
324 }
325 }
326}
327
328impl<V: Variant, D: Digest> Read for Activity<V, D> {
329 type Cfg = ();
330
331 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
332 match u8::read(reader)? {
333 0 => Ok(Activity::Ack(Ack::read(reader)?)),
334 1 => Ok(Activity::Certified(Certificate::read(reader)?)),
335 2 => Ok(Activity::Tip(UInt::read(reader)?.into())),
336 _ => Err(CodecError::Invalid(
337 "consensus::aggregation::Activity",
338 "Invalid type",
339 )),
340 }
341 }
342}
343
344impl<V: Variant, D: Digest> EncodeSize for Activity<V, D> {
345 fn encode_size(&self) -> usize {
346 1 + match self {
347 Activity::Ack(ack) => ack.encode_size(),
348 Activity::Certified(certificate) => certificate.encode_size(),
349 Activity::Tip(index) => UInt(*index).encode_size(),
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357 use bytes::BytesMut;
358 use commonware_codec::{DecodeExt, Encode};
359 use commonware_cryptography::{
360 bls12381::{
361 dkg::ops::{self, evaluate_all},
362 primitives::{ops::sign_message, variant::MinSig},
363 },
364 Hasher, Sha256,
365 };
366 use rand::{rngs::StdRng, SeedableRng};
367
368 #[test]
369 fn test_ack_namespace() {
370 let namespace = b"test_namespace";
371 let expected = [namespace, ACK_SUFFIX].concat();
372 assert_eq!(ack_namespace(namespace), expected);
373 }
374
375 #[test]
376 fn test_codec() {
377 let namespace = b"test";
378 let mut rng = StdRng::seed_from_u64(0);
379 let (public, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, 4, 3);
380 let polynomial = evaluate_all::<MinSig>(&public, 4);
381 let item = Item {
382 index: 100,
383 digest: Sha256::hash(b"test_item"),
384 };
385
386 let restored_item = Item::decode(item.encode()).unwrap();
388 assert_eq!(item, restored_item);
389
390 let ack: Ack<MinSig, _> = Ack::sign(namespace, 1, &shares[0], item.clone());
392 assert!(ack.verify(namespace, &polynomial));
393 assert!(!ack.verify(b"wrong", &polynomial));
394
395 let restored_ack: Ack<MinSig, <Sha256 as Hasher>::Digest> =
396 Ack::decode(ack.encode()).unwrap();
397 assert_eq!(ack, restored_ack);
398
399 let tip_ack = TipAck { ack, tip: 42 };
401 let restored: TipAck<MinSig, <Sha256 as Hasher>::Digest> =
402 TipAck::decode(tip_ack.encode()).unwrap();
403 assert_eq!(tip_ack, restored);
404
405 let activity_ack = Activity::Ack(Ack::sign(namespace, 1, &shares[0], item.clone()));
407 let restored_activity_ack: Activity<MinSig, <Sha256 as Hasher>::Digest> =
408 Activity::decode(activity_ack.encode()).unwrap();
409 assert_eq!(activity_ack, restored_activity_ack);
410
411 let signature = sign_message::<MinSig>(&shares[0].private, Some(b"test"), b"message");
413 let activity_certified = Activity::Certified(Certificate { item, signature });
414 let restored_activity_certified: Activity<MinSig, <Sha256 as Hasher>::Digest> =
415 Activity::decode(activity_certified.encode()).unwrap();
416 assert_eq!(activity_certified, restored_activity_certified);
417
418 let activity_tip = Activity::Tip(123);
420 let restored_activity_tip: Activity<MinSig, <Sha256 as Hasher>::Digest> =
421 Activity::decode(activity_tip.encode()).unwrap();
422 assert_eq!(activity_tip, restored_activity_tip);
423 }
424
425 #[test]
426 fn test_activity_invalid_enum() {
427 let mut buf = BytesMut::new();
428 3u8.write(&mut buf); let result = Activity::<MinSig, <Sha256 as Hasher>::Digest>::decode(&buf[..]);
431 assert!(matches!(
432 result,
433 Err(CodecError::Invalid(
434 "consensus::aggregation::Activity",
435 "Invalid type"
436 ))
437 ));
438 }
439}