1use std::{
2 collections::HashMap,
3 ffi::{CString, OsStr, OsString},
4 net::SocketAddrV4,
5 num::NonZeroU64,
6 path::{Path, PathBuf},
7 process::{Child, Command, Stdio},
8 thread,
9 time::Duration,
10};
11
12use log::{debug, error, warn};
13use temp_env;
14
15use crate::{
16 errors::{ErrorContext, HapiError, Result},
17 ffi::{self, ThriftServerOptions, enums::StatusVerbosity},
18 session::UninitializedSession,
19 utils,
20};
21
22pub use crate::ffi::raw::ThriftSharedMemoryBufferType;
23
24#[derive(Copy, Clone, Debug, PartialEq, Eq)]
25pub enum LicensePreference {
26 AnyAvailable,
27 HoudiniEngineOnly,
28 HoudiniEngineAndCore,
29}
30
31impl std::fmt::Display for LicensePreference {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(
34 f,
35 "{}",
36 match self {
37 LicensePreference::AnyAvailable => {
38 "--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
39 }
40 LicensePreference::HoudiniEngineOnly => {
41 "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
42 }
43 LicensePreference::HoudiniEngineAndCore => {
44 "--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
45 }
46 }
47 )
48 }
49}
50
51#[derive(Clone, Debug)]
52pub struct ThriftSharedMemoryTransport {
53 pub memory_name: String,
54 pub buffer_type: ThriftSharedMemoryBufferType,
55 pub buffer_size: i64,
56}
57
58#[derive(Clone, Debug)]
59pub struct ThriftSocketTransport {
60 pub address: SocketAddrV4,
61}
62
63#[derive(Clone, Debug)]
64pub struct ThriftPipeTransport {
65 pub pipe_path: PathBuf,
66}
67
68#[derive(Clone, Debug)]
69pub enum ThriftTransport {
70 SharedMemory(ThriftSharedMemoryTransport),
71 Pipe(ThriftPipeTransport),
72 Socket(ThriftSocketTransport),
73}
74
75pub struct ThriftSharedMemoryTransportBuilder {
76 memory_name: String,
77 buffer_type: ThriftSharedMemoryBufferType,
78 buffer_size: i64,
79}
80
81impl Default for ThriftSharedMemoryTransportBuilder {
82 fn default() -> Self {
83 Self {
84 memory_name: format!("shared-memory-{}", utils::random_string(16)),
85 buffer_type: ThriftSharedMemoryBufferType::Buffer,
86 buffer_size: 1024, }
88 }
89}
90
91impl ThriftSharedMemoryTransportBuilder {
92 #[must_use]
93 pub fn with_memory_name(mut self, name: impl Into<String>) -> Self {
94 self.memory_name = name.into();
95 self
96 }
97 #[must_use]
98 pub fn with_buffer_type(mut self, buffer_type: ThriftSharedMemoryBufferType) -> Self {
99 self.buffer_type = buffer_type;
100 self
101 }
102 #[must_use]
103 pub fn with_buffer_size(mut self, buffer_size: NonZeroU64) -> Self {
104 self.buffer_size = if let Ok(size) = buffer_size.get().try_into() {
105 size
106 } else {
107 warn!("ThriftSharedMemoryTransport buffer size is too large, using default of 1024");
109 1024
110 };
111 self
112 }
113 #[must_use]
114 pub fn build(self) -> ThriftSharedMemoryTransport {
115 ThriftSharedMemoryTransport {
116 memory_name: self.memory_name,
117 buffer_type: self.buffer_type,
118 buffer_size: self.buffer_size,
119 }
120 }
121}
122
123#[derive(Clone, Debug)]
125pub struct ServerOptions {
126 pub thrift_transport: ThriftTransport,
127 pub auto_close: bool,
128 pub verbosity: StatusVerbosity,
129 pub log_file: Option<CString>,
130 pub env_variables: Option<HashMap<OsString, OsString>>,
131 pub license_preference: Option<LicensePreference>,
132 pub connection_count: i32,
133 pub server_ready_timeout: Option<u32>,
134 pub(crate) connection_retry_interval: Option<Duration>,
135}
136
137impl Default for ServerOptions {
138 fn default() -> Self {
139 Self {
140 thrift_transport: ThriftTransport::SharedMemory(
141 ThriftSharedMemoryTransportBuilder::default().build(),
142 ),
143 auto_close: true,
144 verbosity: StatusVerbosity::Statusverbosity0,
145 log_file: None,
146 env_variables: None,
147 license_preference: None,
148 connection_count: 0,
149 server_ready_timeout: None,
150 connection_retry_interval: Some(Duration::from_secs(10)),
151 }
152 }
153}
154
155impl ServerOptions {
156 #[must_use]
158 pub fn shared_memory_with_defaults() -> Self {
159 Self::default().with_thrift_transport(ThriftTransport::SharedMemory(
160 ThriftSharedMemoryTransportBuilder::default().build(),
161 ))
162 }
163
164 #[must_use]
166 pub fn pipe_with_defaults() -> Self {
167 Self::default().with_thrift_transport(ThriftTransport::Pipe(ThriftPipeTransport {
168 pipe_path: PathBuf::from(format!("hapi-pipe-{}", utils::random_string(16))),
169 }))
170 }
171
172 #[must_use]
174 pub fn socket_with_defaults(address: SocketAddrV4) -> Self {
175 Self::default()
176 .with_thrift_transport(ThriftTransport::Socket(ThriftSocketTransport { address }))
177 }
178
179 #[must_use]
180 pub fn with_thrift_transport(mut self, transport: ThriftTransport) -> Self {
181 self.thrift_transport = transport;
182 self
183 }
184
185 #[must_use]
187 pub fn with_connection_timeout(mut self, timeout: Option<Duration>) -> Self {
188 self.connection_retry_interval = timeout;
189 self
190 }
191
192 #[must_use]
196 pub fn with_license_preference(mut self, license_preference: LicensePreference) -> Self {
197 self.license_preference.replace(license_preference);
198
199 self.env_variables.get_or_insert_default().insert(
200 OsString::from("HOUDINI_PLUGIN_LIC_OPT"),
201 OsString::from(license_preference.to_string()),
202 );
203
204 self
205 }
206
207 #[must_use]
213 pub fn with_log_file(mut self, file: impl AsRef<Path>) -> Self {
214 self.log_file = Some(utils::path_to_cstring(file).expect("Path to CString failed"));
215 self
216 }
217
218 #[must_use]
222 pub fn with_env_variables<'a, I, K, V>(mut self, variables: I) -> Self
223 where
224 I: Iterator<Item = &'a (K, V)>,
225 K: Into<OsString> + Clone + 'a,
226 V: Into<OsString> + Clone + 'a,
227 {
228 self.env_variables = Some(
229 variables
230 .map(|(k, v)| (k.clone().into(), v.clone().into()))
231 .collect(),
232 );
233 self
234 }
235
236 #[must_use]
238 pub fn with_auto_close(mut self, auto_close: bool) -> Self {
239 self.auto_close = auto_close;
240 self
241 }
242
243 #[must_use]
245 pub fn with_verbosity(mut self, verbosity: StatusVerbosity) -> Self {
246 self.verbosity = verbosity;
247 self
248 }
249
250 #[must_use]
251 #[cfg(feature = "async-cooking")]
252 pub fn with_connection_count(mut self, connection_count: i32) -> Self {
253 self.connection_count = connection_count;
257 self
258 }
259
260 #[must_use]
263 pub fn with_server_ready_timeout(mut self, timeout: u32) -> Self {
264 self.server_ready_timeout.replace(timeout);
265 self
266 }
267
268 pub(crate) fn session_info(&self) -> crate::ffi::SessionInfo {
269 let mut session_info =
270 crate::ffi::SessionInfo::default().with_connection_count(self.connection_count);
271
272 if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
273 session_info.set_shared_memory_buffer_type(transport.buffer_type);
274 session_info.set_shared_memory_buffer_size(transport.buffer_size);
275 }
276
277 session_info
278 }
279
280 pub(crate) fn thrift_options(&self) -> crate::ffi::ThriftServerOptions {
281 let mut options = ThriftServerOptions::default()
282 .with_auto_close(self.auto_close)
283 .with_verbosity(self.verbosity);
284
285 if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
286 options.set_shared_memory_buffer_type(transport.buffer_type);
287 options.set_shared_memory_buffer_size(transport.buffer_size);
288 }
289 if let Some(timeout) = self.server_ready_timeout {
290 #[allow(clippy::cast_precision_loss)]
291 options.set_timeout_ms(timeout as f32);
292 }
293
294 options
295 }
296}
297
298fn call_with_temp_environment<R, T, F>(variables: Option<&[(T, T)]>, f: F) -> Result<R>
299where
300 T: AsRef<OsStr>,
301 F: FnOnce() -> Result<R>,
302{
303 if let Some(env_variables) = variables {
304 let env_variables: Vec<(&OsStr, Option<&OsStr>)> = env_variables
305 .iter()
306 .map(|(k, v)| (k.as_ref(), Some(v.as_ref())))
307 .collect::<Vec<_>>();
308 temp_env::with_vars(env_variables.as_slice(), f)
309 } else {
310 f()
311 }
312}
313
314pub fn connect_to_pipe_server(
316 server_options: ServerOptions,
317 pid: Option<u32>,
318) -> Result<UninitializedSession> {
319 let ThriftTransport::Pipe(ThriftPipeTransport { pipe_path }) = &server_options.thrift_transport
320 else {
321 return Err(HapiError::Internal(
322 "ServerOptions is not configured for pipe transport".to_owned(),
323 ));
324 };
325 let pipe_name = utils::path_to_cstring(pipe_path)?;
326 debug!("Connecting to pipe server: {:?}", pipe_path.display());
327 let handle = try_connect_with_timeout(
328 server_options.connection_retry_interval,
329 Duration::from_millis(100),
330 || ffi::new_thrift_piped_session(&pipe_name, &server_options.session_info().0),
331 )?;
332 Ok(UninitializedSession {
333 session_handle: handle,
334 server_options: Some(server_options),
335 server_pid: pid,
336 })
337}
338
339pub fn connect_to_memory_server(
341 server_options: ServerOptions,
342 pid: Option<u32>,
343) -> Result<UninitializedSession> {
344 let ThriftTransport::SharedMemory(ThriftSharedMemoryTransport { memory_name, .. }) =
345 &server_options.thrift_transport
346 else {
347 return Err(HapiError::Internal(
348 "ServerOptions is not configured for shared memory transport".to_owned(),
349 ));
350 };
351 let mem_name_cstr = CString::new(memory_name.clone())?;
352 debug!("Connecting to shared memory server: {memory_name:?}");
353 let handle = try_connect_with_timeout(
354 server_options.connection_retry_interval,
355 Duration::from_millis(100),
356 || ffi::new_thrift_shared_memory_session(&mem_name_cstr, &server_options.session_info().0),
357 )?;
358 Ok(UninitializedSession {
359 session_handle: handle,
360 server_options: Some(server_options),
361 server_pid: pid,
362 })
363}
364
365fn try_connect_with_timeout<F: Fn() -> Result<crate::ffi::raw::HAPI_Session>>(
366 timeout: Option<Duration>,
367 wait_ms: Duration,
368 f: F,
369) -> Result<crate::ffi::raw::HAPI_Session> {
370 debug!("Trying to connect to server with timeout: {timeout:?}");
371 let mut waited = Duration::from_secs(0);
372 let mut last_error = None;
373 let handle = loop {
374 match f() {
375 Ok(handle) => break handle,
376 Err(e) => {
377 error!("Error while trying to connect to server: {e:?}");
378 last_error.replace(e);
379 thread::sleep(wait_ms);
380 waited += wait_ms;
381 }
382 }
383 if let Some(timeout) = timeout
384 && waited > timeout
385 {
386 return Err(last_error.unwrap()).context(format!(
388 "Could not connect to server within timeout: {timeout:?}"
389 ));
390 }
391 };
392 Ok(handle)
393}
394
395pub fn connect_to_socket_server(
397 server_options: ServerOptions,
398 pid: Option<u32>,
399) -> Result<UninitializedSession> {
400 let ThriftTransport::Socket(ThriftSocketTransport { address }) =
401 &server_options.thrift_transport
402 else {
403 return Err(HapiError::Internal(
404 "ServerOptions is not configured for socket transport".to_owned(),
405 ));
406 };
407 debug!("Connecting to socket server: {address:?}");
408 let host = CString::new(address.ip().to_string())
409 .map_err(HapiError::from)
410 .context("Converting SocketAddr to CString")?;
411 let handle = try_connect_with_timeout(
412 server_options.connection_retry_interval,
413 Duration::from_millis(100),
414 || {
415 ffi::new_thrift_socket_session(
416 i32::from(address.port()),
417 &host,
418 &server_options.session_info().0,
419 )
420 },
421 )?;
422 Ok(UninitializedSession {
423 session_handle: handle,
424 server_options: Some(server_options),
425 server_pid: pid,
426 })
427}
428
429pub fn start_engine_server(server_options: &ServerOptions) -> Result<u32> {
430 let env_variables = server_options.env_variables.as_ref().map(|env_variables| {
431 env_variables
432 .iter()
433 .map(|(k, v)| (k.as_os_str(), v.as_os_str()))
434 .collect::<Vec<_>>()
435 });
436 match &server_options.thrift_transport {
437 ThriftTransport::SharedMemory(transport) => {
438 debug!(
439 "Starting shared memory server name: {}",
440 transport.memory_name
441 );
442 let memory_name = CString::new(transport.memory_name.clone())?;
443 ffi::clear_connection_error()?;
444 call_with_temp_environment(env_variables.as_deref(), || {
445 ffi::start_thrift_shared_memory_server(
446 &memory_name,
447 &server_options.thrift_options().0,
448 server_options.log_file.as_deref(),
449 )
450 .with_context(|| {
451 format!(
452 "Failed to start shared memory server: {}",
453 transport.memory_name
454 )
455 })
456 })
457 }
458 ThriftTransport::Pipe(transport) => {
459 debug!(
460 "Starting named pipe server: {}",
461 transport.pipe_path.display()
462 );
463 let pipe_name = utils::path_to_cstring(&transport.pipe_path)?;
464 ffi::clear_connection_error()?;
465 call_with_temp_environment(env_variables.as_deref(), || {
466 ffi::start_thrift_pipe_server(
467 &pipe_name,
468 &server_options.thrift_options().0,
469 server_options.log_file.as_deref(),
470 )
471 .with_context(|| {
472 format!(
473 "Failed to start pipe server: {}",
474 transport.pipe_path.display()
475 )
476 })
477 })
478 }
479 ThriftTransport::Socket(transport) => {
480 debug!(
481 "Starting socket server on port: {}",
482 transport.address.port()
483 );
484 ffi::clear_connection_error()?;
485 call_with_temp_environment(env_variables.as_deref(), || {
486 ffi::start_thrift_socket_server(
487 i32::from(transport.address.port()),
488 &server_options.thrift_options().0,
489 server_options.log_file.as_deref(),
490 )
491 })
492 }
493 }
494}
495
496pub fn start_houdini_server(
498 pipe_name: impl AsRef<str>,
499 houdini_executable: impl AsRef<Path>,
500 fx_license: bool,
501 env_variables: Option<&[(String, String)]>,
502) -> Result<Child> {
503 let mut command = Command::new(houdini_executable.as_ref());
504 call_with_temp_environment(env_variables, move || {
505 command
506 .arg(format!("-hess=pipe:{}", pipe_name.as_ref()))
507 .arg(if fx_license {
508 "-force-fx-license"
509 } else {
510 "-core"
511 })
512 .stdin(Stdio::null())
513 .stdout(Stdio::null())
514 .stderr(Stdio::null())
515 .spawn()
516 .map_err(HapiError::from)
517 })
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use std::{ffi::OsString, net::Ipv4Addr, num::NonZeroU64};
524
525 use crate::ffi::enums::StatusVerbosity;
526
527 #[test]
528 fn license_preference_display_strings() {
529 assert_eq!(
530 LicensePreference::AnyAvailable.to_string(),
531 "--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
532 );
533 assert_eq!(
534 LicensePreference::HoudiniEngineOnly.to_string(),
535 "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
536 );
537 assert_eq!(
538 LicensePreference::HoudiniEngineAndCore.to_string(),
539 "--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
540 );
541 }
542
543 #[test]
544 fn shared_memory_transport_builder_applies_options() {
545 let transport = ThriftSharedMemoryTransportBuilder::default()
546 .with_memory_name("test-memory")
547 .with_buffer_type(ThriftSharedMemoryBufferType::RingBuffer)
548 .with_buffer_size(NonZeroU64::new(512).unwrap())
549 .build();
550
551 assert_eq!(transport.memory_name, "test-memory");
552 assert_eq!(transport.buffer_type, ThriftSharedMemoryBufferType::RingBuffer);
553 assert_eq!(transport.buffer_size, 512);
554 }
555
556 #[test]
557 fn shared_memory_transport_builder_clamps_oversized_buffer() {
558 let transport = ThriftSharedMemoryTransportBuilder::default()
559 .with_buffer_size(NonZeroU64::new(i64::MAX as u64 + 1).unwrap())
560 .build();
561
562 assert_eq!(transport.buffer_size, 1024);
563 }
564
565 #[test]
566 fn server_options_shared_memory_maps_to_session_and_thrift_options() {
567 let transport = ThriftSharedMemoryTransportBuilder::default()
568 .with_buffer_type(ThriftSharedMemoryBufferType::RingBuffer)
569 .with_buffer_size(NonZeroU64::new(256).unwrap())
570 .build();
571 let options = ServerOptions::default()
572 .with_auto_close(false)
573 .with_verbosity(StatusVerbosity::Statusverbosity2)
574 .with_server_ready_timeout(5_000)
575 .with_thrift_transport(ThriftTransport::SharedMemory(transport.clone()));
576
577 let session_info = options.session_info();
578 assert_eq!(
579 session_info.shared_memory_buffer_type(),
580 ThriftSharedMemoryBufferType::RingBuffer
581 );
582 assert_eq!(session_info.shared_memory_buffer_size(), 256);
583
584 let thrift_options = options.thrift_options();
585 assert!(!thrift_options.auto_close());
586 assert_eq!(thrift_options.verbosity(), StatusVerbosity::Statusverbosity2);
587 assert_eq!(
588 thrift_options.shared_memory_buffer_type(),
589 ThriftSharedMemoryBufferType::RingBuffer
590 );
591 assert_eq!(thrift_options.shared_memory_buffer_size(), 256);
592 assert_eq!(thrift_options.timeout_ms(), 5_000.0);
593 }
594
595 #[test]
596 fn server_options_license_preference_sets_plugin_env() {
597 let options =
598 ServerOptions::default().with_license_preference(LicensePreference::HoudiniEngineOnly);
599 let env = options.env_variables.expect("env map");
600 assert_eq!(
601 env.get(&OsString::from("HOUDINI_PLUGIN_LIC_OPT")),
602 Some(&OsString::from(
603 "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
604 ))
605 );
606 }
607
608 #[test]
609 fn socket_with_defaults_preserves_address() {
610 let address = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 12_345);
611 let options = ServerOptions::socket_with_defaults(address);
612 let ThriftTransport::Socket(ThriftSocketTransport { address: actual }) =
613 options.thrift_transport
614 else {
615 panic!("expected socket transport");
616 };
617 assert_eq!(actual, address);
618 }
619
620 #[test]
621 fn connect_rejects_mismatched_transport_without_calling_hapi() {
622 let memory_options = ServerOptions::shared_memory_with_defaults();
623 let pipe_options = ServerOptions::pipe_with_defaults();
624 let socket_options =
625 ServerOptions::socket_with_defaults(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9_999));
626
627 assert!(connect_to_memory_server(pipe_options.clone(), None).is_err());
628 assert!(connect_to_pipe_server(memory_options.clone(), None).is_err());
629 assert!(connect_to_socket_server(memory_options, None).is_err());
630 assert!(connect_to_memory_server(socket_options.clone(), None).is_err());
631 assert!(connect_to_pipe_server(socket_options, None).is_err());
632 }
633}