janus_client/
lib.rs

1//! A [DAP](https://datatracker.ietf.org/doc/draft-ietf-ppm-dap/) client
2//!
3//! This library implements the client role of the DAP-PPM protocol. It uploads measurements to two
4//! DAP aggregator servers which in turn compute a statistical aggregate over data from many
5//! clients, while preserving the privacy of each client's data.
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use url::Url;
11//! use prio::vdaf::prio3::Prio3Histogram;
12//! use janus_messages::{Duration, TaskId};
13//! use std::str::FromStr;
14//!
15//! #[tokio::main]
16//! async fn main() {
17//!     let leader_url = Url::parse("https://leader.example.com/").unwrap();
18//!     let helper_url = Url::parse("https://helper.example.com/").unwrap();
19//!     let vdaf = Prio3Histogram::new_histogram(
20//!         2,
21//!         12,
22//!         4
23//!     ).unwrap();
24//!     let taskid = "rc0jgm1MHH6Q7fcI4ZdNUxas9DAYLcJFK5CL7xUl-gU";
25//!     let task = TaskId::from_str(taskid).unwrap();
26//!
27//!     let client = janus_client::Client::new(
28//!         task,
29//!         leader_url,
30//!         helper_url,
31//!         Duration::from_seconds(300),
32//!         vdaf
33//!     )
34//!     .await
35//!     .unwrap();
36//!     client.upload(&5).await.unwrap();
37//! }
38//! ```
39
40#![cfg_attr(docsrs, feature(doc_cfg))]
41
42use backoff::ExponentialBackoff;
43#[cfg(feature = "ohttp")]
44use bhttp::{ControlData, Message, Mode};
45use educe::Educe;
46#[cfg(feature = "ohttp")]
47use http::{header::ACCEPT, HeaderValue};
48use http::{header::CONTENT_TYPE, StatusCode};
49use itertools::Itertools;
50use janus_core::{
51    hpke::{self, is_hpke_config_supported, HpkeApplicationInfo, Label},
52    http::HttpErrorResponse,
53    retries::{http_request_exponential_backoff, retry_http_request},
54    time::{Clock, RealClock, TimeExt},
55    url_ensure_trailing_slash,
56};
57use janus_messages::{
58    Duration, HpkeConfig, HpkeConfigList, InputShareAad, PlaintextInputShare, Report, ReportId,
59    ReportMetadata, Role, TaskId, Time,
60};
61#[cfg(feature = "ohttp")]
62use ohttp::{ClientRequest, KeyConfig};
63use prio::{
64    codec::{Decode, Encode},
65    vdaf,
66};
67use rand::random;
68#[cfg(feature = "ohttp")]
69use std::io::Cursor;
70use std::{convert::Infallible, fmt::Debug, time::SystemTimeError};
71use url::Url;
72
73#[cfg(test)]
74mod tests;
75
76#[derive(Debug, thiserror::Error)]
77pub enum Error {
78    #[error("invalid parameter {0}")]
79    InvalidParameter(&'static str),
80    #[error("HTTP client error: {0}")]
81    HttpClient(#[from] reqwest::Error),
82    #[error("codec error: {0}")]
83    Codec(#[from] prio::codec::CodecError),
84    #[error("HTTP response status {0}")]
85    Http(Box<HttpErrorResponse>),
86    #[error("URL parse: {0}")]
87    Url(#[from] url::ParseError),
88    #[error("VDAF error: {0}")]
89    Vdaf(#[from] prio::vdaf::VdafError),
90    #[error("HPKE error: {0}")]
91    Hpke(#[from] janus_core::hpke::Error),
92    #[error("unexpected server response {0}")]
93    UnexpectedServerResponse(&'static str),
94    #[error("time conversion error: {0}")]
95    TimeConversion(#[from] SystemTimeError),
96    #[cfg(feature = "ohttp")]
97    #[error("OHTTP error: {0}")]
98    Ohttp(#[from] ohttp::Error),
99    #[cfg(feature = "ohttp")]
100    #[error("BHTTP error: {0}")]
101    Bhttp(#[from] bhttp::Error),
102    #[cfg(feature = "ohttp")]
103    #[error("No supported key configurations advertised by OHTTP gateway")]
104    OhttpNoSupportedKeyConfigs(Box<Vec<KeyConfig>>),
105}
106
107impl From<Infallible> for Error {
108    fn from(value: Infallible) -> Self {
109        match value {}
110    }
111}
112
113impl From<Result<HttpErrorResponse, reqwest::Error>> for Error {
114    fn from(result: Result<HttpErrorResponse, reqwest::Error>) -> Self {
115        match result {
116            Ok(http_error_response) => Error::Http(Box::new(http_error_response)),
117            Err(error) => error.into(),
118        }
119    }
120}
121
122static CLIENT_USER_AGENT: &str = concat!(
123    env!("CARGO_PKG_NAME"),
124    "/",
125    env!("CARGO_PKG_VERSION"),
126    "/",
127    "client"
128);
129
130#[cfg(feature = "ohttp")]
131const OHTTP_KEYS_MEDIA_TYPE: &str = "application/ohttp-keys";
132#[cfg(feature = "ohttp")]
133const OHTTP_REQUEST_MEDIA_TYPE: &str = "message/ohttp-req";
134#[cfg(feature = "ohttp")]
135const OHTTP_RESPONSE_MEDIA_TYPE: &str = "message/ohttp-res";
136
137/// The DAP client's view of task parameters.
138#[derive(Clone, Educe)]
139#[educe(Debug)]
140struct ClientParameters {
141    /// Unique identifier for the task.
142    task_id: TaskId,
143    /// URL relative to which the Leader's API endpoints are found.
144    #[educe(Debug(method(std::fmt::Display::fmt)))]
145    leader_aggregator_endpoint: Url,
146    /// URL relative to which the Helper's API endpoints are found.
147    #[educe(Debug(method(std::fmt::Display::fmt)))]
148    helper_aggregator_endpoint: Url,
149    /// The time precision of the task. This value is shared by all parties in the protocol, and is
150    /// used to compute report timestamps.
151    time_precision: Duration,
152    /// Parameters to use when retrying HTTP requests.
153    http_request_retry_parameters: ExponentialBackoff,
154}
155
156impl ClientParameters {
157    /// Creates a new set of client task parameters.
158    pub fn new(
159        task_id: TaskId,
160        leader_aggregator_endpoint: Url,
161        helper_aggregator_endpoint: Url,
162        time_precision: Duration,
163    ) -> Self {
164        Self {
165            task_id,
166            leader_aggregator_endpoint: url_ensure_trailing_slash(leader_aggregator_endpoint),
167            helper_aggregator_endpoint: url_ensure_trailing_slash(helper_aggregator_endpoint),
168            time_precision,
169            http_request_retry_parameters: http_request_exponential_backoff(),
170        }
171    }
172
173    /// The URL relative to which the API endpoints for the aggregator may be found, if the role is
174    /// an aggregator, or an error otherwise.
175    fn aggregator_endpoint(&self, role: &Role) -> Result<&Url, Error> {
176        match role {
177            Role::Leader => Ok(&self.leader_aggregator_endpoint),
178            Role::Helper => Ok(&self.helper_aggregator_endpoint),
179            _ => Err(Error::InvalidParameter("role is not an aggregator")),
180        }
181    }
182
183    /// URL from which the HPKE configuration for the server filling `role` may be fetched, per
184    /// the [DAP specification][1].
185    ///
186    /// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-hpke-configuration-request
187    fn hpke_config_endpoint(&self, role: &Role) -> Result<Url, Error> {
188        Ok(self.aggregator_endpoint(role)?.join("hpke_config")?)
189    }
190
191    // URI to which reports may be uploaded for the provided task.
192    fn reports_resource_uri(&self, task_id: &TaskId) -> Result<Url, Error> {
193        Ok(self
194            .leader_aggregator_endpoint
195            .join(&format!("tasks/{task_id}/reports"))?)
196    }
197}
198
199/// Fetches HPKE configuration from the specified aggregator using the aggregator endpoints in the
200/// provided [`ClientParameters`].
201#[tracing::instrument(err)]
202async fn aggregator_hpke_config(
203    hpke_config: Option<HpkeConfig>,
204    client_parameters: &ClientParameters,
205    aggregator_role: &Role,
206    http_client: &reqwest::Client,
207) -> Result<HpkeConfig, Error> {
208    if let Some(hpke_config) = hpke_config {
209        return Ok(hpke_config);
210    }
211
212    let mut request_url = client_parameters.hpke_config_endpoint(aggregator_role)?;
213    request_url.set_query(Some(&format!("task_id={}", client_parameters.task_id)));
214    let hpke_config_response = retry_http_request(
215        client_parameters.http_request_retry_parameters.clone(),
216        || async { http_client.get(request_url.clone()).send().await },
217    )
218    .await?;
219    let status = hpke_config_response.status();
220    if !status.is_success() {
221        return Err(Error::Http(Box::new(HttpErrorResponse::from(status))));
222    }
223
224    let hpke_configs = HpkeConfigList::get_decoded(hpke_config_response.body())?;
225
226    if hpke_configs.hpke_configs().is_empty() {
227        return Err(Error::UnexpectedServerResponse(
228            "aggregator provided empty HpkeConfigList",
229        ));
230    }
231
232    // Take the first supported HpkeConfig from the list. Return the first error otherwise.
233    let mut first_error = None;
234    for config in hpke_configs.hpke_configs() {
235        match is_hpke_config_supported(config) {
236            Ok(()) => return Ok(config.clone()),
237            Err(e) => {
238                if first_error.is_none() {
239                    first_error = Some(e);
240                }
241            }
242        }
243    }
244    // Unwrap safety: we checked that the list is nonempty, and if we fell through to here, we must
245    // have seen at least one error.
246    Err(first_error.unwrap().into())
247}
248
249/// Fetches OHTTP HPKE key configurations for the provided OHTTP config.
250#[tracing::instrument(err)]
251#[cfg(feature = "ohttp")]
252async fn ohttp_key_configs(
253    http_request_retry_parameters: ExponentialBackoff,
254    ohttp_config: &OhttpConfig,
255    http_client: &reqwest::Client,
256) -> Result<Vec<KeyConfig>, Error> {
257    // TODO(#3159): store/fetch OHTTP key configs in a cache-control aware persistent cache.
258    let keys_response = retry_http_request(http_request_retry_parameters, || async {
259        http_client
260            .get(ohttp_config.key_configs.clone())
261            .header(ACCEPT, OHTTP_KEYS_MEDIA_TYPE)
262            .send()
263            .await
264    })
265    .await?;
266
267    if !keys_response.status().is_success() {
268        return Err(Error::Http(Box::new(HttpErrorResponse::from(
269            keys_response.status(),
270        ))));
271    }
272
273    if keys_response
274        .headers()
275        .get(CONTENT_TYPE)
276        .map(HeaderValue::as_bytes)
277        != Some(OHTTP_KEYS_MEDIA_TYPE.as_bytes())
278    {
279        return Err(Error::UnexpectedServerResponse(
280            "content type wrong for OHTTP keys",
281        ));
282    }
283
284    Ok(KeyConfig::decode_list(keys_response.body().as_ref())?)
285}
286
287/// Construct a [`reqwest::Client`] suitable for use in a DAP [`Client`].
288pub fn default_http_client() -> Result<reqwest::Client, Error> {
289    Ok(reqwest::Client::builder()
290        // Clients wishing to override these timeouts may provide their own
291        // values using ClientBuilder::with_http_client.
292        .timeout(std::time::Duration::from_secs(30))
293        .connect_timeout(std::time::Duration::from_secs(10))
294        .user_agent(CLIENT_USER_AGENT)
295        .build()?)
296}
297
298/// Configuration for using Oblivious HTTP (RFC 9458).
299#[derive(Clone, Debug)]
300#[cfg_attr(docsrs, doc(cfg(feature = "ohttp")))]
301#[cfg(feature = "ohttp")]
302pub struct OhttpConfig {
303    /// Endpoint from which OHTTP gateway key configurations may be fetched. The key configurations
304    /// must be in the format specified by [RFC 9458, section 3][1].
305    ///
306    /// [1]: https://datatracker.ietf.org/doc/html/rfc9458#name-key-configuration
307    pub key_configs: Url,
308
309    /// The OHTTP relay which will relay encapsulated messages to the gateway.
310    pub relay: Url,
311}
312
313/// Builder for configuring a [`Client`].
314pub struct ClientBuilder<V: vdaf::Client<16>> {
315    parameters: ClientParameters,
316    vdaf: V,
317    leader_hpke_config: Option<HpkeConfig>,
318    helper_hpke_config: Option<HpkeConfig>,
319    #[cfg(feature = "ohttp")]
320    ohttp_config: Option<OhttpConfig>,
321    http_client: Option<reqwest::Client>,
322}
323
324impl<V: vdaf::Client<16>> ClientBuilder<V> {
325    /// Construct a [`ClientBuilder`] from its required DAP task parameters.
326    pub fn new(
327        task_id: TaskId,
328        leader_aggregator_endpoint: Url,
329        helper_aggregator_endpoint: Url,
330        time_precision: Duration,
331        vdaf: V,
332    ) -> Self {
333        Self {
334            parameters: ClientParameters::new(
335                task_id,
336                leader_aggregator_endpoint,
337                helper_aggregator_endpoint,
338                time_precision,
339            ),
340            vdaf,
341            leader_hpke_config: None,
342            helper_hpke_config: None,
343            #[cfg(feature = "ohttp")]
344            ohttp_config: None,
345            http_client: None,
346        }
347    }
348
349    /// Finalize construction of a [`Client`]. This will fetch HPKE configurations from each
350    /// aggregator via HTTPS.
351    pub async fn build(self) -> Result<Client<V>, Error> {
352        let http_client = if let Some(http_client) = self.http_client {
353            http_client
354        } else {
355            default_http_client()?
356        };
357        // TODO(#3159): store/fetch HPKE configurations in a cache-control aware persistent cache
358        let (leader_hpke_config, helper_hpke_config) = tokio::try_join!(
359            aggregator_hpke_config(
360                self.leader_hpke_config,
361                &self.parameters,
362                &Role::Leader,
363                &http_client
364            ),
365            aggregator_hpke_config(
366                self.helper_hpke_config,
367                &self.parameters,
368                &Role::Helper,
369                &http_client
370            ),
371        )?;
372
373        #[cfg(feature = "ohttp")]
374        let ohttp_config = if let Some(ohttp_config) = self.ohttp_config {
375            let key_configs = ohttp_key_configs(
376                self.parameters.http_request_retry_parameters.clone(),
377                &ohttp_config,
378                &http_client,
379            )
380            .await?;
381            Some((ohttp_config, key_configs))
382        } else {
383            None
384        };
385
386        Ok(Client {
387            #[cfg(feature = "ohttp")]
388            ohttp_config,
389            parameters: self.parameters,
390            vdaf: self.vdaf,
391            http_client,
392            leader_hpke_config,
393            helper_hpke_config,
394        })
395    }
396
397    /// Finalize construction of a [`Client`], and provide aggregator HPKE configurations through an
398    /// out-of-band mechanism.
399    ///
400    /// # Notes
401    ///
402    /// This method is not compatible with OHTTP . Use [`ClientBuilder::with_ohttp_config`] and then
403    /// [`ClientBuilder::build`] to provide OHTTP configuration.
404    #[deprecated(
405        note = "Use `ClientBuilder::with_leader_hpke_config`, `ClientBuilder::with_helper_hpke_config` and `ClientBuilder::build` instead"
406    )]
407    pub fn build_with_hpke_configs(
408        self,
409        leader_hpke_config: HpkeConfig,
410        helper_hpke_config: HpkeConfig,
411    ) -> Result<Client<V>, Error> {
412        let http_client = if let Some(http_client) = self.http_client {
413            http_client
414        } else {
415            default_http_client()?
416        };
417        Ok(Client {
418            parameters: self.parameters,
419            vdaf: self.vdaf,
420            #[cfg(feature = "ohttp")]
421            ohttp_config: None,
422            http_client,
423            leader_hpke_config,
424            helper_hpke_config,
425        })
426    }
427
428    /// Override the HTTPS client configuration to be used.
429    pub fn with_http_client(mut self, http_client: reqwest::Client) -> Self {
430        self.http_client = Some(http_client);
431        self
432    }
433
434    /// Override the exponential backoff parameters used when retrying HTTPS requests.
435    pub fn with_backoff(mut self, http_request_retry_parameters: ExponentialBackoff) -> Self {
436        self.parameters.http_request_retry_parameters = http_request_retry_parameters;
437        self
438    }
439
440    /// Set the leader HPKE configuration to be used, preventing the client from fetching it from
441    /// the aggregator over HTTPS.
442    pub fn with_leader_hpke_config(mut self, hpke_config: HpkeConfig) -> Self {
443        self.leader_hpke_config = Some(hpke_config);
444        self
445    }
446
447    /// Set the helper HPKE configuration to be used, preventing the client from fetching it from
448    /// the aggregator over HTTPS.
449    pub fn with_helper_hpke_config(mut self, hpke_config: HpkeConfig) -> Self {
450        self.helper_hpke_config = Some(hpke_config);
451        self
452    }
453
454    /// Set the OHTTP configuration to be used when uploading reports, but not when fetching DAP
455    /// HPKE configurations.
456    ///
457    /// # Examples
458    ///
459    /// ```no_run
460    /// # use url::Url;
461    /// # use prio::vdaf::prio3::Prio3Count;
462    /// # use janus_messages::{Duration, TaskId};
463    /// # use rand::random;
464    /// # use std::str::FromStr;
465    ///
466    /// #[tokio::main]
467    /// async fn main() {
468    ///     let task_id = random();
469    ///
470    ///     let client = janus_client::Client::builder(
471    ///         task_id,
472    ///         Url::parse("https://leader.example.com/").unwrap(),
473    ///         Url::parse("https://helper.example.com/").unwrap(),
474    ///         Duration::from_seconds(1),
475    ///         Prio3Count::new_count(2).unwrap(),
476    ///     )
477    ///     .with_ohttp_config(janus_client::OhttpConfig {
478    ///         key_configs: Url::parse("https://ohttp-keys.example.com").unwrap(),
479    ///         relay: Url::parse("https://ohttp-relay.example.com").unwrap(),
480    ///     })
481    ///     .build()
482    ///     .await
483    ///     .unwrap();
484    ///
485    ///     client.upload(&true).await.unwrap();
486    /// }
487    /// ```
488    #[cfg(feature = "ohttp")]
489    #[cfg_attr(docsrs, doc(cfg(feature = "ohttp")))]
490    pub fn with_ohttp_config(mut self, ohttp_config: OhttpConfig) -> Self {
491        self.ohttp_config = Some(ohttp_config);
492        self
493    }
494}
495
496/// A DAP client.
497#[derive(Clone, Debug)]
498pub struct Client<V: vdaf::Client<16>> {
499    parameters: ClientParameters,
500    vdaf: V,
501    #[cfg(feature = "ohttp")]
502    ohttp_config: Option<(OhttpConfig, Vec<KeyConfig>)>,
503    http_client: reqwest::Client,
504    leader_hpke_config: HpkeConfig,
505    helper_hpke_config: HpkeConfig,
506}
507
508impl<V: vdaf::Client<16>> Client<V> {
509    /// Construct a new client from the required set of DAP task parameters.
510    pub async fn new(
511        task_id: TaskId,
512        leader_aggregator_endpoint: Url,
513        helper_aggregator_endpoint: Url,
514        time_precision: Duration,
515        vdaf: V,
516    ) -> Result<Self, Error> {
517        ClientBuilder::new(
518            task_id,
519            leader_aggregator_endpoint,
520            helper_aggregator_endpoint,
521            time_precision,
522            vdaf,
523        )
524        .build()
525        .await
526    }
527
528    /// Construct a new client, and provide the aggregator HPKE configurations through an
529    /// out-of-band means.
530    ///
531    /// # Notes
532    ///
533    /// This method is not compatible with OHTTP. Use [`ClientBuilder::with_ohttp_config`] and then
534    /// [`ClientBuilder::build`] to provide OHTTP configuration.
535    #[deprecated(
536        note = "Use `ClientBuilder::with_leader_hpke_config`, `ClientBuilder::with_helper_hpke_config` and `ClientBuilder::build` instead"
537    )]
538    pub fn with_hpke_configs(
539        task_id: TaskId,
540        leader_aggregator_endpoint: Url,
541        helper_aggregator_endpoint: Url,
542        time_precision: Duration,
543        vdaf: V,
544        leader_hpke_config: HpkeConfig,
545        helper_hpke_config: HpkeConfig,
546    ) -> Result<Self, Error> {
547        #[allow(deprecated)]
548        ClientBuilder::new(
549            task_id,
550            leader_aggregator_endpoint,
551            helper_aggregator_endpoint,
552            time_precision,
553            vdaf,
554        )
555        .build_with_hpke_configs(leader_hpke_config, helper_hpke_config)
556    }
557
558    /// Creates a [`ClientBuilder`] for further configuration from the required set of DAP task
559    /// parameters.
560    pub fn builder(
561        task_id: TaskId,
562        leader_aggregator_endpoint: Url,
563        helper_aggregator_endpoint: Url,
564        time_precision: Duration,
565        vdaf: V,
566    ) -> ClientBuilder<V> {
567        ClientBuilder::new(
568            task_id,
569            leader_aggregator_endpoint,
570            helper_aggregator_endpoint,
571            time_precision,
572            vdaf,
573        )
574    }
575
576    /// Shard a measurement, encrypt its shares, and construct a [`janus_messages::Report`] to be
577    /// uploaded.
578    fn prepare_report(&self, measurement: &V::Measurement, time: &Time) -> Result<Report, Error> {
579        let report_id: ReportId = random();
580        let (public_share, input_shares) = self.vdaf.shard(measurement, report_id.as_ref())?;
581        assert_eq!(input_shares.len(), 2); // DAP only supports VDAFs using two aggregators.
582
583        let time = time
584            .to_batch_interval_start(&self.parameters.time_precision)
585            .map_err(|_| Error::InvalidParameter("couldn't round time down to time_precision"))?;
586        let report_metadata = ReportMetadata::new(report_id, time);
587        let encoded_public_share = public_share.get_encoded()?;
588
589        let (leader_encrypted_input_share, helper_encrypted_input_share) = [
590            (&self.leader_hpke_config, &Role::Leader),
591            (&self.helper_hpke_config, &Role::Helper),
592        ]
593        .into_iter()
594        .zip(input_shares)
595        .map(|((hpke_config, receiver_role), input_share)| {
596            hpke::seal(
597                hpke_config,
598                &HpkeApplicationInfo::new(&Label::InputShare, &Role::Client, receiver_role),
599                &PlaintextInputShare::new(
600                    Vec::new(), // No extensions supported yet.
601                    input_share.get_encoded()?,
602                )
603                .get_encoded()?,
604                &InputShareAad::new(
605                    self.parameters.task_id,
606                    report_metadata.clone(),
607                    encoded_public_share.clone(),
608                )
609                .get_encoded()?,
610            )
611            .map_err(Error::Hpke)
612        })
613        .collect_tuple()
614        .expect("iterator to yield two items"); // expect safety: iterator contains two items.
615
616        Ok(Report::new(
617            report_metadata,
618            encoded_public_share,
619            leader_encrypted_input_share?,
620            helper_encrypted_input_share?,
621        ))
622    }
623
624    /// Upload a [`Report`] to the leader, per the [DAP specification][1]. The provided measurement
625    /// is sharded into two shares and then uploaded to the leader.
626    ///
627    /// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-uploading-reports
628    #[tracing::instrument(skip(measurement), err)]
629    pub async fn upload(&self, measurement: &V::Measurement) -> Result<(), Error> {
630        self.upload_with_time(measurement, Clock::now(&RealClock::default()))
631            .await
632    }
633
634    /// Upload a [`Report`] to the leader, per the [DAP specification][1], and override the report's
635    /// timestamp. The provided measurement is sharded into two shares and then uploaded to the
636    /// leader.
637    ///
638    /// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-uploading-reports
639    ///
640    /// ```no_run
641    /// # use janus_client::{Client, Error};
642    /// # use janus_messages::Duration;
643    /// # use prio::vdaf::prio3::Prio3;
644    /// # use rand::random;
645    /// #
646    /// # async fn test() -> Result<(), Error> {
647    /// # let measurement = true;
648    /// # let timestamp = 1_700_000_000;
649    /// # let vdaf = Prio3::new_count(2).unwrap();
650    /// let client = Client::new(
651    ///     random(),
652    ///     "https://example.com/".parse().unwrap(),
653    ///     "https://example.net/".parse().unwrap(),
654    ///     Duration::from_seconds(3600),
655    ///     vdaf,
656    /// ).await?;
657    /// client.upload_with_time(&measurement, std::time::SystemTime::now()).await?;
658    /// client.upload_with_time(&measurement, janus_messages::Time::from_seconds_since_epoch(timestamp)).await?;
659    /// # Ok(())
660    /// # }
661    /// ```
662    #[tracing::instrument(skip(measurement), err)]
663    pub async fn upload_with_time<T>(
664        &self,
665        measurement: &V::Measurement,
666        time: T,
667    ) -> Result<(), Error>
668    where
669        T: TryInto<Time> + Debug,
670        Error: From<<T as TryInto<Time>>::Error>,
671    {
672        let report = self
673            .prepare_report(measurement, &time.try_into()?)?
674            .get_encoded()?;
675        let upload_endpoint = self
676            .parameters
677            .reports_resource_uri(&self.parameters.task_id)?;
678
679        #[cfg(feature = "ohttp")]
680        let upload_status = self.upload_with_ohttp(&upload_endpoint, &report).await?;
681        #[cfg(not(feature = "ohttp"))]
682        let upload_status = self.put_report(&upload_endpoint, &report).await?;
683
684        if !upload_status.is_success() {
685            return Err(Error::Http(Box::new(HttpErrorResponse::from(
686                upload_status,
687            ))));
688        }
689
690        Ok(())
691    }
692
693    async fn put_report(
694        &self,
695        upload_endpoint: &Url,
696        request_body: &[u8],
697    ) -> Result<StatusCode, Error> {
698        Ok(retry_http_request(
699            self.parameters.http_request_retry_parameters.clone(),
700            || async {
701                self.http_client
702                    .put(upload_endpoint.clone())
703                    .header(CONTENT_TYPE, Report::MEDIA_TYPE)
704                    .body(request_body.to_vec())
705                    .send()
706                    .await
707            },
708        )
709        .await?
710        .status())
711    }
712
713    /// Send a DAP upload request via OHTTP, if the client is configured to use it, or directly if
714    /// not. This is only intended for DAP uploads and so does not handle response bodies.
715    #[cfg(feature = "ohttp")]
716    #[tracing::instrument(skip(self, request_body), err)]
717    async fn upload_with_ohttp(
718        &self,
719        upload_endpoint: &Url,
720        request_body: &[u8],
721    ) -> Result<StatusCode, Error> {
722        let (ohttp_config, key_configs) =
723            if let Some((ohttp_config, key_configs)) = &self.ohttp_config {
724                (ohttp_config, key_configs)
725            } else {
726                return self.put_report(upload_endpoint, request_body).await;
727            };
728
729        // Construct a Message representing the upload request...
730        let mut message = Message::request(
731            "PUT".into(),
732            upload_endpoint.scheme().into(),
733            upload_endpoint.authority().into(),
734            upload_endpoint.path().into(),
735        );
736        message.put_header(CONTENT_TYPE.as_str(), Report::MEDIA_TYPE);
737        message.write_content(request_body);
738
739        // ...get the BHTTP encoding of the message...
740        let mut request_buf = Vec::new();
741        message.write_bhttp(Mode::KnownLength, &mut request_buf)?;
742
743        // ...and OHTTP encapsulate it to the gateway key config.
744        let ohttp_request = key_configs
745            .iter()
746            .cloned()
747            .find_map(|mut key_config| ClientRequest::from_config(&mut key_config).ok())
748            .ok_or_else(|| Error::OhttpNoSupportedKeyConfigs(Box::new(key_configs.to_vec())))?;
749
750        let (encapsulated_request, ohttp_response) = ohttp_request.encapsulate(&request_buf)?;
751
752        let relay_response = retry_http_request(
753            self.parameters.http_request_retry_parameters.clone(),
754            || async {
755                self.http_client
756                    .post(ohttp_config.relay.clone())
757                    .header(CONTENT_TYPE, OHTTP_REQUEST_MEDIA_TYPE)
758                    .header(ACCEPT, OHTTP_RESPONSE_MEDIA_TYPE)
759                    .body(encapsulated_request.clone())
760                    .send()
761                    .await
762            },
763        )
764        .await?;
765
766        // Check whether request to the OHTTP relay was successful, and if so, decapsulate that
767        // response to get the DAP aggregator's response.
768        if !relay_response.status().is_success() {
769            return Err(Error::Http(Box::new(HttpErrorResponse::from(
770                relay_response.status(),
771            ))));
772        }
773
774        if relay_response
775            .headers()
776            .get(CONTENT_TYPE)
777            .map(HeaderValue::as_bytes)
778            != Some(OHTTP_RESPONSE_MEDIA_TYPE.as_bytes())
779        {
780            return Err(Error::UnexpectedServerResponse(
781                "content type wrong for OHTTP response",
782            ));
783        }
784
785        let decapsulated_response = ohttp_response.decapsulate(relay_response.body().as_ref())?;
786        let message = Message::read_bhttp(&mut Cursor::new(&decapsulated_response))?;
787        let status = if let ControlData::Response(status) = message.control() {
788            StatusCode::from_u16((*status).into()).map_err(|_| {
789                Error::UnexpectedServerResponse(
790                    "status in decapsulated response is not valid HTTP status",
791                )
792            })?
793        } else {
794            return Err(Error::UnexpectedServerResponse(
795                "decapsulated response control data is not a response",
796            ));
797        };
798
799        Ok(status)
800    }
801}