janus_messages/
lib.rs

1//! Messages defined by the [Distributed Aggregation Protocol][dap] with serialization and
2//! deserialization support.
3//!
4//! [dap]: https://datatracker.ietf.org/doc/draft-ietf-ppm-dap/
5
6use self::query_type::{FixedSize, QueryType, TimeInterval};
7use anyhow::anyhow;
8use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD, Engine};
9use educe::Educe;
10use num_enum::{FromPrimitive, IntoPrimitive, TryFromPrimitive};
11use prio::{
12    codec::{
13        decode_u16_items, decode_u32_items, encode_u16_items, encode_u32_items, CodecError, Decode,
14        Encode,
15    },
16    topology::ping_pong::PingPongMessage,
17};
18use rand::{distributions::Standard, prelude::Distribution, Rng};
19use serde::{
20    de::{self, Visitor},
21    Deserialize, Serialize, Serializer,
22};
23use std::{
24    fmt::{self, Debug, Display, Formatter},
25    io::{Cursor, Read},
26    num::TryFromIntError,
27    str,
28    str::FromStr,
29    time::{SystemTime, SystemTimeError},
30};
31
32pub use prio::codec;
33
34pub mod problem_type;
35pub mod query_type;
36pub mod taskprov;
37#[cfg(test)]
38mod tests;
39
40/// Errors returned by functions and methods in this module
41#[derive(Debug, thiserror::Error)]
42pub enum Error {
43    /// An invalid parameter was passed.
44    #[error("{0}")]
45    InvalidParameter(&'static str),
46    /// An illegal arithmetic operation on a [`Time`] or [`Duration`].
47    #[error("{0}")]
48    IllegalTimeArithmetic(&'static str),
49    #[error("base64 decode failure: {0}")]
50    Base64Decode(#[from] base64::DecodeError),
51}
52
53/// Wire-representation of an ASCII-encoded URL with minimum length 1 and maximum
54/// length 2^16 - 1.
55#[derive(Clone, PartialEq, Eq)]
56pub struct Url(Vec<u8>);
57
58impl Url {
59    const MAX_LEN: usize = 2usize.pow(16) - 1;
60}
61
62impl Encode for Url {
63    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
64        encode_u16_items(bytes, &(), &self.0)
65    }
66
67    fn encoded_len(&self) -> Option<usize> {
68        Some(2 + self.0.len())
69    }
70}
71
72impl Decode for Url {
73    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
74        Url::try_from(decode_u16_items(&(), bytes)?.as_ref())
75    }
76}
77
78impl Debug for Url {
79    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
80        write!(
81            f,
82            "{}",
83            str::from_utf8(&self.0).map_err(|_| std::fmt::Error)?
84        )
85    }
86}
87
88impl Display for Url {
89    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
90        write!(
91            f,
92            "{}",
93            str::from_utf8(&self.0).map_err(|_| std::fmt::Error)?
94        )
95    }
96}
97
98impl TryFrom<&[u8]> for Url {
99    type Error = CodecError;
100
101    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
102        if value.is_empty() {
103            Err(CodecError::Other(
104                anyhow!("Url must be at least 1 byte long").into(),
105            ))
106        } else if value.len() > Url::MAX_LEN {
107            Err(CodecError::Other(
108                anyhow!("Url must be less than {} bytes long", Url::MAX_LEN).into(),
109            ))
110        } else if !value.iter().all(|i: &u8| i.is_ascii()) {
111            Err(CodecError::Other(
112                anyhow!("Url must be ASCII encoded").into(),
113            ))
114        } else {
115            Ok(Self(Vec::from(value)))
116        }
117    }
118}
119
120impl TryFrom<&Url> for url::Url {
121    type Error = url::ParseError;
122
123    fn try_from(value: &Url) -> Result<Self, Self::Error> {
124        // Unwrap safety: this type can't be constructed without being validated
125        // as consisting only of ASCII.
126        url::Url::parse(str::from_utf8(&value.0).unwrap())
127    }
128}
129
130/// DAP protocol message representing a duration with a resolution of seconds.
131#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
132pub struct Duration(u64);
133
134impl Duration {
135    pub const ZERO: Duration = Duration::from_seconds(0);
136
137    /// Create a duration representing the provided number of seconds.
138    pub const fn from_seconds(seconds: u64) -> Self {
139        Self(seconds)
140    }
141
142    /// Get the number of seconds this duration represents.
143    pub fn as_seconds(&self) -> u64 {
144        self.0
145    }
146}
147
148impl Encode for Duration {
149    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
150        self.0.encode(bytes)
151    }
152
153    fn encoded_len(&self) -> Option<usize> {
154        self.0.encoded_len()
155    }
156}
157
158impl Decode for Duration {
159    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
160        Ok(Self(u64::decode(bytes)?))
161    }
162}
163
164impl Display for Duration {
165    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
166        write!(f, "{} seconds", self.0)
167    }
168}
169
170/// DAP protocol message representing an instant in time with a resolution of seconds.
171#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
172pub struct Time(u64);
173
174impl Time {
175    /// Construct a [`Time`] representing the instant that is a given number of seconds after
176    /// January 1st, 1970, at 0:00:00 UTC (i.e., the instant with the Unix timestamp of
177    /// `timestamp`).
178    pub const fn from_seconds_since_epoch(timestamp: u64) -> Self {
179        Self(timestamp)
180    }
181
182    /// Get the number of seconds from January 1st, 1970, at 0:00:00 UTC to the instant represented
183    /// by this [`Time`] (i.e., the Unix timestamp for the instant it represents).
184    pub fn as_seconds_since_epoch(&self) -> u64 {
185        self.0
186    }
187}
188
189impl Display for Time {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        write!(f, "{}", self.0)
192    }
193}
194
195impl Encode for Time {
196    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
197        self.0.encode(bytes)
198    }
199
200    fn encoded_len(&self) -> Option<usize> {
201        self.0.encoded_len()
202    }
203}
204
205impl Decode for Time {
206    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
207        Ok(Self(u64::decode(bytes)?))
208    }
209}
210
211impl TryFrom<SystemTime> for Time {
212    type Error = SystemTimeError;
213
214    fn try_from(time: SystemTime) -> Result<Self, Self::Error> {
215        let duration = time.duration_since(SystemTime::UNIX_EPOCH)?;
216        Ok(Time::from_seconds_since_epoch(duration.as_secs()))
217    }
218}
219
220/// DAP protocol message representing a half-open interval of time with a resolution of seconds;
221/// the start of the interval is included while the end of the interval is excluded.
222#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
223pub struct Interval {
224    /// The start of the interval.
225    start: Time,
226    /// The length of the interval.
227    duration: Duration,
228}
229
230impl Interval {
231    pub const EMPTY: Self = Self {
232        start: Time::from_seconds_since_epoch(0),
233        duration: Duration::ZERO,
234    };
235
236    /// Create a new [`Interval`] from the provided start and duration. Returns an error if the end
237    /// of the interval cannot be represented as a [`Time`].
238    pub fn new(start: Time, duration: Duration) -> Result<Self, Error> {
239        start
240            .0
241            .checked_add(duration.0)
242            .ok_or(Error::IllegalTimeArithmetic("duration overflows time"))?;
243
244        Ok(Self { start, duration })
245    }
246
247    /// Returns a [`Time`] representing the included start of this interval.
248    pub fn start(&self) -> &Time {
249        &self.start
250    }
251
252    /// Get the duration of this interval.
253    pub fn duration(&self) -> &Duration {
254        &self.duration
255    }
256}
257
258impl Encode for Interval {
259    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
260        self.start.encode(bytes)?;
261        self.duration.encode(bytes)
262    }
263
264    fn encoded_len(&self) -> Option<usize> {
265        Some(self.start.encoded_len()? + self.duration.encoded_len()?)
266    }
267}
268
269impl Decode for Interval {
270    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
271        let start = Time::decode(bytes)?;
272        let duration = Duration::decode(bytes)?;
273
274        Self::new(start, duration).map_err(|e| CodecError::Other(Box::new(e)))
275    }
276}
277
278impl Display for Interval {
279    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
280        write!(f, "start: {} duration: {}", self.start, self.duration)
281    }
282}
283
284/// DAP protocol message representing an ID uniquely identifying a batch, for fixed-size tasks.
285#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
286pub struct BatchId([u8; Self::LEN]);
287
288impl BatchId {
289    /// LEN is the length of a batch ID in bytes.
290    pub const LEN: usize = 32;
291}
292
293impl From<[u8; Self::LEN]> for BatchId {
294    fn from(batch_id: [u8; Self::LEN]) -> Self {
295        Self(batch_id)
296    }
297}
298
299impl TryFrom<&[u8]> for BatchId {
300    type Error = Error;
301
302    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
303        Ok(Self(value.try_into().map_err(|_| {
304            Error::InvalidParameter("byte slice has incorrect length for BatchId")
305        })?))
306    }
307}
308
309impl FromStr for BatchId {
310    type Err = Error;
311
312    fn from_str(s: &str) -> Result<Self, Self::Err> {
313        Self::try_from(URL_SAFE_NO_PAD.decode(s)?.as_ref())
314    }
315}
316
317impl AsRef<[u8; Self::LEN]> for BatchId {
318    fn as_ref(&self) -> &[u8; Self::LEN] {
319        &self.0
320    }
321}
322
323impl Debug for BatchId {
324    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
325        write!(
326            f,
327            "BatchId({})",
328            Base64Display::new(&self.0, &URL_SAFE_NO_PAD)
329        )
330    }
331}
332
333impl Display for BatchId {
334    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
335        write!(f, "{}", Base64Display::new(&self.0, &URL_SAFE_NO_PAD))
336    }
337}
338
339impl Encode for BatchId {
340    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
341        bytes.extend_from_slice(&self.0);
342        Ok(())
343    }
344
345    fn encoded_len(&self) -> Option<usize> {
346        Some(Self::LEN)
347    }
348}
349
350impl Decode for BatchId {
351    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
352        let mut batch_id = [0; Self::LEN];
353        bytes.read_exact(&mut batch_id)?;
354        Ok(Self(batch_id))
355    }
356}
357
358impl Distribution<BatchId> for Standard {
359    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> BatchId {
360        BatchId(rng.gen())
361    }
362}
363
364/// DAP protocol message representing an ID uniquely identifying a client report.
365#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
366pub struct ReportId([u8; Self::LEN]);
367
368impl ReportId {
369    /// LEN is the length of a report ID in bytes.
370    pub const LEN: usize = 16;
371}
372
373impl From<[u8; Self::LEN]> for ReportId {
374    fn from(report_id: [u8; Self::LEN]) -> Self {
375        Self(report_id)
376    }
377}
378
379impl TryFrom<&[u8]> for ReportId {
380    type Error = Error;
381
382    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
383        Ok(Self(value.try_into().map_err(|_| {
384            Error::InvalidParameter("byte slice has incorrect length for ReportId")
385        })?))
386    }
387}
388
389impl AsRef<[u8; Self::LEN]> for ReportId {
390    fn as_ref(&self) -> &[u8; Self::LEN] {
391        &self.0
392    }
393}
394
395impl Debug for ReportId {
396    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
397        write!(
398            f,
399            "ReportId({})",
400            Base64Display::new(&self.0, &URL_SAFE_NO_PAD)
401        )
402    }
403}
404
405impl Display for ReportId {
406    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
407        write!(f, "{}", Base64Display::new(&self.0, &URL_SAFE_NO_PAD))
408    }
409}
410
411impl Encode for ReportId {
412    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
413        bytes.extend_from_slice(&self.0);
414        Ok(())
415    }
416
417    fn encoded_len(&self) -> Option<usize> {
418        Some(Self::LEN)
419    }
420}
421
422impl Decode for ReportId {
423    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
424        let mut report_id = [0; Self::LEN];
425        bytes.read_exact(&mut report_id)?;
426        Ok(Self(report_id))
427    }
428}
429
430impl FromStr for ReportId {
431    type Err = Error;
432
433    fn from_str(s: &str) -> Result<Self, Self::Err> {
434        Self::try_from(URL_SAFE_NO_PAD.decode(s)?.as_ref())
435    }
436}
437
438impl Distribution<ReportId> for Standard {
439    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> ReportId {
440        ReportId(rng.gen())
441    }
442}
443
444/// Checksum over DAP report IDs, defined in §4.4.4.3.
445#[derive(Copy, Clone, Debug, Default, Hash, PartialEq, Eq)]
446pub struct ReportIdChecksum([u8; Self::LEN]);
447
448impl ReportIdChecksum {
449    /// LEN is the length of a report ID checksum in bytes.
450    pub const LEN: usize = 32;
451}
452
453impl From<[u8; Self::LEN]> for ReportIdChecksum {
454    fn from(checksum: [u8; Self::LEN]) -> Self {
455        Self(checksum)
456    }
457}
458
459impl TryFrom<&[u8]> for ReportIdChecksum {
460    type Error = Error;
461
462    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
463        Ok(Self(value.try_into().map_err(|_| {
464            Error::InvalidParameter("byte slice has incorrect length for ReportIdChecksum")
465        })?))
466    }
467}
468
469impl AsRef<[u8]> for ReportIdChecksum {
470    fn as_ref(&self) -> &[u8] {
471        &self.0
472    }
473}
474impl AsMut<[u8]> for ReportIdChecksum {
475    fn as_mut(&mut self) -> &mut [u8] {
476        &mut self.0
477    }
478}
479
480impl Display for ReportIdChecksum {
481    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
482        write!(f, "{}", hex::encode(self.0))
483    }
484}
485
486impl Encode for ReportIdChecksum {
487    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
488        bytes.extend_from_slice(&self.0);
489        Ok(())
490    }
491
492    fn encoded_len(&self) -> Option<usize> {
493        Some(Self::LEN)
494    }
495}
496
497impl Decode for ReportIdChecksum {
498    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
499        let mut checksum = Self::default();
500        bytes.read_exact(&mut checksum.0)?;
501
502        Ok(checksum)
503    }
504}
505
506#[cfg(feature = "test-util")]
507impl Distribution<ReportIdChecksum> for Standard {
508    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> ReportIdChecksum {
509        ReportIdChecksum(rng.gen())
510    }
511}
512
513/// DAP protocol message representing the different roles that participants can adopt.
514#[derive(Copy, Clone, Debug, PartialEq, Eq, TryFromPrimitive, Serialize, Deserialize)]
515#[repr(u8)]
516pub enum Role {
517    Collector = 0,
518    Client = 1,
519    Leader = 2,
520    Helper = 3,
521}
522
523impl Role {
524    /// True if this [`Role`] is one of the aggregators.
525    pub fn is_aggregator(&self) -> bool {
526        matches!(self, Role::Leader | Role::Helper)
527    }
528
529    /// Returns a VDAF aggregator ID if this [`Role`] is one of the aggregators, or `None` if the
530    /// role is not an aggregator. This is also used in [draft-wang-ppm-dap-taskprov-04][1] and earlier
531    /// to index into the `aggregator_endpoints` array.
532    ///
533    /// [1]: https://www.ietf.org/archive/id/draft-wang-ppm-dap-taskprov-04.html#section-3-4
534    pub fn index(&self) -> Option<usize> {
535        match self {
536            Role::Leader => Some(0),
537            Role::Helper => Some(1),
538            _ => None,
539        }
540    }
541
542    pub fn as_str(&self) -> &'static str {
543        match self {
544            Self::Collector => "collector",
545            Self::Client => "client",
546            Self::Leader => "leader",
547            Self::Helper => "helper",
548        }
549    }
550}
551
552#[derive(Debug, thiserror::Error)]
553#[error("unknown role {0}")]
554pub struct RoleParseError(String);
555
556impl FromStr for Role {
557    type Err = RoleParseError;
558
559    fn from_str(s: &str) -> Result<Self, Self::Err> {
560        match s {
561            "collector" => Ok(Self::Collector),
562            "client" => Ok(Self::Client),
563            "leader" => Ok(Self::Leader),
564            "helper" => Ok(Self::Helper),
565            _ => Err(RoleParseError(s.to_owned())),
566        }
567    }
568}
569
570impl Encode for Role {
571    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
572        (*self as u8).encode(bytes)
573    }
574
575    fn encoded_len(&self) -> Option<usize> {
576        Some(1)
577    }
578}
579
580impl Decode for Role {
581    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
582        let val = u8::decode(bytes)?;
583        Self::try_from(val)
584            .map_err(|_| CodecError::Other(anyhow!("unexpected Role value {val}").into()))
585    }
586}
587
588impl Display for Role {
589    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
590        write!(f, "{}", self.as_str())
591    }
592}
593
594/// DAP protocol message representing an identifier for an HPKE config.
595#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
596pub struct HpkeConfigId(u8);
597
598impl Display for HpkeConfigId {
599    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
600        write!(f, "{}", self.0)
601    }
602}
603
604impl Encode for HpkeConfigId {
605    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
606        self.0.encode(bytes)
607    }
608
609    fn encoded_len(&self) -> Option<usize> {
610        self.0.encoded_len()
611    }
612}
613
614impl Decode for HpkeConfigId {
615    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
616        Ok(Self(u8::decode(bytes)?))
617    }
618}
619
620impl From<u8> for HpkeConfigId {
621    fn from(value: u8) -> HpkeConfigId {
622        HpkeConfigId(value)
623    }
624}
625
626impl From<HpkeConfigId> for u8 {
627    fn from(id: HpkeConfigId) -> u8 {
628        id.0
629    }
630}
631
632impl Distribution<HpkeConfigId> for Standard {
633    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> HpkeConfigId {
634        HpkeConfigId(rng.gen())
635    }
636}
637
638/// DAP protocol message representing an identifier for a DAP task.
639#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
640pub struct TaskId([u8; Self::LEN]);
641
642impl TaskId {
643    /// LEN is the length of a task ID in bytes.
644    pub const LEN: usize = 32;
645}
646
647impl Debug for TaskId {
648    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
649        write!(
650            f,
651            "TaskId({})",
652            Base64Display::new(&self.0, &URL_SAFE_NO_PAD)
653        )
654    }
655}
656
657impl Display for TaskId {
658    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
659        write!(f, "{}", Base64Display::new(&self.0, &URL_SAFE_NO_PAD))
660    }
661}
662
663impl Encode for TaskId {
664    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
665        bytes.extend_from_slice(&self.0);
666        Ok(())
667    }
668
669    fn encoded_len(&self) -> Option<usize> {
670        Some(Self::LEN)
671    }
672}
673
674impl Decode for TaskId {
675    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
676        let mut decoded = [0u8; Self::LEN];
677        bytes.read_exact(&mut decoded)?;
678        Ok(Self(decoded))
679    }
680}
681
682impl From<[u8; Self::LEN]> for TaskId {
683    fn from(task_id: [u8; Self::LEN]) -> Self {
684        Self(task_id)
685    }
686}
687
688impl TryFrom<&[u8]> for TaskId {
689    type Error = Error;
690
691    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
692        Ok(Self(value.try_into().map_err(|_| {
693            Error::InvalidParameter("byte slice has incorrect length for TaskId")
694        })?))
695    }
696}
697
698impl AsRef<[u8; Self::LEN]> for TaskId {
699    fn as_ref(&self) -> &[u8; Self::LEN] {
700        &self.0
701    }
702}
703
704impl FromStr for TaskId {
705    type Err = Error;
706
707    fn from_str(s: &str) -> Result<Self, Self::Err> {
708        Self::try_from(URL_SAFE_NO_PAD.decode(s)?.as_ref())
709    }
710}
711
712impl Distribution<TaskId> for Standard {
713    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> TaskId {
714        TaskId(rng.gen())
715    }
716}
717
718/// This customized implementation serializes a [`TaskId`] as a base64url-encoded string, instead
719/// of as a byte array. This is more compact and ergonomic when serialized to YAML, and aligns with
720/// other uses of base64url encoding in DAP.
721impl Serialize for TaskId {
722    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
723    where
724        S: Serializer,
725    {
726        let encoded = URL_SAFE_NO_PAD.encode(self.as_ref());
727        serializer.serialize_str(&encoded)
728    }
729}
730
731struct TaskIdVisitor;
732
733impl Visitor<'_> for TaskIdVisitor {
734    type Value = TaskId;
735
736    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
737        formatter.write_str("a base64url-encoded string that decodes to 32 bytes")
738    }
739
740    fn visit_str<E>(self, value: &str) -> Result<TaskId, E>
741    where
742        E: de::Error,
743    {
744        let decoded = URL_SAFE_NO_PAD
745            .decode(value)
746            .map_err(|_| E::custom("invalid base64url value"))?;
747
748        TaskId::try_from(decoded.as_slice()).map_err(|e| E::custom(e))
749    }
750}
751
752/// This customized implementation deserializes a [`TaskId`] as a base64url-encoded string, instead
753/// of as a byte array. This is more compact and ergonomic when serialized to YAML, and aligns with
754/// other uses of base64url encoding in DAP.
755impl<'de> Deserialize<'de> for TaskId {
756    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
757    where
758        D: serde::Deserializer<'de>,
759    {
760        deserializer.deserialize_str(TaskIdVisitor)
761    }
762}
763
764/// DAP protocol message representing an HPKE key encapsulation mechanism.
765#[derive(
766    Clone, Copy, Debug, PartialEq, Eq, FromPrimitive, IntoPrimitive, Serialize, Deserialize, Hash,
767)]
768#[repr(u16)]
769#[non_exhaustive]
770pub enum HpkeKemId {
771    /// NIST P-256 keys and HKDF-SHA256.
772    P256HkdfSha256 = 0x0010,
773    /// NIST P-384 keys and HKDF-SHA384.
774    P384HkdfSha384 = 0x0011,
775    /// NIST P-521 keys and HKDF-SHA512.
776    P521HkdfSha512 = 0x0012,
777    /// X25519 keys and HKDF-SHA256.
778    X25519HkdfSha256 = 0x0020,
779    /// X448 keys and HKDF-SHA512.
780    X448HkdfSha512 = 0x0021,
781    /// Unrecognized algorithm identifiers.
782    #[num_enum(catch_all)]
783    Other(u16),
784}
785
786impl Encode for HpkeKemId {
787    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
788        u16::from(*self).encode(bytes)
789    }
790
791    fn encoded_len(&self) -> Option<usize> {
792        Some(2)
793    }
794}
795
796impl Decode for HpkeKemId {
797    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
798        let val = u16::decode(bytes)?;
799        Ok(Self::from(val))
800    }
801}
802
803/// DAP protocol message representing an HPKE key derivation function.
804#[derive(
805    Clone, Copy, Debug, PartialEq, Eq, FromPrimitive, IntoPrimitive, Serialize, Deserialize, Hash,
806)]
807#[repr(u16)]
808#[non_exhaustive]
809pub enum HpkeKdfId {
810    /// HMAC Key Derivation Function SHA256.
811    HkdfSha256 = 0x0001,
812    /// HMAC Key Derivation Function SHA384.
813    HkdfSha384 = 0x0002,
814    /// HMAC Key Derivation Function SHA512.
815    HkdfSha512 = 0x0003,
816    /// Unrecognized algorithm identifiers.
817    #[num_enum(catch_all)]
818    Other(u16),
819}
820
821impl Encode for HpkeKdfId {
822    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
823        u16::from(*self).encode(bytes)
824    }
825
826    fn encoded_len(&self) -> Option<usize> {
827        Some(2)
828    }
829}
830
831impl Decode for HpkeKdfId {
832    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
833        let val = u16::decode(bytes)?;
834        Ok(Self::from(val))
835    }
836}
837
838/// DAP protocol message representing an HPKE AEAD.
839#[derive(
840    Clone, Copy, Debug, PartialEq, Eq, FromPrimitive, IntoPrimitive, Serialize, Deserialize, Hash,
841)]
842#[repr(u16)]
843#[non_exhaustive]
844pub enum HpkeAeadId {
845    /// AES-128-GCM.
846    Aes128Gcm = 0x0001,
847    /// AES-256-GCM.
848    Aes256Gcm = 0x0002,
849    /// ChaCha20Poly1305.
850    ChaCha20Poly1305 = 0x0003,
851    /// Unrecognized algorithm identifiers.
852    #[num_enum(catch_all)]
853    Other(u16),
854}
855
856impl Encode for HpkeAeadId {
857    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
858        u16::from(*self).encode(bytes)
859    }
860
861    fn encoded_len(&self) -> Option<usize> {
862        Some(2)
863    }
864}
865
866impl Decode for HpkeAeadId {
867    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
868        let val = u16::decode(bytes)?;
869        Ok(Self::from(val))
870    }
871}
872
873/// DAP protocol message representing an arbitrary extension included in a client report.
874#[derive(Clone, Debug, PartialEq, Eq)]
875pub struct Extension {
876    extension_type: ExtensionType,
877    extension_data: Vec<u8>,
878}
879
880impl Extension {
881    /// Construct an extension from its type and payload.
882    pub fn new(extension_type: ExtensionType, extension_data: Vec<u8>) -> Extension {
883        Extension {
884            extension_type,
885            extension_data,
886        }
887    }
888
889    /// Returns the type of this extension.
890    pub fn extension_type(&self) -> &ExtensionType {
891        &self.extension_type
892    }
893
894    /// Returns the unparsed data representing this extension.
895    pub fn extension_data(&self) -> &[u8] {
896        &self.extension_data
897    }
898}
899
900impl Encode for Extension {
901    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
902        self.extension_type.encode(bytes)?;
903        encode_u16_items(bytes, &(), &self.extension_data)
904    }
905
906    fn encoded_len(&self) -> Option<usize> {
907        // Type, length prefix, and extension data.
908        Some(self.extension_type.encoded_len()? + 2 + self.extension_data.len())
909    }
910}
911
912impl Decode for Extension {
913    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
914        let extension_type = ExtensionType::decode(bytes)?;
915        let extension_data = decode_u16_items(&(), bytes)?;
916
917        Ok(Self {
918            extension_type,
919            extension_data,
920        })
921    }
922}
923
924/// DAP protocol message representing the type of an extension included in a client report.
925#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, TryFromPrimitive)]
926#[repr(u16)]
927#[non_exhaustive]
928pub enum ExtensionType {
929    Tbd = 0,
930    Taskprov = 0xFF00,
931}
932
933impl Encode for ExtensionType {
934    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
935        (*self as u16).encode(bytes)
936    }
937
938    fn encoded_len(&self) -> Option<usize> {
939        Some(2)
940    }
941}
942
943impl Decode for ExtensionType {
944    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
945        let val = u16::decode(bytes)?;
946        Self::try_from(val)
947            .map_err(|_| CodecError::Other(anyhow!("unexpected ExtensionType value {val}").into()))
948    }
949}
950
951/// DAP protocol message representing an HPKE ciphertext.
952#[derive(Clone, Educe, Eq, PartialEq)]
953#[educe(Debug)]
954pub struct HpkeCiphertext {
955    /// An identifier of the HPKE configuration used to seal the message.
956    config_id: HpkeConfigId,
957    /// An encapsulated HPKE key.
958    #[educe(Debug(ignore))]
959    encapsulated_key: Vec<u8>,
960    /// An HPKE ciphertext.
961    #[educe(Debug(ignore))]
962    payload: Vec<u8>,
963}
964
965impl HpkeCiphertext {
966    /// Construct a HPKE ciphertext message from its components.
967    pub fn new(
968        config_id: HpkeConfigId,
969        encapsulated_key: Vec<u8>,
970        payload: Vec<u8>,
971    ) -> HpkeCiphertext {
972        HpkeCiphertext {
973            config_id,
974            encapsulated_key,
975            payload,
976        }
977    }
978
979    /// Get the configuration identifier associated with this ciphertext.
980    pub fn config_id(&self) -> &HpkeConfigId {
981        &self.config_id
982    }
983
984    /// Get the encapsulated key from this ciphertext message.
985    pub fn encapsulated_key(&self) -> &[u8] {
986        &self.encapsulated_key
987    }
988
989    /// Get the encrypted payload from this ciphertext message.
990    pub fn payload(&self) -> &[u8] {
991        &self.payload
992    }
993}
994
995impl Encode for HpkeCiphertext {
996    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
997        self.config_id.encode(bytes)?;
998        encode_u16_items(bytes, &(), &self.encapsulated_key)?;
999        encode_u32_items(bytes, &(), &self.payload)
1000    }
1001
1002    fn encoded_len(&self) -> Option<usize> {
1003        Some(
1004            self.config_id.encoded_len()?
1005                + 2
1006                + self.encapsulated_key.len()
1007                + 4
1008                + self.payload.len(),
1009        )
1010    }
1011}
1012
1013impl Decode for HpkeCiphertext {
1014    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1015        let config_id = HpkeConfigId::decode(bytes)?;
1016        let encapsulated_key = decode_u16_items(&(), bytes)?;
1017        let payload = decode_u32_items(&(), bytes)?;
1018
1019        Ok(Self {
1020            config_id,
1021            encapsulated_key,
1022            payload,
1023        })
1024    }
1025}
1026
1027/// DAP protocol message representing an HPKE public key.
1028// TODO(#230): refactor HpkePublicKey & HpkeConfig to simplify usage
1029#[derive(Clone, PartialEq, Eq)]
1030pub struct HpkePublicKey(Vec<u8>);
1031
1032impl From<Vec<u8>> for HpkePublicKey {
1033    fn from(key: Vec<u8>) -> Self {
1034        Self(key)
1035    }
1036}
1037
1038impl AsRef<[u8]> for HpkePublicKey {
1039    fn as_ref(&self) -> &[u8] {
1040        &self.0
1041    }
1042}
1043
1044impl Encode for HpkePublicKey {
1045    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1046        encode_u16_items(bytes, &(), &self.0)
1047    }
1048
1049    fn encoded_len(&self) -> Option<usize> {
1050        Some(2 + self.0.len())
1051    }
1052}
1053
1054impl Decode for HpkePublicKey {
1055    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1056        let key = decode_u16_items(&(), bytes)?;
1057        Ok(Self(key))
1058    }
1059}
1060
1061impl Debug for HpkePublicKey {
1062    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1063        write!(f, "HpkePublicKey({self})")
1064    }
1065}
1066
1067impl Display for HpkePublicKey {
1068    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1069        write!(f, "{}", Base64Display::new(&self.0, &URL_SAFE_NO_PAD))
1070    }
1071}
1072
1073impl FromStr for HpkePublicKey {
1074    type Err = Error;
1075
1076    fn from_str(s: &str) -> Result<Self, Self::Err> {
1077        Ok(Self::from(URL_SAFE_NO_PAD.decode(s)?))
1078    }
1079}
1080
1081/// This customized implementation serializes a [`HpkePublicKey`] as a base64url-encoded string,
1082/// instead of as a byte array. This is more compact and ergonomic when serialized to YAML.
1083impl Serialize for HpkePublicKey {
1084    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1085    where
1086        S: Serializer,
1087    {
1088        let encoded = URL_SAFE_NO_PAD.encode(self.as_ref());
1089        serializer.serialize_str(&encoded)
1090    }
1091}
1092
1093struct HpkePublicKeyVisitor;
1094
1095impl Visitor<'_> for HpkePublicKeyVisitor {
1096    type Value = HpkePublicKey;
1097
1098    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
1099        formatter.write_str("a base64url-encoded string")
1100    }
1101
1102    fn visit_str<E>(self, value: &str) -> Result<HpkePublicKey, E>
1103    where
1104        E: de::Error,
1105    {
1106        let decoded = URL_SAFE_NO_PAD
1107            .decode(value)
1108            .map_err(|_| E::custom("invalid base64url value"))?;
1109        Ok(HpkePublicKey::from(decoded))
1110    }
1111}
1112
1113/// This customized implementation deserializes a [`HpkePublicKey`] as a base64url-encoded string,
1114/// instead of as a byte array. This is more compact and ergonomic when serialized to YAML.
1115impl<'de> Deserialize<'de> for HpkePublicKey {
1116    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1117    where
1118        D: serde::Deserializer<'de>,
1119    {
1120        deserializer.deserialize_str(HpkePublicKeyVisitor)
1121    }
1122}
1123
1124/// DAP protocol message representing an HPKE config.
1125#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1126pub struct HpkeConfig {
1127    id: HpkeConfigId,
1128    kem_id: HpkeKemId,
1129    kdf_id: HpkeKdfId,
1130    aead_id: HpkeAeadId,
1131    public_key: HpkePublicKey,
1132}
1133
1134impl HpkeConfig {
1135    /// Construct a HPKE configuration message from its components.
1136    pub fn new(
1137        id: HpkeConfigId,
1138        kem_id: HpkeKemId,
1139        kdf_id: HpkeKdfId,
1140        aead_id: HpkeAeadId,
1141        public_key: HpkePublicKey,
1142    ) -> HpkeConfig {
1143        HpkeConfig {
1144            id,
1145            kem_id,
1146            kdf_id,
1147            aead_id,
1148            public_key,
1149        }
1150    }
1151
1152    /// Returns the HPKE config ID associated with this HPKE configuration.
1153    pub fn id(&self) -> &HpkeConfigId {
1154        &self.id
1155    }
1156
1157    /// Retrieve the key encapsulation mechanism algorithm identifier associated with this HPKE configuration.
1158    pub fn kem_id(&self) -> &HpkeKemId {
1159        &self.kem_id
1160    }
1161
1162    /// Retrieve the key derivation function algorithm identifier associated with this HPKE configuration.
1163    pub fn kdf_id(&self) -> &HpkeKdfId {
1164        &self.kdf_id
1165    }
1166
1167    /// Retrieve the AEAD algorithm identifier associated with this HPKE configuration.
1168    pub fn aead_id(&self) -> &HpkeAeadId {
1169        &self.aead_id
1170    }
1171
1172    /// Retrieve the public key from this HPKE configuration.
1173    pub fn public_key(&self) -> &HpkePublicKey {
1174        &self.public_key
1175    }
1176}
1177
1178impl Encode for HpkeConfig {
1179    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1180        self.id.encode(bytes)?;
1181        self.kem_id.encode(bytes)?;
1182        self.kdf_id.encode(bytes)?;
1183        self.aead_id.encode(bytes)?;
1184        self.public_key.encode(bytes)
1185    }
1186
1187    fn encoded_len(&self) -> Option<usize> {
1188        Some(
1189            self.id.encoded_len()?
1190                + self.kem_id.encoded_len()?
1191                + self.kdf_id.encoded_len()?
1192                + self.aead_id.encoded_len()?
1193                + self.public_key.encoded_len()?,
1194        )
1195    }
1196}
1197
1198impl Decode for HpkeConfig {
1199    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1200        let id = HpkeConfigId::decode(bytes)?;
1201        let kem_id = HpkeKemId::decode(bytes)?;
1202        let kdf_id = HpkeKdfId::decode(bytes)?;
1203        let aead_id = HpkeAeadId::decode(bytes)?;
1204        let public_key = HpkePublicKey::decode(bytes)?;
1205
1206        Ok(Self {
1207            id,
1208            kem_id,
1209            kdf_id,
1210            aead_id,
1211            public_key,
1212        })
1213    }
1214}
1215
1216/// DAP protocol message representing a list of HPKE configurations.
1217#[derive(Clone, Debug, PartialEq, Eq)]
1218pub struct HpkeConfigList(Vec<HpkeConfig>);
1219
1220impl HpkeConfigList {
1221    /// The media type associated with this protocol message.
1222    pub const MEDIA_TYPE: &'static str = "application/dap-hpke-config-list";
1223
1224    /// Construct an HPKE configuration list.
1225    pub fn new(hpke_configs: Vec<HpkeConfig>) -> Self {
1226        Self(hpke_configs)
1227    }
1228
1229    pub fn hpke_configs(&self) -> &[HpkeConfig] {
1230        &self.0
1231    }
1232}
1233
1234impl Encode for HpkeConfigList {
1235    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1236        encode_u16_items(bytes, &(), &self.0)
1237    }
1238
1239    fn encoded_len(&self) -> Option<usize> {
1240        let mut length = 2;
1241        for hpke_config in self.0.iter() {
1242            length += hpke_config.encoded_len()?;
1243        }
1244        Some(length)
1245    }
1246}
1247
1248impl Decode for HpkeConfigList {
1249    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1250        Ok(Self(decode_u16_items(&(), bytes)?))
1251    }
1252}
1253
1254/// DAP protocol message representing client report metadata.
1255#[derive(Clone, Debug, PartialEq, Eq)]
1256pub struct ReportMetadata {
1257    report_id: ReportId,
1258    time: Time,
1259}
1260
1261impl ReportMetadata {
1262    /// Construct a report's metadata from its components.
1263    pub fn new(report_id: ReportId, time: Time) -> Self {
1264        Self { report_id, time }
1265    }
1266
1267    /// Retrieve the report ID from this report metadata.
1268    pub fn id(&self) -> &ReportId {
1269        &self.report_id
1270    }
1271
1272    /// Retrieve the client timestamp from this report metadata.
1273    pub fn time(&self) -> &Time {
1274        &self.time
1275    }
1276}
1277
1278impl Encode for ReportMetadata {
1279    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1280        self.report_id.encode(bytes)?;
1281        self.time.encode(bytes)
1282    }
1283
1284    fn encoded_len(&self) -> Option<usize> {
1285        Some(self.report_id.encoded_len()? + self.time.encoded_len()?)
1286    }
1287}
1288
1289impl Decode for ReportMetadata {
1290    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1291        let report_id = ReportId::decode(bytes)?;
1292        let time = Time::decode(bytes)?;
1293
1294        Ok(Self { report_id, time })
1295    }
1296}
1297
1298/// DAP protocol message representing the plaintext of an input share.
1299#[derive(Clone, Debug, PartialEq, Eq)]
1300pub struct PlaintextInputShare {
1301    extensions: Vec<Extension>,
1302    payload: Vec<u8>,
1303}
1304
1305impl PlaintextInputShare {
1306    /// Construct a plaintext input share from its components.
1307    pub fn new(extensions: Vec<Extension>, payload: Vec<u8>) -> Self {
1308        Self {
1309            extensions,
1310            payload,
1311        }
1312    }
1313
1314    /// Retrieve the extensions from this plaintext input share.
1315    pub fn extensions(&self) -> &[Extension] {
1316        &self.extensions
1317    }
1318
1319    /// Retrieve the payload from this plaintext input share.
1320    pub fn payload(&self) -> &[u8] {
1321        &self.payload
1322    }
1323}
1324
1325impl Encode for PlaintextInputShare {
1326    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1327        encode_u16_items(bytes, &(), &self.extensions)?;
1328        encode_u32_items(bytes, &(), &self.payload)
1329    }
1330
1331    fn encoded_len(&self) -> Option<usize> {
1332        let mut length = 2;
1333        for extension in self.extensions.iter() {
1334            length += extension.encoded_len()?;
1335        }
1336        length += 4;
1337        length += self.payload.len();
1338        Some(length)
1339    }
1340}
1341
1342impl Decode for PlaintextInputShare {
1343    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1344        let extensions = decode_u16_items(&(), bytes)?;
1345        let payload = decode_u32_items(&(), bytes)?;
1346
1347        Ok(Self {
1348            extensions,
1349            payload,
1350        })
1351    }
1352}
1353
1354/// DAP protocol message representing a client report.
1355#[derive(Clone, Debug, PartialEq, Eq)]
1356pub struct Report {
1357    metadata: ReportMetadata,
1358    public_share: Vec<u8>,
1359    leader_encrypted_input_share: HpkeCiphertext,
1360    helper_encrypted_input_share: HpkeCiphertext,
1361}
1362
1363impl Report {
1364    /// The media type associated with this protocol message.
1365    pub const MEDIA_TYPE: &'static str = "application/dap-report";
1366
1367    /// Construct a report from its components.
1368    pub fn new(
1369        metadata: ReportMetadata,
1370        public_share: Vec<u8>,
1371        leader_encrypted_input_share: HpkeCiphertext,
1372        helper_encrypted_input_share: HpkeCiphertext,
1373    ) -> Self {
1374        Self {
1375            metadata,
1376            public_share,
1377            leader_encrypted_input_share,
1378            helper_encrypted_input_share,
1379        }
1380    }
1381
1382    /// Retrieve the metadata from this report.
1383    pub fn metadata(&self) -> &ReportMetadata {
1384        &self.metadata
1385    }
1386
1387    /// Retrieve the public share from this report.
1388    pub fn public_share(&self) -> &[u8] {
1389        &self.public_share
1390    }
1391
1392    /// Retrieve the encrypted leader input share from this report.
1393    pub fn leader_encrypted_input_share(&self) -> &HpkeCiphertext {
1394        &self.leader_encrypted_input_share
1395    }
1396
1397    /// Retrieve the encrypted helper input share from this report.
1398    pub fn helper_encrypted_input_share(&self) -> &HpkeCiphertext {
1399        &self.helper_encrypted_input_share
1400    }
1401}
1402
1403impl Encode for Report {
1404    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1405        self.metadata.encode(bytes)?;
1406        encode_u32_items(bytes, &(), &self.public_share)?;
1407        self.leader_encrypted_input_share.encode(bytes)?;
1408        self.helper_encrypted_input_share.encode(bytes)
1409    }
1410
1411    fn encoded_len(&self) -> Option<usize> {
1412        let mut length = self.metadata.encoded_len()?;
1413        length += 4;
1414        length += self.public_share.len();
1415        length += self.leader_encrypted_input_share.encoded_len()?;
1416        length += self.helper_encrypted_input_share.encoded_len()?;
1417        Some(length)
1418    }
1419}
1420
1421impl Decode for Report {
1422    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1423        let metadata = ReportMetadata::decode(bytes)?;
1424        let public_share = decode_u32_items(&(), bytes)?;
1425        let leader_encrypted_input_share = HpkeCiphertext::decode(bytes)?;
1426        let helper_encrypted_input_share = HpkeCiphertext::decode(bytes)?;
1427
1428        Ok(Self {
1429            metadata,
1430            public_share,
1431            leader_encrypted_input_share,
1432            helper_encrypted_input_share,
1433        })
1434    }
1435}
1436
1437/// DAP protocol message representing a fixed-size query.
1438#[derive(Clone, Debug, PartialEq, Eq)]
1439pub enum FixedSizeQuery {
1440    ByBatchId { batch_id: BatchId },
1441    CurrentBatch,
1442}
1443
1444impl Encode for FixedSizeQuery {
1445    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1446        match self {
1447            FixedSizeQuery::ByBatchId { batch_id } => {
1448                0u8.encode(bytes)?;
1449                batch_id.encode(bytes)
1450            }
1451            FixedSizeQuery::CurrentBatch => 1u8.encode(bytes),
1452        }
1453    }
1454
1455    fn encoded_len(&self) -> Option<usize> {
1456        match self {
1457            FixedSizeQuery::ByBatchId { batch_id } => Some(1 + batch_id.encoded_len()?),
1458            FixedSizeQuery::CurrentBatch => Some(1),
1459        }
1460    }
1461}
1462
1463impl Decode for FixedSizeQuery {
1464    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1465        let query_type = u8::decode(bytes)?;
1466        match query_type {
1467            0 => {
1468                let batch_id = BatchId::decode(bytes)?;
1469                Ok(FixedSizeQuery::ByBatchId { batch_id })
1470            }
1471            1 => Ok(FixedSizeQuery::CurrentBatch),
1472            _ => Err(CodecError::Other(
1473                anyhow!("unexpected FixedSizeQueryType value {query_type}").into(),
1474            )),
1475        }
1476    }
1477}
1478
1479/// Represents a query for a specific batch identifier, received from a Collector as part of the
1480/// collection flow.
1481#[derive(Clone, Debug, PartialEq, Eq)]
1482pub struct Query<Q: QueryType> {
1483    query_body: Q::QueryBody,
1484}
1485
1486impl<Q: QueryType> Query<Q> {
1487    /// Constructs a new query from its components.
1488    ///
1489    /// This method would typically be used for code which is generic over the query type.
1490    /// Query-type specific code will typically call one of [`Self::new_time_interval`] or
1491    /// [`Self::new_fixed_size`].
1492    pub fn new(query_body: Q::QueryBody) -> Self {
1493        Self { query_body }
1494    }
1495
1496    /// Gets the query body included in this query.
1497    ///
1498    /// This method would typically be used for code which is generic over the query type.
1499    /// Query-type specific code will typically call one of [`Self::batch_interval`] or
1500    /// [`Self::fixed_size_query`].
1501    pub fn query_body(&self) -> &Q::QueryBody {
1502        &self.query_body
1503    }
1504}
1505
1506impl Query<TimeInterval> {
1507    /// Constructs a new query for a time-interval task.
1508    pub fn new_time_interval(batch_interval: Interval) -> Self {
1509        Self::new(batch_interval)
1510    }
1511
1512    /// Gets the batch interval associated with this query.
1513    pub fn batch_interval(&self) -> &Interval {
1514        self.query_body()
1515    }
1516}
1517
1518impl Query<FixedSize> {
1519    /// Constructs a new query for a fixed-size task.
1520    pub fn new_fixed_size(fixed_size_query: FixedSizeQuery) -> Self {
1521        Self::new(fixed_size_query)
1522    }
1523
1524    /// Gets the fixed size query associated with this query.
1525    pub fn fixed_size_query(&self) -> &FixedSizeQuery {
1526        self.query_body()
1527    }
1528}
1529
1530impl<Q: QueryType> Encode for Query<Q> {
1531    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1532        Q::CODE.encode(bytes)?;
1533        self.query_body.encode(bytes)
1534    }
1535
1536    fn encoded_len(&self) -> Option<usize> {
1537        Some(1 + self.query_body.encoded_len()?)
1538    }
1539}
1540
1541impl<Q: QueryType> Decode for Query<Q> {
1542    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1543        query_type::Code::decode_expecting_value(bytes, Q::CODE)?;
1544        let query_body = Q::QueryBody::decode(bytes)?;
1545
1546        Ok(Self { query_body })
1547    }
1548}
1549
1550/// DAP protocol message representing a request from the collector to the leader to provide
1551/// aggregate shares for a given batch.
1552#[derive(Clone, Educe, PartialEq, Eq)]
1553#[educe(Debug)]
1554pub struct CollectionReq<Q: QueryType> {
1555    query: Query<Q>,
1556    #[educe(Debug(ignore))]
1557    aggregation_parameter: Vec<u8>,
1558}
1559
1560impl<Q: QueryType> CollectionReq<Q> {
1561    /// The media type associated with this protocol message.
1562    pub const MEDIA_TYPE: &'static str = "application/dap-collect-req";
1563
1564    /// Constructs a new collect request from its components.
1565    pub fn new(query: Query<Q>, aggregation_parameter: Vec<u8>) -> Self {
1566        Self {
1567            query,
1568            aggregation_parameter,
1569        }
1570    }
1571
1572    /// Gets the query associated with this collect request.
1573    pub fn query(&self) -> &Query<Q> {
1574        &self.query
1575    }
1576
1577    /// Gets the aggregation parameter associated with this collect request.
1578    pub fn aggregation_parameter(&self) -> &[u8] {
1579        &self.aggregation_parameter
1580    }
1581}
1582
1583impl<Q: QueryType> Encode for CollectionReq<Q> {
1584    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1585        self.query.encode(bytes)?;
1586        encode_u32_items(bytes, &(), &self.aggregation_parameter)
1587    }
1588
1589    fn encoded_len(&self) -> Option<usize> {
1590        Some(self.query.encoded_len()? + 4 + self.aggregation_parameter.len())
1591    }
1592}
1593
1594impl<Q: QueryType> Decode for CollectionReq<Q> {
1595    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1596        let query = Query::decode(bytes)?;
1597        let aggregation_parameter = decode_u32_items(&(), bytes)?;
1598
1599        Ok(Self {
1600            query,
1601            aggregation_parameter,
1602        })
1603    }
1604}
1605
1606/// DAP protocol message representing a partial batch selector, identifying a batch of interest in
1607/// cases where some query types can infer the selector.
1608#[derive(Clone, Debug, PartialEq, Eq)]
1609pub struct PartialBatchSelector<Q: QueryType> {
1610    batch_identifier: Q::PartialBatchIdentifier,
1611}
1612
1613impl<Q: QueryType> PartialBatchSelector<Q> {
1614    /// Constructs a new partial batch selector.
1615    ///
1616    /// This method would typically be used for code which is generic over the query type.
1617    /// Query-type specific code will typically call one of [`Self::new_time_interval`] or
1618    /// [`Self::new_fixed_size`].
1619    pub fn new(batch_identifier: Q::PartialBatchIdentifier) -> Self {
1620        Self { batch_identifier }
1621    }
1622
1623    /// Gets the batch identifier associated with this collect response.
1624    ///
1625    /// This method would typically be used for code which is generic over the query type.
1626    /// Query-type specific code will typically call [`Self::batch_id`].
1627    pub fn batch_identifier(&self) -> &Q::PartialBatchIdentifier {
1628        &self.batch_identifier
1629    }
1630}
1631
1632impl PartialBatchSelector<TimeInterval> {
1633    /// Constructs a new partial batch selector for a time-interval task.
1634    pub fn new_time_interval() -> Self {
1635        Self::new(())
1636    }
1637}
1638
1639impl PartialBatchSelector<FixedSize> {
1640    /// Constructs a new partial batch selector for a fixed-size task.
1641    pub fn new_fixed_size(batch_id: BatchId) -> Self {
1642        Self::new(batch_id)
1643    }
1644
1645    /// Gets the batch ID associated with this partial batch selector.
1646    pub fn batch_id(&self) -> &BatchId {
1647        self.batch_identifier()
1648    }
1649}
1650
1651impl<Q: QueryType> Encode for PartialBatchSelector<Q> {
1652    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1653        Q::CODE.encode(bytes)?;
1654        self.batch_identifier.encode(bytes)
1655    }
1656
1657    fn encoded_len(&self) -> Option<usize> {
1658        Some(1 + self.batch_identifier.encoded_len()?)
1659    }
1660}
1661
1662impl<Q: QueryType> Decode for PartialBatchSelector<Q> {
1663    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1664        query_type::Code::decode_expecting_value(bytes, Q::CODE)?;
1665        let batch_identifier = Q::PartialBatchIdentifier::decode(bytes)?;
1666
1667        Ok(Self { batch_identifier })
1668    }
1669}
1670
1671/// DAP protocol message representing an identifier for a collection.
1672#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
1673pub struct CollectionJobId([u8; Self::LEN]);
1674
1675impl CollectionJobId {
1676    /// LEN is the length of a collection ID in bytes.
1677    pub const LEN: usize = 16;
1678}
1679
1680impl AsRef<[u8; Self::LEN]> for CollectionJobId {
1681    fn as_ref(&self) -> &[u8; Self::LEN] {
1682        &self.0
1683    }
1684}
1685
1686impl TryFrom<&[u8]> for CollectionJobId {
1687    type Error = Error;
1688
1689    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
1690        Ok(Self(value.try_into().map_err(|_| {
1691            Error::InvalidParameter("byte slice has incorrect length for CollectionId")
1692        })?))
1693    }
1694}
1695
1696impl FromStr for CollectionJobId {
1697    type Err = Error;
1698
1699    fn from_str(s: &str) -> Result<Self, Self::Err> {
1700        Self::try_from(URL_SAFE_NO_PAD.decode(s)?.as_ref())
1701    }
1702}
1703
1704impl Debug for CollectionJobId {
1705    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1706        write!(
1707            f,
1708            "CollectionJobId({})",
1709            Base64Display::new(&self.0, &URL_SAFE_NO_PAD)
1710        )
1711    }
1712}
1713
1714impl Display for CollectionJobId {
1715    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1716        write!(f, "{}", Base64Display::new(&self.0, &URL_SAFE_NO_PAD))
1717    }
1718}
1719
1720impl Distribution<CollectionJobId> for Standard {
1721    fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> CollectionJobId {
1722        CollectionJobId(rng.gen())
1723    }
1724}
1725
1726/// DAP protocol message representing a leader's response to the collector's request to provide
1727/// aggregate shares for a given query.
1728#[derive(Clone, Debug, PartialEq, Eq)]
1729pub struct Collection<Q: QueryType> {
1730    partial_batch_selector: PartialBatchSelector<Q>,
1731    report_count: u64,
1732    interval: Interval,
1733    leader_encrypted_agg_share: HpkeCiphertext,
1734    helper_encrypted_agg_share: HpkeCiphertext,
1735}
1736
1737impl<Q: QueryType> Collection<Q> {
1738    /// The media type associated with this protocol message.
1739    pub const MEDIA_TYPE: &'static str = "application/dap-collection";
1740
1741    /// Constructs a new collection.
1742    pub fn new(
1743        partial_batch_selector: PartialBatchSelector<Q>,
1744        report_count: u64,
1745        interval: Interval,
1746        leader_encrypted_agg_share: HpkeCiphertext,
1747        helper_encrypted_agg_share: HpkeCiphertext,
1748    ) -> Self {
1749        Self {
1750            partial_batch_selector,
1751            report_count,
1752            interval,
1753            leader_encrypted_agg_share,
1754            helper_encrypted_agg_share,
1755        }
1756    }
1757
1758    /// Retrieves the batch selector associated with this collection.
1759    pub fn partial_batch_selector(&self) -> &PartialBatchSelector<Q> {
1760        &self.partial_batch_selector
1761    }
1762
1763    /// Retrieves the number of reports that were aggregated into this collection.
1764    pub fn report_count(&self) -> u64 {
1765        self.report_count
1766    }
1767
1768    /// Retrieves the interval spanned by the reports aggregated into this collection.
1769    pub fn interval(&self) -> &Interval {
1770        &self.interval
1771    }
1772
1773    /// Retrieves the leader encrypted aggregate share associated with this collection.
1774    pub fn leader_encrypted_aggregate_share(&self) -> &HpkeCiphertext {
1775        &self.leader_encrypted_agg_share
1776    }
1777
1778    /// Retrieves the helper encrypted aggregate share associated with this collection.
1779    pub fn helper_encrypted_aggregate_share(&self) -> &HpkeCiphertext {
1780        &self.helper_encrypted_agg_share
1781    }
1782}
1783
1784impl<Q: QueryType> Encode for Collection<Q> {
1785    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1786        self.partial_batch_selector.encode(bytes)?;
1787        self.report_count.encode(bytes)?;
1788        self.interval.encode(bytes)?;
1789        self.leader_encrypted_agg_share.encode(bytes)?;
1790        self.helper_encrypted_agg_share.encode(bytes)
1791    }
1792
1793    fn encoded_len(&self) -> Option<usize> {
1794        Some(
1795            self.partial_batch_selector.encoded_len()?
1796                + self.report_count.encoded_len()?
1797                + self.interval.encoded_len()?
1798                + self.leader_encrypted_agg_share.encoded_len()?
1799                + self.helper_encrypted_agg_share.encoded_len()?,
1800        )
1801    }
1802}
1803
1804impl<Q: QueryType> Decode for Collection<Q> {
1805    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1806        let partial_batch_selector = PartialBatchSelector::decode(bytes)?;
1807        let report_count = u64::decode(bytes)?;
1808        let interval = Interval::decode(bytes)?;
1809        let leader_encrypted_agg_share = HpkeCiphertext::decode(bytes)?;
1810        let helper_encrypted_agg_share = HpkeCiphertext::decode(bytes)?;
1811
1812        Ok(Self {
1813            partial_batch_selector,
1814            report_count,
1815            interval,
1816            leader_encrypted_agg_share,
1817            helper_encrypted_agg_share,
1818        })
1819    }
1820}
1821
1822/// DAP message representing the additional associated data for an input share encryption operation.
1823#[derive(Clone, Debug, PartialEq, Eq)]
1824pub struct InputShareAad {
1825    task_id: TaskId,
1826    metadata: ReportMetadata,
1827    public_share: Vec<u8>,
1828}
1829
1830impl InputShareAad {
1831    /// Constructs a new input share AAD.
1832    pub fn new(task_id: TaskId, metadata: ReportMetadata, public_share: Vec<u8>) -> Self {
1833        Self {
1834            task_id,
1835            metadata,
1836            public_share,
1837        }
1838    }
1839
1840    /// Retrieves the task ID associated with this input share AAD.
1841    pub fn task_id(&self) -> &TaskId {
1842        &self.task_id
1843    }
1844
1845    /// Retrieves the report metadata associated with this input share AAD.
1846    pub fn metadata(&self) -> &ReportMetadata {
1847        &self.metadata
1848    }
1849
1850    /// Retrieves the public share associated with this input share AAD.
1851    pub fn public_share(&self) -> &[u8] {
1852        &self.public_share
1853    }
1854}
1855
1856impl Encode for InputShareAad {
1857    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1858        self.task_id.encode(bytes)?;
1859        self.metadata.encode(bytes)?;
1860        encode_u32_items(bytes, &(), &self.public_share)
1861    }
1862
1863    fn encoded_len(&self) -> Option<usize> {
1864        Some(
1865            self.task_id.encoded_len()?
1866                + self.metadata.encoded_len()?
1867                + 4
1868                + self.public_share.len(),
1869        )
1870    }
1871}
1872
1873impl Decode for InputShareAad {
1874    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1875        let task_id = TaskId::decode(bytes)?;
1876        let metadata = ReportMetadata::decode(bytes)?;
1877        let public_share = decode_u32_items(&(), bytes)?;
1878
1879        Ok(Self {
1880            task_id,
1881            metadata,
1882            public_share,
1883        })
1884    }
1885}
1886
1887/// DAP message representing the additional associated data for an aggregate share encryption
1888/// operation.
1889#[derive(Clone, Debug, PartialEq, Eq)]
1890pub struct AggregateShareAad<Q: QueryType> {
1891    task_id: TaskId,
1892    aggregation_parameter: Vec<u8>,
1893    batch_selector: BatchSelector<Q>,
1894}
1895
1896impl<Q: QueryType> AggregateShareAad<Q> {
1897    /// Constructs a new aggregate share AAD.
1898    pub fn new(
1899        task_id: TaskId,
1900        aggregation_parameter: Vec<u8>,
1901        batch_selector: BatchSelector<Q>,
1902    ) -> Self {
1903        Self {
1904            task_id,
1905            aggregation_parameter,
1906            batch_selector,
1907        }
1908    }
1909
1910    /// Retrieves the task ID associated with this aggregate share AAD.
1911    pub fn task_id(&self) -> &TaskId {
1912        &self.task_id
1913    }
1914
1915    /// Retrieves the aggregation parameter associated with this aggregate share AAD.
1916    pub fn aggregation_parameter(&self) -> &[u8] {
1917        &self.aggregation_parameter
1918    }
1919
1920    /// Retrieves the batch selector associated with this aggregate share AAD.
1921    pub fn batch_selector(&self) -> &BatchSelector<Q> {
1922        &self.batch_selector
1923    }
1924}
1925
1926impl<Q: QueryType> Encode for AggregateShareAad<Q> {
1927    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1928        self.task_id.encode(bytes)?;
1929        encode_u32_items(bytes, &(), &self.aggregation_parameter)?;
1930        self.batch_selector.encode(bytes)
1931    }
1932
1933    fn encoded_len(&self) -> Option<usize> {
1934        Some(
1935            self.task_id.encoded_len()?
1936                + 4
1937                + self.aggregation_parameter.len()
1938                + self.batch_selector.encoded_len()?,
1939        )
1940    }
1941}
1942
1943impl<Q: QueryType> Decode for AggregateShareAad<Q> {
1944    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
1945        let task_id = TaskId::decode(bytes)?;
1946        let aggregation_parameter = decode_u32_items(&(), bytes)?;
1947        let batch_selector = BatchSelector::decode(bytes)?;
1948
1949        Ok(Self {
1950            task_id,
1951            aggregation_parameter,
1952            batch_selector,
1953        })
1954    }
1955}
1956
1957/// DAP protocol message representing one aggregator's share of a single client report.
1958#[derive(Educe, Clone, PartialEq, Eq)]
1959#[educe(Debug)]
1960pub struct ReportShare {
1961    metadata: ReportMetadata,
1962    #[educe(Debug(ignore))]
1963    public_share: Vec<u8>,
1964    encrypted_input_share: HpkeCiphertext,
1965}
1966
1967impl ReportShare {
1968    /// Constructs a new report share from its components.
1969    pub fn new(
1970        metadata: ReportMetadata,
1971        public_share: Vec<u8>,
1972        encrypted_input_share: HpkeCiphertext,
1973    ) -> Self {
1974        Self {
1975            metadata,
1976            public_share,
1977            encrypted_input_share,
1978        }
1979    }
1980
1981    /// Gets the metadata associated with this report share.
1982    pub fn metadata(&self) -> &ReportMetadata {
1983        &self.metadata
1984    }
1985
1986    /// Gets the public share associated with this report share.
1987    pub fn public_share(&self) -> &[u8] {
1988        &self.public_share
1989    }
1990
1991    /// Gets the encrypted input share associated with this report share.
1992    pub fn encrypted_input_share(&self) -> &HpkeCiphertext {
1993        &self.encrypted_input_share
1994    }
1995}
1996
1997impl Encode for ReportShare {
1998    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
1999        self.metadata.encode(bytes)?;
2000        encode_u32_items(bytes, &(), &self.public_share)?;
2001        self.encrypted_input_share.encode(bytes)
2002    }
2003
2004    fn encoded_len(&self) -> Option<usize> {
2005        Some(
2006            self.metadata.encoded_len()?
2007                + 4
2008                + self.public_share.len()
2009                + self.encrypted_input_share.encoded_len()?,
2010        )
2011    }
2012}
2013
2014impl Decode for ReportShare {
2015    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2016        let metadata = ReportMetadata::decode(bytes)?;
2017        let public_share = decode_u32_items(&(), bytes)?;
2018        let encrypted_input_share = HpkeCiphertext::decode(bytes)?;
2019
2020        Ok(Self {
2021            metadata,
2022            public_share,
2023            encrypted_input_share,
2024        })
2025    }
2026}
2027
2028/// DAP protocol message representing information required to initialize preparation of a report for
2029/// aggregation.
2030#[derive(Clone, Debug, PartialEq, Eq)]
2031pub struct PrepareInit {
2032    report_share: ReportShare,
2033    message: PingPongMessage,
2034}
2035
2036impl PrepareInit {
2037    /// Constructs a new preparation initialization message from its components.
2038    pub fn new(report_share: ReportShare, message: PingPongMessage) -> Self {
2039        Self {
2040            report_share,
2041            message,
2042        }
2043    }
2044
2045    /// Gets the report share associated with this prep init.
2046    pub fn report_share(&self) -> &ReportShare {
2047        &self.report_share
2048    }
2049
2050    /// Gets the message associated with this prep init.
2051    pub fn message(&self) -> &PingPongMessage {
2052        &self.message
2053    }
2054}
2055
2056impl Encode for PrepareInit {
2057    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2058        self.report_share.encode(bytes)?;
2059        let encoded_message = self.message.get_encoded()?;
2060        encode_u32_items(bytes, &(), &encoded_message)
2061    }
2062
2063    fn encoded_len(&self) -> Option<usize> {
2064        Some(self.report_share.encoded_len()? + 4 + self.message.encoded_len()?)
2065    }
2066}
2067
2068impl Decode for PrepareInit {
2069    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2070        let report_share = ReportShare::decode(bytes)?;
2071        let message_bytes = decode_u32_items(&(), bytes)?;
2072        let message = PingPongMessage::get_decoded(&message_bytes)?;
2073
2074        Ok(Self {
2075            report_share,
2076            message,
2077        })
2078    }
2079}
2080
2081/// DAP protocol message representing the response to a preparation step in a VDAF evaluation.
2082#[derive(Clone, Debug, PartialEq, Eq)]
2083pub struct PrepareResp {
2084    report_id: ReportId,
2085    result: PrepareStepResult,
2086}
2087
2088impl PrepareResp {
2089    /// Constructs a new prepare step from its components.
2090    pub fn new(report_id: ReportId, result: PrepareStepResult) -> Self {
2091        Self { report_id, result }
2092    }
2093
2094    /// Gets the report ID associated with this prepare step.
2095    pub fn report_id(&self) -> &ReportId {
2096        &self.report_id
2097    }
2098
2099    /// Gets the result associated with this prepare step.
2100    pub fn result(&self) -> &PrepareStepResult {
2101        &self.result
2102    }
2103}
2104
2105impl Encode for PrepareResp {
2106    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2107        self.report_id.encode(bytes)?;
2108        self.result.encode(bytes)
2109    }
2110
2111    fn encoded_len(&self) -> Option<usize> {
2112        Some(self.report_id.encoded_len()? + self.result.encoded_len()?)
2113    }
2114}
2115
2116impl Decode for PrepareResp {
2117    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2118        let report_id = ReportId::decode(bytes)?;
2119        let result = PrepareStepResult::decode(bytes)?;
2120
2121        Ok(Self { report_id, result })
2122    }
2123}
2124
2125/// DAP protocol message representing result-type-specific data associated with a preparation step
2126/// in a VDAF evaluation. Included in a PrepareResp message.
2127#[derive(Clone, Educe, PartialEq, Eq)]
2128#[educe(Debug)]
2129pub enum PrepareStepResult {
2130    Continue {
2131        #[educe(Debug(ignore))]
2132        message: PingPongMessage,
2133    },
2134    Finished,
2135    Reject(PrepareError),
2136}
2137
2138impl Encode for PrepareStepResult {
2139    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2140        // The encoding includes an implicit discriminator byte, called PrepareStepResult in the
2141        // DAP spec.
2142        match self {
2143            Self::Continue { message: prep_msg } => {
2144                0u8.encode(bytes)?;
2145                let encoded_prep_msg = prep_msg.get_encoded()?;
2146                encode_u32_items(bytes, &(), &encoded_prep_msg)
2147            }
2148            Self::Finished => 1u8.encode(bytes),
2149            Self::Reject(error) => {
2150                2u8.encode(bytes)?;
2151                error.encode(bytes)
2152            }
2153        }
2154    }
2155
2156    fn encoded_len(&self) -> Option<usize> {
2157        match self {
2158            Self::Continue { message: prep_msg } => Some(1 + 4 + prep_msg.encoded_len()?),
2159            Self::Finished => Some(1),
2160            Self::Reject(error) => Some(1 + error.encoded_len()?),
2161        }
2162    }
2163}
2164
2165impl Decode for PrepareStepResult {
2166    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2167        let val = u8::decode(bytes)?;
2168        Ok(match val {
2169            0 => {
2170                let prep_msg_bytes = decode_u32_items(&(), bytes)?;
2171                let prep_msg = PingPongMessage::get_decoded(&prep_msg_bytes)?;
2172                Self::Continue { message: prep_msg }
2173            }
2174            1 => Self::Finished,
2175            2 => Self::Reject(PrepareError::decode(bytes)?),
2176            _ => return Err(CodecError::UnexpectedValue),
2177        })
2178    }
2179}
2180
2181/// DAP protocol message representing an error while preparing a report share for aggregation.
2182#[derive(Clone, Copy, Debug, PartialEq, Eq, TryFromPrimitive)]
2183#[repr(u8)]
2184pub enum PrepareError {
2185    BatchCollected = 0,
2186    ReportReplayed = 1,
2187    ReportDropped = 2,
2188    HpkeUnknownConfigId = 3,
2189    HpkeDecryptError = 4,
2190    VdafPrepError = 5,
2191    BatchSaturated = 6,
2192    TaskExpired = 7,
2193    InvalidMessage = 8,
2194    ReportTooEarly = 9,
2195}
2196
2197impl Encode for PrepareError {
2198    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2199        (*self as u8).encode(bytes)
2200    }
2201
2202    fn encoded_len(&self) -> Option<usize> {
2203        Some(1)
2204    }
2205}
2206
2207impl Decode for PrepareError {
2208    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2209        let val = u8::decode(bytes)?;
2210        Self::try_from(val).map_err(|_| {
2211            CodecError::Other(anyhow!("unexpected ReportShareError value {val}").into())
2212        })
2213    }
2214}
2215
2216/// DAP protocol message representing a request to continue preparation of a report share for
2217/// aggregation.
2218#[derive(Clone, Debug, PartialEq, Eq)]
2219pub struct PrepareContinue {
2220    report_id: ReportId,
2221    message: PingPongMessage,
2222}
2223
2224impl PrepareContinue {
2225    /// Constructs a new prepare continue from its components.
2226    pub fn new(report_id: ReportId, message: PingPongMessage) -> Self {
2227        Self { report_id, message }
2228    }
2229
2230    /// Gets the report ID associated with this prepare continue.
2231    pub fn report_id(&self) -> &ReportId {
2232        &self.report_id
2233    }
2234
2235    /// Gets the message associated with this prepare continue.
2236    pub fn message(&self) -> &PingPongMessage {
2237        &self.message
2238    }
2239}
2240
2241impl Encode for PrepareContinue {
2242    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2243        self.report_id.encode(bytes)?;
2244        let encoded_message = self.message.get_encoded()?;
2245        encode_u32_items(bytes, &(), &encoded_message)
2246    }
2247
2248    fn encoded_len(&self) -> Option<usize> {
2249        Some(self.report_id.encoded_len()? + 4 + self.message.encoded_len()?)
2250    }
2251}
2252
2253impl Decode for PrepareContinue {
2254    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2255        let report_id = ReportId::decode(bytes)?;
2256        let message_bytes = decode_u32_items(&(), bytes)?;
2257        let message = PingPongMessage::get_decoded(&message_bytes)?;
2258
2259        Ok(Self { report_id, message })
2260    }
2261}
2262
2263/// DAP protocol message representing an identifier for an aggregation job.
2264#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
2265pub struct AggregationJobId([u8; Self::LEN]);
2266
2267impl AggregationJobId {
2268    /// LEN is the length of an aggregation job ID in bytes.
2269    pub const LEN: usize = 16;
2270}
2271
2272impl From<[u8; Self::LEN]> for AggregationJobId {
2273    fn from(aggregation_job_id: [u8; Self::LEN]) -> Self {
2274        Self(aggregation_job_id)
2275    }
2276}
2277
2278impl TryFrom<&[u8]> for AggregationJobId {
2279    type Error = Error;
2280
2281    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
2282        Ok(Self(value.try_into().map_err(|_| {
2283            Error::InvalidParameter("byte slice has incorrect length for AggregationJobId")
2284        })?))
2285    }
2286}
2287
2288impl AsRef<[u8; Self::LEN]> for AggregationJobId {
2289    fn as_ref(&self) -> &[u8; Self::LEN] {
2290        &self.0
2291    }
2292}
2293
2294impl FromStr for AggregationJobId {
2295    type Err = Error;
2296
2297    fn from_str(s: &str) -> Result<Self, Self::Err> {
2298        Self::try_from(URL_SAFE_NO_PAD.decode(s)?.as_ref())
2299    }
2300}
2301
2302impl Debug for AggregationJobId {
2303    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2304        write!(
2305            f,
2306            "AggregationJobId({})",
2307            Base64Display::new(&self.0, &URL_SAFE_NO_PAD)
2308        )
2309    }
2310}
2311
2312impl Display for AggregationJobId {
2313    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2314        write!(f, "{}", Base64Display::new(&self.0, &URL_SAFE_NO_PAD))
2315    }
2316}
2317
2318impl Distribution<AggregationJobId> for Standard {
2319    fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> AggregationJobId {
2320        AggregationJobId(rng.gen())
2321    }
2322}
2323
2324/// DAP protocol message representing an aggregation job initialization request from leader to
2325/// helper.
2326#[derive(Clone, Educe, PartialEq, Eq)]
2327#[educe(Debug)]
2328pub struct AggregationJobInitializeReq<Q: QueryType> {
2329    #[educe(Debug(ignore))]
2330    aggregation_parameter: Vec<u8>,
2331    partial_batch_selector: PartialBatchSelector<Q>,
2332    prepare_inits: Vec<PrepareInit>,
2333}
2334
2335impl<Q: QueryType> AggregationJobInitializeReq<Q> {
2336    /// The media type associated with this protocol message.
2337    pub const MEDIA_TYPE: &'static str = "application/dap-aggregation-job-init-req";
2338
2339    /// Constructs an aggregate initialization request from its components.
2340    pub fn new(
2341        aggregation_parameter: Vec<u8>,
2342        partial_batch_selector: PartialBatchSelector<Q>,
2343        prepare_inits: Vec<PrepareInit>,
2344    ) -> Self {
2345        Self {
2346            aggregation_parameter,
2347            partial_batch_selector,
2348            prepare_inits,
2349        }
2350    }
2351
2352    /// Gets the aggregation parameter associated with this aggregate initialization request.
2353    pub fn aggregation_parameter(&self) -> &[u8] {
2354        &self.aggregation_parameter
2355    }
2356
2357    /// Gets the partial batch selector associated with this aggregate initialization request.
2358    pub fn batch_selector(&self) -> &PartialBatchSelector<Q> {
2359        &self.partial_batch_selector
2360    }
2361
2362    /// Gets the preparation initialization messages associated with this aggregate initialization
2363    /// request.
2364    pub fn prepare_inits(&self) -> &[PrepareInit] {
2365        &self.prepare_inits
2366    }
2367}
2368
2369impl<Q: QueryType> Encode for AggregationJobInitializeReq<Q> {
2370    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2371        encode_u32_items(bytes, &(), &self.aggregation_parameter)?;
2372        self.partial_batch_selector.encode(bytes)?;
2373        encode_u32_items(bytes, &(), &self.prepare_inits)
2374    }
2375
2376    fn encoded_len(&self) -> Option<usize> {
2377        let mut length = 4 + self.aggregation_parameter.len();
2378        length += self.partial_batch_selector.encoded_len()?;
2379        length += 4;
2380        for prepare_init in &self.prepare_inits {
2381            length += prepare_init.encoded_len()?;
2382        }
2383        Some(length)
2384    }
2385}
2386
2387impl<Q: QueryType> Decode for AggregationJobInitializeReq<Q> {
2388    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2389        let aggregation_parameter = decode_u32_items(&(), bytes)?;
2390        let partial_batch_selector = PartialBatchSelector::decode(bytes)?;
2391        let prepare_inits = decode_u32_items(&(), bytes)?;
2392
2393        Ok(Self {
2394            aggregation_parameter,
2395            partial_batch_selector,
2396            prepare_inits,
2397        })
2398    }
2399}
2400
2401/// Type representing the step of an aggregation job.
2402#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
2403pub struct AggregationJobStep(u16);
2404
2405impl AggregationJobStep {
2406    /// Construct a new [`AggregationJobStep`] representing the step after this one.
2407    pub fn increment(&self) -> Self {
2408        Self(self.0 + 1)
2409    }
2410}
2411
2412impl Display for AggregationJobStep {
2413    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
2414        write!(f, "{}", self.0)
2415    }
2416}
2417
2418impl Encode for AggregationJobStep {
2419    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2420        self.0.encode(bytes)
2421    }
2422
2423    fn encoded_len(&self) -> Option<usize> {
2424        self.0.encoded_len()
2425    }
2426}
2427
2428impl Decode for AggregationJobStep {
2429    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2430        Ok(Self(u16::decode(bytes)?))
2431    }
2432}
2433
2434impl From<u16> for AggregationJobStep {
2435    fn from(value: u16) -> Self {
2436        Self(value)
2437    }
2438}
2439
2440impl From<AggregationJobStep> for u16 {
2441    fn from(value: AggregationJobStep) -> Self {
2442        value.0
2443    }
2444}
2445
2446impl TryFrom<i32> for AggregationJobStep {
2447    // This implementation is convenient for converting from the representation of a step in
2448    // PostgreSQL, where the smallest type that can store a u16 is `integer`, which is represented
2449    // as i32 in Rust.
2450
2451    type Error = TryFromIntError;
2452
2453    fn try_from(value: i32) -> Result<Self, Self::Error> {
2454        Ok(AggregationJobStep(u16::try_from(value)?))
2455    }
2456}
2457
2458/// DAP protocol message representing a request to continue an aggregation job.
2459#[derive(Clone, Debug, PartialEq, Eq)]
2460pub struct AggregationJobContinueReq {
2461    step: AggregationJobStep,
2462    prepare_continues: Vec<PrepareContinue>,
2463}
2464
2465impl AggregationJobContinueReq {
2466    /// The media type associated with this protocol message.
2467    pub const MEDIA_TYPE: &'static str = "application/dap-aggregation-job-continue-req";
2468
2469    /// Constructs a new aggregate continuation response from its components.
2470    pub fn new(step: AggregationJobStep, prepare_continues: Vec<PrepareContinue>) -> Self {
2471        Self {
2472            step,
2473            prepare_continues,
2474        }
2475    }
2476
2477    /// Gets the step this aggregation job is on.
2478    pub fn step(&self) -> AggregationJobStep {
2479        self.step
2480    }
2481
2482    /// Gets the prepare steps associated with this aggregate continuation response.
2483    pub fn prepare_steps(&self) -> &[PrepareContinue] {
2484        &self.prepare_continues
2485    }
2486}
2487
2488impl Encode for AggregationJobContinueReq {
2489    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2490        self.step.encode(bytes)?;
2491        encode_u32_items(bytes, &(), &self.prepare_continues)
2492    }
2493
2494    fn encoded_len(&self) -> Option<usize> {
2495        let mut length = self.step.encoded_len()?;
2496        length += 4;
2497        for prepare_continue in self.prepare_continues.iter() {
2498            length += prepare_continue.encoded_len()?;
2499        }
2500        Some(length)
2501    }
2502}
2503
2504impl Decode for AggregationJobContinueReq {
2505    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2506        let step = AggregationJobStep::decode(bytes)?;
2507        let prepare_continues = decode_u32_items(&(), bytes)?;
2508        Ok(Self::new(step, prepare_continues))
2509    }
2510}
2511
2512/// DAP protocol message representing the response to an aggregation job initialization or
2513/// continuation request.
2514#[derive(Clone, Debug, PartialEq, Eq)]
2515pub struct AggregationJobResp {
2516    prepare_resps: Vec<PrepareResp>,
2517}
2518
2519impl AggregationJobResp {
2520    /// The media type associated with this protocol message.
2521    pub const MEDIA_TYPE: &'static str = "application/dap-aggregation-job-resp";
2522
2523    /// Constructs a new aggregate continuation response from its components.
2524    pub fn new(prepare_resps: Vec<PrepareResp>) -> Self {
2525        Self { prepare_resps }
2526    }
2527
2528    /// Gets the prepare responses associated with this aggregate continuation response.
2529    pub fn prepare_resps(&self) -> &[PrepareResp] {
2530        &self.prepare_resps
2531    }
2532}
2533
2534impl Encode for AggregationJobResp {
2535    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2536        encode_u32_items(bytes, &(), &self.prepare_resps)
2537    }
2538
2539    fn encoded_len(&self) -> Option<usize> {
2540        let mut length = 4;
2541        for prepare_resp in self.prepare_resps.iter() {
2542            length += prepare_resp.encoded_len()?;
2543        }
2544        Some(length)
2545    }
2546}
2547
2548impl Decode for AggregationJobResp {
2549    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2550        let prepare_resps = decode_u32_items(&(), bytes)?;
2551        Ok(Self { prepare_resps })
2552    }
2553}
2554
2555/// DAP protocol message identifying a batch of interest.
2556#[derive(Clone, Debug, PartialEq, Eq)]
2557pub struct BatchSelector<Q: QueryType> {
2558    batch_identifier: Q::BatchIdentifier,
2559}
2560
2561impl<Q: QueryType> BatchSelector<Q> {
2562    /// Constructs a new batch selector from its components.
2563    ///
2564    /// This method would typically be used for code which is generic over the query type.
2565    /// Query-type specific code will typically call one of [`Self::new_time_interval`] or
2566    /// [`Self::new_fixed_size`].
2567    pub fn new(batch_identifier: Q::BatchIdentifier) -> Self {
2568        Self { batch_identifier }
2569    }
2570
2571    /// Gets the batch identifier associated with this batch selector.
2572    ///
2573    /// This method would typically be used for code which is generic over the query type.
2574    /// Query-type specific code will typically call one of [`Self::batch_interval`] or
2575    /// [`Self::batch_id`].
2576    pub fn batch_identifier(&self) -> &Q::BatchIdentifier {
2577        &self.batch_identifier
2578    }
2579}
2580
2581impl BatchSelector<TimeInterval> {
2582    /// Constructs a new batch selector for time-interval tasks.
2583    pub fn new_time_interval(batch_interval: Interval) -> Self {
2584        Self::new(batch_interval)
2585    }
2586
2587    /// Gets the batch interval associated with this batch selector.
2588    pub fn batch_interval(&self) -> &Interval {
2589        self.batch_identifier()
2590    }
2591}
2592
2593impl BatchSelector<FixedSize> {
2594    /// Constructs a new batch selector for fixed-size tasks.
2595    pub fn new_fixed_size(batch_id: BatchId) -> Self {
2596        Self::new(batch_id)
2597    }
2598
2599    /// Gets the batch ID associated with this batch selector.
2600    pub fn batch_id(&self) -> &BatchId {
2601        self.batch_identifier()
2602    }
2603}
2604
2605impl<Q: QueryType> Encode for BatchSelector<Q> {
2606    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2607        Q::CODE.encode(bytes)?;
2608        self.batch_identifier.encode(bytes)
2609    }
2610
2611    fn encoded_len(&self) -> Option<usize> {
2612        Some(1 + self.batch_identifier.encoded_len()?)
2613    }
2614}
2615
2616impl<Q: QueryType> Decode for BatchSelector<Q> {
2617    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2618        query_type::Code::decode_expecting_value(bytes, Q::CODE)?;
2619        let batch_identifier = Q::BatchIdentifier::decode(bytes)?;
2620
2621        Ok(Self { batch_identifier })
2622    }
2623}
2624
2625/// DAP protocol message representing a request from the leader to a helper to provide an
2626/// encrypted aggregate of its share of data for a given batch interval.
2627#[derive(Clone, Educe, PartialEq, Eq)]
2628#[educe(Debug)]
2629pub struct AggregateShareReq<Q: QueryType> {
2630    batch_selector: BatchSelector<Q>,
2631    #[educe(Debug(ignore))]
2632    aggregation_parameter: Vec<u8>,
2633    report_count: u64,
2634    checksum: ReportIdChecksum,
2635}
2636
2637impl<Q: QueryType> AggregateShareReq<Q> {
2638    /// The media type associated with this protocol message.
2639    pub const MEDIA_TYPE: &'static str = "application/dap-aggregate-share-req";
2640
2641    /// Constructs a new aggregate share request from its components.
2642    pub fn new(
2643        batch_selector: BatchSelector<Q>,
2644        aggregation_parameter: Vec<u8>,
2645        report_count: u64,
2646        checksum: ReportIdChecksum,
2647    ) -> Self {
2648        Self {
2649            batch_selector,
2650            aggregation_parameter,
2651            report_count,
2652            checksum,
2653        }
2654    }
2655
2656    /// Gets the batch selector associated with this aggregate share request.
2657    pub fn batch_selector(&self) -> &BatchSelector<Q> {
2658        &self.batch_selector
2659    }
2660
2661    /// Gets the aggregation parameter associated with this aggregate share request.
2662    pub fn aggregation_parameter(&self) -> &[u8] {
2663        &self.aggregation_parameter
2664    }
2665
2666    /// Gets the report count associated with this aggregate share request.
2667    pub fn report_count(&self) -> u64 {
2668        self.report_count
2669    }
2670
2671    /// Gets the checksum associated with this aggregate share request.
2672    pub fn checksum(&self) -> &ReportIdChecksum {
2673        &self.checksum
2674    }
2675}
2676
2677impl<Q: QueryType> Encode for AggregateShareReq<Q> {
2678    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2679        self.batch_selector.encode(bytes)?;
2680        encode_u32_items(bytes, &(), &self.aggregation_parameter)?;
2681        self.report_count.encode(bytes)?;
2682        self.checksum.encode(bytes)
2683    }
2684
2685    fn encoded_len(&self) -> Option<usize> {
2686        Some(
2687            self.batch_selector.encoded_len()?
2688                + 4
2689                + self.aggregation_parameter.len()
2690                + self.report_count.encoded_len()?
2691                + self.checksum.encoded_len()?,
2692        )
2693    }
2694}
2695
2696impl<Q: QueryType> Decode for AggregateShareReq<Q> {
2697    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2698        let batch_selector = BatchSelector::decode(bytes)?;
2699        let aggregation_parameter = decode_u32_items(&(), bytes)?;
2700        let report_count = u64::decode(bytes)?;
2701        let checksum = ReportIdChecksum::decode(bytes)?;
2702
2703        Ok(Self {
2704            batch_selector,
2705            aggregation_parameter,
2706            report_count,
2707            checksum,
2708        })
2709    }
2710}
2711
2712/// DAP protocol message representing a helper's response to the leader's request to provide an
2713/// encrypted aggregate of its share of data for a given batch interval.
2714#[derive(Clone, Debug, PartialEq, Eq)]
2715pub struct AggregateShare {
2716    encrypted_aggregate_share: HpkeCiphertext,
2717}
2718
2719impl AggregateShare {
2720    /// The media type associated with this protocol message.
2721    pub const MEDIA_TYPE: &'static str = "application/dap-aggregate-share";
2722
2723    /// Constructs a new aggregate share response from its components.
2724    pub fn new(encrypted_aggregate_share: HpkeCiphertext) -> Self {
2725        Self {
2726            encrypted_aggregate_share,
2727        }
2728    }
2729
2730    /// Gets the encrypted aggregate share associated with this aggregate share response.
2731    pub fn encrypted_aggregate_share(&self) -> &HpkeCiphertext {
2732        &self.encrypted_aggregate_share
2733    }
2734}
2735
2736impl Encode for AggregateShare {
2737    fn encode(&self, bytes: &mut Vec<u8>) -> Result<(), CodecError> {
2738        self.encrypted_aggregate_share.encode(bytes)
2739    }
2740
2741    fn encoded_len(&self) -> Option<usize> {
2742        self.encrypted_aggregate_share.encoded_len()
2743    }
2744}
2745
2746impl Decode for AggregateShare {
2747    fn decode(bytes: &mut Cursor<&[u8]>) -> Result<Self, CodecError> {
2748        let encrypted_aggregate_share = HpkeCiphertext::decode(bytes)?;
2749
2750        Ok(Self {
2751            encrypted_aggregate_share,
2752        })
2753    }
2754}
2755
2756#[cfg(test)]
2757pub(crate) fn roundtrip_encoding<T>(vals_and_encodings: &[(T, &str)])
2758where
2759    T: Encode + Decode + Debug + Eq,
2760{
2761    struct Wrapper<T>(T);
2762
2763    impl<T: PartialEq> PartialEq for Wrapper<T> {
2764        fn eq(&self, other: &Self) -> bool {
2765            self.0 == other.0
2766        }
2767    }
2768
2769    impl<T: Eq> Eq for Wrapper<T> {}
2770
2771    impl<T: Debug> Debug for Wrapper<T> {
2772        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
2773            write!(f, "{:02x?}", &self.0)
2774        }
2775    }
2776
2777    for (val, hex_encoding) in vals_and_encodings {
2778        let mut encoded_val = Vec::new();
2779        val.encode(&mut encoded_val).unwrap();
2780        let expected = Wrapper(hex::decode(hex_encoding).unwrap());
2781        let encoded_val = Wrapper(encoded_val);
2782        pretty_assertions::assert_eq!(
2783            encoded_val,
2784            expected,
2785            "Couldn't roundtrip (encoded value differs): {val:?}"
2786        );
2787        let decoded_val = T::get_decoded(&encoded_val.0).unwrap();
2788        pretty_assertions::assert_eq!(
2789            &decoded_val,
2790            val,
2791            "Couldn't roundtrip (decoded value differs): {val:?}"
2792        );
2793        pretty_assertions::assert_eq!(
2794            encoded_val.0.len(),
2795            val.encoded_len().expect("No encoded length hint"),
2796            "Encoded length hint is incorrect: {val:?}"
2797        )
2798    }
2799}