openstack_sdk/
openstack_async.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12//
13// SPDX-License-Identifier: Apache-2.0
14
15//! Asynchronous OpenStack client
16
17use async_trait::async_trait;
18use bytes::Bytes;
19use chrono::TimeDelta;
20use futures::io::{Error as IoError, ErrorKind as IoErrorKind};
21use futures::stream::TryStreamExt;
22use http::{header, HeaderMap, HeaderValue, Response as HttpResponse, StatusCode};
23use reqwest::{Body, Certificate, Client as AsyncClient, Request, Response};
24use std::convert::TryInto;
25use std::fmt::{self, Debug};
26use std::time::{Duration, SystemTime};
27use std::{fs::File, io::Read};
28use tokio_util::codec;
29use tokio_util::compat::FuturesAsyncReadCompatExt;
30use tracing::{debug, enabled, error, event, info, instrument, trace, warn, Level};
31
32use crate::api;
33use crate::api::query;
34#[cfg(feature = "keystone_ng")]
35use crate::api::query::QueryAsync;
36use crate::api::query::RawQueryAsync;
37use crate::api::RestClient;
38use crate::auth::{
39    self,
40    auth_helper::{AuthHelper, Dialoguer, Noop},
41    authtoken::{self, AuthTokenError, AuthType},
42    Auth, AuthError, AuthState,
43};
44use crate::catalog::{Catalog, CatalogError, ServiceEndpoint};
45use crate::config::CloudConfig;
46use crate::config::{get_config_identity_hash, ConfigFile};
47use crate::error::{OpenStackError, OpenStackResult, RestError};
48use crate::state;
49use crate::types::identity::v3::{AuthReceiptResponse, AuthResponse, Project, ServiceEndpoints};
50use crate::types::{ApiVersion, BoxedAsyncRead, ServiceType};
51use crate::utils::expand_tilde;
52
53/// Asynchronous client for the OpenStack API for a single user
54///
55/// Separate Identity (not the scope) should use separate instances of this.
56/// ```rust
57/// use openstack_sdk::api::{paged, Pagination, QueryAsync};
58/// use openstack_sdk::{AsyncOpenStack, config::ConfigFile, OpenStackError};
59/// use openstack_sdk::types::ServiceType;
60/// use openstack_sdk::api::compute::v2::flavor::list;
61///
62/// async fn list_flavors() -> Result<(), OpenStackError> {
63///     // Get the builder for the listing Flavors Endpoint
64///     let mut ep_builder = list::Request::builder();
65///     // Set the `min_disk` query param
66///     ep_builder.min_disk("15");
67///     let ep = ep_builder.build().unwrap();
68///
69///     let cfg = ConfigFile::new().unwrap();
70///     // Get connection config from clouds.yaml/secure.yaml
71///     let profile = cfg.get_cloud_config("devstack").unwrap().unwrap();
72///     // Establish connection
73///     let mut session = AsyncOpenStack::new(&profile).await?;
74///
75///     // Invoke service discovery when desired.
76///     session.discover_service_endpoint(&ServiceType::Compute).await?;
77///
78///     // Execute the call with pagination limiting maximum amount of entries to 1000
79///     let data: Vec<serde_json::Value> = paged(ep, Pagination::Limit(1000))
80///         .query_async(&session)
81///         .await.unwrap();
82///
83///     println!("Data = {:?}", data);
84///     Ok(())
85/// }
86/// ```
87#[derive(Clone)]
88pub struct AsyncOpenStack {
89    /// The client to use for API calls.
90    client: reqwest::Client,
91    /// Cloud configuration
92    config: CloudConfig,
93    /// The authentication information to use when communicating with OpenStack.
94    auth: Auth,
95    /// Endpoints catalog
96    catalog: Catalog,
97    /// Session state.
98    ///
99    /// In order to save authentication roundtrips save/load authentication
100    /// information in the file (similar to how other cli tools are doing)
101    /// and check auth expiration upon load.
102    state: state::State,
103}
104
105impl Debug for AsyncOpenStack {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        f.debug_struct("OpenStack")
108            .field("service_endpoints", &self.catalog)
109            .finish()
110    }
111}
112
113#[async_trait]
114impl api::RestClient for AsyncOpenStack {
115    type Error = RestError;
116
117    /// Get service endpoint from the catalog
118    fn get_service_endpoint(
119        &self,
120        service_type: &ServiceType,
121        version: Option<&ApiVersion>,
122    ) -> Result<&ServiceEndpoint, api::ApiError<Self::Error>> {
123        Ok(self
124            .catalog
125            .get_service_endpoint(service_type.to_string(), version, None::<String>)?)
126    }
127
128    /// Get project id from the current scope
129    fn get_current_project(&self) -> Option<Project> {
130        if let Auth::AuthToken(token) = &self.auth {
131            return token.auth_info.clone().and_then(|x| x.token.project);
132        }
133        None
134    }
135}
136
137#[async_trait]
138impl api::AsyncClient for AsyncOpenStack {
139    // Perform REST request
140    async fn rest_async(
141        &self,
142        request: http::request::Builder,
143        body: Vec<u8>,
144    ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
145        self.rest_with_auth_async(request, body, &self.auth).await
146    }
147
148    /// Perform REST request with the body read from AsyncRead
149    async fn rest_read_body_async(
150        &self,
151        request: http::request::Builder,
152        body: BoxedAsyncRead,
153    ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
154        self.rest_with_auth_read_body_async(request, body, &self.auth)
155            .await
156    }
157
158    /// Download result of HTTP operation.
159    async fn download_async(
160        &self,
161        request: http::request::Builder,
162        body: Vec<u8>,
163    ) -> Result<(HeaderMap, BoxedAsyncRead), api::ApiError<<Self as api::RestClient>::Error>> {
164        self.download_with_auth_async(request, body, &self.auth)
165            .await
166    }
167}
168
169impl AsyncOpenStack {
170    /// Basic constructor
171    fn new_impl(config: &CloudConfig, auth: Auth) -> OpenStackResult<Self> {
172        let mut client_builder = AsyncClient::builder();
173
174        if let Some(cacert) = &config.cacert {
175            let mut buf = Vec::new();
176            File::open(expand_tilde(cacert).unwrap_or(cacert.into()))
177                .map_err(|e| OpenStackError::IOWithPath {
178                    source: e,
179                    path: cacert.into(),
180                })?
181                .read_to_end(&mut buf)
182                .map_err(|e| OpenStackError::IOWithPath {
183                    source: e,
184                    path: cacert.into(),
185                })?;
186            for cert in Certificate::from_pem_bundle(&buf)? {
187                client_builder = client_builder.add_root_certificate(cert);
188            }
189        }
190        if let Some(false) = &config.verify {
191            warn!(
192                "SSL Verification is disabled! Please consider using `cacert` for adding custom certificate instead."
193            );
194            client_builder = client_builder.danger_accept_invalid_certs(true);
195        }
196        client_builder = client_builder.pool_max_idle_per_host(10);
197        client_builder = client_builder.pool_idle_timeout(Duration::from_secs(30));
198        client_builder = client_builder.timeout(Duration::from_secs(
199            config
200                .options
201                .get("api_timeout")
202                .and_then(|val| val.clone().into_uint().ok())
203                .unwrap_or(30),
204        ));
205        client_builder = client_builder.connect_timeout(Duration::from_secs(5));
206        client_builder = client_builder.tcp_keepalive(Duration::from_secs(60));
207        client_builder = client_builder.gzip(true);
208        client_builder = client_builder.deflate(true);
209
210        let mut session = AsyncOpenStack {
211            client: client_builder.build()?,
212            config: config.clone(),
213            auth,
214            catalog: Catalog::default(),
215            state: state::State::new(),
216        };
217
218        let auth_data = session
219            .config
220            .auth
221            .as_ref()
222            .ok_or(AuthTokenError::MissingAuthData)?;
223
224        let identity_service_url = auth_data
225            .auth_url
226            .as_ref()
227            .ok_or(AuthTokenError::MissingAuthUrl)?;
228
229        session.catalog.register_catalog_endpoint(
230            "identity",
231            identity_service_url,
232            config.region_name.as_ref(),
233            Some("public"),
234        )?;
235
236        session.catalog.configure(config)?;
237
238        session
239            .state
240            .set_auth_hash_key(get_config_identity_hash(config))
241            .enable_auth_cache(ConfigFile::new()?.is_auth_cache_enabled());
242
243        Ok(session)
244    }
245
246    /// Create a new OpenStack API session from CloudConfig
247    #[instrument(name = "connect", level = "trace", skip(config))]
248    pub async fn new(config: &CloudConfig) -> OpenStackResult<Self> {
249        let mut session = Self::new_impl(config, Auth::None)?;
250
251        // Ensure we resolve identity endpoint using version discovery
252        session
253            .discover_service_endpoint(&ServiceType::Identity)
254            .await?;
255
256        session.authorize(None, false, false).await?;
257
258        Ok(session)
259    }
260
261    /// Create a new OpenStack API session from CloudConfig
262    #[instrument(name = "connect", level = "trace", skip(config, auth_helper))]
263    pub async fn new_with_authentication_helper(
264        config: &CloudConfig,
265        auth_helper: &mut impl AuthHelper,
266        renew_auth: bool,
267    ) -> OpenStackResult<Self> {
268        let mut session = Self::new_impl(config, Auth::None)?;
269
270        // Ensure we resolve identity endpoint using version discovery
271        session
272            .discover_service_endpoint(&ServiceType::Identity)
273            .await?;
274
275        session
276            .authorize_with_auth_helper(None, auth_helper, renew_auth)
277            .await?;
278
279        Ok(session)
280    }
281
282    /// Create a new OpenStack API session from CloudConfig
283    #[instrument(name = "connect", level = "trace", skip(config))]
284    #[deprecated(
285        since = "0.22.0",
286        note = "please use `new_with_authentication_helper` instead"
287    )]
288    pub async fn new_interactive(config: &CloudConfig, renew_auth: bool) -> OpenStackResult<Self> {
289        Self::new_with_authentication_helper(config, &mut Dialoguer::default(), renew_auth).await
290    }
291
292    /// Set the authorization to be used by the client
293    fn set_auth(&mut self, auth: auth::Auth, skip_cache_update: bool) -> &mut Self {
294        self.auth = auth;
295        if !skip_cache_update {
296            if let Auth::AuthToken(auth) = &self.auth {
297                // For app creds we should save auth as unscoped since:
298                // - on request it is disallowed to specify scope
299                // - response contain fixed scope
300                // With this it is not possible to find auth in the cache if we use the real
301                // scope
302                let scope = match &auth.auth_info {
303                    Some(info) => {
304                        if info.token.application_credential.is_some() {
305                            authtoken::AuthTokenScope::Unscoped
306                        } else {
307                            auth.get_scope()
308                        }
309                    }
310                    _ => auth.get_scope(),
311                };
312                self.state.set_scope_auth(&scope, auth);
313            }
314        }
315        self
316    }
317
318    /// Set TokenAuth as current authorization
319    fn set_token_auth(&mut self, token: String, token_info: Option<AuthResponse>) -> &mut Self {
320        let token_auth = authtoken::AuthToken {
321            token,
322            auth_info: token_info,
323        };
324        self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
325        self
326    }
327
328    /// Authorize against the cloud using provided credentials and get the session token.
329    pub async fn authorize(
330        &mut self,
331        scope: Option<authtoken::AuthTokenScope>,
332        interactive: bool,
333        renew_auth: bool,
334    ) -> Result<(), OpenStackError> {
335        if interactive {
336            self.authorize_with_auth_helper(scope, &mut Dialoguer::default(), renew_auth)
337                .await
338        } else {
339            self.authorize_with_auth_helper(scope, &mut Noop::default(), renew_auth)
340                .await
341        }
342    }
343
344    /// Authorize against the cloud using provided credentials and get the session token with the
345    /// auth helper that may be invoked to interactively ask for the credentials.
346    pub async fn authorize_with_auth_helper<A>(
347        &mut self,
348        scope: Option<authtoken::AuthTokenScope>,
349        auth_helper: &mut A,
350        renew_auth: bool,
351    ) -> Result<(), OpenStackError>
352    where
353        A: AuthHelper,
354    {
355        let requested_scope = scope.map_or_else(
356            || authtoken::AuthTokenScope::try_from(&self.config),
357            |v| Ok(v.clone()),
358        )?;
359
360        if let (Some(auth), false) = (self.state.get_scope_auth(&requested_scope), renew_auth) {
361            // Valid authorization is already available and no renewal is required
362            trace!("Auth already available");
363            self.set_auth(auth::Auth::AuthToken(Box::new(auth.clone())), true);
364        } else {
365            // No valid authorization data is available in the state or
366            // renewal is requested
367            let auth_type = AuthType::from_cloud_config(&self.config)?;
368            let mut force_new_auth = renew_auth;
369            if let AuthType::V3ApplicationCredential = auth_type {
370                // application_credentials token can not be used to get new token without again
371                // supplying application credentials (bug in Keystone?)
372                // So for AppCred we just force a brand new auth
373                force_new_auth = true;
374            }
375            let mut rsp;
376            if let (Some(available_auth), false) = (self.state.get_any_valid_auth(), force_new_auth)
377            {
378                // State contain valid authentication for different scope/unscoped. It is possible
379                // to request new authz using this other auth
380                trace!("Valid Auth is available for reauthz: {:?}", available_auth);
381                let auth_ep = authtoken::build_reauth_request(&available_auth, &requested_scope)?;
382                rsp = auth_ep.raw_query_async_ll(self, Some(false)).await?;
383                // Check whether re-auth was successful
384                api::check_response_error::<Self>(&rsp, None)?;
385            } else {
386                // No auth/authz information available. Proceed with new auth
387                trace!("No Auth already available. Proceeding with new login");
388
389                match auth_type {
390                    AuthType::V3ApplicationCredential
391                    | AuthType::V3Password
392                    | AuthType::V3Token
393                    | AuthType::V3Totp
394                    | AuthType::V3Multifactor => {
395                        let identity =
396                            authtoken::build_identity_data_from_config(&self.config, auth_helper)
397                                .await?;
398                        let auth_ep = authtoken::build_auth_request_with_identity_and_scope(
399                            &identity,
400                            &requested_scope,
401                        )?;
402                        rsp = auth_ep.raw_query_async_ll(self, Some(false)).await?;
403
404                        // Handle the MFA
405                        if let StatusCode::UNAUTHORIZED = rsp.status() {
406                            if let Some(receipt) = rsp.headers().get("openstack-auth-receipt") {
407                                let receipt_data: AuthReceiptResponse =
408                                    serde_json::from_slice(rsp.body())?;
409                                let auth_endpoint = authtoken::build_auth_request_from_receipt(
410                                    &self.config,
411                                    receipt.clone(),
412                                    &receipt_data,
413                                    &requested_scope,
414                                    auth_helper,
415                                )
416                                .await?;
417                                rsp = auth_endpoint.raw_query_async(self).await?;
418                            }
419                        }
420                        api::check_response_error::<Self>(&rsp, None)?;
421                    }
422                    AuthType::V3OidcAccessToken => {
423                        let auth_ep =
424                            auth::v3oidcaccesstoken::get_auth_ep(&self.config, auth_helper).await?;
425                        rsp = auth_ep.raw_query_async(self).await?;
426
427                        let token = rsp
428                            .headers()
429                            .get("x-subject-token")
430                            .ok_or(AuthError::AuthTokenNotInResponse)?
431                            .to_str()
432                            .map_err(|_| AuthError::AuthTokenNotString)?;
433
434                        // Set retrieved token as current auth
435                        let token_info: AuthResponse = serde_json::from_slice(rsp.body())?;
436                        let token_auth = authtoken::AuthToken {
437                            token: token.to_string(),
438                            auth_info: Some(token_info),
439                        };
440                        self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
441
442                        // And now time to rescope the token
443                        let auth_ep =
444                            authtoken::build_reauth_request(&token_auth, &requested_scope)?;
445                        rsp = auth_ep.raw_query_async(self).await?;
446                    }
447                    AuthType::V3WebSso => {
448                        let auth_url = auth::v3websso::get_auth_url(&self.config)?;
449                        let identity_ep = self.get_service_endpoint(
450                            &ServiceType::Identity,
451                            Some(&ApiVersion::new(3, 0)),
452                        )?;
453                        let mut url = identity_ep.build_request_url(&auth_url)?;
454
455                        let mut token_auth = auth::v3websso::get_token_auth(&mut url).await?;
456
457                        // Set retrieved token as current auth
458                        self.set_auth(auth::Auth::AuthToken(Box::new(token_auth.clone())), true);
459
460                        // Get the token info (for the expiration)
461                        let token_info = self.fetch_token_info(token_auth.token.clone()).await?;
462                        token_auth.auth_info = Some(token_info.clone());
463                        let scope = authtoken::AuthTokenScope::from(&token_info);
464
465                        // Save unscoped token in the cache
466                        self.state.set_scope_auth(&scope, &token_auth);
467
468                        // And now time to rescope the token
469                        let auth_ep =
470                            authtoken::build_reauth_request(&token_auth, &requested_scope)?;
471                        rsp = auth_ep.raw_query_async(self).await?;
472                    }
473
474                    #[cfg(feature = "keystone_ng")]
475                    AuthType::V4Federation => {
476                        // Construct request for initializing authentication (POST call to keystone
477                        // `/federation/identity_providers/{idp_id}/auth`) to get the IDP url
478                        // client would need to contact.
479                        // TODO: If we know the scope we can request it from the very beginning
480                        // saving 1 call.
481                        let callback_addr = std::net::SocketAddr::from(([127, 0, 0, 1], 8050));
482                        let init_auth_ep =
483                            auth::v4federation::get_auth_ep(&self.config, callback_addr.port())?;
484                        let auth_info: auth::v4federation::FederationAuthRequestResponse =
485                            init_auth_ep.query_async(self).await?;
486
487                        // Perform the magic directing user's browser at the IDP url and waiting
488                        // for the callback to be invoked with the authorization code
489                        let oauth2_code =
490                            auth::v4federation::get_auth_code(&auth_info.auth_url, callback_addr)
491                                .await?;
492
493                        // Construct the request to Keystone to finish the authorization exchanging
494                        // received authorization code for the (unscoped) token
495                        let mut oidc_callback_builder =
496                            auth::v4federation::OauthCallbackRequestBuilder::default();
497                        if let (Some(code), Some(state)) = (oauth2_code.code, oauth2_code.state) {
498                            oidc_callback_builder.code(code.clone());
499                            oidc_callback_builder.state(state.clone());
500                            let oidc_callback_ep = oidc_callback_builder
501                                .build()
502                                .map_err(auth::v4federation::FederationError::from)?;
503
504                            rsp = oidc_callback_ep.raw_query_async(self).await?;
505                        } else {
506                            return Err(OpenStackError::NoAuth);
507                        }
508                    }
509                    #[cfg(feature = "keystone_ng")]
510                    AuthType::V4Jwt => {
511                        let auth_ep = auth::v4jwt::get_auth_ep(&self.config, auth_helper).await?;
512                        rsp = auth_ep.raw_query_async(self).await?;
513                        api::check_response_error::<Self>(&rsp, None)?;
514
515                        let token_info: AuthResponse = serde_json::from_slice(rsp.body())?;
516                        let received_scope = authtoken::AuthTokenScope::from(&token_info);
517                        tracing::debug!(
518                            "Requested: {:?}, received: {:?}",
519                            requested_scope,
520                            received_scope
521                        );
522                        let token = rsp
523                            .headers()
524                            .get("x-subject-token")
525                            .ok_or(AuthError::AuthTokenNotInResponse)?
526                            .to_str()
527                            .map_err(|_| AuthError::AuthTokenNotString)?;
528
529                        // Set retrieved token as current auth
530                        let token_auth = authtoken::AuthToken {
531                            token: token.to_string(),
532                            auth_info: Some(token_info),
533                        };
534                        self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
535
536                        if requested_scope != authtoken::AuthTokenScope::Unscoped
537                            && requested_scope != received_scope
538                        {
539                            // TODO: check if rescope is necessary and use v4 api
540                            // And now time to rescope the token
541                            let auth_ep =
542                                authtoken::build_reauth_request(&token_auth, &requested_scope)?;
543                            rsp = auth_ep.raw_query_async(self).await?;
544                        } else {
545                            // Client may not specify the target scope expecting the mapping to set
546                            // the proper token. Save the auth as unscope (similarly to the AppCred
547                            // handling).
548                            self.state
549                                .set_scope_auth(&authtoken::AuthTokenScope::Unscoped, &token_auth);
550                        }
551                    }
552
553                    #[cfg(feature = "passkey")]
554                    AuthType::V4Passkey => {
555                        let auth_ep =
556                            auth::v4passkey::get_init_auth_ep(&self.config, auth_helper).await?;
557                        let req: auth::v4passkey::PasskeyAuthenticationStartResponse =
558                            auth_ep.query_async(self).await?;
559                        use webauthn_authenticator_rs::prelude::Url;
560                        use webauthn_authenticator_rs::WebauthnAuthenticator;
561                        let mut auth = WebauthnAuthenticator::new(
562                            webauthn_authenticator_rs::mozilla::MozillaAuthenticator::new(),
563                        );
564                        let passkey_auth = auth
565                            .do_authentication(
566                                Url::parse("http://localhost:8080")?,
567                                req.try_into()?,
568                            )
569                            .map_err(auth::v4passkey::PasskeyError::from)?;
570                        let finish_ep = auth::v4passkey::get_finish_auth_ep(
571                            &self.config,
572                            passkey_auth,
573                            auth_helper,
574                        )
575                        .await?;
576                        rsp = finish_ep.raw_query_async(self).await?;
577
578                        let token = rsp
579                            .headers()
580                            .get("x-subject-token")
581                            .ok_or(AuthError::AuthTokenNotInResponse)?
582                            .to_str()
583                            .map_err(|_| AuthError::AuthTokenNotString)?;
584
585                        // Set retrieved token as current auth
586                        let token_info: AuthResponse = serde_json::from_slice(rsp.body())?;
587                        let token_auth = authtoken::AuthToken {
588                            token: token.to_string(),
589                            auth_info: Some(token_info),
590                        };
591                        self.set_auth(Auth::AuthToken(Box::new(token_auth.clone())), false);
592
593                        // And now time to rescope the token
594                        let auth_ep =
595                            authtoken::build_reauth_request(&token_auth, &requested_scope)?;
596                        rsp = auth_ep.raw_query_async(self).await?;
597                    }
598                }
599            };
600
601            let data: AuthResponse = serde_json::from_slice(rsp.body())?;
602            debug!("Auth token is {:?}", data);
603
604            let token = rsp
605                .headers()
606                .get("x-subject-token")
607                .ok_or(AuthError::AuthTokenNotInResponse)?
608                .to_str()
609                .map_err(|_| AuthError::AuthTokenNotString)?;
610
611            self.set_token_auth(token.into(), Some(data));
612        }
613
614        if let auth::Auth::AuthToken(token_data) = &self.auth {
615            match &token_data.auth_info {
616                Some(auth_data) => {
617                    if let Some(project) = &auth_data.token.project {
618                        self.catalog.set_project_id(project.id.clone());
619                        // Reconfigure catalog since we know now the project_id
620                        self.catalog.configure(&self.config)?;
621                    }
622                    if let Some(endpoints) = &auth_data.token.catalog {
623                        self.catalog
624                            .process_catalog_endpoints(endpoints, Some("public"))?;
625                    } else {
626                        error!("No catalog information");
627                    }
628                }
629                _ => return Err(OpenStackError::NoAuth),
630            }
631        }
632        // TODO: without AuthToken authorization we may want to read catalog separately
633        Ok(())
634    }
635
636    /// Perform version discovery of a service
637    #[instrument(skip(self))]
638    pub async fn discover_service_endpoint(
639        &mut self,
640        service_type: &ServiceType,
641    ) -> Result<(), OpenStackError> {
642        if let Ok(ep) = self.catalog.get_service_endpoint(
643            service_type.to_string(),
644            None,
645            self.config.region_name.as_ref(),
646        ) {
647            if self.catalog.discovery_allowed(service_type.to_string()) {
648                info!("Performing `{}` endpoint version discovery", service_type);
649
650                let orig_url = ep.url().clone();
651                let mut try_url = ep.url().clone();
652                // Version discovery document must logically end with "/" since API url goes even
653                // deeper.
654                try_url
655                    .path_segments_mut()
656                    .map_err(|_| CatalogError::cannot_be_base(ep.url()))?
657                    .pop_if_empty()
658                    .push("");
659                let mut max_depth = 10;
660                loop {
661                    let req = http::Request::builder()
662                        .method(http::Method::GET)
663                        .uri(query::url_to_http_uri(try_url.clone())?);
664
665                    match self.rest_with_auth_async(req, Vec::new(), &self.auth).await {
666                        Ok(rsp) => {
667                            if rsp.status() != StatusCode::NOT_FOUND
668                                && self
669                                    .catalog
670                                    .process_endpoint_discovery(
671                                        service_type,
672                                        &try_url,
673                                        rsp.body(),
674                                        self.config.region_name.as_ref(),
675                                    )
676                                    .is_ok()
677                            {
678                                debug!(
679                                    "Finished service version discovery at {}",
680                                    try_url.as_str()
681                                );
682                                return Ok(());
683                            }
684                        }
685                        Err(err) => {
686                            error!(
687                                "Error querying {} for the version discovery. It is most likely a misconfiguration on the cloud side. {}",
688                                try_url.as_str(),
689                                err
690                            );
691                        }
692                    };
693                    if try_url.path() != "/" {
694                        // We are not at the root yet and have not found a
695                        // valid version document so far, try one level up
696                        try_url
697                            .path_segments_mut()
698                            .map_err(|_| CatalogError::cannot_be_base(&orig_url))?
699                            .pop();
700                    } else {
701                        return Err(OpenStackError::Discovery {
702                            service: service_type.to_string(),
703                            url: orig_url.into(),
704                            msg: match service_type {
705                                ServiceType::Identity => "Service is not working.".into(),
706                                _ => "No Version document found. Either service is not supporting version discovery, or API is not working".into(),
707                            }
708                        });
709                    }
710
711                    max_depth -= 1;
712                    if max_depth == 0 {
713                        break;
714                    }
715                }
716                return Err(OpenStackError::Discovery {
717                    service: service_type.to_string(),
718                    url: orig_url.into(),
719                    msg: "Unknown".into(),
720                });
721            }
722            return Ok(());
723        }
724        Ok(())
725    }
726
727    // TODO(gtema): rename to `get_catalog`)
728    /// Return catalog information given in the token
729    pub fn get_token_catalog(&self) -> Option<Vec<ServiceEndpoints>> {
730        self.catalog.get_token_catalog()
731    }
732
733    /// Return current authentication information
734    pub fn get_auth_info(&self) -> Option<AuthResponse> {
735        if let Auth::AuthToken(token) = &self.auth {
736            return token.auth_info.clone();
737        }
738        None
739    }
740
741    /// Return current authentication status
742    ///
743    /// Offset can be used to calculate imminent expiration.
744    pub fn get_auth_state(&self, offset: Option<TimeDelta>) -> Option<AuthState> {
745        if let Auth::AuthToken(token) = &self.auth {
746            return Some(token.get_state(offset));
747        }
748        None
749    }
750
751    /// Return current authentication token
752    pub fn get_auth_token(&self) -> Option<String> {
753        if let Auth::AuthToken(token) = &self.auth {
754            return Some(token.token.clone());
755        }
756        None
757    }
758
759    /// Perform token introspection call
760    pub async fn fetch_token_info<S: AsRef<str>>(
761        &self,
762        token: S,
763    ) -> Result<AuthResponse, OpenStackError> {
764        let auth_ep = auth::authtoken::build_token_info_endpoint(token)?;
765        let rsp = auth_ep.raw_query_async(self).await?;
766        let data: AuthResponse = serde_json::from_slice(rsp.body())?;
767        Ok(data)
768    }
769
770    /// Perform HTTP request with given request and return raw response.
771    #[instrument(name="request", skip_all, fields(http.uri = request.url().as_str(), http.method = request.method().as_str(), openstack.ver=request.headers().get("openstack-api-version").map(|v| v.to_str().unwrap_or(""))))]
772    async fn execute_request(&self, request: Request) -> Result<Response, reqwest::Error> {
773        info!("Sending request {:?}", request);
774        let url = request.url().clone();
775        let method = request.method().clone();
776
777        if enabled!(Level::TRACE)
778            && request.headers().get(header::CONTENT_TYPE)
779                == Some(&HeaderValue::from_static("application/json"))
780        {
781            // Body may contain sensitive info - censor it but only when trace is on
782            request
783                .body()
784                .and_then(|body| body.as_bytes())
785                .and_then(|bytes| String::from_utf8(bytes.to_vec()).ok())
786                .inspect(|rq| {
787                    let censored = self
788                        .config
789                        .get_sensitive_values()
790                        .iter()
791                        .fold(rq.clone(), |sanitized, &secret| {
792                            sanitized.replace(secret, "<CENSORED>")
793                        });
794                    trace!("Request Body: {:?}", censored);
795                });
796        }
797
798        let start = SystemTime::now();
799        let rsp = self.client.execute(request).await?;
800        let elapsed = SystemTime::now().duration_since(start).unwrap_or_default();
801        event!(
802            name: "http_request",
803            Level::INFO,
804            url=url.as_str(),
805            duration_ms=elapsed.as_millis(),
806            status=rsp.status().as_u16(),
807            method=method.as_str(),
808            request_id=rsp.headers().get("x-openstack-request-id").map(|v| v.to_str().unwrap_or("")),
809            "Request completed with status {}",
810            rsp.status(),
811        );
812        Ok(rsp)
813    }
814
815    /// Perform a REST query with a given auth.
816    async fn rest_with_auth_async(
817        &self,
818        mut request: http::request::Builder,
819        body: Vec<u8>,
820        auth: &Auth,
821    ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
822        use futures_util::TryFutureExt;
823        let call = || async {
824            if let Some(headers) = request.headers_mut() {
825                auth.set_header(headers)?;
826            }
827            let http_request = request.body(body)?;
828            let request = http_request.try_into()?;
829
830            let rsp = self.execute_request(request).await?;
831
832            let mut http_rsp = HttpResponse::builder()
833                .status(rsp.status())
834                .version(rsp.version());
835
836            if let Some(headers) = http_rsp.headers_mut() {
837                headers.extend(rsp.headers().clone())
838            }
839
840            Ok(http_rsp.body(rsp.bytes().await?)?)
841        };
842        call().map_err(api::ApiError::client).await
843    }
844
845    /// Perform a REST query with a given auth.
846    async fn rest_with_auth_read_body_async(
847        &self,
848        mut request: http::request::Builder,
849        body_read: BoxedAsyncRead,
850        auth: &Auth,
851    ) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
852        use futures_util::TryFutureExt;
853        let call = || async {
854            if let Some(headers) = request.headers_mut() {
855                auth.set_header(headers)?;
856            }
857            let stream = codec::FramedRead::new(body_read.compat(), codec::BytesCodec::new())
858                .map_ok(|b| b.freeze());
859            let http_request = request.body(Body::wrap_stream(stream))?;
860            let request = http_request.try_into()?;
861
862            let rsp = self.execute_request(request).await?;
863
864            let mut http_rsp = HttpResponse::builder()
865                .status(rsp.status())
866                .version(rsp.version());
867
868            if let Some(headers) = http_rsp.headers_mut() {
869                headers.extend(rsp.headers().clone())
870            }
871
872            Ok(http_rsp.body(rsp.bytes().await?)?)
873        };
874        call().map_err(api::ApiError::client).await
875    }
876
877    /// Perform a REST query with a given auth and return AsyncRead of the body.
878    async fn download_with_auth_async(
879        &self,
880        mut request: http::request::Builder,
881        body: Vec<u8>,
882        auth: &Auth,
883    ) -> Result<(HeaderMap, BoxedAsyncRead), api::ApiError<<Self as api::RestClient>::Error>> {
884        use futures_util::TryFutureExt;
885        let call = || async {
886            if let Some(headers) = request.headers_mut() {
887                auth.set_header(headers)?;
888            }
889            let http_request = request.body(body)?;
890            let request = http_request.try_into()?;
891            let rsp = self.execute_request(request).await?;
892
893            let mut headers = HeaderMap::new();
894            for (key, value) in rsp.headers() {
895                headers.insert(key, value.clone());
896            }
897
898            let boxed_async_read = BoxedAsyncRead::new(
899                rsp.bytes_stream()
900                    .map_err(|orig| {
901                        let kind = if orig.is_timeout() {
902                            IoErrorKind::TimedOut
903                        } else {
904                            IoErrorKind::Other
905                        };
906                        IoError::new(kind, orig)
907                    })
908                    .into_async_read(),
909            );
910            Ok((headers, boxed_async_read))
911        };
912        call().map_err(api::ApiError::client).await
913    }
914}