1use std::{
2 collections::HashMap,
3 ffi::{CString, OsStr, OsString},
4 net::SocketAddrV4,
5 num::NonZeroU64,
6 path::{Path, PathBuf},
7 process::{Child, Command, Stdio},
8 thread,
9 time::Duration,
10};
11
12use log::{debug, error, warn};
13use temp_env;
14
15use crate::{
16 errors::{ErrorContext, HapiError, Result},
17 ffi::{self, ThriftServerOptions, enums::StatusVerbosity},
18 session::UninitializedSession,
19 utils,
20};
21
22pub use crate::ffi::raw::ThriftSharedMemoryBufferType;
23
24#[derive(Copy, Clone, Debug, PartialEq, Eq)]
25pub enum LicensePreference {
26 AnyAvailable,
27 HoudiniEngineOnly,
28 HoudiniEngineAndCore,
29}
30
31impl std::fmt::Display for LicensePreference {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(
34 f,
35 "{}",
36 match self {
37 LicensePreference::AnyAvailable => {
38 "--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
39 }
40 LicensePreference::HoudiniEngineOnly => {
41 "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
42 }
43 LicensePreference::HoudiniEngineAndCore => {
44 "--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
45 }
46 }
47 )
48 }
49}
50
51#[derive(Clone, Debug)]
52pub struct ThriftSharedMemoryTransport {
53 pub memory_name: String,
54 pub buffer_type: ThriftSharedMemoryBufferType,
55 pub buffer_size: i64,
56}
57
58#[derive(Clone, Debug)]
59pub struct ThriftSocketTransport {
60 pub address: SocketAddrV4,
61}
62
63#[derive(Clone, Debug)]
64pub struct ThriftPipeTransport {
65 pub pipe_path: PathBuf,
66}
67
68#[derive(Clone, Debug)]
69pub enum ThriftTransport {
70 SharedMemory(ThriftSharedMemoryTransport),
71 Pipe(ThriftPipeTransport),
72 Socket(ThriftSocketTransport),
73}
74
75pub struct ThriftSharedMemoryTransportBuilder {
76 memory_name: String,
77 buffer_type: ThriftSharedMemoryBufferType,
78 buffer_size: i64,
79}
80
81impl Default for ThriftSharedMemoryTransportBuilder {
82 fn default() -> Self {
83 Self {
84 memory_name: format!("shared-memory-{}", utils::random_string(16)),
85 buffer_type: ThriftSharedMemoryBufferType::Buffer,
86 buffer_size: 1024, }
88 }
89}
90
91impl ThriftSharedMemoryTransportBuilder {
92 pub fn with_memory_name(mut self, name: impl Into<String>) -> Self {
93 self.memory_name = name.into();
94 self
95 }
96 pub fn with_buffer_type(mut self, buffer_type: ThriftSharedMemoryBufferType) -> Self {
97 self.buffer_type = buffer_type;
98 self
99 }
100 pub fn with_buffer_size(mut self, buffer_size: NonZeroU64) -> Self {
101 self.buffer_size = match buffer_size.get().try_into() {
102 Ok(size) => size,
103 Err(_) => {
104 warn!(
106 "ThriftSharedMemoryTransport buffer size is too large, using default of 1024"
107 );
108 1024
109 }
110 };
111 self
112 }
113 pub fn build(self) -> ThriftSharedMemoryTransport {
114 ThriftSharedMemoryTransport {
115 memory_name: self.memory_name,
116 buffer_type: self.buffer_type,
117 buffer_size: self.buffer_size,
118 }
119 }
120}
121
122#[derive(Clone, Debug)]
124pub struct ServerOptions {
125 pub thrift_transport: ThriftTransport,
126 pub auto_close: bool,
127 pub verbosity: StatusVerbosity,
128 pub log_file: Option<CString>,
129 pub env_variables: Option<HashMap<OsString, OsString>>,
130 pub license_preference: Option<LicensePreference>,
131 pub connection_count: i32,
132 pub server_ready_timeout: Option<u32>,
133 pub(crate) connection_retry_interval: Option<Duration>,
134}
135
136impl Default for ServerOptions {
137 fn default() -> Self {
138 Self {
139 thrift_transport: ThriftTransport::SharedMemory(
140 ThriftSharedMemoryTransportBuilder::default().build(),
141 ),
142 auto_close: true,
143 verbosity: StatusVerbosity::Statusverbosity0,
144 log_file: None,
145 env_variables: None,
146 license_preference: None,
147 connection_count: 0,
148 server_ready_timeout: None,
149 connection_retry_interval: Some(Duration::from_secs(10)),
150 }
151 }
152}
153
154impl ServerOptions {
155 pub fn shared_memory_with_defaults() -> Self {
157 Self::default().with_thrift_transport(ThriftTransport::SharedMemory(
158 ThriftSharedMemoryTransportBuilder::default().build(),
159 ))
160 }
161
162 pub fn pipe_with_defaults() -> Self {
164 Self::default().with_thrift_transport(ThriftTransport::Pipe(ThriftPipeTransport {
165 pipe_path: PathBuf::from(format!("hapi-pipe-{}", utils::random_string(16))),
166 }))
167 }
168
169 pub fn socket_with_defaults(address: SocketAddrV4) -> Self {
171 Self::default()
172 .with_thrift_transport(ThriftTransport::Socket(ThriftSocketTransport { address }))
173 }
174
175 pub fn with_thrift_transport(mut self, transport: ThriftTransport) -> Self {
176 self.thrift_transport = transport;
177 self
178 }
179
180 pub fn with_connection_timeout(mut self, timeout: Option<Duration>) -> Self {
182 self.connection_retry_interval = timeout;
183 self
184 }
185
186 pub fn with_license_preference(mut self, license_preference: LicensePreference) -> Self {
190 self.license_preference.replace(license_preference);
191
192 self.env_variables.get_or_insert_default().insert(
193 OsString::from("HOUDINI_PLUGIN_LIC_OPT"),
194 OsString::from(license_preference.to_string()),
195 );
196
197 self
198 }
199
200 pub fn with_log_file(mut self, file: impl AsRef<Path>) -> Self {
203 self.log_file = Some(utils::path_to_cstring(file).expect("Path to CString failed"));
204 self
205 }
206
207 pub fn with_env_variables<'a, I, K, V>(mut self, variables: I) -> Self
211 where
212 I: Iterator<Item = &'a (K, V)>,
213 K: Into<OsString> + Clone + 'a,
214 V: Into<OsString> + Clone + 'a,
215 {
216 self.env_variables = Some(
217 variables
218 .map(|(k, v)| (k.clone().into(), v.clone().into()))
219 .collect(),
220 );
221 self
222 }
223
224 pub fn with_auto_close(mut self, auto_close: bool) -> Self {
226 self.auto_close = auto_close;
227 self
228 }
229
230 pub fn with_verbosity(mut self, verbosity: StatusVerbosity) -> Self {
232 self.verbosity = verbosity;
233 self
234 }
235
236 pub fn with_connection_count(mut self, connection_count: i32) -> Self {
237 self.connection_count = connection_count;
241 self
242 }
243
244 pub fn with_server_ready_timeout(mut self, timeout: u32) -> Self {
247 self.server_ready_timeout.replace(timeout);
248 self
249 }
250
251 pub(crate) fn session_info(&self) -> crate::ffi::SessionInfo {
252 let mut session_info =
253 crate::ffi::SessionInfo::default().with_connection_count(self.connection_count);
254
255 if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
256 session_info.set_shared_memory_buffer_type(transport.buffer_type);
257 session_info.set_shared_memory_buffer_size(transport.buffer_size);
258 }
259
260 session_info
261 }
262
263 pub(crate) fn thrift_options(&self) -> crate::ffi::ThriftServerOptions {
264 let mut options = ThriftServerOptions::default()
265 .with_auto_close(self.auto_close)
266 .with_verbosity(self.verbosity);
267
268 if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
269 options.set_shared_memory_buffer_type(transport.buffer_type);
270 options.set_shared_memory_buffer_size(transport.buffer_size);
271 }
272 if let Some(timeout) = self.server_ready_timeout {
273 options.set_timeout_ms(timeout as f32);
274 }
275
276 options
277 }
278}
279
280fn call_with_temp_environment<R, T, F>(variables: Option<&[(T, T)]>, f: F) -> Result<R>
281where
282 T: AsRef<OsStr>,
283 F: FnOnce() -> Result<R>,
284{
285 if let Some(env_variables) = variables {
286 let env_variables: Vec<(&OsStr, Option<&OsStr>)> = env_variables
287 .iter()
288 .map(|(k, v)| (k.as_ref(), Some(v.as_ref())))
289 .collect::<Vec<_>>();
290 temp_env::with_vars(env_variables.as_slice(), f)
291 } else {
292 f()
293 }
294}
295
296pub fn connect_to_pipe_server(
298 server_options: ServerOptions,
299 pid: Option<u32>,
300) -> Result<UninitializedSession> {
301 let ThriftTransport::Pipe(ThriftPipeTransport { pipe_path }) = &server_options.thrift_transport
302 else {
303 return Err(HapiError::Internal(
304 "ServerOptions is not configured for pipe transport".to_owned(),
305 ));
306 };
307 let pipe_name = utils::path_to_cstring(pipe_path)?;
308 debug!("Connecting to pipe server: {:?}", pipe_path.display());
309 let handle = try_connect_with_timeout(
310 server_options.connection_retry_interval,
311 Duration::from_millis(100),
312 || ffi::new_thrift_piped_session(&pipe_name, &server_options.session_info().0),
313 )?;
314 Ok(UninitializedSession {
315 session_handle: handle,
316 server_options: Some(server_options),
317 server_pid: pid,
318 })
319}
320
321pub fn connect_to_memory_server(
323 server_options: ServerOptions,
324 pid: Option<u32>,
325) -> Result<UninitializedSession> {
326 let ThriftTransport::SharedMemory(ThriftSharedMemoryTransport { memory_name, .. }) =
327 &server_options.thrift_transport
328 else {
329 return Err(HapiError::Internal(
330 "ServerOptions is not configured for shared memory transport".to_owned(),
331 ));
332 };
333 let mem_name_cstr = CString::new(memory_name.clone())?;
334 debug!("Connecting to shared memory server: {:?}", memory_name);
335 let handle = try_connect_with_timeout(
336 server_options.connection_retry_interval,
337 Duration::from_millis(100),
338 || ffi::new_thrift_shared_memory_session(&mem_name_cstr, &server_options.session_info().0),
339 )?;
340 Ok(UninitializedSession {
341 session_handle: handle,
342 server_options: Some(server_options),
343 server_pid: pid,
344 })
345}
346
347fn try_connect_with_timeout<F: Fn() -> Result<crate::ffi::raw::HAPI_Session>>(
348 timeout: Option<Duration>,
349 wait_ms: Duration,
350 f: F,
351) -> Result<crate::ffi::raw::HAPI_Session> {
352 debug!("Trying to connect to server with timeout: {:?}", timeout);
353 let mut waited = Duration::from_secs(0);
354 let mut last_error = None;
355 let handle = loop {
356 match f() {
357 Ok(handle) => break handle,
358 Err(e) => {
359 error!("Error while trying to connect to server: {:?}", e);
360 last_error.replace(e);
361 thread::sleep(wait_ms);
362 waited += wait_ms;
363 }
364 }
365 if let Some(timeout) = timeout
366 && waited > timeout
367 {
368 return Err(last_error.unwrap()).context(format!(
370 "Could not connect to server within timeout: {timeout:?}"
371 ));
372 }
373 };
374 Ok(handle)
375}
376
377pub fn connect_to_socket_server(
379 server_options: ServerOptions,
380 pid: Option<u32>,
381) -> Result<UninitializedSession> {
382 let ThriftTransport::Socket(ThriftSocketTransport { address }) =
383 &server_options.thrift_transport
384 else {
385 return Err(HapiError::Internal(
386 "ServerOptions is not configured for socket transport".to_owned(),
387 ));
388 };
389 debug!("Connecting to socket server: {:?}", address);
390 let host = CString::new(address.ip().to_string())
391 .map_err(HapiError::from)
392 .context("Converting SocketAddr to CString")?;
393 let handle = try_connect_with_timeout(
394 server_options.connection_retry_interval,
395 Duration::from_millis(100),
396 || {
397 ffi::new_thrift_socket_session(
398 address.port() as i32,
399 &host,
400 &server_options.session_info().0,
401 )
402 },
403 )?;
404 Ok(UninitializedSession {
405 session_handle: handle,
406 server_options: Some(server_options),
407 server_pid: pid,
408 })
409}
410
411pub fn start_engine_server(server_options: &ServerOptions) -> Result<u32> {
412 let env_variables = server_options.env_variables.as_ref().map(|env_variables| {
413 env_variables
414 .iter()
415 .map(|(k, v)| (k.as_os_str(), v.as_os_str()))
416 .collect::<Vec<_>>()
417 });
418 match &server_options.thrift_transport {
419 ThriftTransport::SharedMemory(transport) => {
420 debug!(
421 "Starting shared memory server name: {}",
422 transport.memory_name
423 );
424 let memory_name = CString::new(transport.memory_name.clone())?;
425 ffi::clear_connection_error()?;
426 call_with_temp_environment(env_variables.as_deref(), || {
427 ffi::start_thrift_shared_memory_server(
428 &memory_name,
429 &server_options.thrift_options().0,
430 server_options.log_file.as_deref(),
431 )
432 .with_context(|| {
433 format!(
434 "Failed to start shared memory server: {}",
435 transport.memory_name
436 )
437 })
438 })
439 }
440 ThriftTransport::Pipe(transport) => {
441 debug!("Starting named pipe server: {:?}", transport.pipe_path);
442 let pipe_name = utils::path_to_cstring(&transport.pipe_path)?;
443 ffi::clear_connection_error()?;
444 call_with_temp_environment(env_variables.as_deref(), || {
445 ffi::start_thrift_pipe_server(
446 &pipe_name,
447 &server_options.thrift_options().0,
448 server_options.log_file.as_deref(),
449 )
450 .with_context(|| format!("Failed to start pipe server: {:?}", transport.pipe_path))
451 })
452 }
453 ThriftTransport::Socket(transport) => {
454 debug!(
455 "Starting socket server on port: {}",
456 transport.address.port()
457 );
458 ffi::clear_connection_error()?;
459 call_with_temp_environment(env_variables.as_deref(), || {
460 ffi::start_thrift_socket_server(
461 transport.address.port() as i32,
462 &server_options.thrift_options().0,
463 server_options.log_file.as_deref(),
464 )
465 })
466 }
467 }
468}
469
470pub fn start_houdini_server(
472 pipe_name: impl AsRef<str>,
473 houdini_executable: impl AsRef<Path>,
474 fx_license: bool,
475 env_variables: Option<&[(String, String)]>,
476) -> Result<Child> {
477 let mut command = Command::new(houdini_executable.as_ref());
478 call_with_temp_environment(env_variables, move || {
479 command
480 .arg(format!("-hess=pipe:{}", pipe_name.as_ref()))
481 .arg(if fx_license {
482 "-force-fx-license"
483 } else {
484 "-core"
485 })
486 .stdin(Stdio::null())
487 .stdout(Stdio::null())
488 .stderr(Stdio::null())
489 .spawn()
490 .map_err(HapiError::from)
491 })
492}