use std::{
collections::HashMap,
ffi::{CString, OsStr, OsString},
net::SocketAddrV4,
num::NonZeroU64,
path::{Path, PathBuf},
process::{Child, Command, Stdio},
thread,
time::Duration,
};
use log::{debug, error, warn};
use temp_env;
use crate::{
errors::{ErrorContext, HapiError, Result},
ffi::{self, ThriftServerOptions, enums::StatusVerbosity},
session::UninitializedSession,
utils,
};
pub use crate::ffi::raw::ThriftSharedMemoryBufferType;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum LicensePreference {
AnyAvailable,
HoudiniEngineOnly,
HoudiniEngineAndCore,
}
impl std::fmt::Display for LicensePreference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
LicensePreference::AnyAvailable => {
"--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
}
LicensePreference::HoudiniEngineOnly => {
"--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
}
LicensePreference::HoudiniEngineAndCore => {
"--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
}
}
)
}
}
#[derive(Clone, Debug)]
pub struct ThriftSharedMemoryTransport {
pub memory_name: String,
pub buffer_type: ThriftSharedMemoryBufferType,
pub buffer_size: i64,
}
#[derive(Clone, Debug)]
pub struct ThriftSocketTransport {
pub address: SocketAddrV4,
}
#[derive(Clone, Debug)]
pub struct ThriftPipeTransport {
pub pipe_path: PathBuf,
}
#[derive(Clone, Debug)]
pub enum ThriftTransport {
SharedMemory(ThriftSharedMemoryTransport),
Pipe(ThriftPipeTransport),
Socket(ThriftSocketTransport),
}
pub struct ThriftSharedMemoryTransportBuilder {
memory_name: String,
buffer_type: ThriftSharedMemoryBufferType,
buffer_size: i64,
}
impl Default for ThriftSharedMemoryTransportBuilder {
fn default() -> Self {
Self {
memory_name: format!("shared-memory-{}", utils::random_string(16)),
buffer_type: ThriftSharedMemoryBufferType::Buffer,
buffer_size: 1024, }
}
}
impl ThriftSharedMemoryTransportBuilder {
#[must_use]
pub fn with_memory_name(mut self, name: impl Into<String>) -> Self {
self.memory_name = name.into();
self
}
#[must_use]
pub fn with_buffer_type(mut self, buffer_type: ThriftSharedMemoryBufferType) -> Self {
self.buffer_type = buffer_type;
self
}
#[must_use]
pub fn with_buffer_size(mut self, buffer_size: NonZeroU64) -> Self {
self.buffer_size = if let Ok(size) = buffer_size.get().try_into() {
size
} else {
warn!("ThriftSharedMemoryTransport buffer size is too large, using default of 1024");
1024
};
self
}
#[must_use]
pub fn build(self) -> ThriftSharedMemoryTransport {
ThriftSharedMemoryTransport {
memory_name: self.memory_name,
buffer_type: self.buffer_type,
buffer_size: self.buffer_size,
}
}
}
#[derive(Clone, Debug)]
pub struct ServerOptions {
pub thrift_transport: ThriftTransport,
pub auto_close: bool,
pub verbosity: StatusVerbosity,
pub log_file: Option<CString>,
pub env_variables: Option<HashMap<OsString, OsString>>,
pub license_preference: Option<LicensePreference>,
pub connection_count: i32,
pub server_ready_timeout: Option<u32>,
pub(crate) connection_retry_interval: Option<Duration>,
}
impl Default for ServerOptions {
fn default() -> Self {
Self {
thrift_transport: ThriftTransport::SharedMemory(
ThriftSharedMemoryTransportBuilder::default().build(),
),
auto_close: true,
verbosity: StatusVerbosity::Statusverbosity0,
log_file: None,
env_variables: None,
license_preference: None,
connection_count: 0,
server_ready_timeout: None,
connection_retry_interval: Some(Duration::from_secs(10)),
}
}
}
impl ServerOptions {
#[must_use]
pub fn shared_memory_with_defaults() -> Self {
Self::default().with_thrift_transport(ThriftTransport::SharedMemory(
ThriftSharedMemoryTransportBuilder::default().build(),
))
}
#[must_use]
pub fn pipe_with_defaults() -> Self {
Self::default().with_thrift_transport(ThriftTransport::Pipe(ThriftPipeTransport {
pipe_path: PathBuf::from(format!("hapi-pipe-{}", utils::random_string(16))),
}))
}
#[must_use]
pub fn socket_with_defaults(address: SocketAddrV4) -> Self {
Self::default()
.with_thrift_transport(ThriftTransport::Socket(ThriftSocketTransport { address }))
}
#[must_use]
pub fn with_thrift_transport(mut self, transport: ThriftTransport) -> Self {
self.thrift_transport = transport;
self
}
#[must_use]
pub fn with_connection_timeout(mut self, timeout: Option<Duration>) -> Self {
self.connection_retry_interval = timeout;
self
}
#[must_use]
pub fn with_license_preference(mut self, license_preference: LicensePreference) -> Self {
self.license_preference.replace(license_preference);
self.env_variables.get_or_insert_default().insert(
OsString::from("HOUDINI_PLUGIN_LIC_OPT"),
OsString::from(license_preference.to_string()),
);
self
}
#[must_use]
pub fn with_log_file(mut self, file: impl AsRef<Path>) -> Self {
self.log_file = Some(utils::path_to_cstring(file).expect("Path to CString failed"));
self
}
#[must_use]
pub fn with_env_variables<'a, I, K, V>(mut self, variables: I) -> Self
where
I: Iterator<Item = &'a (K, V)>,
K: Into<OsString> + Clone + 'a,
V: Into<OsString> + Clone + 'a,
{
self.env_variables = Some(
variables
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.collect(),
);
self
}
#[must_use]
pub fn with_auto_close(mut self, auto_close: bool) -> Self {
self.auto_close = auto_close;
self
}
#[must_use]
pub fn with_verbosity(mut self, verbosity: StatusVerbosity) -> Self {
self.verbosity = verbosity;
self
}
#[must_use]
#[cfg(feature = "async-cooking")]
pub fn with_connection_count(mut self, connection_count: i32) -> Self {
self.connection_count = connection_count;
self
}
#[must_use]
pub fn with_server_ready_timeout(mut self, timeout: u32) -> Self {
self.server_ready_timeout.replace(timeout);
self
}
pub(crate) fn session_info(&self) -> crate::ffi::SessionInfo {
let mut session_info =
crate::ffi::SessionInfo::default().with_connection_count(self.connection_count);
if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
session_info.set_shared_memory_buffer_type(transport.buffer_type);
session_info.set_shared_memory_buffer_size(transport.buffer_size);
}
session_info
}
pub(crate) fn thrift_options(&self) -> crate::ffi::ThriftServerOptions {
let mut options = ThriftServerOptions::default()
.with_auto_close(self.auto_close)
.with_verbosity(self.verbosity);
if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
options.set_shared_memory_buffer_type(transport.buffer_type);
options.set_shared_memory_buffer_size(transport.buffer_size);
}
if let Some(timeout) = self.server_ready_timeout {
#[allow(clippy::cast_precision_loss)]
options.set_timeout_ms(timeout as f32);
}
options
}
}
fn call_with_temp_environment<R, T, F>(variables: Option<&[(T, T)]>, f: F) -> Result<R>
where
T: AsRef<OsStr>,
F: FnOnce() -> Result<R>,
{
if let Some(env_variables) = variables {
let env_variables: Vec<(&OsStr, Option<&OsStr>)> = env_variables
.iter()
.map(|(k, v)| (k.as_ref(), Some(v.as_ref())))
.collect::<Vec<_>>();
temp_env::with_vars(env_variables.as_slice(), f)
} else {
f()
}
}
pub fn connect_to_pipe_server(
server_options: ServerOptions,
pid: Option<u32>,
) -> Result<UninitializedSession> {
let ThriftTransport::Pipe(ThriftPipeTransport { pipe_path }) = &server_options.thrift_transport
else {
return Err(HapiError::Internal(
"ServerOptions is not configured for pipe transport".to_owned(),
));
};
let pipe_name = utils::path_to_cstring(pipe_path)?;
debug!("Connecting to pipe server: {:?}", pipe_path.display());
let handle = try_connect_with_timeout(
server_options.connection_retry_interval,
Duration::from_millis(100),
|| ffi::new_thrift_piped_session(&pipe_name, &server_options.session_info().0),
)?;
Ok(UninitializedSession {
session_handle: handle,
server_options: Some(server_options),
server_pid: pid,
})
}
pub fn connect_to_memory_server(
server_options: ServerOptions,
pid: Option<u32>,
) -> Result<UninitializedSession> {
let ThriftTransport::SharedMemory(ThriftSharedMemoryTransport { memory_name, .. }) =
&server_options.thrift_transport
else {
return Err(HapiError::Internal(
"ServerOptions is not configured for shared memory transport".to_owned(),
));
};
let mem_name_cstr = CString::new(memory_name.clone())?;
debug!("Connecting to shared memory server: {memory_name:?}");
let handle = try_connect_with_timeout(
server_options.connection_retry_interval,
Duration::from_millis(100),
|| ffi::new_thrift_shared_memory_session(&mem_name_cstr, &server_options.session_info().0),
)?;
Ok(UninitializedSession {
session_handle: handle,
server_options: Some(server_options),
server_pid: pid,
})
}
fn try_connect_with_timeout<F: Fn() -> Result<crate::ffi::raw::HAPI_Session>>(
timeout: Option<Duration>,
wait_ms: Duration,
f: F,
) -> Result<crate::ffi::raw::HAPI_Session> {
debug!("Trying to connect to server with timeout: {timeout:?}");
let mut waited = Duration::from_secs(0);
let mut last_error = None;
let handle = loop {
match f() {
Ok(handle) => break handle,
Err(e) => {
error!("Error while trying to connect to server: {e:?}");
last_error.replace(e);
thread::sleep(wait_ms);
waited += wait_ms;
}
}
if let Some(timeout) = timeout
&& waited > timeout
{
return Err(last_error.unwrap()).context(format!(
"Could not connect to server within timeout: {timeout:?}"
));
}
};
Ok(handle)
}
pub fn connect_to_socket_server(
server_options: ServerOptions,
pid: Option<u32>,
) -> Result<UninitializedSession> {
let ThriftTransport::Socket(ThriftSocketTransport { address }) =
&server_options.thrift_transport
else {
return Err(HapiError::Internal(
"ServerOptions is not configured for socket transport".to_owned(),
));
};
debug!("Connecting to socket server: {address:?}");
let host = CString::new(address.ip().to_string())
.map_err(HapiError::from)
.context("Converting SocketAddr to CString")?;
let handle = try_connect_with_timeout(
server_options.connection_retry_interval,
Duration::from_millis(100),
|| {
ffi::new_thrift_socket_session(
i32::from(address.port()),
&host,
&server_options.session_info().0,
)
},
)?;
Ok(UninitializedSession {
session_handle: handle,
server_options: Some(server_options),
server_pid: pid,
})
}
pub fn start_engine_server(server_options: &ServerOptions) -> Result<u32> {
let env_variables = server_options.env_variables.as_ref().map(|env_variables| {
env_variables
.iter()
.map(|(k, v)| (k.as_os_str(), v.as_os_str()))
.collect::<Vec<_>>()
});
match &server_options.thrift_transport {
ThriftTransport::SharedMemory(transport) => {
debug!(
"Starting shared memory server name: {}",
transport.memory_name
);
let memory_name = CString::new(transport.memory_name.clone())?;
ffi::clear_connection_error()?;
call_with_temp_environment(env_variables.as_deref(), || {
ffi::start_thrift_shared_memory_server(
&memory_name,
&server_options.thrift_options().0,
server_options.log_file.as_deref(),
)
.with_context(|| {
format!(
"Failed to start shared memory server: {}",
transport.memory_name
)
})
})
}
ThriftTransport::Pipe(transport) => {
debug!(
"Starting named pipe server: {}",
transport.pipe_path.display()
);
let pipe_name = utils::path_to_cstring(&transport.pipe_path)?;
ffi::clear_connection_error()?;
call_with_temp_environment(env_variables.as_deref(), || {
ffi::start_thrift_pipe_server(
&pipe_name,
&server_options.thrift_options().0,
server_options.log_file.as_deref(),
)
.with_context(|| {
format!(
"Failed to start pipe server: {}",
transport.pipe_path.display()
)
})
})
}
ThriftTransport::Socket(transport) => {
debug!(
"Starting socket server on port: {}",
transport.address.port()
);
ffi::clear_connection_error()?;
call_with_temp_environment(env_variables.as_deref(), || {
ffi::start_thrift_socket_server(
i32::from(transport.address.port()),
&server_options.thrift_options().0,
server_options.log_file.as_deref(),
)
})
}
}
}
pub fn start_houdini_server(
pipe_name: impl AsRef<str>,
houdini_executable: impl AsRef<Path>,
fx_license: bool,
env_variables: Option<&[(String, String)]>,
) -> Result<Child> {
let mut command = Command::new(houdini_executable.as_ref());
call_with_temp_environment(env_variables, move || {
command
.arg(format!("-hess=pipe:{}", pipe_name.as_ref()))
.arg(if fx_license {
"-force-fx-license"
} else {
"-core"
})
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.map_err(HapiError::from)
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::{ffi::OsString, net::Ipv4Addr, num::NonZeroU64};
use crate::ffi::enums::StatusVerbosity;
#[test]
fn license_preference_display_strings() {
assert_eq!(
LicensePreference::AnyAvailable.to_string(),
"--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
);
assert_eq!(
LicensePreference::HoudiniEngineOnly.to_string(),
"--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
);
assert_eq!(
LicensePreference::HoudiniEngineAndCore.to_string(),
"--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
);
}
#[test]
fn shared_memory_transport_builder_applies_options() {
let transport = ThriftSharedMemoryTransportBuilder::default()
.with_memory_name("test-memory")
.with_buffer_type(ThriftSharedMemoryBufferType::RingBuffer)
.with_buffer_size(NonZeroU64::new(512).unwrap())
.build();
assert_eq!(transport.memory_name, "test-memory");
assert_eq!(transport.buffer_type, ThriftSharedMemoryBufferType::RingBuffer);
assert_eq!(transport.buffer_size, 512);
}
#[test]
fn shared_memory_transport_builder_clamps_oversized_buffer() {
let transport = ThriftSharedMemoryTransportBuilder::default()
.with_buffer_size(NonZeroU64::new(i64::MAX as u64 + 1).unwrap())
.build();
assert_eq!(transport.buffer_size, 1024);
}
#[test]
fn server_options_shared_memory_maps_to_session_and_thrift_options() {
let transport = ThriftSharedMemoryTransportBuilder::default()
.with_buffer_type(ThriftSharedMemoryBufferType::RingBuffer)
.with_buffer_size(NonZeroU64::new(256).unwrap())
.build();
let options = ServerOptions::default()
.with_auto_close(false)
.with_verbosity(StatusVerbosity::Statusverbosity2)
.with_server_ready_timeout(5_000)
.with_thrift_transport(ThriftTransport::SharedMemory(transport.clone()));
let session_info = options.session_info();
assert_eq!(
session_info.shared_memory_buffer_type(),
ThriftSharedMemoryBufferType::RingBuffer
);
assert_eq!(session_info.shared_memory_buffer_size(), 256);
let thrift_options = options.thrift_options();
assert!(!thrift_options.auto_close());
assert_eq!(thrift_options.verbosity(), StatusVerbosity::Statusverbosity2);
assert_eq!(
thrift_options.shared_memory_buffer_type(),
ThriftSharedMemoryBufferType::RingBuffer
);
assert_eq!(thrift_options.shared_memory_buffer_size(), 256);
assert_eq!(thrift_options.timeout_ms(), 5_000.0);
}
#[test]
fn server_options_license_preference_sets_plugin_env() {
let options =
ServerOptions::default().with_license_preference(LicensePreference::HoudiniEngineOnly);
let env = options.env_variables.expect("env map");
assert_eq!(
env.get(&OsString::from("HOUDINI_PLUGIN_LIC_OPT")),
Some(&OsString::from(
"--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
))
);
}
#[test]
fn socket_with_defaults_preserves_address() {
let address = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 12_345);
let options = ServerOptions::socket_with_defaults(address);
let ThriftTransport::Socket(ThriftSocketTransport { address: actual }) =
options.thrift_transport
else {
panic!("expected socket transport");
};
assert_eq!(actual, address);
}
#[test]
fn connect_rejects_mismatched_transport_without_calling_hapi() {
let memory_options = ServerOptions::shared_memory_with_defaults();
let pipe_options = ServerOptions::pipe_with_defaults();
let socket_options =
ServerOptions::socket_with_defaults(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9_999));
assert!(connect_to_memory_server(pipe_options.clone(), None).is_err());
assert!(connect_to_pipe_server(memory_options.clone(), None).is_err());
assert!(connect_to_socket_server(memory_options, None).is_err());
assert!(connect_to_memory_server(socket_options.clone(), None).is_err());
assert!(connect_to_pipe_server(socket_options, None).is_err());
}
}