1use crate::{
4 aggregation::scheme,
5 types::{Epoch, Height},
6 Heightable,
7};
8use bytes::{Buf, BufMut, Bytes};
9use commonware_codec::{Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write};
10use commonware_cryptography::{
11 certificate::{self, Attestation, Scheme, Subject},
12 Digest,
13};
14use commonware_parallel::Strategy;
15use commonware_utils::{channel::oneshot, union, N3f1};
16use rand_core::CryptoRngCore;
17use std::hash::Hash;
18
19#[derive(Debug, thiserror::Error)]
21pub enum Error {
22 #[error("Application verify error: {0}")]
25 AppProposeCanceled(oneshot::error::RecvError),
26
27 #[error("Epoch {0} has no validator {1}")]
30 UnknownValidator(Epoch, String),
31 #[error("Not a signer at epoch {0}")]
33 NotSigner(Epoch),
34
35 #[error("Peer mismatch")]
38 PeerMismatch,
39
40 #[error("Invalid ack signature")]
43 InvalidAckSignature,
44
45 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
48 AckEpochOutsideBounds(Epoch, Epoch, Epoch),
49 #[error("Non-useful ack height {0}")]
51 AckHeight(Height),
52 #[error("Invalid ack digest {0}")]
54 AckDigest(Height),
55 #[error("Duplicate ack from sender {0} for height {1}")]
57 AckDuplicate(String, Height),
58 #[error("Ack for height {0} already has been certified")]
60 AckCertified(Height),
61 #[error("Unknown epoch {0}")]
63 UnknownEpoch(Epoch),
64}
65
66impl Error {
67 pub const fn blockable(&self) -> bool {
69 matches!(self, Self::PeerMismatch | Self::InvalidAckSignature)
70 }
71}
72
73const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
76
77#[inline]
82fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
83 union(namespace, ACK_SUFFIX)
84}
85
86#[derive(Clone, Debug)]
91pub struct Namespace(Vec<u8>);
92
93impl certificate::Namespace for Namespace {
94 fn derive(namespace: &[u8]) -> Self {
95 Self(ack_namespace(namespace))
96 }
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Hash)]
102pub struct Item<D: Digest> {
103 pub height: Height,
105 pub digest: D,
107}
108
109impl<D: Digest> Heightable for Item<D> {
110 fn height(&self) -> Height {
111 self.height
112 }
113}
114
115impl<D: Digest> Write for Item<D> {
116 fn write(&self, writer: &mut impl BufMut) {
117 self.height.write(writer);
118 self.digest.write(writer);
119 }
120}
121
122impl<D: Digest> Read for Item<D> {
123 type Cfg = ();
124
125 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
126 let height = Height::read(reader)?;
127 let digest = D::read(reader)?;
128 Ok(Self { height, digest })
129 }
130}
131
132impl<D: Digest> EncodeSize for Item<D> {
133 fn encode_size(&self) -> usize {
134 self.height.encode_size() + self.digest.encode_size()
135 }
136}
137
138impl<D: Digest> Subject for &Item<D> {
139 type Namespace = Namespace;
140
141 fn namespace<'a>(&self, derived: &'a Self::Namespace) -> &'a [u8] {
142 &derived.0
143 }
144
145 fn message(&self) -> Bytes {
146 self.encode()
147 }
148}
149
150#[cfg(feature = "arbitrary")]
151impl<D: Digest> arbitrary::Arbitrary<'_> for Item<D>
152where
153 D: for<'a> arbitrary::Arbitrary<'a>,
154{
155 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
156 let height = u.arbitrary::<Height>()?;
157 let digest = u.arbitrary::<D>()?;
158 Ok(Self { height, digest })
159 }
160}
161
162#[derive(Clone, Debug, PartialEq, Eq, Hash)]
165pub struct Ack<S: Scheme, D: Digest> {
166 pub item: Item<D>,
168 pub epoch: Epoch,
170 pub attestation: Attestation<S>,
172}
173
174impl<S: Scheme, D: Digest> Ack<S, D> {
175 pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
180 where
181 R: CryptoRngCore,
182 S: scheme::Scheme<D>,
183 {
184 scheme.verify_attestation::<_, D>(rng, &self.item, &self.attestation, strategy)
185 }
186
187 pub fn sign(scheme: &S, epoch: Epoch, item: Item<D>) -> Option<Self>
195 where
196 S: scheme::Scheme<D>,
197 {
198 let attestation = scheme.sign::<D>(&item)?;
199 Some(Self {
200 item,
201 epoch,
202 attestation,
203 })
204 }
205}
206
207impl<S: Scheme, D: Digest> Write for Ack<S, D> {
208 fn write(&self, writer: &mut impl BufMut) {
209 self.item.write(writer);
210 self.epoch.write(writer);
211 self.attestation.write(writer);
212 }
213}
214
215impl<S: Scheme, D: Digest> Read for Ack<S, D> {
216 type Cfg = ();
217
218 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
219 let item = Item::read(reader)?;
220 let epoch = Epoch::read(reader)?;
221 let attestation = Attestation::read(reader)?;
222 Ok(Self {
223 item,
224 epoch,
225 attestation,
226 })
227 }
228}
229
230impl<S: Scheme, D: Digest> EncodeSize for Ack<S, D> {
231 fn encode_size(&self) -> usize {
232 self.item.encode_size() + self.epoch.encode_size() + self.attestation.encode_size()
233 }
234}
235
236#[cfg(feature = "arbitrary")]
237impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Ack<S, D>
238where
239 S::Signature: for<'a> arbitrary::Arbitrary<'a>,
240 D: for<'a> arbitrary::Arbitrary<'a>,
241{
242 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
243 let item = u.arbitrary::<Item<D>>()?;
244 let epoch = u.arbitrary::<Epoch>()?;
245 let attestation = Attestation::arbitrary(u)?;
246 Ok(Self {
247 item,
248 epoch,
249 attestation,
250 })
251 }
252}
253
254#[derive(Clone, Debug, PartialEq, Eq, Hash)]
257pub struct TipAck<S: Scheme, D: Digest> {
258 pub tip: Height,
260
261 pub ack: Ack<S, D>,
263}
264
265impl<S: Scheme, D: Digest> Write for TipAck<S, D> {
266 fn write(&self, writer: &mut impl BufMut) {
267 self.tip.write(writer);
268 self.ack.write(writer);
269 }
270}
271
272impl<S: Scheme, D: Digest> Read for TipAck<S, D> {
273 type Cfg = ();
274
275 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
276 let tip = Height::read(reader)?;
277 let ack = Ack::read(reader)?;
278 Ok(Self { tip, ack })
279 }
280}
281
282impl<S: Scheme, D: Digest> EncodeSize for TipAck<S, D> {
283 fn encode_size(&self) -> usize {
284 self.tip.encode_size() + self.ack.encode_size()
285 }
286}
287
288#[cfg(feature = "arbitrary")]
289impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for TipAck<S, D>
290where
291 D: for<'a> arbitrary::Arbitrary<'a>,
292 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
293{
294 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
295 let tip = u.arbitrary::<Height>()?;
296 let ack = u.arbitrary::<Ack<S, D>>()?;
297 Ok(Self { tip, ack })
298 }
299}
300
301#[derive(Clone, Debug, PartialEq, Eq, Hash)]
303pub struct Certificate<S: Scheme, D: Digest> {
304 pub item: Item<D>,
306 pub certificate: S::Certificate,
308}
309
310impl<S: Scheme, D: Digest> Certificate<S, D> {
311 pub fn from_acks<'a, I>(scheme: &S, acks: I, strategy: &impl Strategy) -> Option<Self>
312 where
313 S: scheme::Scheme<D>,
314 I: IntoIterator<Item = &'a Ack<S, D>>,
315 I::IntoIter: Send,
316 {
317 let mut iter = acks.into_iter().peekable();
318 let item = iter.peek()?.item.clone();
319 let attestations = iter
320 .filter(|ack| ack.item == item)
321 .map(|ack| ack.attestation.clone());
322 let certificate = scheme.assemble::<_, N3f1>(attestations, strategy)?;
323
324 Some(Self { item, certificate })
325 }
326
327 pub fn verify<R>(&self, rng: &mut R, scheme: &S, strategy: &impl Strategy) -> bool
329 where
330 R: CryptoRngCore,
331 S: scheme::Scheme<D>,
332 {
333 scheme.verify_certificate::<_, D, N3f1>(rng, &self.item, &self.certificate, strategy)
334 }
335}
336
337impl<S: Scheme, D: Digest> Write for Certificate<S, D> {
338 fn write(&self, writer: &mut impl BufMut) {
339 self.item.write(writer);
340 self.certificate.write(writer);
341 }
342}
343
344impl<S: Scheme, D: Digest> Read for Certificate<S, D> {
345 type Cfg = <S::Certificate as Read>::Cfg;
346
347 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
348 let item = Item::read(reader)?;
349 let certificate = S::Certificate::read_cfg(reader, cfg)?;
350 Ok(Self { item, certificate })
351 }
352}
353
354impl<S: Scheme, D: Digest> EncodeSize for Certificate<S, D> {
355 fn encode_size(&self) -> usize {
356 self.item.encode_size() + self.certificate.encode_size()
357 }
358}
359
360#[cfg(feature = "arbitrary")]
361impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Certificate<S, D>
362where
363 D: for<'a> arbitrary::Arbitrary<'a>,
364 S::Certificate: for<'a> arbitrary::Arbitrary<'a>,
365{
366 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
367 let item = u.arbitrary::<Item<D>>()?;
368 let certificate = u.arbitrary::<S::Certificate>()?;
369 Ok(Self { item, certificate })
370 }
371}
372
373#[derive(Clone, Debug, PartialEq)]
377pub enum Activity<S: Scheme, D: Digest> {
378 Ack(Ack<S, D>),
380
381 Certified(Certificate<S, D>),
383
384 Tip(Height),
386}
387
388impl<S: Scheme, D: Digest> Write for Activity<S, D> {
389 fn write(&self, writer: &mut impl BufMut) {
390 match self {
391 Self::Ack(ack) => {
392 0u8.write(writer);
393 ack.write(writer);
394 }
395 Self::Certified(certificate) => {
396 1u8.write(writer);
397 certificate.write(writer);
398 }
399 Self::Tip(height) => {
400 2u8.write(writer);
401 height.write(writer);
402 }
403 }
404 }
405}
406
407impl<S: Scheme, D: Digest> Read for Activity<S, D> {
408 type Cfg = <S::Certificate as Read>::Cfg;
409
410 fn read_cfg(reader: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
411 match u8::read(reader)? {
412 0 => Ok(Self::Ack(Ack::read(reader)?)),
413 1 => Ok(Self::Certified(Certificate::read_cfg(reader, cfg)?)),
414 2 => Ok(Self::Tip(Height::read(reader)?)),
415 _ => Err(CodecError::Invalid(
416 "consensus::aggregation::Activity",
417 "Invalid type",
418 )),
419 }
420 }
421}
422
423impl<S: Scheme, D: Digest> EncodeSize for Activity<S, D> {
424 fn encode_size(&self) -> usize {
425 1 + match self {
426 Self::Ack(ack) => ack.encode_size(),
427 Self::Certified(certificate) => certificate.encode_size(),
428 Self::Tip(height) => height.encode_size(),
429 }
430 }
431}
432
433#[cfg(feature = "arbitrary")]
434impl<S: Scheme, D: Digest> arbitrary::Arbitrary<'_> for Activity<S, D>
435where
436 D: for<'a> arbitrary::Arbitrary<'a>,
437 Ack<S, D>: for<'a> arbitrary::Arbitrary<'a>,
438 Certificate<S, D>: for<'a> arbitrary::Arbitrary<'a>,
439{
440 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
441 let choice = u.int_in_range(0..=2)?;
442 match choice {
443 0 => Ok(Self::Ack(u.arbitrary::<Ack<S, D>>()?)),
444 1 => Ok(Self::Certified(u.arbitrary::<Certificate<S, D>>()?)),
445 2 => Ok(Self::Tip(u.arbitrary::<Height>()?)),
446 _ => unreachable!(),
447 }
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use crate::aggregation::scheme::{
455 bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme,
456 };
457 use bytes::BytesMut;
458 use commonware_codec::{Decode, DecodeExt, Encode};
459 use commonware_cryptography::{
460 bls12381::primitives::variant::{MinPk, MinSig},
461 certificate::mocks::Fixture,
462 Hasher, Sha256,
463 };
464 use commonware_parallel::Sequential;
465 use commonware_utils::{ordered::Quorum, test_rng, N3f1};
466 use rand::rngs::StdRng;
467
468 const NAMESPACE: &[u8] = b"test";
469
470 type Sha256Digest = <Sha256 as Hasher>::Digest;
471
472 #[test]
473 fn test_ack_namespace() {
474 let namespace = b"test_namespace";
475 let expected = [namespace, ACK_SUFFIX].concat();
476 assert_eq!(ack_namespace(namespace), expected);
477 }
478
479 fn codec<S, F>(fixture: F)
480 where
481 S: Scheme<Sha256Digest>,
482 F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
483 {
484 let mut rng = test_rng();
485 let fixture = fixture(&mut rng, NAMESPACE, 4);
486 let schemes = &fixture.schemes;
487 let item = Item {
488 height: Height::new(100),
489 digest: Sha256::hash(b"test_item"),
490 };
491
492 let restored_item = Item::decode(item.encode()).unwrap();
494 assert_eq!(item, restored_item);
495
496 let ack = Ack::sign(&schemes[0], Epoch::new(1), item.clone()).unwrap();
498 let cfg = schemes[0].certificate_codec_config();
499 let encoded_ack = ack.encode();
500 let restored_ack: Ack<S, Sha256Digest> = Ack::decode(encoded_ack).unwrap();
501
502 assert_eq!(restored_ack.item, item);
504 assert_eq!(restored_ack.epoch, Epoch::new(1));
505 assert!(restored_ack.verify(&mut rng, &schemes[0], &Sequential));
506
507 let tip_ack = TipAck {
509 ack: ack.clone(),
510 tip: Height::new(42),
511 };
512 let encoded_tip_ack = tip_ack.encode();
513 let restored_tip_ack: TipAck<S, Sha256Digest> = TipAck::decode(encoded_tip_ack).unwrap();
514 assert_eq!(restored_tip_ack.tip, Height::new(42));
515 assert_eq!(restored_tip_ack.ack.item, item);
516 assert_eq!(restored_tip_ack.ack.epoch, Epoch::new(1));
517
518 let activity_ack = Activity::Ack(ack);
520 let encoded_activity = activity_ack.encode();
521 let restored_activity_ack: Activity<S, Sha256Digest> =
522 Activity::decode_cfg(encoded_activity, &cfg).unwrap();
523 if let Activity::Ack(restored) = restored_activity_ack {
524 assert_eq!(restored.item, item);
525 assert_eq!(restored.epoch, Epoch::new(1));
526 } else {
527 panic!("Expected Activity::Ack");
528 }
529
530 let acks: Vec<_> = schemes
533 .iter()
534 .take(schemes[0].participants().quorum::<N3f1>() as usize)
535 .filter_map(|scheme| Ack::sign(scheme, Epoch::new(1), item.clone()))
536 .collect();
537
538 let certificate = Certificate::from_acks(&schemes[0], &acks, &Sequential).unwrap();
539 assert!(certificate.verify(&mut rng, &schemes[0], &Sequential));
540
541 let activity_certified = Activity::Certified(certificate.clone());
542 let encoded_certified = activity_certified.encode();
543 let restored_activity_certified: Activity<S, Sha256Digest> =
544 Activity::decode_cfg(encoded_certified, &cfg).unwrap();
545 if let Activity::Certified(restored) = restored_activity_certified {
546 assert_eq!(restored.item, item);
547 assert!(restored.verify(&mut rng, &schemes[0], &Sequential));
548 } else {
549 panic!("Expected Activity::Certified");
550 }
551
552 let activity_tip: Activity<S, Sha256Digest> = Activity::Tip(Height::new(123));
554 let encoded_tip = activity_tip.encode();
555 let restored_activity_tip: Activity<S, Sha256Digest> =
556 Activity::decode_cfg(encoded_tip, &cfg).unwrap();
557 if let Activity::Tip(height) = restored_activity_tip {
558 assert_eq!(height, Height::new(123));
559 } else {
560 panic!("Expected Activity::Tip");
561 }
562 }
563
564 #[test]
565 fn test_codec() {
566 codec(ed25519::fixture);
567 codec(secp256r1::fixture);
568 codec(bls12381_multisig::fixture::<MinPk, _>);
569 codec(bls12381_multisig::fixture::<MinSig, _>);
570 codec(bls12381_threshold::fixture::<MinPk, _>);
571 codec(bls12381_threshold::fixture::<MinSig, _>);
572 }
573
574 fn activity_invalid_enum<S, F>(fixture: F)
575 where
576 S: Scheme<Sha256Digest>,
577 F: FnOnce(&mut StdRng, &[u8], u32) -> Fixture<S>,
578 {
579 let fixture = fixture(&mut test_rng(), NAMESPACE, 4);
580 let mut buf = BytesMut::new();
581 3u8.write(&mut buf); let cfg = fixture.schemes[0].certificate_codec_config();
584 let result = Activity::<S, Sha256Digest>::read_cfg(&mut &buf[..], &cfg);
585 assert!(matches!(
586 result,
587 Err(CodecError::Invalid(
588 "consensus::aggregation::Activity",
589 "Invalid type"
590 ))
591 ));
592 }
593
594 #[test]
595 fn test_activity_invalid_enum() {
596 activity_invalid_enum(ed25519::fixture);
597 activity_invalid_enum(secp256r1::fixture);
598 activity_invalid_enum(bls12381_multisig::fixture::<MinPk, _>);
599 activity_invalid_enum(bls12381_multisig::fixture::<MinSig, _>);
600 activity_invalid_enum(bls12381_threshold::fixture::<MinPk, _>);
601 activity_invalid_enum(bls12381_threshold::fixture::<MinSig, _>);
602 }
603
604 #[cfg(feature = "arbitrary")]
605 mod conformance {
606 use super::*;
607 use crate::aggregation::scheme::bls12381_threshold;
608 use commonware_codec::conformance::CodecConformance;
609 use commonware_cryptography::{ed25519::PublicKey, sha256::Digest as Sha256Digest};
610
611 type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
612
613 commonware_conformance::conformance_tests! {
614 CodecConformance<Item<Sha256Digest>>,
615 CodecConformance<Ack<Scheme, Sha256Digest>>,
616 CodecConformance<TipAck<Scheme, Sha256Digest>>,
617 CodecConformance<Certificate<Scheme, Sha256Digest>>,
618 CodecConformance<Activity<Scheme, Sha256Digest>>,
619 }
620 }
621}