1use crate::{
4 aggregation::scheme,
5 types::{Epoch, Height},
6 Heightable,
7};
8use bytes::{Buf, BufMut, Bytes};
9use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write};
10use commonware_cryptography::{
11 certificate::{self, Attestation, Scheme, Subject},
12 Digest,
13};
14use commonware_parallel::Strategy;
15use commonware_utils::{channel::oneshot, union, N3f1};
16use rand_core::CryptoRngCore;
17use std::hash::Hash;
18
19#[derive(Debug, thiserror::Error)]
21pub enum Error {
22 #[error("Application verify error: {0}")]
25 AppProposeCanceled(oneshot::error::RecvError),
26
27 #[error("Unable to send message")]
30 UnableToSendMessage,
31
32 #[error("Epoch {0} has no validator {1}")]
35 UnknownValidator(Epoch, String),
36 #[error("Not a signer at epoch {0}")]
38 NotSigner(Epoch),
39
40 #[error("Peer mismatch")]
43 PeerMismatch,
44
45 #[error("Invalid ack signature")]
48 InvalidAckSignature,
49
50 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
53 AckEpochOutsideBounds(Epoch, Epoch, Epoch),
54 #[error("Non-useful ack height {0}")]
56 AckHeight(Height),
57 #[error("Invalid ack digest {0}")]
59 AckDigest(Height),
60 #[error("Duplicate ack from sender {0} for height {1}")]
62 AckDuplicate(String, Height),
63 #[error("Ack for height {0} already has been certified")]
65 AckCertified(Height),
66 #[error("Unknown epoch {0}")]
68 UnknownEpoch(Epoch),
69}
70
71impl Error {
72 pub const fn blockable(&self) -> bool {
74 matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
75 }
76}
77
78const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
81
82#[inline]
87fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
88 union(namespace, ACK_SUFFIX)
89}
90
91#[derive(Clone, Debug)]
96pub struct Namespace(Vec<u8>);
97
98impl certificate::Namespace for Namespace {
99 fn derive(namespace: &[u8]) -> Self {
100 Self(ack_namespace(namespace))
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, Hash)]
107pub struct Item<D: Digest> {
108 pub height: Height,
110 pub digest: D,
112}
113
114impl<D: Digest> Heightable for Item<D> {
115 fn height(&self) -> Height {
116 self.height
117 }
118}
119
120impl<D: Digest> Write for Item<D> {
121 fn write(&self, writer: &mut impl BufMut) {
122 self.height.write(writer);
123 self.digest.write(writer);
124 }
125}
126
127impl<D: Digest> Read for Item<D> {
128 type Cfg = ();
129
130 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
131 let height = Height::read(reader)?;
132 let digest = D::read(reader)?;
133 Ok(Self { height, digest })
134 }
135}
136
137impl<D: Digest> EncodeSize for Item<D> {
138 fn encode_size(&self) -> usize {
139 self.height.encode_size() + self.digest.encode_size()
140 }
141}
142
143impl<D: Digest> Subject for &Item<D> {
144 type Namespace = Namespace;
145
146 fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
147 &derived.0
148 }
149
150 fn message(&self) -> Bytes {
151 self.encode()
152 }
153}
154
155#[cfg(feature = "arbitrary")]
156impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
157where
158 D: for<'a> arbitrary::Arbitrary<'a>,
159{
160 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
161 let height = u.arbitrary::<Height>()?;
162 let digest = u.arbitrary::<D>()?;
163 Ok(Self { height, digest })
164 }
165}
166
167#[derive(Clone, Debug, PartialEq, Eq, Hash)]
170pub struct Ack<S: Scheme, D: Digest> {
171 pub item: Item<D>,
173 pub epoch: Epoch,
175 pub attestation: Attestation<S>,
177}
178
179impl<S: Scheme, D: Digest> Ack<S, D> {
180 pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
185 where
186 R: CryptoRngCore,
187 S: scheme::Scheme<D>,
188 {
189 scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation, strategy)
190 }
191
192 pub fn sign(scheme: &S, epoch: Epoch, item: Item<D>) -> Option<Self>
200 where
201 S: scheme::Scheme<D>,
202 {
203 let attestation = scheme.sign::<D>(&item)?;
204 Some(Self {
205 item,
206 epoch,
207 attestation,
208 })
209 }
210}
211
212impl<S: Scheme, D: Digest> Write for Ack<S, D> {
213 fn write(&self, writer: &mut impl BufMut) {
214 self.item.write(writer);
215 self.epoch.write(writer);
216 self.attestation.write(writer);
217 }
218}
219
220impl<S: Scheme, D: Digest> Read for Ack<S, D> {
221 type Cfg = ();
222
223 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
224 let item = Item::read(reader)?;
225 let epoch = Epoch::read(reader)?;
226 let attestation = Attestation::read(reader)?;
227 Ok(Self {
228 item,
229 epoch,
230 attestation,
231 })
232 }
233}
234
235impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
236 fn encode_size(&self) -> usize {
237 self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
238 }
239}
240
241#[cfg(feature = "arbitrary")]
242impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
243where
244 S::Signature: for<'a> arbitrary::Arbitrary<'a>,
245 D: for<'a> arbitrary::Arbitrary<'a>,
246{
247 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
248 let item = u.arbitrary::<Item<D>>()?;
249 let epoch = u.arbitrary::<Epoch>()?;
250 let attestation = Attestation::arbitrary(u)?;
251 Ok(Self {
252 item,
253 epoch,
254 attestation,
255 })
256 }
257}
258
259#[derive(Clone, Debug, PartialEq, Eq, Hash)]
262pub struct TipAck<S: Scheme, D: Digest> {
263 pub tip: Height,
265
266 pub ack: Ack<S, D>,
268}
269
270impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
271 fn write(&self, writer: &mut impl BufMut) {
272 self.tip.write(writer);
273 self.ack.write(writer);
274 }
275}
276
277impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
278 type Cfg = ();
279
280 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
281 let tip = Height::read(reader)?;
282 let ack = Ack::read(reader)?;
283 Ok(Self { tip, ack })
284 }
285}
286
287impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
288 fn encode_size(&self) -> usize {
289 self.tip.encode_size() + self.ack.encode_size()
290 }
291}
292
293#[cfg(feature = "arbitrary")]
294impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
295where
296 D: for<'a> arbitrary::Arbitrary<'a>,
297 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
298{
299 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
300 let tip = u.arbitrary::<Height>()?;
301 let ack = u.arbitrary::<Ack<S, D>>()?;
302 Ok(Self { tip, ack })
303 }
304}
305
306#[derive(Clone, Debug, PartialEq, Eq, Hash)]
308pub struct Certificate<S: Scheme, D: Digest> {
309 pub item: Item<D>,
311 pub certificate: S::Certificate,
313}
314
315impl<S: Scheme, D: Digest> Certificate<S, D> {
316 pub fn from_acks<'a, I>(scheme: &S, acks: I, strategy: &impl Strategy) -> Option<Self>
317 where
318 S: scheme::Scheme<D>,
319 I: IntoIterator<Item = &'a Ack<S, D>>,
320 I::IntoIter: Send,
321 {
322 let mut iter = acks.into_iter().peekable();
323 let item = iter.peek()?.item.clone();
324 let attestations = iter
325 .filter(|ack| ack.item == item)
326 .map(|ack| ack.attestation.clone());
327 let certificate = scheme.assemble::<_, N3f1>(attestations.into_iter(), strategy)?;
328
329 Some(Self { item, certificate })
330 }
331
332 pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
334 where
335 R: CryptoRngCore,
336 S: scheme::Scheme<D>,
337 {
338 scheme.verify_certificate::<_, D, N3f1>(rng, &self.item, &self.certificate, strategy)
339 }
340}
341
342impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
343 fn write(&self, writer: &mut impl BufMut) {
344 self.item.write(writer);
345 self.certificate.write(writer);
346 }
347}
348
349impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
350 type Cfg = <S::Certificate as Read>::Cfg;
351
352 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
353 let item = Item::read(reader)?;
354 let certificate = S::Certificate::read_cfg(reader, cfg)?;
355 Ok(Self { item, certificate })
356 }
357}
358
359impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
360 fn encode_size(&self) -> usize {
361 self.item.encode_size() + self.certificate.encode_size()
362 }
363}
364
365#[cfg(feature = "arbitrary")]
366impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
367where
368 D: for<'a> arbitrary::Arbitrary<'a>,
369 S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
370{
371 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
372 let item = u.arbitrary::<Item<D>>()?;
373 let certificate = u.arbitrary::<S::Certificate>()?;
374 Ok(Self { item, certificate })
375 }
376}
377
378#[derive(Clone, Debug, PartialEq)]
382pub enum Activity<S: Scheme, D: Digest> {
383 Ack(Ack<S, D>),
385
386 Certified(Certificate<S, D>),
388
389 Tip(Height),
391}
392
393impl<S: Scheme, D: Digest> Write for Activity<S, D> {
394 fn write(&self, writer: &mut impl BufMut) {
395 match self {
396 Self::Ack(ack) => {
397 0u8.write(writer);
398 ack.write(writer);
399 }
400 Self::Certified(certificate) => {
401 1u8.write(writer);
402 certificate.write(writer);
403 }
404 Self::Tip(height) => {
405 2u8.write(writer);
406 height.write(writer);
407 }
408 }
409 }
410}
411
412impl<S: Scheme, D: Digest> Read for Activity<S, D> {
413 type Cfg = <S::Certificate as Read>::Cfg;
414
415 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
416 match u8::read(reader)? {
417 0 => Ok(Self::Ack(Ack::read(reader)?)),
418 1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
419 2 => Ok(Self::Tip(Height::read(reader)?)),
420 _ => Err(CodecError::Invalid(
421 "consensus::aggregation::Activity",
422 "Invalid type",
423 )),
424 }
425 }
426}
427
428impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
429 fn encode_size(&self) -> usize {
430 1 + match self {
431 Self::Ack(ack) => ack.encode_size(),
432 Self::Certified(certificate) => certificate.encode_size(),
433 Self::Tip(height) => height.encode_size(),
434 }
435 }
436}
437
438#[cfg(feature = "arbitrary")]
439impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
440where
441 D: for<'a> arbitrary::Arbitrary<'a>,
442 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
443 Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
444{
445 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
446 let choice = u.int_in_range(0..=2)?;
447 match choice {
448 0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
449 1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
450 2 => Ok(Self::Tip(u.arbitrary::<Height>()?)),
451 _ => unreachable!(),
452 }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use crate::aggregation::scheme::{
460 bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme,
461 };
462 use bytes::BytesMut;
463 use commonware_codec::{Decode, DecodeExt, Encode};
464 use commonware_cryptography::{
465 bls12381::primitives::variant::{MinPk, MinSig},
466 certificate::mocks::Fixture,
467 Hasher, Sha256,
468 };
469 use commonware_parallel::Sequential;
470 use commonware_utils::{ordered::Quorum, test_rng, N3f1};
471 use rand::rngs::StdRng;
472
473 const NAMESPACE: &[u8] = b"test";
474
475 type Sha256Digest = <Sha256 as Hasher>::Digest;
476
477 #[test]
478 fn test_ack_namespace() {
479 let namespace = b"test_namespace";
480 let expected = [namespace, ACK_SUFFIX].concat();
481 assert_eq!(ack_namespace(namespace), expected);
482 }
483
484 fn codec<S, F>(fixture: F)
485 where
486 S: Scheme<Sha256Digest>,
487 F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
488 {
489 let mut rng = test_rng();
490 let fixture = fixture(&mut rng, NAMESPACE, 4);
491 let schemes = &fixture.schemes;
492 let item = Item {
493 height: Height::new(100),
494 digest: Sha256::hash(b"test_item"),
495 };
496
497 let restored_item = Item::decode(item.encode()).unwrap();
499 assert_eq!(item, restored_item);
500
501 let ack = Ack::sign(&schemes[0], Epoch::new(1), item.clone()).unwrap();
503 let cfg = schemes[0].certificate_codec_config();
504 let encoded_ack = ack.encode();
505 let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
506
507 assert_eq!(restored_ack.item, item);
509 assert_eq!(restored_ack.epoch, Epoch::new(1));
510 assert!(restored_ack.verify(&mut rng, &schemes[0], &Sequential));
511
512 let tip_ack = TipAck {
514 ack: ack.clone(),
515 tip: Height::new(42),
516 };
517 let encoded_tip_ack = tip_ack.encode();
518 let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
519 assert_eq!(restored_tip_ack.tip, Height::new(42));
520 assert_eq!(restored_tip_ack.ack.item, item);
521 assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
522
523 let activity_ack = Activity::Ack(ack);
525 let encoded_activity = activity_ack.encode();
526 let restored_activity_ack: Activity<S, Sha256Digest> =
527 Activity::decode_cfg(encoded_activity, &cfg).unwrap();
528 if let Activity::Ack(restored) = restored_activity_ack {
529 assert_eq!(restored.item, item);
530 assert_eq!(restored.epoch, Epoch::new(1));
531 } else {
532 panic!("Expected Activity::Ack");
533 }
534
535 let acks: Vec<_> = schemes
538 .iter()
539 .take(schemes[0].participants().quorum::<N3f1>() as usize)
540 .filter_map(|scheme| Ack::sign(scheme, Epoch::new(1), item.clone()))
541 .collect();
542
543 let certificate = Certificate::from_acks(&schemes[0], &acks, &Sequential).unwrap();
544 assert!(certificate.verify(&mut rng, &schemes[0], &Sequential));
545
546 let activity_certified = Activity::Certified(certificate.clone());
547 let encoded_certified = activity_certified.encode();
548 let restored_activity_certified: Activity<S, Sha256Digest> =
549 Activity::decode_cfg(encoded_certified, &cfg).unwrap();
550 if let Activity::Certified(restored) = restored_activity_certified {
551 assert_eq!(restored.item, item);
552 assert!(restored.verify(&mut rng, &schemes[0], &Sequential));
553 } else {
554 panic!("Expected Activity::Certified");
555 }
556
557 let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(Height::new(123));
559 let encoded_tip = activity_tip.encode();
560 let restored_activity_tip: Activity<S, Sha256Digest> =
561 Activity::decode_cfg(encoded_tip, &cfg).unwrap();
562 if let Activity::Tip(height) = restored_activity_tip {
563 assert_eq!(height, Height::new(123));
564 } else {
565 panic!("Expected Activity::Tip");
566 }
567 }
568
569 #[test]
570 fn test_codec() {
571 codec(ed25519::fixture);
572 codec(secp256r1::fixture);
573 codec(bls12381_multisig::fixture::<MinPk, _>);
574 codec(bls12381_multisig::fixture::<MinSig, _>);
575 codec(bls12381_threshold::fixture::<MinPk, _>);
576 codec(bls12381_threshold::fixture::<MinSig, _>);
577 }
578
579 fn activity_invalid_enum<S, F>(fixture: F)
580 where
581 S: Scheme<Sha256Digest>,
582 F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
583 {
584 let fixture = fixture(&mut test_rng(), NAMESPACE, 4);
585 let mut buf = BytesMut::new();
586 3u8.write(&mut buf); let cfg = fixture.schemes[0].certificate_codec_config();
589 let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
590 assert!(matches!(
591 result,
592 Err(CodecError::Invalid(
593 "consensus::aggregation::Activity",
594 "Invalid type"
595 ))
596 ));
597 }
598
599 #[test]
600 fn test_activity_invalid_enum() {
601 activity_invalid_enum(ed25519::fixture);
602 activity_invalid_enum(secp256r1::fixture);
603 activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
604 activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
605 activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
606 activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
607 }
608
609 #[cfg(feature = "arbitrary")]
610 mod conformance {
611 use super::*;
612 use crate::aggregation::scheme::bls12381_threshold;
613 use commonware_codec::conformance::CodecConformance;
614 use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
615
616 type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
617
618 commonware_conformance::conformance_tests! {
619 CodecConformance<Item<Sha256Digest>>,
620 CodecConformance<Ack<Scheme, Sha256Digest>>,
621 CodecConformance<TipAck<Scheme, Sha256Digest>>,
622 CodecConformance<Certificate<Scheme, Sha256Digest>>,
623 CodecConformance<Activity<Scheme, Sha256Digest>>,
624 }
625 }
626}