use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use super::super::types::structs;
use super::wire_log;
use log::{warn, debug, trace};
use bytes::Bytes;
use std::future::Future;
use std::pin::Pin;
use std::ffi::OsStr;
use crate::mo;
use crate::types::struct_enum::StructType;
use crate::types::structs::{ManagedObjectReference, ServiceContent};
const LIB_NAME: &str = env!("CARGO_PKG_NAME");
const LIB_VERSION: &str = env!("CARGO_PKG_VERSION");
const RUSTC_VERSION: &str = env!("RUSTC_VERSION");
pub const COMPATIBLE_API_RELEASES: [&str; 4] = ["9.0.0.0", "8.0.3.0", "8.0.2.0", "8.0.1.0"];
pub const API_RELEASE: &str = "9.0.0.0";
const AUTHN_HEADER: &str = "vmware-api-session-id";
const SERVICE_INSTANCE_MOID: &str = "ServiceInstance";
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("MethodFault: {0:?}")]
MethodFault(structs::MethodFault),
#[error("Reqwest error: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("Parse error: {0}")]
ParseError(String),
#[error("Missing or Invalid session key")]
MissingOrInvalidSessionKey,
#[error("Invalid object type {0} expected: {1}")]
InvalidObjectType(String, String),
#[error("Cannot negotiate compatible API release. Attempted with: {0:?}")]
CannotNegotiateAPIRelease(Vec<String>),
}
pub type VimError = Error;
pub type Result<T> = std::result::Result<T, Error>;
#[must_use]
pub fn is_request_canceled_error(err: &Error) -> bool {
matches!(
err,
Error::MethodFault(f) if f.type_ == Some(StructType::RequestCanceled)
)
}
#[cfg(test)]
mod is_request_canceled_tests {
use super::{is_request_canceled_error, Error};
use crate::types::struct_enum::StructType;
use crate::types::structs::MethodFault;
#[test]
fn detects_request_canceled_fault() {
let err = Error::MethodFault(MethodFault {
fault_cause: None,
fault_message: None,
type_: Some(StructType::RequestCanceled),
extra_fields_: Default::default(),
});
assert!(is_request_canceled_error(&err));
}
#[test]
fn rejects_non_fault_errors() {
assert!(!is_request_canceled_error(&Error::ParseError("x".into())));
}
}
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Transport {
Json,
#[cfg(feature = "xml")]
Soap,
}
#[derive(Debug)]
pub enum PropertyValue {
Json(Bytes),
#[cfg(feature = "xml")]
Parsed(crate::types::vim_any::VimAny),
}
pub fn extract_property<T: miniserde::Deserialize + 'static>(pv: PropertyValue) -> Result<T> {
match pv {
PropertyValue::Json(bytes) => {
let text = std::str::from_utf8(&bytes)
.map_err(|e| Error::ParseError(e.to_string()))?;
miniserde::json::from_str(text).map_err(|_| {
Error::ParseError(format!(
"JSON property decode failed for {}",
std::any::type_name::<T>()
))
})
}
#[cfg(feature = "xml")]
PropertyValue::Parsed(any) => any
.into_any()
.downcast::<T>()
.map(|b| *b)
.map_err(|_| {
Error::ParseError(format!(
"SOAP property type mismatch for {}",
std::any::type_name::<T>()
))
}),
}
}
pub trait VimClient: Send + Sync {
fn service_content(&self) -> &ServiceContent;
fn transport(&self) -> Transport;
fn api_release(&self) -> String;
fn invoke<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<Bytes>>;
fn invoke_optional<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<Option<Bytes>>>;
fn invoke_void<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<()>>;
fn fetch_property_raw<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
property: &'a str,
) -> BoxFuture<'a, Result<Option<PropertyValue>>>;
}
pub fn unmarshal<T: miniserde::Deserialize>(transport: Transport, bytes: &[u8]) -> Result<T> {
let text = std::str::from_utf8(bytes)
.map_err(|e| Error::ParseError(e.to_string()))?;
match transport {
Transport::Json => miniserde::json::from_str(text)
.map_err(|_| Error::ParseError(format!(
"JSON deserialization failed for {}", std::any::type_name::<T>()))),
#[cfg(feature = "xml")]
Transport::Soap => {
crate::xml::soap::vim_response_internal(text)
.or_else(|_| crate::xml::de::from_xml_internal(text))
.map_err(|_| Error::ParseError(format!(
"XML deserialization failed for {}", std::any::type_name::<T>())))
},
}
}
pub fn unmarshal_array<U: miniserde::Deserialize>(
transport: Transport,
bytes: &[u8],
) -> Result<Vec<U>> {
let text = std::str::from_utf8(bytes)
.map_err(|e| Error::ParseError(e.to_string()))?;
match transport {
Transport::Json => miniserde::json::from_str(text).map_err(|_| {
Error::ParseError(format!(
"JSON deserialization failed for Vec<{}>",
std::any::type_name::<U>()
))
}),
#[cfg(feature = "xml")]
Transport::Soap => crate::xml::soap::vim_response_list_internal(text).map_err(|_| {
Error::ParseError(format!(
"XML deserialization failed for Vec<{}>",
std::any::type_name::<U>()
))
}),
}
}
pub type VimClientHandle = Arc<dyn VimClient>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TransportMode {
#[default]
Json,
#[cfg(feature = "xml")]
Soap,
#[cfg(feature = "xml")]
Auto,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum WireLoggingMode {
#[default]
Off,
Summary,
Detailed,
}
impl WireLoggingMode {
pub const fn is_enabled(self) -> bool {
!matches!(self, Self::Off)
}
pub const fn is_detailed(self) -> bool {
matches!(self, Self::Detailed)
}
}
pub struct ClientBuilder {
server_address: String,
compatible_api_releases: Option<Vec<String>>,
api_release: Option<String>,
http_client: Option<reqwest::Client>,
insecure: Option<bool>,
app_name: Option<String>,
app_version: Option<String>,
user_name: Option<String>,
password: Option<String>,
locale: Option<String>,
transport_mode: TransportMode,
wire_logging: WireLoggingMode,
}
impl ClientBuilder {
pub fn new(server_address: &str) -> Self {
Self {
server_address: server_address.to_string(),
compatible_api_releases: None,
api_release: None,
http_client: None,
insecure: None,
app_name: None,
app_version: None,
user_name: None,
password: None,
locale: None,
transport_mode: TransportMode::default(),
wire_logging: WireLoggingMode::default(),
}
}
pub fn wire_logging(mut self, mode: WireLoggingMode) -> Self {
self.wire_logging = mode;
self
}
pub fn transport(mut self, mode: TransportMode) -> Self {
self.transport_mode = mode;
self
}
pub fn compatible_api_releases(mut self, releases: Vec<&str>) -> Self {
self.compatible_api_releases = Some(releases.iter().map(|s| s.to_string()).collect());
self
}
pub fn api_release(mut self, api_release: &str) -> Self {
self.api_release = Some(api_release.to_string());
self
}
pub fn http_client(mut self, http_client: reqwest::Client) -> Self {
self.http_client = Some(http_client);
self.insecure = None;
self
}
pub fn insecure(mut self, insecure: bool) -> Self {
warn!("!!! WARNING !!! Insecure mode enabled. TLS certificate and hostname verification is disabled. !!! WARNING !!!");
self.insecure = Some(insecure);
self.http_client = None;
self
}
pub fn app_details(mut self, app_name: &str, app_version: &str) -> Self {
self.app_name = Some(app_name.to_string());
self.app_version = Some(app_version.to_string());
self
}
pub fn basic_authn(mut self, user_name: &str, password: &str) -> Self {
self.user_name = Some(user_name.to_string());
self.password = Some(password.to_string());
self
}
pub fn locale(mut self, locale: &str) -> Self {
self.locale = Some(locale.to_string());
self
}
pub async fn build(self) -> Result<Arc<Client>> {
#[cfg(feature = "xml")]
{
match self.transport_mode {
TransportMode::Soap => return Self::build_soap_facade(self).await,
TransportMode::Auto => return Self::build_auto_facade(self).await,
TransportMode::Json => {}
}
}
Self::build_json(self).await
}
async fn build_json(self) -> Result<Arc<Client>> {
let wire_logging = self.wire_logging;
let http_client = match self.http_client {
Some(client) => client,
None => {
let mut builder = reqwest::ClientBuilder::new();
if let Some(insecure) = self.insecure {
builder = builder.danger_accept_invalid_certs(insecure)
.danger_accept_invalid_hostnames(insecure);
}
builder.build()?
},
};
let session_key = Arc::new(RwLock::new(None));
let user_agent = user_agent(self.app_name.as_deref(), self.app_version.as_deref());
let api_release = match self.api_release {
Some(release) => release,
None => {
let releases = self.compatible_api_releases
.unwrap_or_else(|| COMPATIBLE_API_RELEASES.iter().map(|s| s.to_string()).collect());
let spec = HelloSpec {
api_releases: &releases,
};
let path = format!("https://{}/api/vcenter/system?action=hello", self.server_address);
let json_body = miniserde::json::to_string(&spec);
let started = Instant::now();
if wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(wire_logging, "");
let msg = format!(
"wire=json mode={} phase=request kind=negotiate path={} body_bytes={}",
mode_l,
path,
json_body.len()
);
wire_log::log_json_line(wire_logging, "", false, &msg);
}
let req = http_client.post(&path)
.header("Content-Type", "application/json")
.header("User-Agent", &user_agent)
.body(json_body);
let res = match req.send().await {
Ok(r) => r,
Err(e) => {
wire_log::log_json_negotiate_transport_failure(
wire_logging,
&path,
started.elapsed(),
&e,
);
return Err(Error::ReqwestError(e));
}
};
let status = res.status();
let http_err = res.error_for_status_ref().err();
let body = res.text().await.map_err(Error::ReqwestError)?;
let elapsed = started.elapsed();
if wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(wire_logging, "");
let detailed = wire_logging.is_detailed();
let mut msg = format!(
"wire=json mode={} phase=response kind=negotiate path={} status={} body_bytes={} duration_ms={}",
mode_l,
path,
status.as_u16(),
body.len(),
elapsed.as_millis()
);
if detailed {
msg.push_str(&format!(" body={}", body));
}
wire_log::log_json_line(wire_logging, "", detailed, &msg);
}
if let Some(e) = http_err {
return Err(Error::ReqwestError(e));
}
let result: HelloResult = miniserde::json::from_str(&body)
.map_err(|_| Error::ParseError("Failed to parse HelloResult".to_string()))?;
let api_release = result.api_release;
if api_release.is_empty() {
return Err(Error::CannotNegotiateAPIRelease(releases));
}
debug!("Negotiated API release: {}", api_release);
api_release
},
};
let base_url = format!("https://{}/sdk/vim25/{}", self.server_address, api_release);
let bootstrap = Arc::new(JsonClient {
http_client: http_client.clone(),
session_key: session_key.clone(),
api_release: api_release.clone(),
base_url: base_url.clone(),
user_agent: user_agent.clone(),
service_content: None,
wire_logging: wire_logging,
});
let service_instance = mo::ServiceInstance::new(bootstrap.clone(), SERVICE_INSTANCE_MOID);
let content = service_instance.content().await?;
debug!("ServiceInstance content obtained from: {}", content.about.full_name);
trace!("ServiceInstance content: {:?}", content);
let sm_id = content.session_manager.as_ref().map(|moid| moid.value.clone());
let json = Arc::new(JsonClient {
http_client: http_client.clone(),
session_key: session_key.clone(),
api_release: api_release.clone(),
base_url: base_url.clone(),
user_agent: user_agent.clone(),
service_content: Some(content),
wire_logging: wire_logging,
});
if let (Some(ref sm_id), Some(ref user_name), Some(ref password)) = (sm_id, self.user_name, self.password) {
let sm = mo::SessionManager::new(json.clone(), sm_id);
let session = sm.login(user_name, password, self.locale.as_deref()).await?;
debug!("Session created for: {:?}", session.user_name);
}
Ok(Arc::new(Client { inner: json }))
}
#[cfg(feature = "xml")]
async fn build_soap_facade(self) -> Result<Arc<Client>> {
let http_client = match self.http_client {
Some(client) => client,
None => {
let mut builder = reqwest::ClientBuilder::new()
.cookie_store(true);
if let Some(insecure) = self.insecure {
builder = builder.danger_accept_invalid_certs(insecure)
.danger_accept_invalid_hostnames(insecure);
}
builder.build()?
},
};
let ua = user_agent(self.app_name.as_deref(), self.app_version.as_deref());
let api_release = match self.api_release {
Some(release) => release.clone(),
None => API_RELEASE.to_string(),
};
let mut soap = crate::xml::client::SoapClient::new(
http_client,
&self.server_address,
&api_release,
&ua,
self.wire_logging,
);
soap.bootstrap().await?;
let sm_id = soap.service_content().session_manager.as_ref().map(|m| m.value.clone());
let soap = Arc::new(soap);
if let (Some(ref sm_id), Some(ref user_name), Some(ref password)) = (sm_id, self.user_name, self.password) {
let sm = mo::SessionManager::new(soap.clone(), sm_id);
let session = sm.login(user_name, password, self.locale.as_deref()).await?;
debug!("SOAP session created for: {:?}", session.user_name);
}
Ok(Arc::new(Client { inner: soap }))
}
#[cfg(feature = "xml")]
async fn build_auto_facade(self) -> Result<Arc<Client>> {
let wl = self.wire_logging;
let ua = user_agent(self.app_name.as_deref(), self.app_version.as_deref());
let http_client_for_probe = {
let mut builder = reqwest::ClientBuilder::new();
if let Some(insecure) = self.insecure {
builder = builder.danger_accept_invalid_certs(insecure)
.danger_accept_invalid_hostnames(insecure);
}
builder.build()?
};
let releases = self.compatible_api_releases.clone()
.unwrap_or_else(|| COMPATIBLE_API_RELEASES.iter().map(|s| s.to_string()).collect());
let hello_url = format!("https://{}/api/vcenter/system?action=hello", self.server_address);
let spec = HelloSpec { api_releases: &releases };
let json_body = miniserde::json::to_string(&spec);
let started = Instant::now();
if wl.is_enabled() {
let mode_l = wire_log::wire_mode_label(wl, "");
let msg = format!(
"wire=json mode={} phase=request kind=probe path={} body_bytes={}",
mode_l,
hello_url,
json_body.len()
);
wire_log::log_json_line(wl, "", false, &msg);
}
let hello_ok = match http_client_for_probe
.post(&hello_url)
.header("Content-Type", "application/json")
.header("User-Agent", &ua)
.body(json_body)
.send()
.await
{
Ok(res) => {
let status = res.status();
let body = res.text().await.unwrap_or_default();
let elapsed = started.elapsed();
if wl.is_enabled() {
let mode_l = wire_log::wire_mode_label(wl, "");
let detailed = wl.is_detailed();
let mut msg = format!(
"wire=json mode={} phase=response kind=probe path={} status={} body_bytes={} duration_ms={}",
mode_l,
hello_url,
status.as_u16(),
body.len(),
elapsed.as_millis()
);
if detailed {
msg.push_str(&format!(" body={}", body));
}
wire_log::log_json_line(wl, "", detailed, &msg);
}
if status.is_success() {
miniserde::json::from_str::<HelloResult>(&body)
.ok()
.filter(|r| !r.api_release.is_empty())
} else {
None
}
}
Err(e) => {
if wl.is_enabled() {
let mode_l = wire_log::wire_mode_label(wl, "");
let msg = format!(
"wire=json mode={} phase=response kind=probe path={} error=transport body_bytes=0 duration_ms={} detail={}",
mode_l,
hello_url,
started.elapsed().as_millis(),
e
);
wire_log::log_json_line(wl, "", false, &msg);
}
None
}
};
if hello_ok.is_some() {
debug!("Auto mode: Hello API succeeded, using JSON transport");
Self::build_json(self.transport(TransportMode::Json)).await
} else {
debug!("Auto mode: Hello API unavailable, falling back to SOAP");
Self::build_soap_facade(self).await
}
}
}
pub struct Client {
inner: Arc<dyn VimClient>,
}
impl Client {
pub fn service_content(&self) -> &ServiceContent {
self.inner.service_content()
}
pub fn api_release(&self) -> String {
self.inner.api_release()
}
pub async fn fetch_property<T>(&self, obj: ManagedObjectReference, property: &str) -> Result<T>
where
T: miniserde::Deserialize + 'static,
{
let type_name: &str = obj.r#type.as_str();
let id = &obj.value;
let pv_opt = self
.inner
.fetch_property_raw("", type_name, id, property)
.await?;
let pv = pv_opt.ok_or_else(|| {
Error::ParseError(format!("property {property} was empty"))
})?;
extract_property(pv)
}
}
impl VimClient for Client {
fn service_content(&self) -> &ServiceContent {
self.inner.service_content()
}
fn transport(&self) -> Transport {
self.inner.transport()
}
fn api_release(&self) -> String {
self.inner.api_release()
}
fn invoke<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<Bytes>> {
self.inner.invoke(svc, mo_type, mo_id, method_name, params)
}
fn invoke_optional<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<Option<Bytes>>> {
self.inner
.invoke_optional(svc, mo_type, mo_id, method_name, params)
}
fn invoke_void<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<()>> {
self.inner.invoke_void(svc, mo_type, mo_id, method_name, params)
}
fn fetch_property_raw<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
property: &'a str,
) -> BoxFuture<'a, Result<Option<PropertyValue>>> {
self.inner
.fetch_property_raw(svc, mo_type, mo_id, property)
}
}
pub(crate) struct JsonWireCtx<'a> {
pub svc: &'a str,
pub mo_type: &'a str,
pub mo_id: &'a str,
pub name: &'a str,
pub path: &'a str,
pub is_property_get: bool,
}
fn json_method_path(svc: &str, mo_type: &str, mo_id: &str, method_name: &str) -> String {
if svc.is_empty() {
format!("/{mo_type}/{mo_id}/{method_name}")
} else {
format!("/{svc}/{mo_type}/{mo_id}/{method_name}")
}
}
pub(crate) struct JsonClient {
http_client: reqwest::Client,
session_key: Arc<RwLock<Option<String>>>,
api_release: String,
base_url: String,
user_agent: String,
service_content: Option<ServiceContent>,
wire_logging: WireLoggingMode,
}
impl JsonClient {
pub(crate) fn service_content(&self) -> &ServiceContent {
self.service_content.as_ref().expect("JsonClient missing ServiceContent")
}
pub(crate) fn api_release(&self) -> String {
self.api_release.clone()
}
fn get_request(&self, path: &str) -> reqwest::RequestBuilder
{
debug!("GET request: {}", path);
let url = format!("{}{}", self.base_url, path);
self.http_client.get(&url)
}
pub(crate) fn post_bare(&self, path: &str) -> reqwest::RequestBuilder
{
debug!("POST request (void): {}", path);
let url = format!("{}{}", self.base_url, path);
self.http_client.post(&url)
}
fn build_post_request(
&self,
svc: &str,
mo_type: &str,
mo_id: &str,
method_name: &str,
params: Option<&(dyn miniserde::Serialize + Send + Sync)>,
) -> reqwest::RequestBuilder {
let path = if svc.is_empty() {
format!("/{mo_type}/{mo_id}/{method_name}")
} else {
format!("/{svc}/{mo_type}/{mo_id}/{method_name}")
};
match params {
Some(payload) => {
debug!("POST request: {}", path);
let json_body = miniserde::json::to_string(payload);
if self.wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(self.wire_logging, mo_type);
let deny = wire_log::body_logging_note(self.wire_logging, mo_type);
let deny_s = deny.unwrap_or("");
let deny_sep = if deny_s.is_empty() { "" } else { " " };
let mut msg = format!(
"wire=json mode={} phase=request svc=\"{}\" mo={} id={} method={} path={} body_bytes={}{}{}",
mode_l,
svc,
mo_type,
mo_id,
method_name,
path,
json_body.len(),
deny_sep,
deny_s
);
if wire_log::bodies_allowed(self.wire_logging, mo_type) {
msg.push_str(&format!(" body={}", json_body));
}
wire_log::log_json_line(
self.wire_logging,
mo_type,
wire_log::bodies_allowed(self.wire_logging, mo_type),
&msg,
);
} else if log::log_enabled!(log::Level::Trace)
&& !wire_log::suppress_legacy_transport_trace(self.wire_logging)
{
trace!("POST payload: {}", json_body);
}
let url = format!("{}{}", self.base_url, path);
self.http_client
.post(&url)
.header("Content-Type", "application/json")
.body(json_body)
}
None => {
if self.wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(self.wire_logging, mo_type);
let deny = wire_log::body_logging_note(self.wire_logging, mo_type);
let deny_s = deny.unwrap_or("");
let deny_sep = if deny_s.is_empty() { "" } else { " " };
let msg = format!(
"wire=json mode={} phase=request svc=\"{}\" mo={} id={} method={} path={} body_bytes=0{}{}",
mode_l,
svc,
mo_type,
mo_id,
method_name,
path,
deny_sep,
deny_s
);
wire_log::log_json_line(self.wire_logging, mo_type, false, &msg);
}
self.post_bare(&path)
}
}
}
pub(crate) async fn execute_void(
&self,
mut req: reqwest::RequestBuilder,
ctx: Option<JsonWireCtx<'_>>,
started: Instant,
) -> Result<()> {
req = self.prepare(req).await;
let res = match req.send().await {
Ok(r) => r,
Err(e) => {
if self.wire_logging.is_enabled() {
if let Some(c) = ctx.as_ref() {
wire_log::log_json_transport_failure(
self.wire_logging,
c.svc,
c.mo_type,
c.mo_id,
c.name,
c.path,
c.is_property_get,
started.elapsed(),
&e,
);
}
}
return Err(Error::ReqwestError(e));
}
};
let res = self
.process_response(res, Some(started), ctx.as_ref())
.await?;
if self.wire_logging.is_enabled() {
if let Some(c) = ctx.as_ref() {
let mode_l = wire_log::wire_mode_label(self.wire_logging, c.mo_type);
let deny = wire_log::body_logging_note(self.wire_logging, c.mo_type);
let deny_s = deny.unwrap_or("");
let deny_sep = if deny_s.is_empty() { "" } else { " " };
let name_field = format!("method={}", c.name);
let msg = format!(
"wire=json mode={} phase=response svc=\"{}\" mo={} id={} {} path={} status={} body_bytes=0 duration_ms={}{}{}",
mode_l,
c.svc,
c.mo_type,
c.mo_id,
name_field,
c.path,
res.status().as_u16(),
started.elapsed().as_millis(),
deny_sep,
deny_s
);
wire_log::log_json_line(self.wire_logging, c.mo_type, false, &msg);
}
}
Ok(())
}
async fn prepare(&self, mut req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
let session_key = self.session_key.read().await;
if let Some(value) = session_key.as_ref() {
req = req.header(AUTHN_HEADER, value);
}
req = req.header("User-Agent", &self.user_agent);
req
}
async fn process_response(
&self,
res: reqwest::Response,
started: Option<Instant>,
ctx: Option<&JsonWireCtx<'_>>,
) -> Result<reqwest::Response> {
if res.status().is_success() && res.headers().contains_key(AUTHN_HEADER) {
let session_key = res.headers().get(AUTHN_HEADER).unwrap().to_str().map_err(|_| Error::MissingOrInvalidSessionKey)?.to_string();
let mut key_holder = self.session_key.write().await;
*key_holder = Some(session_key);
}
if !res.status().is_success() {
warn!("HTTP error: {}", res.status());
let status = res.status();
let body = res.text().await?;
if let (Some(start), Some(c)) = (started, ctx) {
if self.wire_logging.is_enabled() {
wire_log::log_json_http_error(
self.wire_logging,
c.svc,
c.mo_type,
c.mo_id,
c.name,
c.path,
c.is_property_get,
status,
&body,
start.elapsed(),
);
}
}
let fault: structs::MethodFault = miniserde::json::from_str(&body)
.map_err(|_| Error::ParseError(format!("Failed to parse MethodFault from error response: {}", &body[..body.len().min(200)])))?;
return Err(Error::MethodFault(fault));
}
Ok(res)
}
}
impl VimClient for JsonClient {
fn service_content(&self) -> &ServiceContent {
JsonClient::service_content(self)
}
fn transport(&self) -> Transport {
Transport::Json
}
fn api_release(&self) -> String {
JsonClient::api_release(self)
}
fn invoke<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<Bytes>> {
Box::pin(async move {
let path_str = json_method_path(svc, mo_type, mo_id, method_name);
let ctx = JsonWireCtx {
svc,
mo_type,
mo_id,
name: method_name,
path: path_str.as_str(),
is_property_get: false,
};
let started = Instant::now();
let req = self.build_post_request(svc, mo_type, mo_id, method_name, params);
let req = self.prepare(req).await;
let res = match req.send().await {
Ok(r) => r,
Err(e) => {
wire_log::log_json_transport_failure(
self.wire_logging,
svc,
mo_type,
mo_id,
method_name,
path_str.as_str(),
false,
started.elapsed(),
&e,
);
return Err(Error::ReqwestError(e));
}
};
let res = self
.process_response(res, Some(started), Some(&ctx))
.await?;
let http_status = res.status();
let bytes = res.bytes().await?;
if self.wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(self.wire_logging, mo_type);
let deny = wire_log::body_logging_note(self.wire_logging, mo_type);
let deny_s = deny.unwrap_or("");
let deny_sep = if deny_s.is_empty() { "" } else { " " };
let body_lossy = wire_log::sanitize_utf8(&bytes);
let mut msg = format!(
"wire=json mode={} phase=response svc=\"{}\" mo={} id={} method={} path={} status={} body_bytes={} duration_ms={}{}{}",
mode_l,
svc,
mo_type,
mo_id,
method_name,
path_str,
http_status.as_u16(),
bytes.len(),
started.elapsed().as_millis(),
deny_sep,
deny_s
);
if wire_log::bodies_allowed(self.wire_logging, mo_type) {
msg.push_str(&format!(" body={}", body_lossy));
}
wire_log::log_json_line(
self.wire_logging,
mo_type,
wire_log::bodies_allowed(self.wire_logging, mo_type),
&msg,
);
} else if log::log_enabled!(log::Level::Trace)
&& !wire_log::suppress_legacy_transport_trace(self.wire_logging)
{
let body = String::from_utf8_lossy(&bytes);
trace!(
"JSON response from {}/{}: {}...",
mo_type,
method_name,
&body[..body.len().min(2000)]
);
}
Ok(bytes)
})
}
fn invoke_optional<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<Option<Bytes>>> {
Box::pin(async move {
let path_str = json_method_path(svc, mo_type, mo_id, method_name);
let ctx = JsonWireCtx {
svc,
mo_type,
mo_id,
name: method_name,
path: path_str.as_str(),
is_property_get: false,
};
let started = Instant::now();
let req = self.build_post_request(svc, mo_type, mo_id, method_name, params);
let req = self.prepare(req).await;
let res = match req.send().await {
Ok(r) => r,
Err(e) => {
wire_log::log_json_transport_failure(
self.wire_logging,
svc,
mo_type,
mo_id,
method_name,
path_str.as_str(),
false,
started.elapsed(),
&e,
);
return Err(Error::ReqwestError(e));
}
};
let res = self
.process_response(res, Some(started), Some(&ctx))
.await?;
let http_status = res.status();
let bytes = res.bytes().await?;
if self.wire_logging.is_enabled() && !bytes.is_empty() {
let mode_l = wire_log::wire_mode_label(self.wire_logging, mo_type);
let deny = wire_log::body_logging_note(self.wire_logging, mo_type);
let deny_s = deny.unwrap_or("");
let deny_sep = if deny_s.is_empty() { "" } else { " " };
let body_lossy = wire_log::sanitize_utf8(&bytes);
let mut msg = format!(
"wire=json mode={} phase=response svc=\"{}\" mo={} id={} method={} path={} status={} body_bytes={} duration_ms={}{}{}",
mode_l,
svc,
mo_type,
mo_id,
method_name,
path_str,
http_status.as_u16(),
bytes.len(),
started.elapsed().as_millis(),
deny_sep,
deny_s
);
if wire_log::bodies_allowed(self.wire_logging, mo_type) {
msg.push_str(&format!(" body={}", body_lossy));
}
wire_log::log_json_line(
self.wire_logging,
mo_type,
wire_log::bodies_allowed(self.wire_logging, mo_type),
&msg,
);
} else if log::log_enabled!(log::Level::Trace)
&& !bytes.is_empty()
&& !wire_log::suppress_legacy_transport_trace(self.wire_logging)
{
let body = String::from_utf8_lossy(&bytes);
trace!(
"JSON response from {}/{}: {}...",
mo_type,
method_name,
&body[..body.len().min(2000)]
);
}
if bytes.is_empty() {
Ok(None)
} else {
Ok(Some(bytes))
}
})
}
fn invoke_void<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
method_name: &'a str,
params: Option<&'a (dyn miniserde::Serialize + Send + Sync)>,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let path_str = json_method_path(svc, mo_type, mo_id, method_name);
let ctx = JsonWireCtx {
svc,
mo_type,
mo_id,
name: method_name,
path: path_str.as_str(),
is_property_get: false,
};
let started = Instant::now();
let req = self.build_post_request(svc, mo_type, mo_id, method_name, params);
JsonClient::execute_void(self, req, Some(ctx), started).await
})
}
fn fetch_property_raw<'a>(
&'a self,
svc: &'a str,
mo_type: &'a str,
mo_id: &'a str,
property: &'a str,
) -> BoxFuture<'a, Result<Option<PropertyValue>>> {
Box::pin(async move {
let path = if svc.is_empty() {
format!("/{mo_type}/{mo_id}/{property}")
} else {
format!("/{svc}/{mo_type}/{mo_id}/{property}")
};
let ctx = JsonWireCtx {
svc,
mo_type,
mo_id,
name: property,
path: path.as_str(),
is_property_get: true,
};
let started = Instant::now();
if self.wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(self.wire_logging, mo_type);
let deny = wire_log::body_logging_note(self.wire_logging, mo_type);
let deny_s = deny.unwrap_or("");
let deny_sep = if deny_s.is_empty() { "" } else { " " };
let msg = format!(
"wire=json mode={} phase=request svc=\"{}\" mo={} id={} property={} path={} body_bytes=0{}{}",
mode_l,
svc,
mo_type,
mo_id,
property,
path,
deny_sep,
deny_s
);
wire_log::log_json_line(self.wire_logging, mo_type, false, &msg);
}
let req = self.get_request(&path);
let req = self.prepare(req).await;
let res = match req.send().await {
Ok(r) => r,
Err(e) => {
wire_log::log_json_transport_failure(
self.wire_logging,
svc,
mo_type,
mo_id,
property,
path.as_str(),
true,
started.elapsed(),
&e,
);
return Err(Error::ReqwestError(e));
}
};
let res = self
.process_response(res, Some(started), Some(&ctx))
.await?;
let http_status = res.status();
let bytes = res.bytes().await?;
if self.wire_logging.is_enabled() && !bytes.is_empty() {
let mode_l = wire_log::wire_mode_label(self.wire_logging, mo_type);
let deny = wire_log::body_logging_note(self.wire_logging, mo_type);
let deny_s = deny.unwrap_or("");
let deny_sep = if deny_s.is_empty() { "" } else { " " };
let body_lossy = wire_log::sanitize_utf8(&bytes);
let mut msg = format!(
"wire=json mode={} phase=response svc=\"{}\" mo={} id={} property={} path={} status={} body_bytes={} duration_ms={}{}{}",
mode_l,
svc,
mo_type,
mo_id,
property,
path,
http_status.as_u16(),
bytes.len(),
started.elapsed().as_millis(),
deny_sep,
deny_s
);
if wire_log::bodies_allowed(self.wire_logging, mo_type) {
msg.push_str(&format!(" body={}", body_lossy));
}
wire_log::log_json_line(
self.wire_logging,
mo_type,
wire_log::bodies_allowed(self.wire_logging, mo_type),
&msg,
);
} else if log::log_enabled!(log::Level::Trace)
&& !bytes.is_empty()
&& !wire_log::suppress_legacy_transport_trace(self.wire_logging)
{
let body = String::from_utf8_lossy(&bytes);
trace!(
"JSON fetch_property_raw {}/{}: {}...",
mo_type,
property,
&body[..body.len().min(2000)]
);
}
if bytes.is_empty() {
Ok(None)
} else {
Ok(Some(PropertyValue::Json(bytes)))
}
})
}
}
impl Drop for JsonClient {
fn drop(&mut self) {
debug!("Disposing VIM client.");
let session_key = Arc::clone(&self.session_key);
let http_client = &self.http_client.clone();
let base_url = self.base_url.clone();
let wire_logging = self.wire_logging;
let sm_id = self.service_content.as_ref().and_then(|content| content.session_manager.as_ref().map(|moid| moid.value.clone()));
let sm_id = match sm_id {
Some(id) => id,
None => {
debug!("No session manager found. Skipping logout.");
return;
},
};
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
debug!("Terminating VIM session as needed.");
let key = {
let session_key = session_key.read().await;
session_key.clone()
};
let Some(key) = key else {
debug!("No session key present. Skipping logout.");
return;
};
debug!("Session is present. Sending logout request...");
let path = format!("{base_url}/SessionManager/{moId}/Logout",
base_url = base_url,
moId = sm_id);
if wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(wire_logging, "SessionManager");
let msg = format!(
"wire=json mode={} phase=request kind=logout mo=SessionManager id={} method=Logout path={} body_bytes=0 body_logging=denylisted",
mode_l,
sm_id,
path
);
wire_log::log_json_line(wire_logging, "SessionManager", false, &msg);
}
let req = http_client.post(&path)
.header(AUTHN_HEADER, key);
let started = Instant::now();
match req.send().await {
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
let dur = started.elapsed();
if wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(wire_logging, "SessionManager");
let http_note = if status.is_success() {
""
} else {
" error=http_failure"
};
let msg = format!(
"wire=json mode={} phase=response kind=logout mo=SessionManager id={} method=Logout status={} body_bytes={} duration_ms={} body_logging=denylisted{}",
mode_l,
sm_id,
status.as_u16(),
body.len(),
dur.as_millis(),
http_note
);
wire_log::log_json_line(wire_logging, "SessionManager", false, &msg);
}
if status.is_success() {
debug!("Session logged out successfully");
} else {
match miniserde::json::from_str::<structs::MethodFault>(&body) {
Ok(fault) => warn!("Failed to logout session(HTTP code: {}). MethodFault: {:?}", status, fault),
Err(_) => warn!("Failed to logout session(HTTP code: {}). Cannot parse MethodFault: {}", status, &body[..body.len().min(200)]),
}
}
}
Err(e) => {
if wire_logging.is_enabled() {
let mode_l = wire_log::wire_mode_label(wire_logging, "SessionManager");
let msg = format!(
"wire=json mode={} phase=response kind=logout mo=SessionManager id={} method=Logout error=transport duration_ms={} body_logging=denylisted detail={}",
mode_l,
sm_id,
started.elapsed().as_millis(),
e
);
wire_log::log_json_line(wire_logging, "SessionManager", false, &msg);
}
warn!("Failed to logout session. Cannot execute logout request: {}", e);
}
}
});
});
}
}
fn user_agent(app_name: Option<&str>, app_version: Option<&str>) -> String {
let app_name: String = if app_name.is_some() {
app_name.unwrap().to_string()
} else {
get_executable_name().unwrap_or_else(|| "unknown".to_string())
};
let Some(appv) = app_version else {
return format!(
"{} ({}/{}; {}; {}; rustc/{})",
app_name,
LIB_NAME,
LIB_VERSION,
std::env::consts::OS,
std::env::consts::ARCH,
RUSTC_VERSION
);
};
format!(
"{}/{} ({}/{}; {}; {}; rustc/{})",
app_name,
appv,
LIB_NAME,
LIB_VERSION,
std::env::consts::OS,
std::env::consts::ARCH,
RUSTC_VERSION
)
}
fn get_executable_name() -> Option<String> {
std::env::current_exe()
.ok()
.as_ref()
.and_then(|path| path.file_name())
.and_then(OsStr::to_str)
.map(|s| s.to_owned())
}
struct HelloSpec<'a> {
api_releases: &'a Vec<String>,
}
impl miniserde::Serialize for HelloSpec<'_> {
fn begin(&self) -> miniserde::ser::Fragment<'_> {
use miniserde::ser::Fragment;
Fragment::Map(Box::new(HelloSpecSerializer { data: self, seq: 0 }))
}
}
struct HelloSpecSerializer<'a> {
data: &'a HelloSpec<'a>,
seq: usize,
}
impl miniserde::ser::Map for HelloSpecSerializer<'_> {
fn next(&mut self) -> Option<(std::borrow::Cow<'_, str>, &dyn miniserde::Serialize)> {
let result = match self.seq {
0 => Some((std::borrow::Cow::Borrowed("api_releases"), &self.data.api_releases as &dyn miniserde::Serialize)),
_ => None,
};
self.seq += 1;
result
}
}
impl std::fmt::Debug for HelloSpec<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HelloSpec")
.field("api_releases", &self.api_releases)
.finish()
}
}
struct HelloResult {
api_release: String,
}
miniserde::make_place!(Place);
impl miniserde::Deserialize for HelloResult {
fn begin(out: &mut Option<Self>) -> &mut dyn miniserde::de::Visitor {
Place::new(out)
}
}
impl miniserde::de::Visitor for Place<HelloResult> {
fn map(&mut self) -> miniserde::Result<Box<dyn miniserde::de::Map + '_>> {
Ok(Box::new(HelloResultFields {
api_release: None,
__out: &mut self.out,
}))
}
}
struct HelloResultFields<'a> {
api_release: Option<String>,
__out: &'a mut Option<HelloResult>,
}
impl miniserde::de::Map for HelloResultFields<'_> {
fn key(&mut self, k: &str) -> miniserde::Result<&mut dyn miniserde::de::Visitor> {
match k {
"api_release" => Ok(miniserde::Deserialize::begin(&mut self.api_release)),
_ => Ok(<dyn miniserde::de::Visitor>::ignore()),
}
}
fn finish(&mut self) -> miniserde::Result<()> {
let api_release = self.api_release.take().ok_or(miniserde::Error)?;
*self.__out = Some(HelloResult { api_release });
Ok(())
}
}
impl std::fmt::Debug for HelloResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HelloResult")
.field("api_release", &self.api_release)
.finish()
}
}
#[cfg(test)]
pub(crate) fn test_dead_port_http_client() -> reqwest::Client {
#[cfg(feature = "xml")]
{
return reqwest::Client::builder()
.cookie_store(true)
.connect_timeout(std::time::Duration::from_millis(500))
.timeout(std::time::Duration::from_secs(3))
.build()
.expect("reqwest test client");
}
#[cfg(not(feature = "xml"))]
{
reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_millis(500))
.timeout(std::time::Duration::from_secs(3))
.build()
.expect("reqwest test client")
}
}
#[cfg(test)]
pub(crate) const TEST_WIRE_DEAD_ADDR: &str = "127.0.0.1:65433";
#[cfg(test)]
pub(crate) fn test_minimal_service_content_for_tests() -> ServiceContent {
let json = r#"{
"_typeName": "ServiceContent",
"rootFolder": {"_typeName":"ManagedObjectReference","type":"Folder","value":"root-1"},
"propertyCollector": {"_typeName":"ManagedObjectReference","type":"PropertyCollector","value":"pc-1"},
"viewManager": {"_typeName":"ManagedObjectReference","type":"ViewManager","value":"vmgr-1"},
"about": {
"_typeName":"AboutInfo",
"name":"n",
"fullName":"f",
"vendor":"v",
"version":"1",
"build":"b",
"osType":"o",
"productLineId":"p",
"apiType":"VirtualCenter",
"apiVersion":"1"
}
}"#;
miniserde::json::from_str(json).expect("fixture ServiceContent")
}
#[cfg(test)]
pub(crate) fn test_json_client_wire_transport() -> Arc<JsonClient> {
let base_url = format!("https://{}/sdk/vim25/{}", TEST_WIRE_DEAD_ADDR, API_RELEASE);
Arc::new(JsonClient {
http_client: test_dead_port_http_client(),
session_key: Arc::new(RwLock::new(None)),
api_release: API_RELEASE.to_string(),
base_url,
user_agent: "wire-transport-test".to_string(),
service_content: Some(test_minimal_service_content_for_tests()),
wire_logging: WireLoggingMode::Summary,
})
}
#[cfg(test)]
pub(crate) fn test_service_content_with_session_manager_for_tests() -> ServiceContent {
let json = r#"{
"_typeName": "ServiceContent",
"rootFolder": {"_typeName":"ManagedObjectReference","type":"Folder","value":"root-1"},
"propertyCollector": {"_typeName":"ManagedObjectReference","type":"PropertyCollector","value":"pc-1"},
"viewManager": {"_typeName":"ManagedObjectReference","type":"ViewManager","value":"vmgr-1"},
"sessionManager": {"_typeName":"ManagedObjectReference","type":"SessionManager","value":"sm-wire-test"},
"about": {
"_typeName":"AboutInfo",
"name":"n",
"fullName":"f",
"vendor":"v",
"version":"1",
"build":"b",
"osType":"o",
"productLineId":"p",
"apiType":"VirtualCenter",
"apiVersion":"1"
}
}"#;
miniserde::json::from_str(json).expect("fixture ServiceContent with session manager")
}
#[cfg(test)]
pub(crate) fn test_json_client_http_origin(
http_origin: &str,
session_key: Option<String>,
) -> Arc<JsonClient> {
let base_url = format!("{http_origin}/sdk/vim25/{API_RELEASE}");
Arc::new(JsonClient {
http_client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.expect("reqwest test client"),
session_key: Arc::new(RwLock::new(session_key)),
api_release: API_RELEASE.to_string(),
base_url,
user_agent: "wire-http-test".to_string(),
service_content: Some(test_service_content_with_session_manager_for_tests()),
wire_logging: WireLoggingMode::Summary,
})
}
#[cfg(test)]
mod wire_logging_transport_tests {
use std::sync::{Mutex, Once};
use std::time::Instant;
use super::super::wire_log;
use super::{
test_dead_port_http_client, test_json_client_wire_transport, ClientBuilder, Error, JsonClient,
VimClient, WireLoggingMode, TEST_WIRE_DEAD_ADDR,
};
use miniserde::Serialize;
const SOAP_WIRE_TARGET: &str = "vim_rs::wire::soap";
static LOG_INIT: Once = Once::new();
static SERIAL: Mutex<()> = Mutex::new(());
static WIRE_LINES: Mutex<Vec<String>> = Mutex::new(Vec::new());
struct CaptureWireLogger;
impl log::Log for CaptureWireLogger {
fn enabled(&self, _: &log::Metadata<'_>) -> bool {
true
}
fn log(&self, record: &log::Record<'_>) {
let t = record.target();
if t == wire_log::TARGET_JSON || t == SOAP_WIRE_TARGET {
WIRE_LINES
.lock()
.expect("wire log capture")
.push(record.args().to_string());
}
}
fn flush(&self) {}
}
fn init_wire_capture() {
LOG_INIT.call_once(|| {
let _ = log::set_logger(&CaptureWireLogger);
log::set_max_level(log::LevelFilter::Trace);
});
}
fn clear_wire_lines() {
WIRE_LINES.lock().expect("wire lines").clear();
}
fn joined_wire_output() -> String {
WIRE_LINES.lock().expect("wire lines").join("\n")
}
struct EmptyJsonObject;
struct EmptyJsonMap;
impl Serialize for EmptyJsonObject {
fn begin(&self) -> miniserde::ser::Fragment<'_> {
miniserde::ser::Fragment::Map(Box::new(EmptyJsonMap))
}
}
impl miniserde::ser::Map for EmptyJsonMap {
fn next(&mut self) -> Option<(std::borrow::Cow<'_, str>, &dyn Serialize)> {
None
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_hello_negotiate_and_invoke_paths_log_transport_failure() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let hello_err = ClientBuilder::new(TEST_WIRE_DEAD_ADDR)
.http_client(test_dead_port_http_client())
.wire_logging(WireLoggingMode::Summary)
.build()
.await;
assert!(
matches!(hello_err.as_ref(), Err(Error::ReqwestError(_))),
"expected transport error from hello, got {:?}",
hello_err.as_ref().err()
);
let out = joined_wire_output();
assert!(
out.contains("phase=request kind=negotiate"),
"hello request line missing: {out}"
);
assert!(
out.contains("phase=response kind=negotiate") && out.contains("error=transport"),
"hello transport response line missing: {out}"
);
clear_wire_lines();
let jc = test_json_client_wire_transport();
let empty = EmptyJsonObject;
let invoke_err = jc
.invoke(
"",
"VirtualMachine",
"vm-1",
"RefreshStorageInfo",
Some(&empty as &(dyn Serialize + Send + Sync)),
)
.await;
assert!(matches!(invoke_err, Err(Error::ReqwestError(_))));
let out = joined_wire_output();
assert!(
out.contains("phase=request") && out.contains("method=RefreshStorageInfo"),
"invoke request missing: {out}"
);
assert!(
out.contains("phase=response") && out.contains("error=transport"),
"invoke transport response missing: {out}"
);
clear_wire_lines();
let opt_err = jc
.invoke_optional(
"",
"VirtualMachine",
"vm-1",
"SomeMethod",
Some(&empty as &(dyn Serialize + Send + Sync)),
)
.await;
assert!(matches!(opt_err, Err(Error::ReqwestError(_))));
let out = joined_wire_output();
assert!(out.contains("error=transport"), "invoke_optional: {out}");
clear_wire_lines();
let void_err = jc
.invoke_void("", "VirtualMachine", "vm-1", "Destroy", None)
.await;
assert!(matches!(void_err, Err(Error::ReqwestError(_))));
let out = joined_wire_output();
assert!(
out.contains("phase=request") && out.contains("body_bytes=0"),
"void request: {out}"
);
assert!(out.contains("error=transport"), "void transport: {out}");
clear_wire_lines();
let fetch_err = jc
.fetch_property_raw("", "VirtualMachine", "vm-1", "name")
.await;
assert!(matches!(fetch_err, Err(Error::ReqwestError(_))));
let out = joined_wire_output();
assert!(
out.contains("property=name") && out.contains("error=transport"),
"fetch_property_raw: {out}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_execute_void_helper_logs_transport_failure() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let jc = test_json_client_wire_transport();
let path_str = super::json_method_path("", "Folder", "group-d1", "Destroy");
let ctx = super::JsonWireCtx {
svc: "",
mo_type: "Folder",
mo_id: "group-d1",
name: "Destroy",
path: path_str.as_str(),
is_property_get: false,
};
let started = Instant::now();
let req = jc.build_post_request("", "Folder", "group-d1", "Destroy", None);
let req = jc.prepare(req).await;
let err = JsonClient::execute_void(jc.as_ref(), req, Some(ctx), started).await;
assert!(matches!(err, Err(Error::ReqwestError(_))));
let out = joined_wire_output();
assert!(out.contains("method=Destroy") && out.contains("error=transport"), "{out}");
}
#[cfg(feature = "xml")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_auto_probe_logs_transport_on_hello_send_failure() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let err = ClientBuilder::new(TEST_WIRE_DEAD_ADDR)
.http_client(test_dead_port_http_client())
.wire_logging(WireLoggingMode::Summary)
.transport(super::TransportMode::Auto)
.build()
.await;
assert!(err.is_err(), "expected auto build to fail");
let out = joined_wire_output();
assert!(
out.contains("kind=probe") && out.contains("error=transport"),
"auto probe transport line missing: {out}"
);
}
fn spawn_http_stub_once(status: u16, body: &[u8]) -> (String, std::thread::JoinHandle<()>) {
use std::io::{Read, Write};
use std::net::TcpListener;
use std::thread;
use std::time::Duration;
let listener = TcpListener::bind("127.0.0.1:0").expect("bind http stub");
let port = listener.local_addr().expect("stub addr").port();
let body = body.to_vec();
let h = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("stub accept");
let _ = stream.set_read_timeout(Some(Duration::from_secs(5)));
let mut buf = vec![0u8; 32768];
let _ = stream.read(&mut buf);
let status_text = match status {
200 => "OK",
403 => "Forbidden",
500 => "Internal Server Error",
503 => "Service Unavailable",
_ => "Error",
};
let head = format!(
"HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
status,
status_text,
body.len()
);
let mut out = head.into_bytes();
out.extend_from_slice(&body);
let _ = stream.write_all(&out);
});
(format!("http://127.0.0.1:{port}"), h)
}
const SAMPLE_FAULT_JSON: &str = r#"{"_typeName":"VAppPropertyFault","id":"x","category":"string","label":"l","type":"string","value":"v"}"#;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_invoke_non_success_emits_wire_http_error_line() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let (origin, stub) = spawn_http_stub_once(500, SAMPLE_FAULT_JSON.as_bytes());
let jc = super::test_json_client_http_origin(&origin, Some("sess".into()));
let empty = EmptyJsonObject;
let err = jc
.invoke(
"",
"VirtualMachine",
"vm-1",
"SomeMethod",
Some(&empty as &(dyn Serialize + Send + Sync)),
)
.await;
assert!(matches!(err, Err(Error::MethodFault(_))));
stub.join().expect("stub thread");
let out = joined_wire_output();
assert!(out.contains("phase=request") && out.contains("method=SomeMethod"), "{out}");
assert!(
out.contains("status=500") && out.contains("phase=response"),
"expected log_json_http_error-style line: {out}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_fetch_property_non_success_emits_wire_http_error_line() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let (origin, stub) = spawn_http_stub_once(403, SAMPLE_FAULT_JSON.as_bytes());
let jc = super::test_json_client_http_origin(&origin, Some("sess".into()));
let err = jc
.fetch_property_raw("", "HostSystem", "host-9", "name")
.await;
assert!(matches!(err, Err(Error::MethodFault(_))));
stub.join().expect("stub thread");
let out = joined_wire_output();
assert!(out.contains("property=name") && out.contains("status=403"), "{out}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_drop_logout_emits_wire_lines_on_http_success() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let (origin, stub) = spawn_http_stub_once(200, b"");
let jc = super::test_json_client_http_origin(&origin, Some("sk-drop-ok".into()));
drop(jc);
stub.join().expect("stub thread");
let out = joined_wire_output();
assert!(out.contains("kind=logout") && out.contains("phase=request"), "{out}");
assert!(
out.contains("status=200") && !out.contains("error=http_failure"),
"{out}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_drop_logout_emits_wire_lines_on_http_non_success() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let (origin, stub) = spawn_http_stub_once(503, b"x");
let jc = super::test_json_client_http_origin(&origin, Some("sk-drop-bad".into()));
drop(jc);
stub.join().expect("stub thread");
let out = joined_wire_output();
assert!(
out.contains("kind=logout") && out.contains("error=http_failure"),
"{out}"
);
assert!(out.contains("status=503"), "{out}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn json_drop_logout_emits_wire_transport_line() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let origin = format!("http://{}", super::TEST_WIRE_DEAD_ADDR);
let jc = super::test_json_client_http_origin(&origin, Some("sk-drop-tr".into()));
drop(jc);
let out = joined_wire_output();
assert!(
out.contains("kind=logout") && out.contains("error=transport"),
"{out}"
);
}
#[cfg(feature = "xml")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn soap_bootstrap_logs_transport_failure() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
use crate::xml::client::SoapClient;
let mut soap = SoapClient::new(
test_dead_port_http_client(),
TEST_WIRE_DEAD_ADDR,
super::API_RELEASE,
"soap-bootstrap-wire-test",
WireLoggingMode::Summary,
);
let err = soap.bootstrap().await;
assert!(matches!(err, Err(Error::ReqwestError(_))));
let out = joined_wire_output();
assert!(
out.contains("RetrieveServiceContent")
&& out.contains("phase=request")
&& out.contains("error=transport"),
"SOAP bootstrap wire: {out}"
);
}
#[cfg(feature = "xml")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn soap_invoke_logs_transport_failure() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
use crate::xml::client::soap_test_client_with_service_content;
let soap = soap_test_client_with_service_content();
let err = VimClient::invoke(
&soap,
"",
"VirtualMachine",
"vm-wire",
"RefreshStorageInfo",
None,
)
.await;
assert!(matches!(err, Err(Error::ReqwestError(_))));
let out = joined_wire_output();
assert!(
out.contains("RefreshStorageInfo") && out.contains("error=transport"),
"SOAP invoke wire: {out}"
);
}
#[cfg(feature = "xml")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn soap_drop_logout_emits_wire_on_http_success() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let (origin, stub) = spawn_http_stub_once(200, b"");
let endpoint = format!("{}/sdk", origin.trim_end_matches('/'));
let soap = crate::xml::client::soap_test_client_for_logout_drop(
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.unwrap(),
endpoint,
);
drop(soap);
stub.join().expect("stub thread");
let out = joined_wire_output();
assert!(
out.contains("wire=soap")
&& out.contains("kind=logout")
&& out.contains("status=200")
&& !out.contains("error=http_failure"),
"{out}"
);
}
#[cfg(feature = "xml")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn soap_drop_logout_emits_wire_on_http_non_success() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let (origin, stub) = spawn_http_stub_once(502, b"err");
let endpoint = format!("{}/sdk", origin.trim_end_matches('/'));
let soap = crate::xml::client::soap_test_client_for_logout_drop(
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.unwrap(),
endpoint,
);
drop(soap);
stub.join().expect("stub thread");
let out = joined_wire_output();
assert!(
out.contains("error=http_failure") && out.contains("status=502"),
"{out}"
);
}
#[cfg(feature = "xml")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn soap_drop_logout_emits_wire_transport_line() {
let _serial = SERIAL.lock().expect("serial");
init_wire_capture();
clear_wire_lines();
let endpoint = format!("http://{}/sdk", super::TEST_WIRE_DEAD_ADDR);
let soap = crate::xml::client::soap_test_client_for_logout_drop(
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(2))
.build()
.unwrap(),
endpoint,
);
drop(soap);
let out = joined_wire_output();
assert!(
out.contains("kind=logout") && out.contains("error=transport"),
"{out}"
);
}
}