1use crate::{aggregation::scheme, types::Epoch};
4use bytes::{Buf, BufMut, Bytes};
5use commonware_codec::{
6 varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
7};
8use commonware_cryptography::{
9 certificate::{Attestation, Scheme, Subject},
10 Digest,
11};
12use commonware_utils::union;
13use futures::channel::oneshot;
14use rand::{CryptoRng, Rng};
15use std::hash::Hash;
16
17#[derive(Debug, thiserror::Error)]
19pub enum Error {
20 #[error("Application verify error: {0}")]
23 AppProposeCanceled(oneshot::Canceled),
24
25 #[error("Unable to send message")]
28 UnableToSendMessage,
29
30 #[error("Epoch {0} has no validator {1}")]
33 UnknownValidator(Epoch, String),
34 #[error("Not a signer at epoch {0}")]
36 NotSigner(Epoch),
37
38 #[error("Peer mismatch")]
41 PeerMismatch,
42
43 #[error("Invalid ack signature")]
46 InvalidAckSignature,
47
48 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
51 AckEpochOutsideBounds(Epoch, Epoch, Epoch),
52 #[error("Non-useful ack index {0}")]
54 AckIndex(Index),
55 #[error("Invalid ack digest {0}")]
57 AckDigest(Index),
58 #[error("Duplicate ack from sender {0} for index {1}")]
60 AckDuplicate(String, Index),
61 #[error("Ack for index {0} already has been certified")]
63 AckCertified(Index),
64 #[error("Unknown epoch {0}")]
66 UnknownEpoch(Epoch),
67}
68
69impl Error {
70 pub const fn blockable(&self) -> bool {
72 matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
73 }
74}
75
76pub type Index = u64;
79
80const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
83
84#[inline]
89fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
90 union(namespace, ACK_SUFFIX)
91}
92
93#[derive(Clone, Debug, PartialEq, Eq, Hash)]
96pub struct Item<D: Digest> {
97 pub index: Index,
99 pub digest: D,
101}
102
103impl<D: Digest> Write for Item<D> {
104 fn write(&self, writer: &mut impl BufMut) {
105 UInt(self.index).write(writer);
106 self.digest.write(writer);
107 }
108}
109
110impl<D: Digest> Read for Item<D> {
111 type Cfg = ();
112
113 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
114 let index = UInt::read(reader)?.into();
115 let digest = D::read(reader)?;
116 Ok(Self { index, digest })
117 }
118}
119
120impl<D: Digest> EncodeSize for Item<D> {
121 fn encode_size(&self) -> usize {
122 UInt(self.index).encode_size() + self.digest.encode_size()
123 }
124}
125
126impl<D: Digest> Subject for &Item<D> {
127 fn namespace_and_message(&self, namespace: &[u8]) -> (Bytes, Bytes) {
128 (ack_namespace(namespace).into(), self.encode().freeze())
129 }
130}
131
132#[cfg(feature = "arbitrary")]
133impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
134where
135 D: for<'a> arbitrary::Arbitrary<'a>,
136{
137 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
138 let index = u.arbitrary::<u64>()?;
139 let digest = u.arbitrary::<D>()?;
140 Ok(Self { index, digest })
141 }
142}
143
144#[derive(Clone, Debug, PartialEq, Eq, Hash)]
147pub struct Ack<S: Scheme, D: Digest> {
148 pub item: Item<D>,
150 pub epoch: Epoch,
152 pub attestation: Attestation<S>,
154}
155
156impl<S: Scheme, D: Digest> Ack<S, D> {
157 pub fn verify(&self, scheme: &S, namespace: &[u8]) -> bool
162 where
163 S: scheme::Scheme<D>,
164 {
165 scheme.verify_attestation::<D>(namespace, &self.item, &self.attestation)
166 }
167
168 pub fn sign(scheme: &S, namespace: &[u8], epoch: Epoch, item: Item<D>) -> Option<Self>
176 where
177 S: scheme::Scheme<D>,
178 {
179 let attestation = scheme.sign::<D>(namespace, &item)?;
180 Some(Self {
181 item,
182 epoch,
183 attestation,
184 })
185 }
186}
187
188impl<S: Scheme, D: Digest> Write for Ack<S, D> {
189 fn write(&self, writer: &mut impl BufMut) {
190 self.item.write(writer);
191 self.epoch.write(writer);
192 self.attestation.write(writer);
193 }
194}
195
196impl<S: Scheme, D: Digest> Read for Ack<S, D> {
197 type Cfg = ();
198
199 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
200 let item = Item::read(reader)?;
201 let epoch = Epoch::read(reader)?;
202 let attestation = Attestation::read(reader)?;
203 Ok(Self {
204 item,
205 epoch,
206 attestation,
207 })
208 }
209}
210
211impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
212 fn encode_size(&self) -> usize {
213 self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
214 }
215}
216
217#[cfg(feature = "arbitrary")]
218impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
219where
220 S::Signature: for<'a> arbitrary::Arbitrary<'a>,
221 D: for<'a> arbitrary::Arbitrary<'a>,
222{
223 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
224 let item = u.arbitrary::<Item<D>>()?;
225 let epoch = u.arbitrary::<Epoch>()?;
226 let attestation = Attestation::arbitrary(u)?;
227 Ok(Self {
228 item,
229 epoch,
230 attestation,
231 })
232 }
233}
234
235#[derive(Clone, Debug, PartialEq, Eq, Hash)]
238pub struct TipAck<S: Scheme, D: Digest> {
239 pub tip: Index,
241
242 pub ack: Ack<S, D>,
244}
245
246impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
247 fn write(&self, writer: &mut impl BufMut) {
248 UInt(self.tip).write(writer);
249 self.ack.write(writer);
250 }
251}
252
253impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
254 type Cfg = ();
255
256 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
257 let tip = UInt::read(reader)?.into();
258 let ack = Ack::read(reader)?;
259 Ok(Self { tip, ack })
260 }
261}
262
263impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
264 fn encode_size(&self) -> usize {
265 UInt(self.tip).encode_size() + self.ack.encode_size()
266 }
267}
268
269#[cfg(feature = "arbitrary")]
270impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
271where
272 D: for<'a> arbitrary::Arbitrary<'a>,
273 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
274{
275 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
276 let tip = u.arbitrary::<u64>()?;
277 let ack = u.arbitrary::<Ack<S, D>>()?;
278 Ok(Self { tip, ack })
279 }
280}
281
282#[derive(Clone, Debug, PartialEq, Eq, Hash)]
284pub struct Certificate<S: Scheme, D: Digest> {
285 pub item: Item<D>,
287 pub certificate: S::Certificate,
289}
290
291impl<S: Scheme, D: Digest> Certificate<S, D> {
292 pub fn from_acks<'a>(scheme: &S, acks: impl IntoIterator<Item = &'a Ack<S, D>>) -> Option<Self>
293 where
294 S: scheme::Scheme<D>,
295 {
296 let mut iter = acks.into_iter().peekable();
297 let item = iter.peek()?.item.clone();
298 let attestations = iter
299 .filter(|ack| ack.item == item)
300 .map(|ack| ack.attestation.clone());
301 let certificate = scheme.assemble(attestations)?;
302
303 Some(Self { item, certificate })
304 }
305
306 pub fn verify<R>(&self, rng: &mut R, scheme: &S, namespace: &[u8]) -> bool
308 where
309 R: Rng + CryptoRng,
310 S: scheme::Scheme<D>,
311 {
312 scheme.verify_certificate::<_, D>(rng, namespace, &self.item, &self.certificate)
313 }
314}
315
316impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
317 fn write(&self, writer: &mut impl BufMut) {
318 self.item.write(writer);
319 self.certificate.write(writer);
320 }
321}
322
323impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
324 type Cfg = <S::Certificate as Read>::Cfg;
325
326 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
327 let item = Item::read(reader)?;
328 let certificate = S::Certificate::read_cfg(reader, cfg)?;
329 Ok(Self { item, certificate })
330 }
331}
332
333impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
334 fn encode_size(&self) -> usize {
335 self.item.encode_size() + self.certificate.encode_size()
336 }
337}
338
339#[cfg(feature = "arbitrary")]
340impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
341where
342 D: for<'a> arbitrary::Arbitrary<'a>,
343 S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
344{
345 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
346 let item = u.arbitrary::<Item<D>>()?;
347 let certificate = u.arbitrary::<S::Certificate>()?;
348 Ok(Self { item, certificate })
349 }
350}
351
352#[derive(Clone, Debug, PartialEq)]
356pub enum Activity<S: Scheme, D: Digest> {
357 Ack(Ack<S, D>),
359
360 Certified(Certificate<S, D>),
362
363 Tip(Index),
365}
366
367impl<S: Scheme, D: Digest> Write for Activity<S, D> {
368 fn write(&self, writer: &mut impl BufMut) {
369 match self {
370 Self::Ack(ack) => {
371 0u8.write(writer);
372 ack.write(writer);
373 }
374 Self::Certified(certificate) => {
375 1u8.write(writer);
376 certificate.write(writer);
377 }
378 Self::Tip(index) => {
379 2u8.write(writer);
380 UInt(*index).write(writer);
381 }
382 }
383 }
384}
385
386impl<S: Scheme, D: Digest> Read for Activity<S, D> {
387 type Cfg = <S::Certificate as Read>::Cfg;
388
389 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
390 match u8::read(reader)? {
391 0 => Ok(Self::Ack(Ack::read(reader)?)),
392 1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
393 2 => Ok(Self::Tip(UInt::read(reader)?.into())),
394 _ => Err(CodecError::Invalid(
395 "consensus::aggregation::Activity",
396 "Invalid type",
397 )),
398 }
399 }
400}
401
402impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
403 fn encode_size(&self) -> usize {
404 1 + match self {
405 Self::Ack(ack) => ack.encode_size(),
406 Self::Certified(certificate) => certificate.encode_size(),
407 Self::Tip(index) => UInt(*index).encode_size(),
408 }
409 }
410}
411
412#[cfg(feature = "arbitrary")]
413impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
414where
415 D: for<'a> arbitrary::Arbitrary<'a>,
416 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
417 Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
418{
419 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
420 let choice = u.int_in_range(0..=2)?;
421 match choice {
422 0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
423 1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
424 2 => Ok(Self::Tip(u.arbitrary::<u64>()?)),
425 _ => unreachable!(),
426 }
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::aggregation::scheme::{bls12381_multisig, bls12381_threshold, ed25519, Scheme};
434 use bytes::BytesMut;
435 use commonware_codec::{Decode, DecodeExt, Encode};
436 use commonware_cryptography::{
437 bls12381::primitives::variant::{MinPk, MinSig},
438 certificate::mocks::Fixture,
439 Hasher, Sha256,
440 };
441 use commonware_utils::ordered::Quorum;
442 use rand::{rngs::StdRng, SeedableRng};
443
444 const NAMESPACE: &[u8] = b"test";
445
446 type Sha256Digest = <Sha256 as Hasher>::Digest;
447
448 fn setup<S, F>(n: u32, fixture: F) -> Fixture<S>
450 where
451 F: FnOnce(&mut StdRng, u32) -> Fixture<S>,
452 {
453 let mut rng = StdRng::seed_from_u64(0);
454 fixture(&mut rng, n)
455 }
456
457 #[test]
458 fn test_ack_namespace() {
459 let namespace = b"test_namespace";
460 let expected = [namespace, ACK_SUFFIX].concat();
461 assert_eq!(ack_namespace(namespace), expected);
462 }
463
464 fn codec<S, F>(fixture: F)
465 where
466 S: Scheme<Sha256Digest>,
467 F: FnOnce(&mut StdRng, u32) -> Fixture<S>,
468 {
469 let fixture = setup(4, fixture);
470 let schemes = &fixture.schemes;
471 let item = Item {
472 index: 100,
473 digest: Sha256::hash(b"test_item"),
474 };
475
476 let restored_item = Item::decode(item.encode()).unwrap();
478 assert_eq!(item, restored_item);
479
480 let ack = Ack::sign(&schemes[0], NAMESPACE, Epoch::new(1), item.clone()).unwrap();
482 let cfg = schemes[0].certificate_codec_config();
483 let encoded_ack = ack.encode();
484 let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
485
486 assert_eq!(restored_ack.item, item);
488 assert_eq!(restored_ack.epoch, Epoch::new(1));
489 assert!(restored_ack.verify(&schemes[0], NAMESPACE));
490
491 let tip_ack = TipAck {
493 ack: ack.clone(),
494 tip: 42,
495 };
496 let encoded_tip_ack = tip_ack.encode();
497 let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
498 assert_eq!(restored_tip_ack.tip, 42);
499 assert_eq!(restored_tip_ack.ack.item, item);
500 assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
501
502 let activity_ack = Activity::Ack(ack);
504 let encoded_activity = activity_ack.encode();
505 let restored_activity_ack: Activity<S, Sha256Digest> =
506 Activity::decode_cfg(encoded_activity, &cfg).unwrap();
507 if let Activity::Ack(restored) = restored_activity_ack {
508 assert_eq!(restored.item, item);
509 assert_eq!(restored.epoch, Epoch::new(1));
510 } else {
511 panic!("Expected Activity::Ack");
512 }
513
514 let acks: Vec<_> = schemes
517 .iter()
518 .take(schemes[0].participants().quorum() as usize)
519 .filter_map(|scheme| Ack::sign(scheme, NAMESPACE, Epoch::new(1), item.clone()))
520 .collect();
521
522 let certificate = Certificate::from_acks(&schemes[0], &acks).unwrap();
523 let mut rng = StdRng::seed_from_u64(0);
524 assert!(certificate.verify(&mut rng, &schemes[0], NAMESPACE));
525
526 let activity_certified = Activity::Certified(certificate.clone());
527 let encoded_certified = activity_certified.encode();
528 let restored_activity_certified: Activity<S, Sha256Digest> =
529 Activity::decode_cfg(encoded_certified, &cfg).unwrap();
530 if let Activity::Certified(restored) = restored_activity_certified {
531 assert_eq!(restored.item, item);
532 assert!(restored.verify(&mut rng, &schemes[0], NAMESPACE));
533 } else {
534 panic!("Expected Activity::Certified");
535 }
536
537 let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(123);
539 let encoded_tip = activity_tip.encode();
540 let restored_activity_tip: Activity<S, Sha256Digest> =
541 Activity::decode_cfg(encoded_tip, &cfg).unwrap();
542 if let Activity::Tip(index) = restored_activity_tip {
543 assert_eq!(index, 123);
544 } else {
545 panic!("Expected Activity::Tip");
546 }
547 }
548
549 #[test]
550 fn test_codec() {
551 codec(ed25519::fixture);
552 codec(bls12381_multisig::fixture::<MinPk, _>);
553 codec(bls12381_multisig::fixture::<MinSig, _>);
554 codec(bls12381_threshold::fixture::<MinPk, _>);
555 codec(bls12381_threshold::fixture::<MinSig, _>);
556 }
557
558 fn activity_invalid_enum<S, F>(fixture: F)
559 where
560 S: Scheme<Sha256Digest>,
561 F: FnOnce(&mut StdRng, u32) -> Fixture<S>,
562 {
563 let fixture = setup(4, fixture);
564 let mut buf = BytesMut::new();
565 3u8.write(&mut buf); let cfg = fixture.schemes[0].certificate_codec_config();
568 let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
569 assert!(matches!(
570 result,
571 Err(CodecError::Invalid(
572 "consensus::aggregation::Activity",
573 "Invalid type"
574 ))
575 ));
576 }
577
578 #[test]
579 fn test_activity_invalid_enum() {
580 activity_invalid_enum(ed25519::fixture);
581 activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
582 activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
583 activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
584 activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
585 }
586
587 #[cfg(feature = "arbitrary")]
588 mod conformance {
589 use super::*;
590 use crate::aggregation::scheme::bls12381_threshold;
591 use commonware_codec::conformance::CodecConformance;
592 use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
593
594 type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
595
596 commonware_conformance::conformance_tests! {
597 CodecConformance<Item<Sha256Digest>>,
598 CodecConformance<Ack<Scheme, Sha256Digest>>,
599 CodecConformance<TipAck<Scheme, Sha256Digest>>,
600 CodecConformance<Certificate<Scheme, Sha256Digest>>,
601 CodecConformance<Activity<Scheme, Sha256Digest>>,
602 }
603 }
604}