1use bytes::{Buf, BufMut};
4use commonware_codec::{
5 varint::UInt, Encode, EncodeSize, Error as CodecError, Read, ReadExt, Write,
6};
7use commonware_cryptography::{
8 bls12381::primitives::{
9 group::Share,
10 ops,
11 poly::{self, PartialSignature},
12 variant::Variant,
13 },
14 Digest,
15};
16use commonware_utils::union;
17use futures::channel::oneshot;
18use std::hash::Hash;
19
20#[derive(Debug, thiserror::Error)]
22pub enum Error {
23 #[error("Application verify error: {0}")]
26 AppProposeCanceled(oneshot::Canceled),
27
28 #[error("Unable to send message")]
31 UnableToSendMessage,
32
33 #[error("Epoch {0} has no validator {1}")]
36 UnknownValidator(u64, String),
37 #[error("Unknown share at epoch {0}")]
39 UnknownShare(u64),
40
41 #[error("Peer mismatch")]
44 PeerMismatch,
45
46 #[error("Invalid ack signature")]
49 InvalidAckSignature,
50
51 #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
54 AckEpochOutsideBounds(u64, u64, u64),
55 #[error("Non-useful ack index {0}")]
57 AckIndex(u64),
58 #[error("Duplicate ack from sender {0} for index {1}")]
60 AckDuplicate(String, u64),
61 #[error("Ack for index {0} already has a threshold")]
63 AckThresholded(u64),
64}
65
66impl Error {
67 pub fn blockable(&self) -> bool {
69 matches!(self, Error::PeerMismatch | Error::InvalidAckSignature)
70 }
71}
72
73pub type Epoch = u64;
76
77pub type Index = u64;
80
81const ACK_SUFFIX: &[u8] = b"_AGG_ACK";
84
85#[inline]
90fn ack_namespace(namespace: &[u8]) -> Vec<u8> {
91 union(namespace, ACK_SUFFIX)
92}
93
94#[derive(Clone, Debug, PartialEq, Eq, Hash)]
97pub struct Item<D: Digest> {
98 pub index: Index,
100 pub digest: D,
102}
103
104impl<D: Digest> Write for Item<D> {
105 fn write(&self, writer: &mut impl BufMut) {
106 UInt(self.index).write(writer);
107 self.digest.write(writer);
108 }
109}
110
111impl<D: Digest> Read for Item<D> {
112 type Cfg = ();
113
114 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
115 let index = UInt::read(reader)?.into();
116 let digest = D::read(reader)?;
117 Ok(Self { index, digest })
118 }
119}
120
121impl<D: Digest> EncodeSize for Item<D> {
122 fn encode_size(&self) -> usize {
123 UInt(self.index).encode_size() + self.digest.encode_size()
124 }
125}
126
127#[derive(Clone, Debug, PartialEq, Eq, Hash)]
130pub struct Ack<V: Variant, D: Digest> {
131 pub item: Item<D>,
133 pub epoch: Epoch,
135 pub signature: PartialSignature<V>,
137}
138
139impl<V: Variant, D: Digest> Ack<V, D> {
140 pub fn verify(&self, namespace: &[u8], identity: &poly::Public<V>) -> bool {
145 ops::partial_verify_message::<V>(
146 identity,
147 Some(ack_namespace(namespace).as_ref()),
148 self.item.encode().as_ref(),
149 &self.signature,
150 )
151 .is_ok()
152 }
153
154 pub fn sign(namespace: &[u8], epoch: Epoch, share: &Share, item: Item<D>) -> Self {
162 let ack_namespace = ack_namespace(namespace);
163 let signature = ops::partial_sign_message::<V>(
164 share,
165 Some(ack_namespace.as_ref()),
166 item.encode().as_ref(),
167 );
168 Self {
169 item,
170 epoch,
171 signature,
172 }
173 }
174}
175
176impl<V: Variant, D: Digest> Write for Ack<V, D> {
177 fn write(&self, writer: &mut impl BufMut) {
178 self.item.write(writer);
179 UInt(self.epoch).write(writer);
180 self.signature.write(writer);
181 }
182}
183
184impl<V: Variant, D: Digest> Read for Ack<V, D> {
185 type Cfg = ();
186
187 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
188 let item = Item::read(reader)?;
189 let epoch = UInt::read(reader)?.into();
190 let signature = PartialSignature::<V>::read(reader)?;
191 Ok(Self {
192 item,
193 epoch,
194 signature,
195 })
196 }
197}
198
199impl<V: Variant, D: Digest> EncodeSize for Ack<V, D> {
200 fn encode_size(&self) -> usize {
201 self.item.encode_size() + UInt(self.epoch).encode_size() + self.signature.encode_size()
202 }
203}
204
205#[derive(Clone, Debug, PartialEq, Eq, Hash)]
208pub struct TipAck<V: Variant, D: Digest> {
209 pub tip: Index,
211
212 pub ack: Ack<V, D>,
214}
215
216impl<V: Variant, D: Digest> Write for TipAck<V, D> {
217 fn write(&self, writer: &mut impl BufMut) {
218 UInt(self.tip).write(writer);
219 self.ack.write(writer);
220 }
221}
222
223impl<V: Variant, D: Digest> Read for TipAck<V, D> {
224 type Cfg = ();
225
226 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
227 let tip = UInt::read(reader)?.into();
228 let ack = Ack::<V, D>::read(reader)?;
229 Ok(Self { tip, ack })
230 }
231}
232
233impl<V: Variant, D: Digest> EncodeSize for TipAck<V, D> {
234 fn encode_size(&self) -> usize {
235 UInt(self.tip).encode_size() + self.ack.encode_size()
236 }
237}
238
239#[derive(Clone, Debug, PartialEq)]
243pub enum Activity<V: Variant, D: Digest> {
244 Ack(Ack<V, D>),
246
247 Recovered(Item<D>, V::Signature),
249
250 Tip(Index),
252}
253
254impl<V: Variant, D: Digest> Write for Activity<V, D> {
255 fn write(&self, writer: &mut impl BufMut) {
256 match self {
257 Activity::Ack(ack) => {
258 0u8.write(writer);
259 ack.write(writer);
260 }
261 Activity::Recovered(item, signature) => {
262 1u8.write(writer);
263 item.write(writer);
264 signature.write(writer);
265 }
266 Activity::Tip(index) => {
267 2u8.write(writer);
268 UInt(*index).write(writer);
269 }
270 }
271 }
272}
273
274impl<V: Variant, D: Digest> Read for Activity<V, D> {
275 type Cfg = ();
276
277 fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
278 match u8::read(reader)? {
279 0 => Ok(Activity::Ack(Ack::read(reader)?)),
280 1 => Ok(Activity::Recovered(
281 Item::read(reader)?,
282 V::Signature::read(reader)?,
283 )),
284 2 => Ok(Activity::Tip(UInt::read(reader)?.into())),
285 _ => Err(CodecError::Invalid(
286 "consensus::aggregation::Activity",
287 "Invalid type",
288 )),
289 }
290 }
291}
292
293impl<V: Variant, D: Digest> EncodeSize for Activity<V, D> {
294 fn encode_size(&self) -> usize {
295 1 + match self {
296 Activity::Ack(ack) => ack.encode_size(),
297 Activity::Recovered(item, signature) => item.encode_size() + signature.encode_size(),
298 Activity::Tip(index) => UInt(*index).encode_size(),
299 }
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use bytes::BytesMut;
307 use commonware_codec::{DecodeExt, Encode};
308 use commonware_cryptography::{
309 bls12381::{
310 dkg::ops,
311 primitives::{ops::sign_message, variant::MinSig},
312 },
313 sha256,
314 };
315 use commonware_runtime::deterministic;
316
317 #[test]
318 fn test_ack_namespace() {
319 let namespace = b"test_namespace";
320 let expected = [namespace, ACK_SUFFIX].concat();
321 assert_eq!(ack_namespace(namespace), expected);
322 }
323
324 #[test]
325 fn test_codec() {
326 let namespace = b"test";
327 let mut context = deterministic::Context::default();
328 let (public, shares) = ops::generate_shares::<_, MinSig>(&mut context, None, 4, 3);
329 let item = Item {
330 index: 100,
331 digest: sha256::hash(b"test_item"),
332 };
333
334 let restored_item = Item::decode(item.encode()).unwrap();
336 assert_eq!(item, restored_item);
337
338 let ack: Ack<MinSig, _> = Ack::sign(namespace, 1, &shares[0], item.clone());
340 assert!(ack.verify(namespace, &public));
341 assert!(!ack.verify(b"wrong", &public));
342
343 let restored_ack: Ack<MinSig, sha256::Digest> = Ack::decode(ack.encode()).unwrap();
344 assert_eq!(ack, restored_ack);
345
346 let tip_ack = TipAck { ack, tip: 42 };
348 let restored: TipAck<MinSig, sha256::Digest> = TipAck::decode(tip_ack.encode()).unwrap();
349 assert_eq!(tip_ack, restored);
350
351 let activity_ack = Activity::Ack(Ack::sign(namespace, 1, &shares[0], item.clone()));
353 let restored_activity_ack: Activity<MinSig, sha256::Digest> =
354 Activity::decode(activity_ack.encode()).unwrap();
355 assert_eq!(activity_ack, restored_activity_ack);
356
357 let signature = sign_message::<MinSig>(&shares[0].private, Some(b"test"), b"message");
359 let activity_recovered = Activity::Recovered(item, signature);
360 let restored_activity_recovered: Activity<MinSig, sha256::Digest> =
361 Activity::decode(activity_recovered.encode()).unwrap();
362 assert_eq!(activity_recovered, restored_activity_recovered);
363
364 let activity_tip = Activity::Tip(123);
366 let restored_activity_tip: Activity<MinSig, sha256::Digest> =
367 Activity::decode(activity_tip.encode()).unwrap();
368 assert_eq!(activity_tip, restored_activity_tip);
369 }
370
371 #[test]
372 fn test_activity_invalid_enum() {
373 let mut buf = BytesMut::new();
374 3u8.write(&mut buf); let result = Activity::<MinSig, sha256::Digest>::decode(&buf[..]);
377 assert!(matches!(
378 result,
379 Err(CodecError::Invalid(
380 "consensus::aggregation::Activity",
381 "Invalid type"
382 ))
383 ));
384 }
385}