janus_client/
lib.rs

1//! A [DAP](https://datatracker.ietf.org/doc/draft-ietf-ppm-dap/) client
2//!
3//! This library implements the client role of the DAP-PPM protocol. It uploads measurements to two
4//! DAP aggregator servers which in turn compute a statistical aggregate over data from many
5//! clients, while preserving the privacy of each client's data.
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use url::Url;
11//! use prio::vdaf::prio3::Prio3Histogram;
12//! use janus_messages::{Duration, TaskId};
13//! use std::str::FromStr;
14//!
15//! #[tokio::main]
16//! async fn main() {
17//!     let leader_url = Url::parse("https://leader.example.com/").unwrap();
18//!     let helper_url = Url::parse("https://helper.example.com/").unwrap();
19//!     let vdaf = Prio3Histogram::new_histogram(
20//!         2,
21//!         12,
22//!         4
23//!     ).unwrap();
24//!     let taskid = "rc0jgm1MHH6Q7fcI4ZdNUxas9DAYLcJFK5CL7xUl-gU";
25//!     let task = TaskId::from_str(taskid).unwrap();
26//!
27//!     let client = janus_client::Client::new(
28//!         task,
29//!         leader_url,
30//!         helper_url,
31//!         Duration::from_seconds(300),
32//!         vdaf
33//!     )
34//!     .await
35//!     .unwrap();
36//!     client.upload(&5).await.unwrap();
37//! }
38//! ```
39
40#![cfg_attr(docsrs, feature(doc_cfg))]
41
42use backoff::ExponentialBackoff;
43#[cfg(feature = "ohttp")]
44use bhttp::{ControlData, Message, Mode};
45use educe::Educe;
46#[cfg(feature = "ohttp")]
47use http::{header::ACCEPT, HeaderValue};
48use http::{header::CONTENT_TYPE, StatusCode};
49use itertools::Itertools;
50use janus_core::{
51    hpke::{self, is_hpke_config_supported, HpkeApplicationInfo, Label},
52    http::{cached_resource::CachedResource, HttpErrorResponse},
53    retries::{http_request_exponential_backoff, retry_http_request},
54    time::{Clock, RealClock, TimeExt},
55    url_ensure_trailing_slash,
56};
57use janus_messages::{
58    Duration, HpkeConfig, HpkeConfigList, InputShareAad, PlaintextInputShare, Report, ReportId,
59    ReportMetadata, Role, TaskId, Time,
60};
61#[cfg(feature = "ohttp")]
62use ohttp::{ClientRequest, KeyConfig};
63use prio::{codec::Encode, vdaf};
64use rand::random;
65#[cfg(feature = "ohttp")]
66use std::io::Cursor;
67use std::{convert::Infallible, fmt::Debug, sync::Arc, time::SystemTimeError};
68use tokio::{sync::Mutex, try_join};
69use url::Url;
70
71#[cfg(test)]
72mod tests;
73
74#[derive(Debug, thiserror::Error)]
75pub enum Error {
76    #[error("invalid parameter {0}")]
77    InvalidParameter(&'static str),
78    #[error("HTTP client error: {0}")]
79    HttpClient(#[from] reqwest::Error),
80    #[error("codec error: {0}")]
81    Codec(#[from] prio::codec::CodecError),
82    #[error("HTTP response status {0}")]
83    Http(Box<HttpErrorResponse>),
84    #[error("URL parse: {0}")]
85    Url(#[from] url::ParseError),
86    #[error("VDAF error: {0}")]
87    Vdaf(#[from] prio::vdaf::VdafError),
88    #[error("HPKE error: {0}")]
89    Hpke(#[from] janus_core::hpke::Error),
90    #[error("Cached resource error: {0}")]
91    CachedResource(#[from] janus_core::http::cached_resource::Error),
92    #[error("unexpected server response {0}")]
93    UnexpectedServerResponse(&'static str),
94    #[error("time conversion error: {0}")]
95    TimeConversion(#[from] SystemTimeError),
96    #[cfg(feature = "ohttp")]
97    #[error("OHTTP error: {0}")]
98    Ohttp(#[from] ohttp::Error),
99    #[cfg(feature = "ohttp")]
100    #[error("BHTTP error: {0}")]
101    Bhttp(#[from] bhttp::Error),
102    #[cfg(feature = "ohttp")]
103    #[error("No supported key configurations advertised by OHTTP gateway")]
104    OhttpNoSupportedKeyConfigs(Box<Vec<KeyConfig>>),
105}
106
107impl From<Infallible> for Error {
108    fn from(value: Infallible) -> Self {
109        match value {}
110    }
111}
112
113impl From<Result<HttpErrorResponse, reqwest::Error>> for Error {
114    fn from(result: Result<HttpErrorResponse, reqwest::Error>) -> Self {
115        match result {
116            Ok(http_error_response) => Error::Http(Box::new(http_error_response)),
117            Err(error) => error.into(),
118        }
119    }
120}
121
122static CLIENT_USER_AGENT: &str = concat!(
123    env!("CARGO_PKG_NAME"),
124    "/",
125    env!("CARGO_PKG_VERSION"),
126    "/",
127    "client"
128);
129
130#[cfg(feature = "ohttp")]
131const OHTTP_KEYS_MEDIA_TYPE: &str = "application/ohttp-keys";
132#[cfg(feature = "ohttp")]
133const OHTTP_REQUEST_MEDIA_TYPE: &str = "message/ohttp-req";
134#[cfg(feature = "ohttp")]
135const OHTTP_RESPONSE_MEDIA_TYPE: &str = "message/ohttp-res";
136
137/// The DAP client's view of task parameters.
138#[derive(Clone, Educe)]
139#[educe(Debug)]
140struct ClientParameters {
141    /// Unique identifier for the task.
142    task_id: TaskId,
143    /// URL relative to which the Leader's API endpoints are found.
144    #[educe(Debug(method(std::fmt::Display::fmt)))]
145    leader_aggregator_endpoint: Url,
146    /// URL relative to which the Helper's API endpoints are found.
147    #[educe(Debug(method(std::fmt::Display::fmt)))]
148    helper_aggregator_endpoint: Url,
149    /// The time precision of the task. This value is shared by all parties in the protocol, and is
150    /// used to compute report timestamps.
151    time_precision: Duration,
152    /// Parameters to use when retrying HTTP requests.
153    http_request_retry_parameters: ExponentialBackoff,
154}
155
156impl ClientParameters {
157    /// Creates a new set of client task parameters.
158    pub fn new(
159        task_id: TaskId,
160        leader_aggregator_endpoint: Url,
161        helper_aggregator_endpoint: Url,
162        time_precision: Duration,
163    ) -> Self {
164        Self {
165            task_id,
166            leader_aggregator_endpoint: url_ensure_trailing_slash(leader_aggregator_endpoint),
167            helper_aggregator_endpoint: url_ensure_trailing_slash(helper_aggregator_endpoint),
168            time_precision,
169            http_request_retry_parameters: http_request_exponential_backoff(),
170        }
171    }
172
173    /// The URL relative to which the API endpoints for the aggregator may be found, if the role is
174    /// an aggregator, or an error otherwise.
175    fn aggregator_endpoint(&self, role: &Role) -> Result<&Url, Error> {
176        match role {
177            Role::Leader => Ok(&self.leader_aggregator_endpoint),
178            Role::Helper => Ok(&self.helper_aggregator_endpoint),
179            _ => Err(Error::InvalidParameter("role is not an aggregator")),
180        }
181    }
182
183    /// URL from which the HPKE configuration for the server filling `role` may be fetched, per
184    /// the [DAP specification][1].
185    ///
186    /// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-hpke-configuration-request
187    fn hpke_config_endpoint(&self, role: &Role) -> Result<Url, Error> {
188        Ok(self.aggregator_endpoint(role)?.join("hpke_config")?)
189    }
190
191    // URI to which reports may be uploaded for the provided task.
192    fn reports_resource_uri(&self, task_id: &TaskId) -> Result<Url, Error> {
193        Ok(self
194            .leader_aggregator_endpoint
195            .join(&format!("tasks/{task_id}/reports"))?)
196    }
197}
198
199/// Fetches OHTTP HPKE key configurations for the provided OHTTP config.
200#[tracing::instrument(err)]
201#[cfg(feature = "ohttp")]
202async fn ohttp_key_configs(
203    http_request_retry_parameters: ExponentialBackoff,
204    ohttp_config: &OhttpConfig,
205    http_client: &reqwest::Client,
206) -> Result<Vec<KeyConfig>, Error> {
207    // TODO(#3159): store/fetch OHTTP key configs in a cache-control aware persistent cache.
208    let keys_response = retry_http_request(http_request_retry_parameters, || async {
209        http_client
210            .get(ohttp_config.key_configs.clone())
211            .header(ACCEPT, OHTTP_KEYS_MEDIA_TYPE)
212            .send()
213            .await
214    })
215    .await?;
216
217    if !keys_response.status().is_success() {
218        return Err(Error::Http(Box::new(HttpErrorResponse::from(
219            keys_response.status(),
220        ))));
221    }
222
223    if keys_response
224        .headers()
225        .get(CONTENT_TYPE)
226        .map(HeaderValue::as_bytes)
227        != Some(OHTTP_KEYS_MEDIA_TYPE.as_bytes())
228    {
229        return Err(Error::UnexpectedServerResponse(
230            "content type wrong for OHTTP keys",
231        ));
232    }
233
234    Ok(KeyConfig::decode_list(keys_response.body().as_ref())?)
235}
236
237/// Construct a [`reqwest::Client`] suitable for use in a DAP [`Client`].
238pub fn default_http_client() -> Result<reqwest::Client, Error> {
239    Ok(reqwest::Client::builder()
240        // Clients wishing to override these timeouts may provide their own
241        // values using ClientBuilder::with_http_client.
242        .timeout(std::time::Duration::from_secs(30))
243        .connect_timeout(std::time::Duration::from_secs(10))
244        .user_agent(CLIENT_USER_AGENT)
245        .build()?)
246}
247
248/// Configuration for using Oblivious HTTP (RFC 9458).
249#[derive(Clone, Debug)]
250#[cfg_attr(docsrs, doc(cfg(feature = "ohttp")))]
251#[cfg(feature = "ohttp")]
252pub struct OhttpConfig {
253    /// Endpoint from which OHTTP gateway key configurations may be fetched. The key configurations
254    /// must be in the format specified by [RFC 9458, section 3][1].
255    ///
256    /// [1]: https://datatracker.ietf.org/doc/html/rfc9458#name-key-configuration
257    pub key_configs: Url,
258
259    /// The OHTTP relay which will relay encapsulated messages to the gateway.
260    pub relay: Url,
261}
262
263/// Builder for configuring a [`Client`].
264pub struct ClientBuilder<V: vdaf::Client<16>> {
265    parameters: ClientParameters,
266    vdaf: V,
267    leader_hpke_config: Option<HpkeConfig>,
268    helper_hpke_config: Option<HpkeConfig>,
269    #[cfg(feature = "ohttp")]
270    ohttp_config: Option<OhttpConfig>,
271    http_client: Option<reqwest::Client>,
272}
273
274impl<V: vdaf::Client<16>> ClientBuilder<V> {
275    /// Construct a [`ClientBuilder`] from its required DAP task parameters.
276    pub fn new(
277        task_id: TaskId,
278        leader_aggregator_endpoint: Url,
279        helper_aggregator_endpoint: Url,
280        time_precision: Duration,
281        vdaf: V,
282    ) -> Self {
283        Self {
284            parameters: ClientParameters::new(
285                task_id,
286                leader_aggregator_endpoint,
287                helper_aggregator_endpoint,
288                time_precision,
289            ),
290            vdaf,
291            leader_hpke_config: None,
292            helper_hpke_config: None,
293            #[cfg(feature = "ohttp")]
294            ohttp_config: None,
295            http_client: None,
296        }
297    }
298
299    /// Finalize construction of a [`Client`]. This will fetch HPKE configurations from each
300    /// aggregator via HTTPS.
301    pub async fn build(self) -> Result<Client<V>, Error> {
302        let http_client = if let Some(http_client) = self.http_client {
303            http_client
304        } else {
305            default_http_client()?
306        };
307
308        let fetch_hpke_config = async |hpke_config, role| match hpke_config {
309            Some(hpke_config) => Ok(HpkeConfiguration::new_static(hpke_config)),
310            None => HpkeConfiguration::new(&self.parameters, role, &http_client).await,
311        };
312
313        let (leader_hpke_config, helper_hpke_config) = tokio::try_join!(
314            fetch_hpke_config(self.leader_hpke_config, &Role::Leader),
315            fetch_hpke_config(self.helper_hpke_config, &Role::Helper),
316        )?;
317
318        #[cfg(feature = "ohttp")]
319        let ohttp_config = if let Some(ohttp_config) = self.ohttp_config {
320            let key_configs = ohttp_key_configs(
321                self.parameters.http_request_retry_parameters.clone(),
322                &ohttp_config,
323                &http_client,
324            )
325            .await?;
326            Some((ohttp_config, key_configs))
327        } else {
328            None
329        };
330
331        Ok(Client {
332            #[cfg(feature = "ohttp")]
333            ohttp_config,
334            parameters: self.parameters,
335            vdaf: self.vdaf,
336            http_client,
337            leader_hpke_config: Arc::new(Mutex::new(leader_hpke_config)),
338            helper_hpke_config: Arc::new(Mutex::new(helper_hpke_config)),
339        })
340    }
341
342    /// Finalize construction of a [`Client`], and provide aggregator HPKE configurations through an
343    /// out-of-band mechanism.
344    ///
345    /// # Notes
346    ///
347    /// This method is not compatible with OHTTP . Use [`ClientBuilder::with_ohttp_config`] and then
348    /// [`ClientBuilder::build`] to provide OHTTP configuration.
349    #[deprecated(
350        note = "Use `ClientBuilder::with_leader_hpke_config`, `ClientBuilder::with_helper_hpke_config` and `ClientBuilder::build` instead"
351    )]
352    pub fn build_with_hpke_configs(
353        self,
354        leader_hpke_config: HpkeConfig,
355        helper_hpke_config: HpkeConfig,
356    ) -> Result<Client<V>, Error> {
357        let http_client = if let Some(http_client) = self.http_client {
358            http_client
359        } else {
360            default_http_client()?
361        };
362        Ok(Client {
363            parameters: self.parameters,
364            vdaf: self.vdaf,
365            #[cfg(feature = "ohttp")]
366            ohttp_config: None,
367            http_client,
368            leader_hpke_config: Arc::new(Mutex::new(HpkeConfiguration::new_static(
369                leader_hpke_config,
370            ))),
371            helper_hpke_config: Arc::new(Mutex::new(HpkeConfiguration::new_static(
372                helper_hpke_config,
373            ))),
374        })
375    }
376
377    /// Override the HTTPS client configuration to be used.
378    pub fn with_http_client(mut self, http_client: reqwest::Client) -> Self {
379        self.http_client = Some(http_client);
380        self
381    }
382
383    /// Override the exponential backoff parameters used when retrying HTTPS requests.
384    pub fn with_backoff(mut self, http_request_retry_parameters: ExponentialBackoff) -> Self {
385        self.parameters.http_request_retry_parameters = http_request_retry_parameters;
386        self
387    }
388
389    /// Set the leader HPKE configuration to be used, preventing the client from fetching it from
390    /// the aggregator over HTTPS.
391    pub fn with_leader_hpke_config(mut self, hpke_config: HpkeConfig) -> Self {
392        self.leader_hpke_config = Some(hpke_config);
393        self
394    }
395
396    /// Set the helper HPKE configuration to be used, preventing the client from fetching it from
397    /// the aggregator over HTTPS.
398    pub fn with_helper_hpke_config(mut self, hpke_config: HpkeConfig) -> Self {
399        self.helper_hpke_config = Some(hpke_config);
400        self
401    }
402
403    /// Set the OHTTP configuration to be used when uploading reports, but not when fetching DAP
404    /// HPKE configurations.
405    ///
406    /// # Examples
407    ///
408    /// ```no_run
409    /// # use url::Url;
410    /// # use prio::vdaf::prio3::Prio3Count;
411    /// # use janus_messages::{Duration, TaskId};
412    /// # use rand::random;
413    /// # use std::str::FromStr;
414    ///
415    /// #[tokio::main]
416    /// async fn main() {
417    ///     let task_id = random();
418    ///
419    ///     let client = janus_client::Client::builder(
420    ///         task_id,
421    ///         Url::parse("https://leader.example.com/").unwrap(),
422    ///         Url::parse("https://helper.example.com/").unwrap(),
423    ///         Duration::from_seconds(1),
424    ///         Prio3Count::new_count(2).unwrap(),
425    ///     )
426    ///     .with_ohttp_config(janus_client::OhttpConfig {
427    ///         key_configs: Url::parse("https://ohttp-keys.example.com").unwrap(),
428    ///         relay: Url::parse("https://ohttp-relay.example.com").unwrap(),
429    ///     })
430    ///     .build()
431    ///     .await
432    ///     .unwrap();
433    ///
434    ///     client.upload(&true).await.unwrap();
435    /// }
436    /// ```
437    #[cfg(feature = "ohttp")]
438    #[cfg_attr(docsrs, doc(cfg(feature = "ohttp")))]
439    pub fn with_ohttp_config(mut self, ohttp_config: OhttpConfig) -> Self {
440        self.ohttp_config = Some(ohttp_config);
441        self
442    }
443}
444
445/// A DAP client.
446#[derive(Clone, Debug)]
447pub struct Client<V: vdaf::Client<16>> {
448    parameters: ClientParameters,
449    vdaf: V,
450    #[cfg(feature = "ohttp")]
451    ohttp_config: Option<(OhttpConfig, Vec<KeyConfig>)>,
452    http_client: reqwest::Client,
453    leader_hpke_config: Arc<Mutex<HpkeConfiguration>>,
454    helper_hpke_config: Arc<Mutex<HpkeConfiguration>>,
455}
456
457impl<V: vdaf::Client<16>> Client<V> {
458    /// Construct a new client from the required set of DAP task parameters.
459    pub async fn new(
460        task_id: TaskId,
461        leader_aggregator_endpoint: Url,
462        helper_aggregator_endpoint: Url,
463        time_precision: Duration,
464        vdaf: V,
465    ) -> Result<Self, Error> {
466        ClientBuilder::new(
467            task_id,
468            leader_aggregator_endpoint,
469            helper_aggregator_endpoint,
470            time_precision,
471            vdaf,
472        )
473        .build()
474        .await
475    }
476
477    /// Construct a new client, and provide the aggregator HPKE configurations through an
478    /// out-of-band means.
479    ///
480    /// # Notes
481    ///
482    /// This method is not compatible with OHTTP. Use [`ClientBuilder::with_ohttp_config`] and then
483    /// [`ClientBuilder::build`] to provide OHTTP configuration.
484    #[deprecated(
485        note = "Use `ClientBuilder::with_leader_hpke_config`, `ClientBuilder::with_helper_hpke_config` and `ClientBuilder::build` instead"
486    )]
487    pub fn with_hpke_configs(
488        task_id: TaskId,
489        leader_aggregator_endpoint: Url,
490        helper_aggregator_endpoint: Url,
491        time_precision: Duration,
492        vdaf: V,
493        leader_hpke_config: HpkeConfig,
494        helper_hpke_config: HpkeConfig,
495    ) -> Result<Self, Error> {
496        #[allow(deprecated)]
497        ClientBuilder::new(
498            task_id,
499            leader_aggregator_endpoint,
500            helper_aggregator_endpoint,
501            time_precision,
502            vdaf,
503        )
504        .build_with_hpke_configs(leader_hpke_config, helper_hpke_config)
505    }
506
507    /// Creates a [`ClientBuilder`] for further configuration from the required set of DAP task
508    /// parameters.
509    pub fn builder(
510        task_id: TaskId,
511        leader_aggregator_endpoint: Url,
512        helper_aggregator_endpoint: Url,
513        time_precision: Duration,
514        vdaf: V,
515    ) -> ClientBuilder<V> {
516        ClientBuilder::new(
517            task_id,
518            leader_aggregator_endpoint,
519            helper_aggregator_endpoint,
520            time_precision,
521            vdaf,
522        )
523    }
524
525    /// Shard a measurement, encrypt its shares, and construct a [`janus_messages::Report`] to be
526    /// uploaded.
527    fn prepare_report(
528        &self,
529        measurement: &V::Measurement,
530        time: &Time,
531        leader_hpke_config: &HpkeConfig,
532        helper_hpke_config: &HpkeConfig,
533    ) -> Result<Report, Error> {
534        let report_id: ReportId = random();
535        let (public_share, input_shares) = self.vdaf.shard(measurement, report_id.as_ref())?;
536        assert_eq!(input_shares.len(), 2); // DAP only supports VDAFs using two aggregators.
537
538        let time = time
539            .to_batch_interval_start(&self.parameters.time_precision)
540            .map_err(|_| Error::InvalidParameter("couldn't round time down to time_precision"))?;
541        let report_metadata = ReportMetadata::new(report_id, time);
542        let encoded_public_share = public_share.get_encoded()?;
543
544        let (leader_encrypted_input_share, helper_encrypted_input_share) = [
545            (leader_hpke_config, &Role::Leader),
546            (helper_hpke_config, &Role::Helper),
547        ]
548        .into_iter()
549        .zip(input_shares)
550        .map(|((hpke_config, receiver_role), input_share)| {
551            hpke::seal(
552                hpke_config,
553                &HpkeApplicationInfo::new(&Label::InputShare, &Role::Client, receiver_role),
554                &PlaintextInputShare::new(
555                    Vec::new(), // No extensions supported yet.
556                    input_share.get_encoded()?,
557                )
558                .get_encoded()?,
559                &InputShareAad::new(
560                    self.parameters.task_id,
561                    report_metadata.clone(),
562                    encoded_public_share.clone(),
563                )
564                .get_encoded()?,
565            )
566            .map_err(Error::Hpke)
567        })
568        .collect_tuple()
569        .expect("iterator to yield two items"); // expect safety: iterator contains two items.
570
571        Ok(Report::new(
572            report_metadata,
573            encoded_public_share,
574            leader_encrypted_input_share?,
575            helper_encrypted_input_share?,
576        ))
577    }
578
579    /// Upload a [`Report`] to the leader, per the [DAP specification][1]. The provided measurement
580    /// is sharded into two shares and then uploaded to the leader.
581    ///
582    /// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-uploading-reports
583    #[tracing::instrument(skip(measurement), err)]
584    pub async fn upload(&self, measurement: &V::Measurement) -> Result<(), Error> {
585        self.upload_with_time(measurement, Clock::now(&RealClock::default()))
586            .await
587    }
588
589    /// Upload a [`Report`] to the leader, per the [DAP specification][1], and override the report's
590    /// timestamp. The provided measurement is sharded into two shares and then uploaded to the
591    /// leader.
592    ///
593    /// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-uploading-reports
594    ///
595    /// ```no_run
596    /// # use janus_client::{Client, Error};
597    /// # use janus_messages::Duration;
598    /// # use prio::vdaf::prio3::Prio3;
599    /// # use rand::random;
600    /// #
601    /// # async fn test() -> Result<(), Error> {
602    /// # let measurement = true;
603    /// # let timestamp = 1_700_000_000;
604    /// # let vdaf = Prio3::new_count(2).unwrap();
605    /// let client = Client::new(
606    ///     random(),
607    ///     "https://example.com/".parse().unwrap(),
608    ///     "https://example.net/".parse().unwrap(),
609    ///     Duration::from_seconds(3600),
610    ///     vdaf,
611    /// ).await?;
612    /// client.upload_with_time(&measurement, std::time::SystemTime::now()).await?;
613    /// client.upload_with_time(&measurement, janus_messages::Time::from_seconds_since_epoch(timestamp)).await?;
614    /// # Ok(())
615    /// # }
616    /// ```
617    #[tracing::instrument(skip(measurement), err)]
618    pub async fn upload_with_time<T>(
619        &self,
620        measurement: &V::Measurement,
621        time: T,
622    ) -> Result<(), Error>
623    where
624        T: TryInto<Time> + Debug,
625        Error: From<<T as TryInto<Time>>::Error>,
626    {
627        let mut leader_hpke_config = self.leader_hpke_config.lock().await;
628        let mut helper_hpke_config = self.helper_hpke_config.lock().await;
629        let (leader_hpke_config, helper_hpke_config) =
630            try_join!(leader_hpke_config.get(), helper_hpke_config.get())?;
631
632        let report = self
633            .prepare_report(
634                measurement,
635                &time.try_into()?,
636                leader_hpke_config,
637                helper_hpke_config,
638            )?
639            .get_encoded()?;
640        let upload_endpoint = self
641            .parameters
642            .reports_resource_uri(&self.parameters.task_id)?;
643
644        #[cfg(feature = "ohttp")]
645        let upload_status = self.upload_with_ohttp(&upload_endpoint, &report).await?;
646        #[cfg(not(feature = "ohttp"))]
647        let upload_status = self.put_report(&upload_endpoint, &report).await?;
648
649        if !upload_status.is_success() {
650            return Err(Error::Http(Box::new(HttpErrorResponse::from(
651                upload_status,
652            ))));
653        }
654
655        Ok(())
656    }
657
658    async fn put_report(
659        &self,
660        upload_endpoint: &Url,
661        request_body: &[u8],
662    ) -> Result<StatusCode, Error> {
663        Ok(retry_http_request(
664            self.parameters.http_request_retry_parameters.clone(),
665            || async {
666                self.http_client
667                    .put(upload_endpoint.clone())
668                    .header(CONTENT_TYPE, Report::MEDIA_TYPE)
669                    .body(request_body.to_vec())
670                    .send()
671                    .await
672            },
673        )
674        .await?
675        .status())
676    }
677
678    /// Send a DAP upload request via OHTTP, if the client is configured to use it, or directly if
679    /// not. This is only intended for DAP uploads and so does not handle response bodies.
680    #[cfg(feature = "ohttp")]
681    #[tracing::instrument(skip(self, request_body), err)]
682    async fn upload_with_ohttp(
683        &self,
684        upload_endpoint: &Url,
685        request_body: &[u8],
686    ) -> Result<StatusCode, Error> {
687        let (ohttp_config, key_configs) =
688            if let Some((ohttp_config, key_configs)) = &self.ohttp_config {
689                (ohttp_config, key_configs)
690            } else {
691                return self.put_report(upload_endpoint, request_body).await;
692            };
693
694        // Construct a Message representing the upload request...
695        let mut message = Message::request(
696            "PUT".into(),
697            upload_endpoint.scheme().into(),
698            upload_endpoint.authority().into(),
699            upload_endpoint.path().into(),
700        );
701        message.put_header(CONTENT_TYPE.as_str(), Report::MEDIA_TYPE);
702        message.write_content(request_body);
703
704        // ...get the BHTTP encoding of the message...
705        let mut request_buf = Vec::new();
706        message.write_bhttp(Mode::KnownLength, &mut request_buf)?;
707
708        // ...and OHTTP encapsulate it to the gateway key config.
709        let ohttp_request = key_configs
710            .iter()
711            .cloned()
712            .find_map(|mut key_config| ClientRequest::from_config(&mut key_config).ok())
713            .ok_or_else(|| Error::OhttpNoSupportedKeyConfigs(Box::new(key_configs.to_vec())))?;
714
715        let (encapsulated_request, ohttp_response) = ohttp_request.encapsulate(&request_buf)?;
716
717        let relay_response = retry_http_request(
718            self.parameters.http_request_retry_parameters.clone(),
719            || async {
720                self.http_client
721                    .post(ohttp_config.relay.clone())
722                    .header(CONTENT_TYPE, OHTTP_REQUEST_MEDIA_TYPE)
723                    .header(ACCEPT, OHTTP_RESPONSE_MEDIA_TYPE)
724                    .body(encapsulated_request.clone())
725                    .send()
726                    .await
727            },
728        )
729        .await?;
730
731        // Check whether request to the OHTTP relay was successful, and if so, decapsulate that
732        // response to get the DAP aggregator's response.
733        if !relay_response.status().is_success() {
734            return Err(Error::Http(Box::new(HttpErrorResponse::from(
735                relay_response.status(),
736            ))));
737        }
738
739        if relay_response
740            .headers()
741            .get(CONTENT_TYPE)
742            .map(HeaderValue::as_bytes)
743            != Some(OHTTP_RESPONSE_MEDIA_TYPE.as_bytes())
744        {
745            return Err(Error::UnexpectedServerResponse(
746                "content type wrong for OHTTP response",
747            ));
748        }
749
750        let decapsulated_response = ohttp_response.decapsulate(relay_response.body().as_ref())?;
751        let message = Message::read_bhttp(&mut Cursor::new(&decapsulated_response))?;
752        let status = if let ControlData::Response(status) = message.control() {
753            StatusCode::from_u16((*status).into()).map_err(|_| {
754                Error::UnexpectedServerResponse(
755                    "status in decapsulated response is not valid HTTP status",
756                )
757            })?
758        } else {
759            return Err(Error::UnexpectedServerResponse(
760                "decapsulated response control data is not a response",
761            ));
762        };
763
764        Ok(status)
765    }
766}
767
768/// An HPKE configuration advertised by an aggregator.
769#[derive(Debug, Clone)]
770pub(crate) struct HpkeConfiguration {
771    hpke_config_list: CachedResource<HpkeConfigList>,
772}
773
774impl HpkeConfiguration {
775    pub(crate) async fn new(
776        client_parameters: &ClientParameters,
777        aggregator_role: &Role,
778        http_client: &reqwest::Client,
779    ) -> Result<Self, Error> {
780        let mut hpke_config_url = client_parameters.hpke_config_endpoint(aggregator_role)?;
781        hpke_config_url.set_query(Some(&format!("task_id={}", client_parameters.task_id)));
782
783        Ok(Self {
784            hpke_config_list: CachedResource::new(
785                hpke_config_url,
786                HpkeConfigList::MEDIA_TYPE,
787                http_client,
788                client_parameters.http_request_retry_parameters.clone(),
789            )
790            .await?,
791        })
792    }
793
794    pub(crate) fn new_static(hpke_configuration: HpkeConfig) -> Self {
795        Self {
796            hpke_config_list: CachedResource::Static(HpkeConfigList::new(vec![hpke_configuration])),
797        }
798    }
799
800    pub(crate) async fn get(&mut self) -> Result<&HpkeConfig, Error> {
801        let hpke_config_list = self.hpke_config_list.resource().await?;
802
803        if hpke_config_list.hpke_configs().is_empty() {
804            return Err(Error::UnexpectedServerResponse(
805                "aggregator provided empty HpkeConfigList",
806            ));
807        }
808
809        // Take the first supported HpkeConfig from the list. Return the first error otherwise.
810        let mut first_error = None;
811        for config in hpke_config_list.hpke_configs() {
812            match is_hpke_config_supported(config) {
813                Ok(()) => return Ok(config),
814                Err(e) => {
815                    if first_error.is_none() {
816                        first_error = Some(e);
817                    }
818                }
819            }
820        }
821        // Unwrap safety: we checked that the list is nonempty, and if we fell through to here, we must
822        // have seen at least one error.
823        Err(first_error.unwrap().into())
824    }
825}