1use async_trait::async_trait;
18use bytes::Bytes;
19use chrono::TimeDelta;
20use futures::io::{Error as IoError, ErrorKind as IoErrorKind};
21use futures::stream::TryStreamExt;
22use http::{header, HeaderMap, HeaderValue, Response as HttpResponse, StatusCode};
23use reqwest::{Body, Certificate, Client as AsyncClient, Request, Response};
24use std::convert::TryInto;
25use std::fmt::{self, Debug};
26use std::time::{Duration, SystemTime};
27use std::{fs::File, io::Read};
28use tokio_util::codec;
29use tokio_util::compat::FuturesAsyncReadCompatExt;
30use tracing::{debug, enabled, error, event, info, instrument, trace, warn, Level};
31
32use crate::api;
33use crate::api::query;
34#[cfg(feature = "keystone_ng")]
35use crate::api::query::QueryAsync;
36use crate::api::query::RawQueryAsync;
37use crate::api::RestClient;
38use crate::auth::{
39 self,
40 auth_helper::{AuthHelper, Dialoguer, Noop},
41 authtoken::{self, AuthTokenError, AuthType},
42 Auth, AuthError, AuthState,
43};
44use crate::catalog::{Catalog, CatalogError, ServiceEndpoint};
45use crate::config::CloudConfig;
46use crate::config::{get_config_identity_hash, ConfigFile};
47use crate::error::{OpenStackError, OpenStackResult, RestError};
48use crate::state;
49use crate::types::identity::v3::{AuthReceiptResponse, AuthResponse, Project, ServiceEndpoints};
50use crate::types::{ApiVersion, BoxedAsyncRead, ServiceType};
51use crate::utils::expand_tilde;
52
53#[derive(Clone)]
88pub struct AsyncOpenStack {
89 client: reqwest::Client,
91 config: CloudConfig,
93 auth: Auth,
95 catalog: Catalog,
97 state: state::State,
103}
104
105impl Debug for AsyncOpenStack {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 f.debug_struct("OpenStack")
108 .field("service_endpoints", &self.catalog)
109 .finish()
110 }
111}
112
113#[async_trait]
114impl api::RestClient for AsyncOpenStack {
115 type Error = RestError;
116
117 fn get_service_endpoint(
119 &self,
120 service_type: &ServiceType,
121 version: Option<&ApiVersion>,
122 ) -> Result<&ServiceEndpoint, api::ApiError<Self::Error>> {
123 Ok(self
124 .catalog
125 .get_service_endpoint(service_type.to_string(), version, None::<String>)?)
126 }
127
128 fn get_current_project(&self) -> Option<Project> {
130 if let Auth::AuthToken(token) = &self.auth {
131 return token.auth_info.clone().and_then(|x| x.token.project);
132 }
133 None
134 }
135}
136
137#[async_trait]
138impl api::AsyncClient for AsyncOpenStack {
139 async fn rest_async(
141 &self,
142 request: http::request::Builder,
143 body: Vec<u8>,
144 ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
145 self.rest_with_auth_async(request, body, &self.auth).await
146 }
147
148 async fn rest_read_body_async(
150 &self,
151 request: http::request::Builder,
152 body: BoxedAsyncRead,
153 ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
154 self.rest_with_auth_read_body_async(request, body, &self.auth)
155 .await
156 }
157
158 async fn download_async(
160 &self,
161 request: http::request::Builder,
162 body: Vec<u8>,
163 ) -> Result<(HeaderMap, BoxedAsyncRead), api::ApiError<<Self as api::RestClient>::Error>> {
164 self.download_with_auth_async(request, body, &self.auth)
165 .await
166 }
167}
168
169impl AsyncOpenStack {
170 fn new_impl(config: &CloudConfig, auth: Auth) -> OpenStackResult<Self> {
172 let mut client_builder = AsyncClient::builder();
173
174 if let Some(cacert) = &config.cacert {
175 let mut buf = Vec::new();
176 File::open(expand_tilde(cacert).unwrap_or(cacert.into()))
177 .map_err(|e| OpenStackError::IOWithPath {
178 source: e,
179 path: cacert.into(),
180 })?
181 .read_to_end(&mut buf)
182 .map_err(|e| OpenStackError::IOWithPath {
183 source: e,
184 path: cacert.into(),
185 })?;
186 for cert in Certificate::from_pem_bundle(&buf)? {
187 client_builder = client_builder.add_root_certificate(cert);
188 }
189 }
190 if let Some(false) = &config.verify {
191 warn!(
192 "SSL Verification is disabled! Please consider using `cacert` for adding custom certificate instead."
193 );
194 client_builder = client_builder.danger_accept_invalid_certs(true);
195 }
196 client_builder = client_builder.pool_max_idle_per_host(10);
197 client_builder = client_builder.pool_idle_timeout(Duration::from_secs(30));
198 client_builder = client_builder.timeout(Duration::from_secs(
199 config
200 .options
201 .get("api_timeout")
202 .and_then(|val| val.clone().into_uint().ok())
203 .unwrap_or(30),
204 ));
205 client_builder = client_builder.connect_timeout(Duration::from_secs(5));
206 client_builder = client_builder.tcp_keepalive(Duration::from_secs(60));
207 client_builder = client_builder.gzip(true);
208 client_builder = client_builder.deflate(true);
209
210 let mut session = AsyncOpenStack {
211 client: client_builder.build()?,
212 config: config.clone(),
213 auth,
214 catalog: Catalog::default(),
215 state: state::State::new(),
216 };
217
218 let auth_data = session
219 .config
220 .auth
221 .as_ref()
222 .ok_or(AuthTokenError::MissingAuthData)?;
223
224 let identity_service_url = auth_data
225 .auth_url
226 .as_ref()
227 .ok_or(AuthTokenError::MissingAuthUrl)?;
228
229 session.catalog.register_catalog_endpoint(
230 "identity",
231 identity_service_url,
232 config.region_name.as_ref(),
233 Some("public"),
234 )?;
235
236 session.catalog.configure(config)?;
237
238 session
239 .state
240 .set_auth_hash_key(get_config_identity_hash(config))
241 .enable_auth_cache(ConfigFile::new()?.is_auth_cache_enabled());
242
243 Ok(session)
244 }
245
246 #[instrument(name = "connect", level = "trace", skip(config))]
248 pub async fn new(config: &CloudConfig) -> OpenStackResult<Self> {
249 let mut session = Self::new_impl(config, Auth::None)?;
250
251 session
253 .discover_service_endpoint(&ServiceType::Identity)
254 .await?;
255
256 session.authorize(None, false, false).await?;
257
258 Ok(session)
259 }
260
261 #[instrument(name = "connect", level = "trace", skip(config, auth_helper))]
263 pub async fn new_with_authentication_helper(
264 config: &CloudConfig,
265 auth_helper: &mut impl AuthHelper,
266 renew_auth: bool,
267 ) -> OpenStackResult<Self> {
268 let mut session = Self::new_impl(config, Auth::None)?;
269
270 session
272 .discover_service_endpoint(&ServiceType::Identity)
273 .await?;
274
275 session
276 .authorize_with_auth_helper(None, auth_helper, renew_auth)
277 .await?;
278
279 Ok(session)
280 }
281
282 #[instrument(name = "connect", level = "trace", skip(config))]
284 #[deprecated(
285 since = "0.22.0",
286 note = "please use `new_with_authentication_helper` instead"
287 )]
288 pub async fn new_interactive(config: &CloudConfig, renew_auth: bool) -> OpenStackResult<Self> {
289 Self::new_with_authentication_helper(config, &mut Dialoguer::default(), renew_auth).await
290 }
291
292 fn set_auth(&mut self, auth: auth::Auth, skip_cache_update: bool) -> &mut Self {
294 self.auth = auth;
295 if !skip_cache_update {
296 if let Auth::AuthToken(auth) = &self.auth {
297 let scope = match &auth.auth_info {
303 Some(info) => {
304 if info.token.application_credential.is_some() {
305 authtoken::AuthTokenScope::Unscoped
306 } else {
307 auth.get_scope()
308 }
309 }
310 _ => auth.get_scope(),
311 };
312 self.state.set_scope_auth(&scope, auth);
313 }
314 }
315 self
316 }
317
318 fn set_token_auth(&mut self, token: String, token_info: Option<AuthResponse>) -> &mut Self {
320 let token_auth = authtoken::AuthToken {
321 token,
322 auth_info: token_info,
323 };
324 self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
325 self
326 }
327
328 pub async fn authorize(
330 &mut self,
331 scope: Option<authtoken::AuthTokenScope>,
332 interactive: bool,
333 renew_auth: bool,
334 ) -> Result<(), OpenStackError> {
335 if interactive {
336 self.authorize_with_auth_helper(scope, &mut Dialoguer::default(), renew_auth)
337 .await
338 } else {
339 self.authorize_with_auth_helper(scope, &mut Noop::default(), renew_auth)
340 .await
341 }
342 }
343
344 pub async fn authorize_with_auth_helper<A>(
347 &mut self,
348 scope: Option<authtoken::AuthTokenScope>,
349 auth_helper: &mut A,
350 renew_auth: bool,
351 ) -> Result<(), OpenStackError>
352 where
353 A: AuthHelper,
354 {
355 let requested_scope = scope.map_or_else(
356 || authtoken::AuthTokenScope::try_from(&self.config),
357 |v| Ok(v.clone()),
358 )?;
359
360 if let (Some(auth), false) = (self.state.get_scope_auth(&requested_scope), renew_auth) {
361 trace!("Auth already available");
363 self.set_auth(auth::Auth::AuthToken(Box::new(auth.clone())), true);
364 } else {
365 let auth_type = AuthType::from_cloud_config(&self.config)?;
368 let mut force_new_auth = renew_auth;
369 if let AuthType::V3ApplicationCredential = auth_type {
370 force_new_auth = true;
374 }
375 let mut rsp;
376 if let (Some(available_auth), false) = (self.state.get_any_valid_auth(), force_new_auth)
377 {
378 trace!("Valid Auth is available for reauthz: {:?}", available_auth);
381 let auth_ep = authtoken::build_reauth_request(&available_auth, &requested_scope)?;
382 rsp = auth_ep.raw_query_async_ll(self, Some(false)).await?;
383 api::check_response_error::<Self>(&rsp, None)?;
385 } else {
386 trace!("No Auth already available. Proceeding with new login");
388
389 match auth_type {
390 AuthType::V3ApplicationCredential
391 | AuthType::V3Password
392 | AuthType::V3Token
393 | AuthType::V3Totp
394 | AuthType::V3Multifactor => {
395 let identity =
396 authtoken::build_identity_data_from_config(&self.config, auth_helper)
397 .await?;
398 let auth_ep = authtoken::build_auth_request_with_identity_and_scope(
399 &identity,
400 &requested_scope,
401 )?;
402 rsp = auth_ep.raw_query_async_ll(self, Some(false)).await?;
403
404 if let StatusCode::UNAUTHORIZED = rsp.status() {
406 if let Some(receipt) = rsp.headers().get("openstack-auth-receipt") {
407 let receipt_data: AuthReceiptResponse =
408 serde_json::from_slice(rsp.body())?;
409 let auth_endpoint = authtoken::build_auth_request_from_receipt(
410 &self.config,
411 receipt.clone(),
412 &receipt_data,
413 &requested_scope,
414 auth_helper,
415 )
416 .await?;
417 rsp = auth_endpoint.raw_query_async(self).await?;
418 }
419 }
420 api::check_response_error::<Self>(&rsp, None)?;
421 }
422 AuthType::V3OidcAccessToken => {
423 let auth_ep =
424 auth::v3oidcaccesstoken::get_auth_ep(&self.config, auth_helper).await?;
425 rsp = auth_ep.raw_query_async(self).await?;
426
427 let token = rsp
428 .headers()
429 .get("x-subject-token")
430 .ok_or(AuthError::AuthTokenNotInResponse)?
431 .to_str()
432 .map_err(|_| AuthError::AuthTokenNotString)?;
433
434 let token_info: AuthResponse = serde_json::from_slice(rsp.body())?;
436 let token_auth = authtoken::AuthToken {
437 token: token.to_string(),
438 auth_info: Some(token_info),
439 };
440 self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
441
442 let auth_ep =
444 authtoken::build_reauth_request(&token_auth, &requested_scope)?;
445 rsp = auth_ep.raw_query_async(self).await?;
446 }
447 AuthType::V3WebSso => {
448 let auth_url = auth::v3websso::get_auth_url(&self.config)?;
449 let identity_ep = self.get_service_endpoint(
450 &ServiceType::Identity,
451 Some(&ApiVersion::new(3, 0)),
452 )?;
453 let mut url = identity_ep.build_request_url(&auth_url)?;
454
455 let mut token_auth = auth::v3websso::get_token_auth(&mut url).await?;
456
457 self.set_auth(auth::Auth::AuthToken(Box::new(token_auth.clone())), true);
459
460 let token_info = self.fetch_token_info(token_auth.token.clone()).await?;
462 token_auth.auth_info = Some(token_info.clone());
463 let scope = authtoken::AuthTokenScope::from(&token_info);
464
465 self.state.set_scope_auth(&scope, &token_auth);
467
468 let auth_ep =
470 authtoken::build_reauth_request(&token_auth, &requested_scope)?;
471 rsp = auth_ep.raw_query_async(self).await?;
472 }
473
474 #[cfg(feature = "keystone_ng")]
475 AuthType::V4Federation => {
476 let callback_addr = std::net::SocketAddr::from(([127, 0, 0, 1], 8050));
482 let init_auth_ep =
483 auth::v4federation::get_auth_ep(&self.config, callback_addr.port())?;
484 let auth_info: auth::v4federation::FederationAuthRequestResponse =
485 init_auth_ep.query_async(self).await?;
486
487 let oauth2_code =
490 auth::v4federation::get_auth_code(&auth_info.auth_url, callback_addr)
491 .await?;
492
493 let mut oidc_callback_builder =
496 auth::v4federation::OauthCallbackRequestBuilder::default();
497 if let (Some(code), Some(state)) = (oauth2_code.code, oauth2_code.state) {
498 oidc_callback_builder.code(code.clone());
499 oidc_callback_builder.state(state.clone());
500 let oidc_callback_ep = oidc_callback_builder
501 .build()
502 .map_err(auth::v4federation::FederationError::from)?;
503
504 rsp = oidc_callback_ep.raw_query_async(self).await?;
505 } else {
506 return Err(OpenStackError::NoAuth);
507 }
508 }
509 #[cfg(feature = "keystone_ng")]
510 AuthType::V4Jwt => {
511 let auth_ep = auth::v4jwt::get_auth_ep(&self.config, auth_helper).await?;
512 rsp = auth_ep.raw_query_async(self).await?;
513 api::check_response_error::<Self>(&rsp, None)?;
514
515 let token_info: AuthResponse = serde_json::from_slice(rsp.body())?;
516 let received_scope = authtoken::AuthTokenScope::from(&token_info);
517 tracing::debug!(
518 "Requested: {:?}, received: {:?}",
519 requested_scope,
520 received_scope
521 );
522 let token = rsp
523 .headers()
524 .get("x-subject-token")
525 .ok_or(AuthError::AuthTokenNotInResponse)?
526 .to_str()
527 .map_err(|_| AuthError::AuthTokenNotString)?;
528
529 let token_auth = authtoken::AuthToken {
531 token: token.to_string(),
532 auth_info: Some(token_info),
533 };
534 self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
535
536 if requested_scope != authtoken::AuthTokenScope::Unscoped
537 && requested_scope != received_scope
538 {
539 let auth_ep =
542 authtoken::build_reauth_request(&token_auth, &requested_scope)?;
543 rsp = auth_ep.raw_query_async(self).await?;
544 } else {
545 self.state
549 .set_scope_auth(&authtoken::AuthTokenScope::Unscoped, &token_auth);
550 }
551 }
552
553 #[cfg(feature = "passkey")]
554 AuthType::V4Passkey => {
555 let auth_ep =
556 auth::v4passkey::get_init_auth_ep(&self.config, auth_helper).await?;
557 let req: auth::v4passkey::PasskeyAuthenticationStartResponse =
558 auth_ep.query_async(self).await?;
559 use webauthn_authenticator_rs::prelude::Url;
560 use webauthn_authenticator_rs::WebauthnAuthenticator;
561 let mut auth = WebauthnAuthenticator::new(
562 webauthn_authenticator_rs::mozilla::MozillaAuthenticator::new(),
563 );
564 let passkey_auth = auth
565 .do_authentication(
566 Url::parse("http://localhost:8080")?,
567 req.try_into()?,
568 )
569 .map_err(auth::v4passkey::PasskeyError::from)?;
570 let finish_ep = auth::v4passkey::get_finish_auth_ep(
571 &self.config,
572 passkey_auth,
573 auth_helper,
574 )
575 .await?;
576 rsp = finish_ep.raw_query_async(self).await?;
577
578 let token = rsp
579 .headers()
580 .get("x-subject-token")
581 .ok_or(AuthError::AuthTokenNotInResponse)?
582 .to_str()
583 .map_err(|_| AuthError::AuthTokenNotString)?;
584
585 let token_info: AuthResponse = serde_json::from_slice(rsp.body())?;
587 let token_auth = authtoken::AuthToken {
588 token: token.to_string(),
589 auth_info: Some(token_info),
590 };
591 self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
592
593 let auth_ep =
595 authtoken::build_reauth_request(&token_auth, &requested_scope)?;
596 rsp = auth_ep.raw_query_async(self).await?;
597 }
598 }
599 };
600
601 let data: AuthResponse = serde_json::from_slice(rsp.body())?;
602 debug!("Auth token is {:?}", data);
603
604 let token = rsp
605 .headers()
606 .get("x-subject-token")
607 .ok_or(AuthError::AuthTokenNotInResponse)?
608 .to_str()
609 .map_err(|_| AuthError::AuthTokenNotString)?;
610
611 self.set_token_auth(token.into(), Some(data));
612 }
613
614 if let auth::Auth::AuthToken(token_data) = &self.auth {
615 match &token_data.auth_info {
616 Some(auth_data) => {
617 if let Some(project) = &auth_data.token.project {
618 self.catalog.set_project_id(project.id.clone());
619 self.catalog.configure(&self.config)?;
621 }
622 if let Some(endpoints) = &auth_data.token.catalog {
623 self.catalog
624 .process_catalog_endpoints(endpoints, Some("public"))?;
625 } else {
626 error!("No catalog information");
627 }
628 }
629 _ => return Err(OpenStackError::NoAuth),
630 }
631 }
632 Ok(())
634 }
635
636 #[instrument(skip(self))]
638 pub async fn discover_service_endpoint(
639 &mut self,
640 service_type: &ServiceType,
641 ) -> Result<(), OpenStackError> {
642 if let Ok(ep) = self.catalog.get_service_endpoint(
643 service_type.to_string(),
644 None,
645 self.config.region_name.as_ref(),
646 ) {
647 if self.catalog.discovery_allowed(service_type.to_string()) {
648 info!("Performing `{}` endpoint version discovery", service_type);
649
650 let orig_url = ep.url().clone();
651 let mut try_url = ep.url().clone();
652 try_url
655 .path_segments_mut()
656 .map_err(|_| CatalogError::cannot_be_base(ep.url()))?
657 .pop_if_empty()
658 .push("");
659 let mut max_depth = 10;
660 loop {
661 let req = http::Request::builder()
662 .method(http::Method::GET)
663 .uri(query::url_to_http_uri(try_url.clone())?);
664
665 match self.rest_with_auth_async(req, Vec::new(), &self.auth).await {
666 Ok(rsp) => {
667 if rsp.status() != StatusCode::NOT_FOUND
668 && self
669 .catalog
670 .process_endpoint_discovery(
671 service_type,
672 &try_url,
673 rsp.body(),
674 self.config.region_name.as_ref(),
675 )
676 .is_ok()
677 {
678 debug!(
679 "Finished service version discovery at {}",
680 try_url.as_str()
681 );
682 return Ok(());
683 }
684 }
685 Err(err) => {
686 error!(
687 "Error querying {} for the version discovery. It is most likely a misconfiguration on the cloud side. {}",
688 try_url.as_str(),
689 err
690 );
691 }
692 };
693 if try_url.path() != "/" {
694 try_url
697 .path_segments_mut()
698 .map_err(|_| CatalogError::cannot_be_base(&orig_url))?
699 .pop();
700 } else {
701 return Err(OpenStackError::Discovery {
702 service: service_type.to_string(),
703 url: orig_url.into(),
704 msg: match service_type {
705 ServiceType::Identity => "Service is not working.".into(),
706 _ => "No Version document found. Either service is not supporting version discovery, or API is not working".into(),
707 }
708 });
709 }
710
711 max_depth -= 1;
712 if max_depth == 0 {
713 break;
714 }
715 }
716 return Err(OpenStackError::Discovery {
717 service: service_type.to_string(),
718 url: orig_url.into(),
719 msg: "Unknown".into(),
720 });
721 }
722 return Ok(());
723 }
724 Ok(())
725 }
726
727 pub fn get_token_catalog(&self) -> Option<Vec<ServiceEndpoints>> {
730 self.catalog.get_token_catalog()
731 }
732
733 pub fn get_auth_info(&self) -> Option<AuthResponse> {
735 if let Auth::AuthToken(token) = &self.auth {
736 return token.auth_info.clone();
737 }
738 None
739 }
740
741 pub fn get_auth_state(&self, offset: Option<TimeDelta>) -> Option<AuthState> {
745 if let Auth::AuthToken(token) = &self.auth {
746 return Some(token.get_state(offset));
747 }
748 None
749 }
750
751 pub fn get_auth_token(&self) -> Option<String> {
753 if let Auth::AuthToken(token) = &self.auth {
754 return Some(token.token.clone());
755 }
756 None
757 }
758
759 pub async fn fetch_token_info<S: AsRef<str>>(
761 &self,
762 token: S,
763 ) -> Result<AuthResponse, OpenStackError> {
764 let auth_ep = auth::authtoken::build_token_info_endpoint(token)?;
765 let rsp = auth_ep.raw_query_async(self).await?;
766 let data: AuthResponse = serde_json::from_slice(rsp.body())?;
767 Ok(data)
768 }
769
770 #[instrument(name="request", skip_all, fields(http.uri = request.url().as_str(), http.method = request.method().as_str(), openstack.ver=request.headers().get("openstack-api-version").map(|v| v.to_str().unwrap_or(""))))]
772 async fn execute_request(&self, request: Request) -> Result<Response, reqwest::Error> {
773 info!("Sending request {:?}", request);
774 let url = request.url().clone();
775 let method = request.method().clone();
776
777 if enabled!(Level::TRACE)
778 && request.headers().get(header::CONTENT_TYPE)
779 == Some(&HeaderValue::from_static("application/json"))
780 {
781 request
783 .body()
784 .and_then(|body| body.as_bytes())
785 .and_then(|bytes| String::from_utf8(bytes.to_vec()).ok())
786 .inspect(|rq| {
787 let censored = self
788 .config
789 .get_sensitive_values()
790 .iter()
791 .fold(rq.clone(), |sanitized, &secret| {
792 sanitized.replace(secret, "<CENSORED>")
793 });
794 trace!("Request Body: {:?}", censored);
795 });
796 }
797
798 let start = SystemTime::now();
799 let rsp = self.client.execute(request).await?;
800 let elapsed = SystemTime::now().duration_since(start).unwrap_or_default();
801 event!(
802 name: "http_request",
803 Level::INFO,
804 url=url.as_str(),
805 duration_ms=elapsed.as_millis(),
806 status=rsp.status().as_u16(),
807 method=method.as_str(),
808 request_id=rsp.headers().get("x-openstack-request-id").map(|v| v.to_str().unwrap_or("")),
809 "Request completed with status {}",
810 rsp.status(),
811 );
812 Ok(rsp)
813 }
814
815 async fn rest_with_auth_async(
817 &self,
818 mut request: http::request::Builder,
819 body: Vec<u8>,
820 auth: &Auth,
821 ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
822 use futures_util::TryFutureExt;
823 let call = || async {
824 if let Some(headers) = request.headers_mut() {
825 auth.set_header(headers)?;
826 }
827 let http_request = request.body(body)?;
828 let request = http_request.try_into()?;
829
830 let rsp = self.execute_request(request).await?;
831
832 let mut http_rsp = HttpResponse::builder()
833 .status(rsp.status())
834 .version(rsp.version());
835
836 if let Some(headers) = http_rsp.headers_mut() {
837 headers.extend(rsp.headers().clone())
838 }
839
840 Ok(http_rsp.body(rsp.bytes().await?)?)
841 };
842 call().map_err(api::ApiError::client).await
843 }
844
845 async fn rest_with_auth_read_body_async(
847 &self,
848 mut request: http::request::Builder,
849 body_read: BoxedAsyncRead,
850 auth: &Auth,
851 ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
852 use futures_util::TryFutureExt;
853 let call = || async {
854 if let Some(headers) = request.headers_mut() {
855 auth.set_header(headers)?;
856 }
857 let stream = codec::FramedRead::new(body_read.compat(), codec::BytesCodec::new())
858 .map_ok(|b| b.freeze());
859 let http_request = request.body(Body::wrap_stream(stream))?;
860 let request = http_request.try_into()?;
861
862 let rsp = self.execute_request(request).await?;
863
864 let mut http_rsp = HttpResponse::builder()
865 .status(rsp.status())
866 .version(rsp.version());
867
868 if let Some(headers) = http_rsp.headers_mut() {
869 headers.extend(rsp.headers().clone())
870 }
871
872 Ok(http_rsp.body(rsp.bytes().await?)?)
873 };
874 call().map_err(api::ApiError::client).await
875 }
876
877 async fn download_with_auth_async(
879 &self,
880 mut request: http::request::Builder,
881 body: Vec<u8>,
882 auth: &Auth,
883 ) -> Result<(HeaderMap, BoxedAsyncRead), api::ApiError<<Self as api::RestClient>::Error>> {
884 use futures_util::TryFutureExt;
885 let call = || async {
886 if let Some(headers) = request.headers_mut() {
887 auth.set_header(headers)?;
888 }
889 let http_request = request.body(body)?;
890 let request = http_request.try_into()?;
891 let rsp = self.execute_request(request).await?;
892
893 let mut headers = HeaderMap::new();
894 for (key, value) in rsp.headers() {
895 headers.insert(key, value.clone());
896 }
897
898 let boxed_async_read = BoxedAsyncRead::new(
899 rsp.bytes_stream()
900 .map_err(|orig| {
901 let kind = if orig.is_timeout() {
902 IoErrorKind::TimedOut
903 } else {
904 IoErrorKind::Other
905 };
906 IoError::new(kind, orig)
907 })
908 .into_async_read(),
909 );
910 Ok((headers, boxed_async_read))
911 };
912 call().map_err(api::ApiError::client).await
913 }
914}