use crate::MetadataClient;
use crate::envelope::{self, EnvelopeBody};
use crate::error::{MetadataError, MetadataResult};
use crate::retry;
use bytes::Bytes;
use serde::de::DeserializeOwned;
pub trait SoapOperation {
const NAME: &'static str;
type Response: DeserializeOwned;
fn render_body(&self) -> MetadataResult<String>;
}
pub(crate) async fn soap_call<O: SoapOperation>(
client: &MetadataClient,
op: &O,
) -> MetadataResult<O::Response> {
let response_local = format!("{}Response", O::NAME);
let body_xml = op.render_body()?;
let inner = call_with_auth_retry(client, O::NAME, &body_xml, &response_local).await?;
let parsed: O::Response = quick_xml::de::from_reader(inner.as_slice())?;
Ok(parsed)
}
async fn call_with_auth_retry(
client: &MetadataClient,
op_name: &str,
body_xml: &str,
response_local: &str,
) -> MetadataResult<Vec<u8>> {
let mut next_token: Option<String> = None;
let mut auth_retried = false;
loop {
let token_str = match next_token.take() {
Some(t) => t,
None => {
let token = client.auth.access_token().await?;
token.into_owned()
}
};
let envelope = envelope::build_envelope(&token_str, op_name, body_xml);
let body = Bytes::from(envelope.into_bytes());
let result = send_with_retries(client, body, response_local).await;
if !auth_retried
&& let Err(MetadataError::Soap { fault, .. }) = &result
&& fault.is_invalid_session()
{
tracing::warn!(
target: "cirrus_metadata::auth",
"INVALID_SESSION_ID fault; invalidating cached token and retrying once",
);
client.auth.invalidate(&token_str).await;
let fresh = client.auth.access_token().await?.into_owned();
if fresh == token_str {
tracing::warn!(
target: "cirrus_metadata::auth",
"auth session returned the same token after invalidate; surfacing fault \
(likely static auth or scope/permission issue)",
);
return result;
}
next_token = Some(fresh);
auth_retried = true;
continue;
}
return result;
}
}
async fn send_with_retries(
client: &MetadataClient,
envelope_bytes: Bytes,
response_local: &str,
) -> MetadataResult<Vec<u8>> {
let url = client.endpoint_url();
let method = reqwest::Method::POST;
let mut attempt: u32 = 0;
loop {
let request = client
.http
.post(&url)
.header(reqwest::header::CONTENT_TYPE, "text/xml; charset=UTF-8")
.header("SOAPAction", "\"\"")
.body(envelope_bytes.clone());
match request.send().await {
Ok(response) => {
let status = response.status().as_u16();
let headers = response.headers().clone();
if retry::should_retry_status(&client.retry_policy, &method, status, attempt) {
let _ = response.bytes().await;
let retry_after = retry::parse_retry_after(&headers);
let delay = retry::compute_delay(&client.retry_policy, attempt, retry_after);
tokio::time::sleep(delay).await;
attempt += 1;
continue;
}
let bytes = response.bytes().await?;
match envelope::parse_envelope(&bytes, response_local) {
Ok(EnvelopeBody::Success(inner)) => return Ok(inner),
Ok(EnvelopeBody::Fault(fault)) => {
return Err(MetadataError::Soap { status, fault });
}
Err(parse_err) => {
if (200..300).contains(&status) {
return Err(MetadataError::InvalidResponse(format!(
"HTTP {status} with non-SOAP body: {parse_err}"
)));
}
return Err(MetadataError::Http4xx5xx {
status,
raw: String::from_utf8_lossy(&bytes).into_owned(),
});
}
}
}
Err(e) => {
let err: MetadataError = e.into();
if retry::should_retry_network(&client.retry_policy, &method, &err, attempt) {
let delay = retry::compute_delay(&client.retry_policy, attempt, None);
tokio::time::sleep(delay).await;
attempt += 1;
continue;
}
return Err(err);
}
}
}
}