1use bytes::{Buf, BufMut};
4use commonware_codec::{
5 varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
6};
7use commonware_cryptography::{
8 bls12381::primitives::{group::Share, ops, poly::PartialSignature, variant::Variant},
9 Digest,
10};
11use commonware_utils::union;
12use futures::channel::oneshot;
13use std::hash::Hash;
14
15#[derive(Debug, thiserror::Error)]
17pub enum Error {
18 #[error("Application verify error: {0}")]
21 AppProposeCanceled(oneshot::Canceled),
22
23 #[error("Unable to send message")]
26 UnableToSendMessage,
27
28 #[error("Epoch {0} has no validator {1}")]
31 UnknownValidator(u64, String),
32 #[error("Unknown share at epoch {0}")]
34 UnknownShare(u64),
35
36 #[error("Peer mismatch")]
39 PeerMismatch,
40
41 #[error("Invalid ack signature")]
44 InvalidAckSignature,
45
46 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
49 AckEpochOutsideBounds(u64, u64, u64),
50 #[error("Non-useful ack index {0}")]
52 AckIndex(u64),
53 #[error("Invalid ack digest {0}")]
55 AckDigest(u64),
56 #[error("Duplicate ack from sender {0} for index {1}")]
58 AckDuplicate(String, u64),
59 #[error("Ack for index {0} already has a threshold")]
61 AckThresholded(u64),
62 #[error("Unknown epoch {0}")]
64 UnknownEpoch(u64),
65}
66
67impl Error {
68 pub fn blockable(&self) -> bool {
70 matches!(self, Error::PeerMismatch | Error::InvalidAckSignature)
71 }
72}
73
74pub type Epoch = u64;
77
78pub type Index = u64;
81
82const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
85
86#[inline]
91fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
92 union(namespace, ACK_SUFFIX)
93}
94
95#[derive(Clone, Debug, PartialEq, Eq, Hash)]
98pub struct Item<D: Digest> {
99 pub index: Index,
101 pub digest: D,
103}
104
105impl<D: Digest> Write for Item<D> {
106 fn write(&self, writer: &mut impl BufMut) {
107 UInt(self.index).write(writer);
108 self.digest.write(writer);
109 }
110}
111
112impl<D: Digest> Read for Item<D> {
113 type Cfg = ();
114
115 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
116 let index = UInt::read(reader)?.into();
117 let digest = D::read(reader)?;
118 Ok(Self { index, digest })
119 }
120}
121
122impl<D: Digest> EncodeSize for Item<D> {
123 fn encode_size(&self) -> usize {
124 UInt(self.index).encode_size() + self.digest.encode_size()
125 }
126}
127
128#[derive(Clone, Debug, PartialEq, Eq, Hash)]
131pub struct Ack<V: Variant, D: Digest> {
132 pub item: Item<D>,
134 pub epoch: Epoch,
136 pub signature: PartialSignature<V>,
138}
139
140impl<V: Variant, D: Digest> Ack<V, D> {
141 pub fn verify(&self, namespace: &[u8], polynomial: &[V::Public]) -> bool {
146 let Some(public) = polynomial.get(self.signature.index as usize) else {
147 return false;
148 };
149 ops::verify_message::<V>(
150 public,
151 Some(ack_namespace(namespace).as_ref()),
152 self.item.encode().as_ref(),
153 &self.signature.value,
154 )
155 .is_ok()
156 }
157
158 pub fn sign(namespace: &[u8], epoch: Epoch, share: &Share, item: Item<D>) -> Self {
166 let ack_namespace = ack_namespace(namespace);
167 let signature = ops::partial_sign_message::<V>(
168 share,
169 Some(ack_namespace.as_ref()),
170 item.encode().as_ref(),
171 );
172 Self {
173 item,
174 epoch,
175 signature,
176 }
177 }
178}
179
180impl<V: Variant, D: Digest> Write for Ack<V, D> {
181 fn write(&self, writer: &mut impl BufMut) {
182 self.item.write(writer);
183 UInt(self.epoch).write(writer);
184 self.signature.write(writer);
185 }
186}
187
188impl<V: Variant, D: Digest> Read for Ack<V, D> {
189 type Cfg = ();
190
191 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
192 let item = Item::read(reader)?;
193 let epoch = UInt::read(reader)?.into();
194 let signature = PartialSignature::<V>::read(reader)?;
195 Ok(Self {
196 item,
197 epoch,
198 signature,
199 })
200 }
201}
202
203impl<V: Variant, D: Digest> EncodeSize for Ack<V, D> {
204 fn encode_size(&self) -> usize {
205 self.item.encode_size() + UInt(self.epoch).encode_size() + self.signature.encode_size()
206 }
207}
208
209#[derive(Clone, Debug, PartialEq, Eq, Hash)]
212pub struct TipAck<V: Variant, D: Digest> {
213 pub tip: Index,
215
216 pub ack: Ack<V, D>,
218}
219
220impl<V: Variant, D: Digest> Write for TipAck<V, D> {
221 fn write(&self, writer: &mut impl BufMut) {
222 UInt(self.tip).write(writer);
223 self.ack.write(writer);
224 }
225}
226
227impl<V: Variant, D: Digest> Read for TipAck<V, D> {
228 type Cfg = ();
229
230 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
231 let tip = UInt::read(reader)?.into();
232 let ack = Ack::<V, D>::read(reader)?;
233 Ok(Self { tip, ack })
234 }
235}
236
237impl<V: Variant, D: Digest> EncodeSize for TipAck<V, D> {
238 fn encode_size(&self) -> usize {
239 UInt(self.tip).encode_size() + self.ack.encode_size()
240 }
241}
242
243#[derive(Clone, Debug, PartialEq, Eq, Hash)]
245pub struct Certificate<V: Variant, D: Digest> {
246 pub item: Item<D>,
248 pub signature: V::Signature,
250}
251
252impl<V: Variant, D: Digest> Write for Certificate<V, D> {
253 fn write(&self, writer: &mut impl BufMut) {
254 self.item.write(writer);
255 self.signature.write(writer);
256 }
257}
258
259impl<V: Variant, D: Digest> Read for Certificate<V, D> {
260 type Cfg = ();
261
262 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
263 let item = Item::read(reader)?;
264 let signature = V::Signature::read(reader)?;
265 Ok(Self { item, signature })
266 }
267}
268
269impl<V: Variant, D: Digest> EncodeSize for Certificate<V, D> {
270 fn encode_size(&self) -> usize {
271 self.item.encode_size() + self.signature.encode_size()
272 }
273}
274
275impl<V: Variant, D: Digest> Certificate<V, D> {
276 pub fn verify(&self, namespace: &[u8], identity: &V::Public) -> bool {
281 ops::verify_message::<V>(
282 identity,
283 Some(ack_namespace(namespace).as_ref()),
284 self.item.encode().as_ref(),
285 &self.signature,
286 )
287 .is_ok()
288 }
289}
290
291#[derive(Clone, Debug, PartialEq)]
295pub enum Activity<V: Variant, D: Digest> {
296 Ack(Ack<V, D>),
298
299 Certified(Certificate<V, D>),
301
302 Tip(Index),
304}
305
306impl<V: Variant, D: Digest> Write for Activity<V, D> {
307 fn write(&self, writer: &mut impl BufMut) {
308 match self {
309 Activity::Ack(ack) => {
310 0u8.write(writer);
311 ack.write(writer);
312 }
313 Activity::Certified(certificate) => {
314 1u8.write(writer);
315 certificate.write(writer);
316 }
317 Activity::Tip(index) => {
318 2u8.write(writer);
319 UInt(*index).write(writer);
320 }
321 }
322 }
323}
324
325impl<V: Variant, D: Digest> Read for Activity<V, D> {
326 type Cfg = ();
327
328 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
329 match u8::read(reader)? {
330 0 => Ok(Activity::Ack(Ack::read(reader)?)),
331 1 => Ok(Activity::Certified(Certificate::read(reader)?)),
332 2 => Ok(Activity::Tip(UInt::read(reader)?.into())),
333 _ => Err(CodecError::Invalid(
334 "consensus::aggregation::Activity",
335 "Invalid type",
336 )),
337 }
338 }
339}
340
341impl<V: Variant, D: Digest> EncodeSize for Activity<V, D> {
342 fn encode_size(&self) -> usize {
343 1 + match self {
344 Activity::Ack(ack) => ack.encode_size(),
345 Activity::Certified(certificate) => certificate.encode_size(),
346 Activity::Tip(index) => UInt(*index).encode_size(),
347 }
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use bytes::BytesMut;
355 use commonware_codec::{DecodeExt, Encode};
356 use commonware_cryptography::{
357 bls12381::{
358 dkg::ops::{self, evaluate_all},
359 primitives::{ops::sign_message, variant::MinSig},
360 },
361 Hasher, Sha256,
362 };
363 use commonware_runtime::deterministic;
364
365 #[test]
366 fn test_ack_namespace() {
367 let namespace = b"test_namespace";
368 let expected = [namespace, ACK_SUFFIX].concat();
369 assert_eq!(ack_namespace(namespace), expected);
370 }
371
372 #[test]
373 fn test_codec() {
374 let namespace = b"test";
375 let mut context = deterministic::Context::default();
376 let (public, shares) = ops::generate_shares::<_, MinSig>(&mut context, None, 4, 3);
377 let polynomial = evaluate_all::<MinSig>(&public, 4);
378 let item = Item {
379 index: 100,
380 digest: Sha256::hash(b"test_item"),
381 };
382
383 let restored_item = Item::decode(item.encode()).unwrap();
385 assert_eq!(item, restored_item);
386
387 let ack: Ack<MinSig, _> = Ack::sign(namespace, 1, &shares[0], item.clone());
389 assert!(ack.verify(namespace, &polynomial));
390 assert!(!ack.verify(b"wrong", &polynomial));
391
392 let restored_ack: Ack<MinSig, <Sha256 as Hasher>::Digest> =
393 Ack::decode(ack.encode()).unwrap();
394 assert_eq!(ack, restored_ack);
395
396 let tip_ack = TipAck { ack, tip: 42 };
398 let restored: TipAck<MinSig, <Sha256 as Hasher>::Digest> =
399 TipAck::decode(tip_ack.encode()).unwrap();
400 assert_eq!(tip_ack, restored);
401
402 let activity_ack = Activity::Ack(Ack::sign(namespace, 1, &shares[0], item.clone()));
404 let restored_activity_ack: Activity<MinSig, <Sha256 as Hasher>::Digest> =
405 Activity::decode(activity_ack.encode()).unwrap();
406 assert_eq!(activity_ack, restored_activity_ack);
407
408 let signature = sign_message::<MinSig>(&shares[0].private, Some(b"test"), b"message");
410 let activity_certified = Activity::Certified(Certificate { item, signature });
411 let restored_activity_certified: Activity<MinSig, <Sha256 as Hasher>::Digest> =
412 Activity::decode(activity_certified.encode()).unwrap();
413 assert_eq!(activity_certified, restored_activity_certified);
414
415 let activity_tip = Activity::Tip(123);
417 let restored_activity_tip: Activity<MinSig, <Sha256 as Hasher>::Digest> =
418 Activity::decode(activity_tip.encode()).unwrap();
419 assert_eq!(activity_tip, restored_activity_tip);
420 }
421
422 #[test]
423 fn test_activity_invalid_enum() {
424 let mut buf = BytesMut::new();
425 3u8.write(&mut buf); let result = Activity::<MinSig, <Sha256 as Hasher>::Digest>::decode(&buf[..]);
428 assert!(matches!(
429 result,
430 Err(CodecError::Invalid(
431 "consensus::aggregation::Activity",
432 "Invalid type"
433 ))
434 ));
435 }
436}