use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt::{self, Debug};
use std::time::{Duration, SystemTime};
use std::{fs::File, io::Read};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::TimeDelta;
use futures::io::{Error as IoError, ErrorKind as IoErrorKind};
use futures::stream::TryStreamExt;
use http::{HeaderMap, HeaderValue, Response as HttpResponse, StatusCode, header};
use reqwest::{Body, Certificate, Client as AsyncClient, Request, Response};
use secrecy::{ExposeSecret, SecretString};
use tokio_util::codec;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{Level, debug, enabled, error, event, info, instrument, trace, warn};
use openstack_sdk_auth_core::{
Auth, AuthError, AuthPluginRegistration, AuthToken, OpenStackAuthType,
authtoken::AuthTokenError,
authtoken_scope::AuthTokenScope,
types::{AuthResponse, Project, ServiceEndpoints},
};
use openstack_sdk_auth_applicationcredential as _;
#[cfg(feature = "keystone_ng")]
use openstack_sdk_auth_federation as _;
#[cfg(feature = "keystone_ng")]
use openstack_sdk_auth_jwt as _;
use openstack_sdk_auth_oidcaccesstoken as _;
#[cfg(feature = "passkey")]
use openstack_sdk_auth_passkey as _;
use openstack_sdk_auth_password as _;
use openstack_sdk_auth_receipt as token_receipt;
use openstack_sdk_auth_token as token_auth;
use openstack_sdk_auth_totp as _;
use openstack_sdk_auth_websso as _;
use crate::auth::authtoken::{AuthType, build_token_info_endpoint};
use openstack_sdk_core::api::{
self, RestClient,
query::{self, RawQueryAsync},
};
use openstack_sdk_core::auth::{
AuthState,
auth_helper::{AuthHelper, Dialoguer, Noop},
gather_auth_data,
};
use openstack_sdk_core::catalog::{Catalog, CatalogError, ServiceEndpoint};
use openstack_sdk_core::config::{CloudConfig, ConfigFile, get_config_identity_hash};
use openstack_sdk_core::error::{OpenStackError, OpenStackResult, RestError};
use openstack_sdk_core::state;
use openstack_sdk_core::types::{ApiVersion, BoxedAsyncRead, ServiceType};
use openstack_sdk_core::utils::expand_tilde;
#[derive(Clone)]
pub struct AsyncOpenStack {
client: reqwest::Client,
config: CloudConfig,
auth: Auth,
catalog: Catalog,
state: state::State,
}
impl Debug for AsyncOpenStack {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OpenStack")
.field("service_endpoints", &self.catalog)
.finish()
}
}
#[async_trait]
impl api::RestClient for AsyncOpenStack {
type Error = RestError;
fn get_service_endpoint(
&self,
service_type: &ServiceType,
version: Option<&ApiVersion>,
) -> Result<&ServiceEndpoint, api::ApiError<Self::Error>> {
Ok(self.catalog.get_service_endpoint(
service_type.to_string(),
version,
self.config.region_name.as_ref(),
self.config.interface.as_ref(),
)?)
}
fn get_current_project(&self) -> Option<Project> {
if let Auth::AuthToken(token) = &self.auth {
return token.auth_info.clone().and_then(|x| x.token.project);
}
None
}
}
#[async_trait]
impl api::AsyncClient for AsyncOpenStack {
async fn rest_async(
&self,
request: http::request::Builder,
body: Vec<u8>,
) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
self.rest_with_auth_async(request, body, &self.auth).await
}
async fn rest_read_body_async(
&self,
request: http::request::Builder,
body: BoxedAsyncRead,
) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
self.rest_with_auth_read_body_async(request, body, &self.auth)
.await
}
async fn download_async(
&self,
request: http::request::Builder,
body: Vec<u8>,
) -> Result<(HeaderMap, BoxedAsyncRead), api::ApiError<<Self as api::RestClient>::Error>> {
self.download_with_auth_async(request, body, &self.auth)
.await
}
}
impl AsyncOpenStack {
fn new_impl(config: &CloudConfig, auth: Auth) -> OpenStackResult<Self> {
let mut client_builder = AsyncClient::builder();
if let Some(cacert) = &config.cacert {
let mut buf = Vec::new();
File::open(expand_tilde(cacert).unwrap_or(cacert.into()))
.map_err(|e| OpenStackError::IOWithPath {
source: e,
path: cacert.into(),
})?
.read_to_end(&mut buf)
.map_err(|e| OpenStackError::IOWithPath {
source: e,
path: cacert.into(),
})?;
for cert in Certificate::from_pem_bundle(&buf)? {
client_builder = client_builder.add_root_certificate(cert);
}
}
if let Some(false) = &config.verify {
warn!(
"SSL Verification is disabled! Please consider using `cacert` for adding custom certificate instead."
);
client_builder = client_builder.danger_accept_invalid_certs(true);
}
client_builder = client_builder.pool_max_idle_per_host(10);
client_builder = client_builder.pool_idle_timeout(Duration::from_secs(30));
client_builder = client_builder.timeout(Duration::from_secs(
config
.options
.get("api_timeout")
.and_then(|val| val.clone().into_uint().ok())
.unwrap_or(30),
));
client_builder = client_builder.connect_timeout(Duration::from_secs(5));
client_builder = client_builder.tcp_keepalive(Duration::from_secs(60));
client_builder = client_builder.gzip(true);
client_builder = client_builder.deflate(true);
let mut session = AsyncOpenStack {
client: client_builder.build()?,
config: config.clone(),
auth,
catalog: Catalog::default(),
state: state::State::new(),
};
let auth_data = session
.config
.auth
.as_ref()
.ok_or(AuthTokenError::MissingAuthData)?;
let identity_service_url = auth_data
.auth_url
.as_ref()
.ok_or(AuthTokenError::MissingAuthUrl)?;
session.catalog.register_catalog_endpoint(
"identity",
identity_service_url,
config.region_name.as_ref(),
Some("public"),
)?;
session.catalog.configure(config)?;
session
.state
.set_auth_hash_key(get_config_identity_hash(config))
.enable_auth_cache(ConfigFile::new()?.is_auth_cache_enabled());
Ok(session)
}
#[instrument(name = "connect", level = "trace", skip(config))]
pub async fn new(config: &CloudConfig) -> OpenStackResult<Self> {
let mut session = Self::new_impl(config, Auth::None)?;
session
.discover_service_endpoint(&ServiceType::Identity)
.await?;
session.authorize(None, false, false).await?;
Ok(session)
}
#[instrument(name = "connect", level = "trace", skip(config, auth_helper))]
pub async fn new_with_authentication_helper<A>(
config: &CloudConfig,
auth_helper: &mut A,
renew_auth: bool,
) -> OpenStackResult<Self>
where
A: AuthHelper + Sync + Send,
{
let mut session = Self::new_impl(config, Auth::None)?;
session
.discover_service_endpoint(&ServiceType::Identity)
.await?;
session
.authorize_with_auth_helper(None, auth_helper, renew_auth)
.await?;
Ok(session)
}
#[instrument(name = "connect", level = "trace", skip(config))]
#[deprecated(
since = "0.22.0",
note = "please use `new_with_authentication_helper` instead"
)]
pub async fn new_interactive(config: &CloudConfig, renew_auth: bool) -> OpenStackResult<Self> {
Self::new_with_authentication_helper(config, &mut Dialoguer::default(), renew_auth).await
}
fn set_auth(&mut self, auth: Auth, skip_cache_update: bool) -> &mut Self {
self.auth = auth;
if !skip_cache_update && let Auth::AuthToken(auth) = &self.auth {
let scope = match &auth.auth_info {
Some(info) => {
if info.token.application_credential.is_some() {
AuthTokenScope::Unscoped
} else {
auth.get_scope()
}
}
_ => auth.get_scope(),
};
self.state.set_scope_auth(&scope, auth);
}
self
}
pub async fn authorize(
&mut self,
scope: Option<AuthTokenScope>,
interactive: bool,
renew_auth: bool,
) -> Result<(), OpenStackError> {
if interactive {
self.authorize_with_auth_helper(scope, &mut Dialoguer::default(), renew_auth)
.await
} else {
self.authorize_with_auth_helper(scope, &mut Noop::default(), renew_auth)
.await
}
}
async fn reauth(
&self,
auth: &AuthToken,
scope: &AuthTokenScope,
) -> Result<Auth, OpenStackError> {
Ok(token_auth::PLUGIN
.auth(
&self.client,
self.get_service_endpoint(&ServiceType::Identity, Some(&ApiVersion::from((3, 0))))?
.url(),
HashMap::from([("token".into(), auth.token.clone())]),
Some(scope),
None,
)
.await?)
}
pub async fn authorize_with_auth_helper<A>(
&mut self,
scope: Option<AuthTokenScope>,
auth_helper: &mut A,
renew_auth: bool,
) -> Result<(), OpenStackError>
where
A: AuthHelper + Sync + Send,
{
let requested_scope =
scope.map_or_else(|| AuthTokenScope::try_from(&self.config), |v| Ok(v.clone()))?;
if let (Some(auth), false) = (self.state.get_scope_auth(&requested_scope), renew_auth) {
trace!("Auth already available");
self.set_auth(Auth::AuthToken(Box::new(auth.clone())), true);
} else {
let auth_type = AuthType::from_cloud_config(&self.config)?;
let mut force_new_auth = renew_auth;
if let AuthType::V3ApplicationCredential = auth_type {
force_new_auth = true;
}
if let (Some(available_auth), false) = (self.state.get_any_valid_auth(), force_new_auth)
{
trace!("Valid Auth is available for reauthz: {:?}", available_auth);
let token_auth = self.reauth(&available_auth, &requested_scope).await?;
self.set_auth(token_auth.clone(), false);
} else {
trace!("No Auth already available. Proceeding with new login");
let auth_type = auth_type.as_str();
if let Some(authenticator) = inventory::iter::<AuthPluginRegistration>
.into_iter()
.find(|x| x.method.get_supported_auth_methods().contains(&auth_type))
.map(|x| x.method)
{
let auth_hints = self
.config
.auth_methods
.as_ref()
.map(|methods| serde_json::json!({"auth_methods": methods}));
match authenticator
.auth(
&self.client,
self.get_service_endpoint(
&ServiceType::Identity,
Some(&ApiVersion::from(authenticator.api_version())),
)?
.url(),
gather_auth_data(
&authenticator.requirements(auth_hints.as_ref())?,
&self.config,
auth_helper,
)
.await?,
Some(&requested_scope),
auth_hints.as_ref(),
)
.await
{
Ok(token_auth) => {
self.set_auth(token_auth.clone(), false);
}
Err(AuthError::AuthReceipt(receipt)) => {
let auth_hints = serde_json::to_value(&receipt)?;
let token_auth = token_receipt::PLUGIN
.auth(
&self.client,
self.get_service_endpoint(
&ServiceType::Identity,
Some(&ApiVersion::from(authenticator.api_version())),
)?
.url(),
gather_auth_data(
&token_receipt::PLUGIN.requirements(Some(&auth_hints))?,
&self.config,
auth_helper,
)
.await?,
Some(&requested_scope),
Some(&auth_hints),
)
.await?;
self.set_auth(token_auth.clone(), false);
}
Err(other) => {
return Err(other.into());
}
}
} else {
return Err(AuthTokenError::IdentityMethod {
auth_type: auth_type.into(),
})?;
}
}
}
if let Auth::AuthToken(token_auth) = &self.auth {
if token_auth.auth_info.is_none() {
let mut resolved_token = token_auth.clone();
let token_info = self.fetch_token_info(token_auth.token.clone()).await?;
resolved_token.auth_info = Some(token_info.clone());
let scope = AuthTokenScope::from(&token_info);
self.state.set_scope_auth(&scope, &resolved_token);
}
if requested_scope != AuthTokenScope::Unscoped
&& !token_auth
.auth_info
.as_ref()
.map(AuthTokenScope::from)
.is_some_and(|scope| requested_scope == scope)
{
let token_auth = self.reauth(token_auth, &requested_scope).await?;
self.set_auth(token_auth.clone(), false);
} else {
self.state
.set_scope_auth(&AuthTokenScope::Unscoped, token_auth);
}
} else {
return Err(AuthError::AuthTokenNotInResponse)?;
}
if let Auth::AuthToken(token_data) = &self.auth {
match &token_data.auth_info {
Some(auth_data) => {
if let Some(project) = &auth_data.token.project {
self.catalog.set_project_id(project.id.clone());
self.catalog.configure(&self.config)?;
}
if let Some(endpoints) = &auth_data.token.catalog {
self.catalog.process_catalog_endpoints(endpoints)?;
} else {
error!("No catalog information");
}
}
_ => return Err(OpenStackError::NoAuth),
}
}
Ok(())
}
#[instrument(skip(self))]
pub async fn discover_service_endpoint(
&mut self,
service_type: &ServiceType,
) -> Result<(), OpenStackError> {
if let Ok(ep) = self.catalog.get_service_endpoint(
service_type.to_string(),
None,
self.config.region_name.as_ref(),
self.config.interface.as_ref(),
) {
if self.catalog.discovery_allowed(service_type.to_string()) {
info!("Performing `{}` endpoint version discovery", service_type);
let orig_url = ep.url().clone();
let mut try_url = ep.url().clone();
try_url
.path_segments_mut()
.map_err(|_| CatalogError::cannot_be_base(ep.url()))?
.pop_if_empty()
.push("");
let mut max_depth = 10;
loop {
let req = http::Request::builder()
.method(http::Method::GET)
.uri(query::url_to_http_uri(try_url.clone())?);
match self.rest_with_auth_async(req, Vec::new(), &self.auth).await {
Ok(rsp) => {
if rsp.status() != StatusCode::NOT_FOUND
&& self
.catalog
.process_endpoint_discovery(
service_type,
&try_url,
rsp.body(),
self.config.region_name.as_ref(),
self.config.interface.as_ref(),
)
.is_ok()
{
debug!(
"Finished service version discovery at {}",
try_url.as_str()
);
debug!("catalog {:?}", self.catalog);
return Ok(());
}
}
Err(err) => {
error!(
"Error querying {} for the version discovery. It is most likely a misconfiguration on the cloud side. {}",
try_url.as_str(),
err
);
}
};
if try_url.path() != "/" {
try_url
.path_segments_mut()
.map_err(|_| CatalogError::cannot_be_base(&orig_url))?
.pop();
} else {
return Err(OpenStackError::Discovery {
service: service_type.to_string(),
url: orig_url.into(),
msg: match service_type {
ServiceType::Identity => "Service is not working.".into(),
_ => "No Version document found. Either service is not supporting version discovery, or API is not working".into(),
}
});
}
max_depth -= 1;
if max_depth == 0 {
break;
}
}
return Err(OpenStackError::Discovery {
service: service_type.to_string(),
url: orig_url.into(),
msg: "Unknown".into(),
});
}
return Ok(());
}
Ok(())
}
pub fn get_token_catalog(&self) -> Option<Vec<ServiceEndpoints>> {
self.catalog.get_token_catalog()
}
pub fn get_auth_info(&self) -> Option<AuthResponse> {
if let Auth::AuthToken(token) = &self.auth {
return token.auth_info.clone();
}
None
}
pub fn get_auth_state(&self, offset: Option<TimeDelta>) -> Option<AuthState> {
if let Auth::AuthToken(token) = &self.auth {
return Some(token.get_state(offset));
}
None
}
pub fn get_auth_token(&self) -> Option<SecretString> {
if let Auth::AuthToken(token) = &self.auth {
return Some(token.token.clone());
}
None
}
pub async fn fetch_token_info(
&self,
token: SecretString,
) -> Result<AuthResponse, OpenStackError> {
let auth_ep = build_token_info_endpoint(token.expose_secret())?;
let rsp = auth_ep.raw_query_async(self).await?;
let data: AuthResponse = serde_json::from_slice(rsp.body())?;
Ok(data)
}
#[instrument(name="request", skip_all, fields(http.uri = request.url().as_str(), http.method = request.method().as_str(), openstack.ver=request.headers().get("openstack-api-version").map(|v| v.to_str().unwrap_or(""))))]
async fn execute_request(&self, request: Request) -> Result<Response, reqwest::Error> {
info!("Sending request {:?}", request);
let url = request.url().clone();
let method = request.method().clone();
if enabled!(Level::TRACE)
&& request.headers().get(header::CONTENT_TYPE)
== Some(&HeaderValue::from_static("application/json"))
{
request
.body()
.and_then(|body| body.as_bytes())
.and_then(|bytes| String::from_utf8(bytes.to_vec()).ok())
.inspect(|rq| {
let censored = self
.config
.get_sensitive_values()
.iter()
.fold(rq.clone(), |sanitized, &secret| {
sanitized.replace(secret, "<CENSORED>")
});
trace!("Request Body: {:?}", censored);
});
}
let start = SystemTime::now();
let rsp = self.client.execute(request).await?;
let elapsed = SystemTime::now().duration_since(start).unwrap_or_default();
event!(
name: "http_request",
Level::INFO,
url=url.as_str(),
duration_ms=elapsed.as_millis(),
status=rsp.status().as_u16(),
method=method.as_str(),
request_id=rsp.headers().get("x-openstack-request-id").map(|v| v.to_str().unwrap_or("")),
"Request completed with status {}",
rsp.status(),
);
Ok(rsp)
}
async fn rest_with_auth_async(
&self,
mut request: http::request::Builder,
body: Vec<u8>,
auth: &Auth,
) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
use futures_util::TryFutureExt;
let call = || async {
if let Some(headers) = request.headers_mut() {
auth.set_header(headers)?;
}
let http_request = request.body(body)?;
let request = http_request.try_into()?;
let rsp = self.execute_request(request).await?;
let mut http_rsp = HttpResponse::builder()
.status(rsp.status())
.version(rsp.version());
if let Some(headers) = http_rsp.headers_mut() {
headers.extend(rsp.headers().clone())
}
Ok(http_rsp.body(rsp.bytes().await?)?)
};
call().map_err(api::ApiError::client).await
}
async fn rest_with_auth_read_body_async(
&self,
mut request: http::request::Builder,
body_read: BoxedAsyncRead,
auth: &Auth,
) -> Result<HttpResponse<Bytes>, api::ApiError<<Self as api::RestClient>::Error>> {
use futures_util::TryFutureExt;
let call = || async {
if let Some(headers) = request.headers_mut() {
auth.set_header(headers)?;
}
let stream = codec::FramedRead::new(body_read.compat(), codec::BytesCodec::new())
.map_ok(|b| b.freeze());
let http_request = request.body(Body::wrap_stream(stream))?;
let request = http_request.try_into()?;
let rsp = self.execute_request(request).await?;
let mut http_rsp = HttpResponse::builder()
.status(rsp.status())
.version(rsp.version());
if let Some(headers) = http_rsp.headers_mut() {
headers.extend(rsp.headers().clone())
}
Ok(http_rsp.body(rsp.bytes().await?)?)
};
call().map_err(api::ApiError::client).await
}
async fn download_with_auth_async(
&self,
mut request: http::request::Builder,
body: Vec<u8>,
auth: &Auth,
) -> Result<(HeaderMap, BoxedAsyncRead), api::ApiError<<Self as api::RestClient>::Error>> {
use futures_util::TryFutureExt;
let call = || async {
if let Some(headers) = request.headers_mut() {
auth.set_header(headers)?;
}
let http_request = request.body(body)?;
let request = http_request.try_into()?;
let rsp = self.execute_request(request).await?;
let mut headers = HeaderMap::new();
for (key, value) in rsp.headers() {
headers.insert(key, value.clone());
}
let boxed_async_read = BoxedAsyncRead::new(
rsp.bytes_stream()
.map_err(|orig| {
let kind = if orig.is_timeout() {
IoErrorKind::TimedOut
} else {
IoErrorKind::Other
};
IoError::new(kind, orig)
})
.into_async_read(),
);
Ok((headers, boxed_async_read))
};
call().map_err(api::ApiError::client).await
}
}