1#![cfg_attr(docsrs, feature(doc_cfg))]
41
42use backoff::ExponentialBackoff;
43#[cfg(feature = "ohttp")]
44use bhttp::{ControlData, Message, Mode};
45use educe::Educe;
46#[cfg(feature = "ohttp")]
47use http::{header::ACCEPT, HeaderValue};
48use http::{header::CONTENT_TYPE, StatusCode};
49use itertools::Itertools;
50use janus_core::{
51 hpke::{self, is_hpke_config_supported, HpkeApplicationInfo, Label},
52 http::{cached_resource::CachedResource, HttpErrorResponse},
53 retries::{http_request_exponential_backoff, retry_http_request},
54 time::{Clock, RealClock, TimeExt},
55 url_ensure_trailing_slash,
56};
57use janus_messages::{
58 Duration, HpkeConfig, HpkeConfigList, InputShareAad, PlaintextInputShare, Report, ReportId,
59 ReportMetadata, Role, TaskId, Time,
60};
61#[cfg(feature = "ohttp")]
62use ohttp::{ClientRequest, KeyConfig};
63use prio::{codec::Encode, vdaf};
64use rand::random;
65#[cfg(feature = "ohttp")]
66use std::io::Cursor;
67use std::{convert::Infallible, fmt::Debug, sync::Arc, time::SystemTimeError};
68use tokio::{sync::Mutex, try_join};
69use url::Url;
70
71#[cfg(test)]
72mod tests;
73
74#[derive(Debug, thiserror::Error)]
75pub enum Error {
76 #[error("invalid parameter {0}")]
77 InvalidParameter(&'static str),
78 #[error("HTTP client error: {0}")]
79 HttpClient(#[from] reqwest::Error),
80 #[error("codec error: {0}")]
81 Codec(#[from] prio::codec::CodecError),
82 #[error("HTTP response status {0}")]
83 Http(Box<HttpErrorResponse>),
84 #[error("URL parse: {0}")]
85 Url(#[from] url::ParseError),
86 #[error("VDAF error: {0}")]
87 Vdaf(#[from] prio::vdaf::VdafError),
88 #[error("HPKE error: {0}")]
89 Hpke(#[from] janus_core::hpke::Error),
90 #[error("Cached resource error: {0}")]
91 CachedResource(#[from] janus_core::http::cached_resource::Error),
92 #[error("unexpected server response {0}")]
93 UnexpectedServerResponse(&'static str),
94 #[error("time conversion error: {0}")]
95 TimeConversion(#[from] SystemTimeError),
96 #[cfg(feature = "ohttp")]
97 #[error("OHTTP error: {0}")]
98 Ohttp(#[from] ohttp::Error),
99 #[cfg(feature = "ohttp")]
100 #[error("BHTTP error: {0}")]
101 Bhttp(#[from] bhttp::Error),
102 #[cfg(feature = "ohttp")]
103 #[error("No supported key configurations advertised by OHTTP gateway")]
104 OhttpNoSupportedKeyConfigs(Box<Vec<KeyConfig>>),
105}
106
107impl From<Infallible> for Error {
108 fn from(value: Infallible) -> Self {
109 match value {}
110 }
111}
112
113impl From<Result<HttpErrorResponse, reqwest::Error>> for Error {
114 fn from(result: Result<HttpErrorResponse, reqwest::Error>) -> Self {
115 match result {
116 Ok(http_error_response) => Error::Http(Box::new(http_error_response)),
117 Err(error) => error.into(),
118 }
119 }
120}
121
122static CLIENT_USER_AGENT: &str = concat!(
123 env!("CARGO_PKG_NAME"),
124 "/",
125 env!("CARGO_PKG_VERSION"),
126 "/",
127 "client"
128);
129
130#[cfg(feature = "ohttp")]
131const OHTTP_KEYS_MEDIA_TYPE: &str = "application/ohttp-keys";
132#[cfg(feature = "ohttp")]
133const OHTTP_REQUEST_MEDIA_TYPE: &str = "message/ohttp-req";
134#[cfg(feature = "ohttp")]
135const OHTTP_RESPONSE_MEDIA_TYPE: &str = "message/ohttp-res";
136
137#[derive(Clone, Educe)]
139#[educe(Debug)]
140struct ClientParameters {
141 task_id: TaskId,
143 #[educe(Debug(method(std::fmt::Display::fmt)))]
145 leader_aggregator_endpoint: Url,
146 #[educe(Debug(method(std::fmt::Display::fmt)))]
148 helper_aggregator_endpoint: Url,
149 time_precision: Duration,
152 http_request_retry_parameters: ExponentialBackoff,
154}
155
156impl ClientParameters {
157 pub fn new(
159 task_id: TaskId,
160 leader_aggregator_endpoint: Url,
161 helper_aggregator_endpoint: Url,
162 time_precision: Duration,
163 ) -> Self {
164 Self {
165 task_id,
166 leader_aggregator_endpoint: url_ensure_trailing_slash(leader_aggregator_endpoint),
167 helper_aggregator_endpoint: url_ensure_trailing_slash(helper_aggregator_endpoint),
168 time_precision,
169 http_request_retry_parameters: http_request_exponential_backoff(),
170 }
171 }
172
173 fn aggregator_endpoint(&self, role: &Role) -> Result<&Url, Error> {
176 match role {
177 Role::Leader => Ok(&self.leader_aggregator_endpoint),
178 Role::Helper => Ok(&self.helper_aggregator_endpoint),
179 _ => Err(Error::InvalidParameter("role is not an aggregator")),
180 }
181 }
182
183 fn hpke_config_endpoint(&self, role: &Role) -> Result<Url, Error> {
188 Ok(self.aggregator_endpoint(role)?.join("hpke_config")?)
189 }
190
191 fn reports_resource_uri(&self, task_id: &TaskId) -> Result<Url, Error> {
193 Ok(self
194 .leader_aggregator_endpoint
195 .join(&format!("tasks/{task_id}/reports"))?)
196 }
197}
198
199#[tracing::instrument(err)]
201#[cfg(feature = "ohttp")]
202async fn ohttp_key_configs(
203 http_request_retry_parameters: ExponentialBackoff,
204 ohttp_config: &OhttpConfig,
205 http_client: &reqwest::Client,
206) -> Result<Vec<KeyConfig>, Error> {
207 let keys_response = retry_http_request(http_request_retry_parameters, || async {
209 http_client
210 .get(ohttp_config.key_configs.clone())
211 .header(ACCEPT, OHTTP_KEYS_MEDIA_TYPE)
212 .send()
213 .await
214 })
215 .await?;
216
217 if !keys_response.status().is_success() {
218 return Err(Error::Http(Box::new(HttpErrorResponse::from(
219 keys_response.status(),
220 ))));
221 }
222
223 if keys_response
224 .headers()
225 .get(CONTENT_TYPE)
226 .map(HeaderValue::as_bytes)
227 != Some(OHTTP_KEYS_MEDIA_TYPE.as_bytes())
228 {
229 return Err(Error::UnexpectedServerResponse(
230 "content type wrong for OHTTP keys",
231 ));
232 }
233
234 Ok(KeyConfig::decode_list(keys_response.body().as_ref())?)
235}
236
237pub fn default_http_client() -> Result<reqwest::Client, Error> {
239 Ok(reqwest::Client::builder()
240 .timeout(std::time::Duration::from_secs(30))
243 .connect_timeout(std::time::Duration::from_secs(10))
244 .user_agent(CLIENT_USER_AGENT)
245 .build()?)
246}
247
248#[derive(Clone, Debug)]
250#[cfg_attr(docsrs, doc(cfg(feature = "ohttp")))]
251#[cfg(feature = "ohttp")]
252pub struct OhttpConfig {
253 pub key_configs: Url,
258
259 pub relay: Url,
261}
262
263pub struct ClientBuilder<V: vdaf::Client<16>> {
265 parameters: ClientParameters,
266 vdaf: V,
267 leader_hpke_config: Option<HpkeConfig>,
268 helper_hpke_config: Option<HpkeConfig>,
269 #[cfg(feature = "ohttp")]
270 ohttp_config: Option<OhttpConfig>,
271 http_client: Option<reqwest::Client>,
272}
273
274impl<V: vdaf::Client<16>> ClientBuilder<V> {
275 pub fn new(
277 task_id: TaskId,
278 leader_aggregator_endpoint: Url,
279 helper_aggregator_endpoint: Url,
280 time_precision: Duration,
281 vdaf: V,
282 ) -> Self {
283 Self {
284 parameters: ClientParameters::new(
285 task_id,
286 leader_aggregator_endpoint,
287 helper_aggregator_endpoint,
288 time_precision,
289 ),
290 vdaf,
291 leader_hpke_config: None,
292 helper_hpke_config: None,
293 #[cfg(feature = "ohttp")]
294 ohttp_config: None,
295 http_client: None,
296 }
297 }
298
299 pub async fn build(self) -> Result<Client<V>, Error> {
302 let http_client = if let Some(http_client) = self.http_client {
303 http_client
304 } else {
305 default_http_client()?
306 };
307
308 let fetch_hpke_config = async |hpke_config, role| match hpke_config {
309 Some(hpke_config) => Ok(HpkeConfiguration::new_static(hpke_config)),
310 None => HpkeConfiguration::new(&self.parameters, role, &http_client).await,
311 };
312
313 let (leader_hpke_config, helper_hpke_config) = tokio::try_join!(
314 fetch_hpke_config(self.leader_hpke_config, &Role::Leader),
315 fetch_hpke_config(self.helper_hpke_config, &Role::Helper),
316 )?;
317
318 #[cfg(feature = "ohttp")]
319 let ohttp_config = if let Some(ohttp_config) = self.ohttp_config {
320 let key_configs = ohttp_key_configs(
321 self.parameters.http_request_retry_parameters.clone(),
322 &ohttp_config,
323 &http_client,
324 )
325 .await?;
326 Some((ohttp_config, key_configs))
327 } else {
328 None
329 };
330
331 Ok(Client {
332 #[cfg(feature = "ohttp")]
333 ohttp_config,
334 parameters: self.parameters,
335 vdaf: self.vdaf,
336 http_client,
337 leader_hpke_config: Arc::new(Mutex::new(leader_hpke_config)),
338 helper_hpke_config: Arc::new(Mutex::new(helper_hpke_config)),
339 })
340 }
341
342 #[deprecated(
350 note = "Use `ClientBuilder::with_leader_hpke_config`, `ClientBuilder::with_helper_hpke_config` and `ClientBuilder::build` instead"
351 )]
352 pub fn build_with_hpke_configs(
353 self,
354 leader_hpke_config: HpkeConfig,
355 helper_hpke_config: HpkeConfig,
356 ) -> Result<Client<V>, Error> {
357 let http_client = if let Some(http_client) = self.http_client {
358 http_client
359 } else {
360 default_http_client()?
361 };
362 Ok(Client {
363 parameters: self.parameters,
364 vdaf: self.vdaf,
365 #[cfg(feature = "ohttp")]
366 ohttp_config: None,
367 http_client,
368 leader_hpke_config: Arc::new(Mutex::new(HpkeConfiguration::new_static(
369 leader_hpke_config,
370 ))),
371 helper_hpke_config: Arc::new(Mutex::new(HpkeConfiguration::new_static(
372 helper_hpke_config,
373 ))),
374 })
375 }
376
377 pub fn with_http_client(mut self, http_client: reqwest::Client) -> Self {
379 self.http_client = Some(http_client);
380 self
381 }
382
383 pub fn with_backoff(mut self, http_request_retry_parameters: ExponentialBackoff) -> Self {
385 self.parameters.http_request_retry_parameters = http_request_retry_parameters;
386 self
387 }
388
389 pub fn with_leader_hpke_config(mut self, hpke_config: HpkeConfig) -> Self {
392 self.leader_hpke_config = Some(hpke_config);
393 self
394 }
395
396 pub fn with_helper_hpke_config(mut self, hpke_config: HpkeConfig) -> Self {
399 self.helper_hpke_config = Some(hpke_config);
400 self
401 }
402
403 #[cfg(feature = "ohttp")]
438 #[cfg_attr(docsrs, doc(cfg(feature = "ohttp")))]
439 pub fn with_ohttp_config(mut self, ohttp_config: OhttpConfig) -> Self {
440 self.ohttp_config = Some(ohttp_config);
441 self
442 }
443}
444
445#[derive(Clone, Debug)]
447pub struct Client<V: vdaf::Client<16>> {
448 parameters: ClientParameters,
449 vdaf: V,
450 #[cfg(feature = "ohttp")]
451 ohttp_config: Option<(OhttpConfig, Vec<KeyConfig>)>,
452 http_client: reqwest::Client,
453 leader_hpke_config: Arc<Mutex<HpkeConfiguration>>,
454 helper_hpke_config: Arc<Mutex<HpkeConfiguration>>,
455}
456
457impl<V: vdaf::Client<16>> Client<V> {
458 pub async fn new(
460 task_id: TaskId,
461 leader_aggregator_endpoint: Url,
462 helper_aggregator_endpoint: Url,
463 time_precision: Duration,
464 vdaf: V,
465 ) -> Result<Self, Error> {
466 ClientBuilder::new(
467 task_id,
468 leader_aggregator_endpoint,
469 helper_aggregator_endpoint,
470 time_precision,
471 vdaf,
472 )
473 .build()
474 .await
475 }
476
477 #[deprecated(
485 note = "Use `ClientBuilder::with_leader_hpke_config`, `ClientBuilder::with_helper_hpke_config` and `ClientBuilder::build` instead"
486 )]
487 pub fn with_hpke_configs(
488 task_id: TaskId,
489 leader_aggregator_endpoint: Url,
490 helper_aggregator_endpoint: Url,
491 time_precision: Duration,
492 vdaf: V,
493 leader_hpke_config: HpkeConfig,
494 helper_hpke_config: HpkeConfig,
495 ) -> Result<Self, Error> {
496 #[allow(deprecated)]
497 ClientBuilder::new(
498 task_id,
499 leader_aggregator_endpoint,
500 helper_aggregator_endpoint,
501 time_precision,
502 vdaf,
503 )
504 .build_with_hpke_configs(leader_hpke_config, helper_hpke_config)
505 }
506
507 pub fn builder(
510 task_id: TaskId,
511 leader_aggregator_endpoint: Url,
512 helper_aggregator_endpoint: Url,
513 time_precision: Duration,
514 vdaf: V,
515 ) -> ClientBuilder<V> {
516 ClientBuilder::new(
517 task_id,
518 leader_aggregator_endpoint,
519 helper_aggregator_endpoint,
520 time_precision,
521 vdaf,
522 )
523 }
524
525 fn prepare_report(
528 &self,
529 measurement: &V::Measurement,
530 time: &Time,
531 leader_hpke_config: &HpkeConfig,
532 helper_hpke_config: &HpkeConfig,
533 ) -> Result<Report, Error> {
534 let report_id: ReportId = random();
535 let (public_share, input_shares) = self.vdaf.shard(measurement, report_id.as_ref())?;
536 assert_eq!(input_shares.len(), 2); let time = time
539 .to_batch_interval_start(&self.parameters.time_precision)
540 .map_err(|_| Error::InvalidParameter("couldn't round time down to time_precision"))?;
541 let report_metadata = ReportMetadata::new(report_id, time);
542 let encoded_public_share = public_share.get_encoded()?;
543
544 let (leader_encrypted_input_share, helper_encrypted_input_share) = [
545 (leader_hpke_config, &Role::Leader),
546 (helper_hpke_config, &Role::Helper),
547 ]
548 .into_iter()
549 .zip(input_shares)
550 .map(|((hpke_config, receiver_role), input_share)| {
551 hpke::seal(
552 hpke_config,
553 &HpkeApplicationInfo::new(&Label::InputShare, &Role::Client, receiver_role),
554 &PlaintextInputShare::new(
555 Vec::new(), input_share.get_encoded()?,
557 )
558 .get_encoded()?,
559 &InputShareAad::new(
560 self.parameters.task_id,
561 report_metadata.clone(),
562 encoded_public_share.clone(),
563 )
564 .get_encoded()?,
565 )
566 .map_err(Error::Hpke)
567 })
568 .collect_tuple()
569 .expect("iterator to yield two items"); Ok(Report::new(
572 report_metadata,
573 encoded_public_share,
574 leader_encrypted_input_share?,
575 helper_encrypted_input_share?,
576 ))
577 }
578
579 #[tracing::instrument(skip(measurement), err)]
584 pub async fn upload(&self, measurement: &V::Measurement) -> Result<(), Error> {
585 self.upload_with_time(measurement, Clock::now(&RealClock::default()))
586 .await
587 }
588
589 #[tracing::instrument(skip(measurement), err)]
618 pub async fn upload_with_time<T>(
619 &self,
620 measurement: &V::Measurement,
621 time: T,
622 ) -> Result<(), Error>
623 where
624 T: TryInto<Time> + Debug,
625 Error: From<<T as TryInto<Time>>::Error>,
626 {
627 let mut leader_hpke_config = self.leader_hpke_config.lock().await;
628 let mut helper_hpke_config = self.helper_hpke_config.lock().await;
629 let (leader_hpke_config, helper_hpke_config) =
630 try_join!(leader_hpke_config.get(), helper_hpke_config.get())?;
631
632 let report = self
633 .prepare_report(
634 measurement,
635 &time.try_into()?,
636 leader_hpke_config,
637 helper_hpke_config,
638 )?
639 .get_encoded()?;
640 let upload_endpoint = self
641 .parameters
642 .reports_resource_uri(&self.parameters.task_id)?;
643
644 #[cfg(feature = "ohttp")]
645 let upload_status = self.upload_with_ohttp(&upload_endpoint, &report).await?;
646 #[cfg(not(feature = "ohttp"))]
647 let upload_status = self.put_report(&upload_endpoint, &report).await?;
648
649 if !upload_status.is_success() {
650 return Err(Error::Http(Box::new(HttpErrorResponse::from(
651 upload_status,
652 ))));
653 }
654
655 Ok(())
656 }
657
658 async fn put_report(
659 &self,
660 upload_endpoint: &Url,
661 request_body: &[u8],
662 ) -> Result<StatusCode, Error> {
663 Ok(retry_http_request(
664 self.parameters.http_request_retry_parameters.clone(),
665 || async {
666 self.http_client
667 .put(upload_endpoint.clone())
668 .header(CONTENT_TYPE, Report::MEDIA_TYPE)
669 .body(request_body.to_vec())
670 .send()
671 .await
672 },
673 )
674 .await?
675 .status())
676 }
677
678 #[cfg(feature = "ohttp")]
681 #[tracing::instrument(skip(self, request_body), err)]
682 async fn upload_with_ohttp(
683 &self,
684 upload_endpoint: &Url,
685 request_body: &[u8],
686 ) -> Result<StatusCode, Error> {
687 let (ohttp_config, key_configs) =
688 if let Some((ohttp_config, key_configs)) = &self.ohttp_config {
689 (ohttp_config, key_configs)
690 } else {
691 return self.put_report(upload_endpoint, request_body).await;
692 };
693
694 let mut message = Message::request(
696 "PUT".into(),
697 upload_endpoint.scheme().into(),
698 upload_endpoint.authority().into(),
699 upload_endpoint.path().into(),
700 );
701 message.put_header(CONTENT_TYPE.as_str(), Report::MEDIA_TYPE);
702 message.write_content(request_body);
703
704 let mut request_buf = Vec::new();
706 message.write_bhttp(Mode::KnownLength, &mut request_buf)?;
707
708 let ohttp_request = key_configs
710 .iter()
711 .cloned()
712 .find_map(|mut key_config| ClientRequest::from_config(&mut key_config).ok())
713 .ok_or_else(|| Error::OhttpNoSupportedKeyConfigs(Box::new(key_configs.to_vec())))?;
714
715 let (encapsulated_request, ohttp_response) = ohttp_request.encapsulate(&request_buf)?;
716
717 let relay_response = retry_http_request(
718 self.parameters.http_request_retry_parameters.clone(),
719 || async {
720 self.http_client
721 .post(ohttp_config.relay.clone())
722 .header(CONTENT_TYPE, OHTTP_REQUEST_MEDIA_TYPE)
723 .header(ACCEPT, OHTTP_RESPONSE_MEDIA_TYPE)
724 .body(encapsulated_request.clone())
725 .send()
726 .await
727 },
728 )
729 .await?;
730
731 if !relay_response.status().is_success() {
734 return Err(Error::Http(Box::new(HttpErrorResponse::from(
735 relay_response.status(),
736 ))));
737 }
738
739 if relay_response
740 .headers()
741 .get(CONTENT_TYPE)
742 .map(HeaderValue::as_bytes)
743 != Some(OHTTP_RESPONSE_MEDIA_TYPE.as_bytes())
744 {
745 return Err(Error::UnexpectedServerResponse(
746 "content type wrong for OHTTP response",
747 ));
748 }
749
750 let decapsulated_response = ohttp_response.decapsulate(relay_response.body().as_ref())?;
751 let message = Message::read_bhttp(&mut Cursor::new(&decapsulated_response))?;
752 let status = if let ControlData::Response(status) = message.control() {
753 StatusCode::from_u16((*status).into()).map_err(|_| {
754 Error::UnexpectedServerResponse(
755 "status in decapsulated response is not valid HTTP status",
756 )
757 })?
758 } else {
759 return Err(Error::UnexpectedServerResponse(
760 "decapsulated response control data is not a response",
761 ));
762 };
763
764 Ok(status)
765 }
766}
767
768#[derive(Debug, Clone)]
770pub(crate) struct HpkeConfiguration {
771 hpke_config_list: CachedResource<HpkeConfigList>,
772}
773
774impl HpkeConfiguration {
775 pub(crate) async fn new(
776 client_parameters: &ClientParameters,
777 aggregator_role: &Role,
778 http_client: &reqwest::Client,
779 ) -> Result<Self, Error> {
780 let mut hpke_config_url = client_parameters.hpke_config_endpoint(aggregator_role)?;
781 hpke_config_url.set_query(Some(&format!("task_id={}", client_parameters.task_id)));
782
783 Ok(Self {
784 hpke_config_list: CachedResource::new(
785 hpke_config_url,
786 HpkeConfigList::MEDIA_TYPE,
787 http_client,
788 client_parameters.http_request_retry_parameters.clone(),
789 )
790 .await?,
791 })
792 }
793
794 pub(crate) fn new_static(hpke_configuration: HpkeConfig) -> Self {
795 Self {
796 hpke_config_list: CachedResource::Static(HpkeConfigList::new(vec![hpke_configuration])),
797 }
798 }
799
800 pub(crate) async fn get(&mut self) -> Result<&HpkeConfig, Error> {
801 let hpke_config_list = self.hpke_config_list.resource().await?;
802
803 if hpke_config_list.hpke_configs().is_empty() {
804 return Err(Error::UnexpectedServerResponse(
805 "aggregator provided empty HpkeConfigList",
806 ));
807 }
808
809 let mut first_error = None;
811 for config in hpke_config_list.hpke_configs() {
812 match is_hpke_config_supported(config) {
813 Ok(()) => return Ok(config),
814 Err(e) => {
815 if first_error.is_none() {
816 first_error = Some(e);
817 }
818 }
819 }
820 }
821 Err(first_error.unwrap().into())
824 }
825}