1use crate::{
4 aggregation::scheme,
5 types::{Epoch, Height},
6 Heightable,
7};
8use bytes::{Buf, BufMut, Bytes};
9use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write};
10use commonware_cryptography::{
11 certificate::{self, Attestation, Scheme, Subject},
12 Digest,
13};
14use commonware_parallel::Strategy;
15use commonware_utils::{union, N3f1};
16use futures::channel::oneshot;
17use rand_core::CryptoRngCore;
18use std::hash::Hash;
19
20#[derive(Debug, thiserror::Error)]
22pub enum Error {
23 #[error("Application verify error: {0}")]
26 AppProposeCanceled(oneshot::Canceled),
27
28 #[error("Unable to send message")]
31 UnableToSendMessage,
32
33 #[error("Epoch {0} has no validator {1}")]
36 UnknownValidator(Epoch, String),
37 #[error("Not a signer at epoch {0}")]
39 NotSigner(Epoch),
40
41 #[error("Peer mismatch")]
44 PeerMismatch,
45
46 #[error("Invalid ack signature")]
49 InvalidAckSignature,
50
51 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
54 AckEpochOutsideBounds(Epoch, Epoch, Epoch),
55 #[error("Non-useful ack height {0}")]
57 AckHeight(Height),
58 #[error("Invalid ack digest {0}")]
60 AckDigest(Height),
61 #[error("Duplicate ack from sender {0} for height {1}")]
63 AckDuplicate(String, Height),
64 #[error("Ack for height {0} already has been certified")]
66 AckCertified(Height),
67 #[error("Unknown epoch {0}")]
69 UnknownEpoch(Epoch),
70}
71
72impl Error {
73 pub const fn blockable(&self) -> bool {
75 matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
76 }
77}
78
79const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
82
83#[inline]
88fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
89 union(namespace, ACK_SUFFIX)
90}
91
92#[derive(Clone, Debug)]
97pub struct Namespace(Vec<u8>);
98
99impl certificate::Namespace for Namespace {
100 fn derive(namespace: &[u8]) -> Self {
101 Self(ack_namespace(namespace))
102 }
103}
104
105#[derive(Clone, Debug, PartialEq, Eq, Hash)]
108pub struct Item<D: Digest> {
109 pub height: Height,
111 pub digest: D,
113}
114
115impl<D: Digest> Heightable for Item<D> {
116 fn height(&self) -> Height {
117 self.height
118 }
119}
120
121impl<D: Digest> Write for Item<D> {
122 fn write(&self, writer: &mut impl BufMut) {
123 self.height.write(writer);
124 self.digest.write(writer);
125 }
126}
127
128impl<D: Digest> Read for Item<D> {
129 type Cfg = ();
130
131 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
132 let height = Height::read(reader)?;
133 let digest = D::read(reader)?;
134 Ok(Self { height, digest })
135 }
136}
137
138impl<D: Digest> EncodeSize for Item<D> {
139 fn encode_size(&self) -> usize {
140 self.height.encode_size() + self.digest.encode_size()
141 }
142}
143
144impl<D: Digest> Subject for &Item<D> {
145 type Namespace = Namespace;
146
147 fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
148 &derived.0
149 }
150
151 fn message(&self) -> Bytes {
152 self.encode()
153 }
154}
155
156#[cfg(feature = "arbitrary")]
157impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
158where
159 D: for<'a> arbitrary::Arbitrary<'a>,
160{
161 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
162 let height = u.arbitrary::<Height>()?;
163 let digest = u.arbitrary::<D>()?;
164 Ok(Self { height, digest })
165 }
166}
167
168#[derive(Clone, Debug, PartialEq, Eq, Hash)]
171pub struct Ack<S: Scheme, D: Digest> {
172 pub item: Item<D>,
174 pub epoch: Epoch,
176 pub attestation: Attestation<S>,
178}
179
180impl<S: Scheme, D: Digest> Ack<S, D> {
181 pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
186 where
187 R: CryptoRngCore,
188 S: scheme::Scheme<D>,
189 {
190 scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation, strategy)
191 }
192
193 pub fn sign(scheme: &S, epoch: Epoch, item: Item<D>) -> Option<Self>
201 where
202 S: scheme::Scheme<D>,
203 {
204 let attestation = scheme.sign::<D>(&item)?;
205 Some(Self {
206 item,
207 epoch,
208 attestation,
209 })
210 }
211}
212
213impl<S: Scheme, D: Digest> Write for Ack<S, D> {
214 fn write(&self, writer: &mut impl BufMut) {
215 self.item.write(writer);
216 self.epoch.write(writer);
217 self.attestation.write(writer);
218 }
219}
220
221impl<S: Scheme, D: Digest> Read for Ack<S, D> {
222 type Cfg = ();
223
224 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
225 let item = Item::read(reader)?;
226 let epoch = Epoch::read(reader)?;
227 let attestation = Attestation::read(reader)?;
228 Ok(Self {
229 item,
230 epoch,
231 attestation,
232 })
233 }
234}
235
236impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
237 fn encode_size(&self) -> usize {
238 self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
239 }
240}
241
242#[cfg(feature = "arbitrary")]
243impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
244where
245 S::Signature: for<'a> arbitrary::Arbitrary<'a>,
246 D: for<'a> arbitrary::Arbitrary<'a>,
247{
248 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
249 let item = u.arbitrary::<Item<D>>()?;
250 let epoch = u.arbitrary::<Epoch>()?;
251 let attestation = Attestation::arbitrary(u)?;
252 Ok(Self {
253 item,
254 epoch,
255 attestation,
256 })
257 }
258}
259
260#[derive(Clone, Debug, PartialEq, Eq, Hash)]
263pub struct TipAck<S: Scheme, D: Digest> {
264 pub tip: Height,
266
267 pub ack: Ack<S, D>,
269}
270
271impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
272 fn write(&self, writer: &mut impl BufMut) {
273 self.tip.write(writer);
274 self.ack.write(writer);
275 }
276}
277
278impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
279 type Cfg = ();
280
281 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
282 let tip = Height::read(reader)?;
283 let ack = Ack::read(reader)?;
284 Ok(Self { tip, ack })
285 }
286}
287
288impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
289 fn encode_size(&self) -> usize {
290 self.tip.encode_size() + self.ack.encode_size()
291 }
292}
293
294#[cfg(feature = "arbitrary")]
295impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
296where
297 D: for<'a> arbitrary::Arbitrary<'a>,
298 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
299{
300 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
301 let tip = u.arbitrary::<Height>()?;
302 let ack = u.arbitrary::<Ack<S, D>>()?;
303 Ok(Self { tip, ack })
304 }
305}
306
307#[derive(Clone, Debug, PartialEq, Eq, Hash)]
309pub struct Certificate<S: Scheme, D: Digest> {
310 pub item: Item<D>,
312 pub certificate: S::Certificate,
314}
315
316impl<S: Scheme, D: Digest> Certificate<S, D> {
317 pub fn from_acks<'a>(
318 scheme: &S,
319 acks: impl IntoIterator<Item = &'a Ack<S, D>>,
320 strategy: &impl Strategy,
321 ) -> Option<Self>
322 where
323 S: scheme::Scheme<D>,
324 {
325 let mut iter = acks.into_iter().peekable();
326 let item = iter.peek()?.item.clone();
327 let attestations = iter
328 .filter(|ack| ack.item == item)
329 .map(|ack| ack.attestation.clone());
330 let certificate = scheme.assemble::<_, N3f1>(attestations, strategy)?;
331
332 Some(Self { item, certificate })
333 }
334
335 pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
337 where
338 R: CryptoRngCore,
339 S: scheme::Scheme<D>,
340 {
341 scheme.verify_certificate::<_, D, N3f1>(rng, &self.item, &self.certificate, strategy)
342 }
343}
344
345impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
346 fn write(&self, writer: &mut impl BufMut) {
347 self.item.write(writer);
348 self.certificate.write(writer);
349 }
350}
351
352impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
353 type Cfg = <S::Certificate as Read>::Cfg;
354
355 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
356 let item = Item::read(reader)?;
357 let certificate = S::Certificate::read_cfg(reader, cfg)?;
358 Ok(Self { item, certificate })
359 }
360}
361
362impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
363 fn encode_size(&self) -> usize {
364 self.item.encode_size() + self.certificate.encode_size()
365 }
366}
367
368#[cfg(feature = "arbitrary")]
369impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
370where
371 D: for<'a> arbitrary::Arbitrary<'a>,
372 S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
373{
374 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
375 let item = u.arbitrary::<Item<D>>()?;
376 let certificate = u.arbitrary::<S::Certificate>()?;
377 Ok(Self { item, certificate })
378 }
379}
380
381#[derive(Clone, Debug, PartialEq)]
385pub enum Activity<S: Scheme, D: Digest> {
386 Ack(Ack<S, D>),
388
389 Certified(Certificate<S, D>),
391
392 Tip(Height),
394}
395
396impl<S: Scheme, D: Digest> Write for Activity<S, D> {
397 fn write(&self, writer: &mut impl BufMut) {
398 match self {
399 Self::Ack(ack) => {
400 0u8.write(writer);
401 ack.write(writer);
402 }
403 Self::Certified(certificate) => {
404 1u8.write(writer);
405 certificate.write(writer);
406 }
407 Self::Tip(height) => {
408 2u8.write(writer);
409 height.write(writer);
410 }
411 }
412 }
413}
414
415impl<S: Scheme, D: Digest> Read for Activity<S, D> {
416 type Cfg = <S::Certificate as Read>::Cfg;
417
418 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
419 match u8::read(reader)? {
420 0 => Ok(Self::Ack(Ack::read(reader)?)),
421 1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
422 2 => Ok(Self::Tip(Height::read(reader)?)),
423 _ => Err(CodecError::Invalid(
424 "consensus::aggregation::Activity",
425 "Invalid type",
426 )),
427 }
428 }
429}
430
431impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
432 fn encode_size(&self) -> usize {
433 1 + match self {
434 Self::Ack(ack) => ack.encode_size(),
435 Self::Certified(certificate) => certificate.encode_size(),
436 Self::Tip(height) => height.encode_size(),
437 }
438 }
439}
440
441#[cfg(feature = "arbitrary")]
442impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
443where
444 D: for<'a> arbitrary::Arbitrary<'a>,
445 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
446 Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
447{
448 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
449 let choice = u.int_in_range(0..=2)?;
450 match choice {
451 0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
452 1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
453 2 => Ok(Self::Tip(u.arbitrary::<Height>()?)),
454 _ => unreachable!(),
455 }
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use crate::aggregation::scheme::{
463 bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme,
464 };
465 use bytes::BytesMut;
466 use commonware_codec::{Decode, DecodeExt, Encode};
467 use commonware_cryptography::{
468 bls12381::primitives::variant::{MinPk, MinSig},
469 certificate::mocks::Fixture,
470 Hasher, Sha256,
471 };
472 use commonware_parallel::Sequential;
473 use commonware_utils::{ordered::Quorum, test_rng, N3f1};
474 use rand::rngs::StdRng;
475
476 const NAMESPACE: &[u8] = b"test";
477
478 type Sha256Digest = <Sha256 as Hasher>::Digest;
479
480 #[test]
481 fn test_ack_namespace() {
482 let namespace = b"test_namespace";
483 let expected = [namespace, ACK_SUFFIX].concat();
484 assert_eq!(ack_namespace(namespace), expected);
485 }
486
487 fn codec<S, F>(fixture: F)
488 where
489 S: Scheme<Sha256Digest>,
490 F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
491 {
492 let mut rng = test_rng();
493 let fixture = fixture(&mut rng, NAMESPACE, 4);
494 let schemes = &fixture.schemes;
495 let item = Item {
496 height: Height::new(100),
497 digest: Sha256::hash(b"test_item"),
498 };
499
500 let restored_item = Item::decode(item.encode()).unwrap();
502 assert_eq!(item, restored_item);
503
504 let ack = Ack::sign(&schemes[0], Epoch::new(1), item.clone()).unwrap();
506 let cfg = schemes[0].certificate_codec_config();
507 let encoded_ack = ack.encode();
508 let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
509
510 assert_eq!(restored_ack.item, item);
512 assert_eq!(restored_ack.epoch, Epoch::new(1));
513 assert!(restored_ack.verify(&mut rng, &schemes[0], &Sequential));
514
515 let tip_ack = TipAck {
517 ack: ack.clone(),
518 tip: Height::new(42),
519 };
520 let encoded_tip_ack = tip_ack.encode();
521 let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
522 assert_eq!(restored_tip_ack.tip, Height::new(42));
523 assert_eq!(restored_tip_ack.ack.item, item);
524 assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
525
526 let activity_ack = Activity::Ack(ack);
528 let encoded_activity = activity_ack.encode();
529 let restored_activity_ack: Activity<S, Sha256Digest> =
530 Activity::decode_cfg(encoded_activity, &cfg).unwrap();
531 if let Activity::Ack(restored) = restored_activity_ack {
532 assert_eq!(restored.item, item);
533 assert_eq!(restored.epoch, Epoch::new(1));
534 } else {
535 panic!("Expected Activity::Ack");
536 }
537
538 let acks: Vec<_> = schemes
541 .iter()
542 .take(schemes[0].participants().quorum::<N3f1>() as usize)
543 .filter_map(|scheme| Ack::sign(scheme, Epoch::new(1), item.clone()))
544 .collect();
545
546 let certificate = Certificate::from_acks(&schemes[0], &acks, &Sequential).unwrap();
547 assert!(certificate.verify(&mut rng, &schemes[0], &Sequential));
548
549 let activity_certified = Activity::Certified(certificate.clone());
550 let encoded_certified = activity_certified.encode();
551 let restored_activity_certified: Activity<S, Sha256Digest> =
552 Activity::decode_cfg(encoded_certified, &cfg).unwrap();
553 if let Activity::Certified(restored) = restored_activity_certified {
554 assert_eq!(restored.item, item);
555 assert!(restored.verify(&mut rng, &schemes[0], &Sequential));
556 } else {
557 panic!("Expected Activity::Certified");
558 }
559
560 let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(Height::new(123));
562 let encoded_tip = activity_tip.encode();
563 let restored_activity_tip: Activity<S, Sha256Digest> =
564 Activity::decode_cfg(encoded_tip, &cfg).unwrap();
565 if let Activity::Tip(height) = restored_activity_tip {
566 assert_eq!(height, Height::new(123));
567 } else {
568 panic!("Expected Activity::Tip");
569 }
570 }
571
572 #[test]
573 fn test_codec() {
574 codec(ed25519::fixture);
575 codec(secp256r1::fixture);
576 codec(bls12381_multisig::fixture::<MinPk, _>);
577 codec(bls12381_multisig::fixture::<MinSig, _>);
578 codec(bls12381_threshold::fixture::<MinPk, _>);
579 codec(bls12381_threshold::fixture::<MinSig, _>);
580 }
581
582 fn activity_invalid_enum<S, F>(fixture: F)
583 where
584 S: Scheme<Sha256Digest>,
585 F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
586 {
587 let fixture = fixture(&mut test_rng(), NAMESPACE, 4);
588 let mut buf = BytesMut::new();
589 3u8.write(&mut buf); let cfg = fixture.schemes[0].certificate_codec_config();
592 let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
593 assert!(matches!(
594 result,
595 Err(CodecError::Invalid(
596 "consensus::aggregation::Activity",
597 "Invalid type"
598 ))
599 ));
600 }
601
602 #[test]
603 fn test_activity_invalid_enum() {
604 activity_invalid_enum(ed25519::fixture);
605 activity_invalid_enum(secp256r1::fixture);
606 activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
607 activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
608 activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
609 activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
610 }
611
612 #[cfg(feature = "arbitrary")]
613 mod conformance {
614 use super::*;
615 use crate::aggregation::scheme::bls12381_threshold;
616 use commonware_codec::conformance::CodecConformance;
617 use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
618
619 type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
620
621 commonware_conformance::conformance_tests! {
622 CodecConformance<Item<Sha256Digest>>,
623 CodecConformance<Ack<Scheme, Sha256Digest>>,
624 CodecConformance<TipAck<Scheme, Sha256Digest>>,
625 CodecConformance<Certificate<Scheme, Sha256Digest>>,
626 CodecConformance<Activity<Scheme, Sha256Digest>>,
627 }
628 }
629}