Skip to main content

hapi_rs/
server.rs

1use std::{
2    collections::HashMap,
3    ffi::{CString, OsStr, OsString},
4    net::SocketAddrV4,
5    num::NonZeroU64,
6    path::{Path, PathBuf},
7    process::{Child, Command, Stdio},
8    thread,
9    time::Duration,
10};
11
12use log::{debug, error, warn};
13use temp_env;
14
15use crate::{
16    errors::{ErrorContext, HapiError, Result},
17    ffi::{self, ThriftServerOptions, enums::StatusVerbosity},
18    session::UninitializedSession,
19    utils,
20};
21
22pub use crate::ffi::raw::ThriftSharedMemoryBufferType;
23
24#[derive(Copy, Clone, Debug, PartialEq, Eq)]
25pub enum LicensePreference {
26    AnyAvailable,
27    HoudiniEngineOnly,
28    HoudiniEngineAndCore,
29}
30
31impl std::fmt::Display for LicensePreference {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(
34            f,
35            "{}",
36            match self {
37                LicensePreference::AnyAvailable => {
38                    "--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
39                }
40                LicensePreference::HoudiniEngineOnly => {
41                    "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
42                }
43                LicensePreference::HoudiniEngineAndCore => {
44                    "--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
45                }
46            }
47        )
48    }
49}
50
51#[derive(Clone, Debug)]
52pub struct ThriftSharedMemoryTransport {
53    pub memory_name: String,
54    pub buffer_type: ThriftSharedMemoryBufferType,
55    pub buffer_size: i64,
56}
57
58#[derive(Clone, Debug)]
59pub struct ThriftSocketTransport {
60    pub address: SocketAddrV4,
61}
62
63#[derive(Clone, Debug)]
64pub struct ThriftPipeTransport {
65    pub pipe_path: PathBuf,
66}
67
68#[derive(Clone, Debug)]
69pub enum ThriftTransport {
70    SharedMemory(ThriftSharedMemoryTransport),
71    Pipe(ThriftPipeTransport),
72    Socket(ThriftSocketTransport),
73}
74
75pub struct ThriftSharedMemoryTransportBuilder {
76    memory_name: String,
77    buffer_type: ThriftSharedMemoryBufferType,
78    buffer_size: i64,
79}
80
81impl Default for ThriftSharedMemoryTransportBuilder {
82    fn default() -> Self {
83        Self {
84            memory_name: format!("shared-memory-{}", utils::random_string(16)),
85            buffer_type: ThriftSharedMemoryBufferType::Buffer,
86            buffer_size: 1024, // MB
87        }
88    }
89}
90
91impl ThriftSharedMemoryTransportBuilder {
92    #[must_use]
93    pub fn with_memory_name(mut self, name: impl Into<String>) -> Self {
94        self.memory_name = name.into();
95        self
96    }
97    #[must_use]
98    pub fn with_buffer_type(mut self, buffer_type: ThriftSharedMemoryBufferType) -> Self {
99        self.buffer_type = buffer_type;
100        self
101    }
102    #[must_use]
103    pub fn with_buffer_size(mut self, buffer_size: NonZeroU64) -> Self {
104        self.buffer_size = if let Ok(size) = buffer_size.get().try_into() {
105            size
106        } else {
107            // When u64 can't fit into i64, use default of 1024 MB
108            warn!("ThriftSharedMemoryTransport buffer size is too large, using default of 1024");
109            1024
110        };
111        self
112    }
113    #[must_use]
114    pub fn build(self) -> ThriftSharedMemoryTransport {
115        ThriftSharedMemoryTransport {
116            memory_name: self.memory_name,
117            buffer_type: self.buffer_type,
118            buffer_size: self.buffer_size,
119        }
120    }
121}
122
123// TODO: rename ServerConfiguration
124#[derive(Clone, Debug)]
125pub struct ServerOptions {
126    pub thrift_transport: ThriftTransport,
127    pub auto_close: bool,
128    pub verbosity: StatusVerbosity,
129    pub log_file: Option<CString>,
130    pub env_variables: Option<HashMap<OsString, OsString>>,
131    pub license_preference: Option<LicensePreference>,
132    pub connection_count: i32,
133    pub server_ready_timeout: Option<u32>,
134    pub(crate) connection_retry_interval: Option<Duration>,
135}
136
137impl Default for ServerOptions {
138    fn default() -> Self {
139        Self {
140            thrift_transport: ThriftTransport::SharedMemory(
141                ThriftSharedMemoryTransportBuilder::default().build(),
142            ),
143            auto_close: true,
144            verbosity: StatusVerbosity::Statusverbosity0,
145            log_file: None,
146            env_variables: None,
147            license_preference: None,
148            connection_count: 0,
149            server_ready_timeout: None,
150            connection_retry_interval: Some(Duration::from_secs(10)),
151        }
152    }
153}
154
155impl ServerOptions {
156    /// Create options for a shared-memory transport with a random name.
157    #[must_use]
158    pub fn shared_memory_with_defaults() -> Self {
159        Self::default().with_thrift_transport(ThriftTransport::SharedMemory(
160            ThriftSharedMemoryTransportBuilder::default().build(),
161        ))
162    }
163
164    /// Create options for a named pipe transport.
165    #[must_use]
166    pub fn pipe_with_defaults() -> Self {
167        Self::default().with_thrift_transport(ThriftTransport::Pipe(ThriftPipeTransport {
168            pipe_path: PathBuf::from(format!("hapi-pipe-{}", utils::random_string(16))),
169        }))
170    }
171
172    /// Create options for a socket transport.
173    #[must_use]
174    pub fn socket_with_defaults(address: SocketAddrV4) -> Self {
175        Self::default()
176            .with_thrift_transport(ThriftTransport::Socket(ThriftSocketTransport { address }))
177    }
178
179    #[must_use]
180    pub fn with_thrift_transport(mut self, transport: ThriftTransport) -> Self {
181        self.thrift_transport = transport;
182        self
183    }
184
185    /// Set a connection timeout used when establishing Thrift sessions.
186    #[must_use]
187    pub fn with_connection_timeout(mut self, timeout: Option<Duration>) -> Self {
188        self.connection_retry_interval = timeout;
189        self
190    }
191
192    /// Set the license preference for the server.
193    /// For more information, see <https://www.sidefx.com/docs/houdini//licensing/system.html>
194    /// Default is No preference, the server decides which license to check out.
195    #[must_use]
196    pub fn with_license_preference(mut self, license_preference: LicensePreference) -> Self {
197        self.license_preference.replace(license_preference);
198
199        self.env_variables.get_or_insert_default().insert(
200            OsString::from("HOUDINI_PLUGIN_LIC_OPT"),
201            OsString::from(license_preference.to_string()),
202        );
203
204        self
205    }
206
207    /// Set the log file for the server.
208    /// BUG: HARS 21.0.685 has a bug where the log file is always created in the working directory
209    ///
210    /// # Panics
211    /// Panics if `file` contains an interior null byte and cannot be converted to a C string.
212    #[must_use]
213    pub fn with_log_file(mut self, file: impl AsRef<Path>) -> Self {
214        self.log_file = Some(utils::path_to_cstring(file).expect("Path to CString failed"));
215        self
216    }
217
218    /// Set **real** environment variables before the server starts.
219    /// Unlike [`crate::session::Session::set_server_var`], where the variables are set in the session after the
220    /// server starts.
221    #[must_use]
222    pub fn with_env_variables<'a, I, K, V>(mut self, variables: I) -> Self
223    where
224        I: Iterator<Item = &'a (K, V)>,
225        K: Into<OsString> + Clone + 'a,
226        V: Into<OsString> + Clone + 'a,
227    {
228        self.env_variables = Some(
229            variables
230                .map(|(k, v)| (k.clone().into(), v.clone().into()))
231                .collect(),
232        );
233        self
234    }
235
236    /// Automatically close the server when the last connection drops.
237    #[must_use]
238    pub fn with_auto_close(mut self, auto_close: bool) -> Self {
239        self.auto_close = auto_close;
240        self
241    }
242
243    /// Set the verbosity level for the server.
244    #[must_use]
245    pub fn with_verbosity(mut self, verbosity: StatusVerbosity) -> Self {
246        self.verbosity = verbosity;
247        self
248    }
249
250    #[must_use]
251    #[cfg(feature = "async-cooking")]
252    pub fn with_connection_count(mut self, connection_count: i32) -> Self {
253        // BUG: HARS 21.0.* has a bug where the connection count is not respected.
254        // If connection_count is > 0, there is a bug in HARS which prevents session creation.
255        // However, async attribute access requires a connection count > 0 according to SESI support, otherwise HARS crashes too.
256        self.connection_count = connection_count;
257        self
258    }
259
260    /// Set the timeout for the server to be ready in ms
261    /// This is the timeout for the server to initialize and be ready to accept connections.
262    #[must_use]
263    pub fn with_server_ready_timeout(mut self, timeout: u32) -> Self {
264        self.server_ready_timeout.replace(timeout);
265        self
266    }
267
268    pub(crate) fn session_info(&self) -> crate::ffi::SessionInfo {
269        let mut session_info =
270            crate::ffi::SessionInfo::default().with_connection_count(self.connection_count);
271
272        if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
273            session_info.set_shared_memory_buffer_type(transport.buffer_type);
274            session_info.set_shared_memory_buffer_size(transport.buffer_size);
275        }
276
277        session_info
278    }
279
280    pub(crate) fn thrift_options(&self) -> crate::ffi::ThriftServerOptions {
281        let mut options = ThriftServerOptions::default()
282            .with_auto_close(self.auto_close)
283            .with_verbosity(self.verbosity);
284
285        if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
286            options.set_shared_memory_buffer_type(transport.buffer_type);
287            options.set_shared_memory_buffer_size(transport.buffer_size);
288        }
289        if let Some(timeout) = self.server_ready_timeout {
290            #[allow(clippy::cast_precision_loss)]
291            options.set_timeout_ms(timeout as f32);
292        }
293
294        options
295    }
296}
297
298fn call_with_temp_environment<R, T, F>(variables: Option<&[(T, T)]>, f: F) -> Result<R>
299where
300    T: AsRef<OsStr>,
301    F: FnOnce() -> Result<R>,
302{
303    if let Some(env_variables) = variables {
304        let env_variables: Vec<(&OsStr, Option<&OsStr>)> = env_variables
305            .iter()
306            .map(|(k, v)| (k.as_ref(), Some(v.as_ref())))
307            .collect::<Vec<_>>();
308        temp_env::with_vars(env_variables.as_slice(), f)
309    } else {
310        f()
311    }
312}
313
314/// Connect to the Thrift pipe server and return an uninitialized session.
315pub fn connect_to_pipe_server(
316    server_options: ServerOptions,
317    pid: Option<u32>,
318) -> Result<UninitializedSession> {
319    let ThriftTransport::Pipe(ThriftPipeTransport { pipe_path }) = &server_options.thrift_transport
320    else {
321        return Err(HapiError::Internal(
322            "ServerOptions is not configured for pipe transport".to_owned(),
323        ));
324    };
325    let pipe_name = utils::path_to_cstring(pipe_path)?;
326    debug!("Connecting to pipe server: {:?}", pipe_path.display());
327    let handle = try_connect_with_timeout(
328        server_options.connection_retry_interval,
329        Duration::from_millis(100),
330        || ffi::new_thrift_piped_session(&pipe_name, &server_options.session_info().0),
331    )?;
332    Ok(UninitializedSession {
333        session_handle: handle,
334        server_options: Some(server_options),
335        server_pid: pid,
336    })
337}
338
339/// Connect to the Thrift shared memory server and return an uninitialized session.
340pub fn connect_to_memory_server(
341    server_options: ServerOptions,
342    pid: Option<u32>,
343) -> Result<UninitializedSession> {
344    let ThriftTransport::SharedMemory(ThriftSharedMemoryTransport { memory_name, .. }) =
345        &server_options.thrift_transport
346    else {
347        return Err(HapiError::Internal(
348            "ServerOptions is not configured for shared memory transport".to_owned(),
349        ));
350    };
351    let mem_name_cstr = CString::new(memory_name.clone())?;
352    debug!("Connecting to shared memory server: {memory_name:?}");
353    let handle = try_connect_with_timeout(
354        server_options.connection_retry_interval,
355        Duration::from_millis(100),
356        || ffi::new_thrift_shared_memory_session(&mem_name_cstr, &server_options.session_info().0),
357    )?;
358    Ok(UninitializedSession {
359        session_handle: handle,
360        server_options: Some(server_options),
361        server_pid: pid,
362    })
363}
364
365fn try_connect_with_timeout<F: Fn() -> Result<crate::ffi::raw::HAPI_Session>>(
366    timeout: Option<Duration>,
367    wait_ms: Duration,
368    f: F,
369) -> Result<crate::ffi::raw::HAPI_Session> {
370    debug!("Trying to connect to server with timeout: {timeout:?}");
371    let mut waited = Duration::from_secs(0);
372    let mut last_error = None;
373    let handle = loop {
374        match f() {
375            Ok(handle) => break handle,
376            Err(e) => {
377                error!("Error while trying to connect to server: {e:?}");
378                last_error.replace(e);
379                thread::sleep(wait_ms);
380                waited += wait_ms;
381            }
382        }
383        if let Some(timeout) = timeout
384            && waited > timeout
385        {
386            // last_error is guaranteed to be Some() because we break out of the loop if we get a result.
387            return Err(last_error.unwrap()).context(format!(
388                "Could not connect to server within timeout: {timeout:?}"
389            ));
390        }
391    };
392    Ok(handle)
393}
394
395/// Connect to the Thrift socket server and return an uninitialized session.
396pub fn connect_to_socket_server(
397    server_options: ServerOptions,
398    pid: Option<u32>,
399) -> Result<UninitializedSession> {
400    let ThriftTransport::Socket(ThriftSocketTransport { address }) =
401        &server_options.thrift_transport
402    else {
403        return Err(HapiError::Internal(
404            "ServerOptions is not configured for socket transport".to_owned(),
405        ));
406    };
407    debug!("Connecting to socket server: {address:?}");
408    let host = CString::new(address.ip().to_string())
409        .map_err(HapiError::from)
410        .context("Converting SocketAddr to CString")?;
411    let handle = try_connect_with_timeout(
412        server_options.connection_retry_interval,
413        Duration::from_millis(100),
414        || {
415            ffi::new_thrift_socket_session(
416                i32::from(address.port()),
417                &host,
418                &server_options.session_info().0,
419            )
420        },
421    )?;
422    Ok(UninitializedSession {
423        session_handle: handle,
424        server_options: Some(server_options),
425        server_pid: pid,
426    })
427}
428
429pub fn start_engine_server(server_options: &ServerOptions) -> Result<u32> {
430    let env_variables = server_options.env_variables.as_ref().map(|env_variables| {
431        env_variables
432            .iter()
433            .map(|(k, v)| (k.as_os_str(), v.as_os_str()))
434            .collect::<Vec<_>>()
435    });
436    match &server_options.thrift_transport {
437        ThriftTransport::SharedMemory(transport) => {
438            debug!(
439                "Starting shared memory server name: {}",
440                transport.memory_name
441            );
442            let memory_name = CString::new(transport.memory_name.clone())?;
443            ffi::clear_connection_error()?;
444            call_with_temp_environment(env_variables.as_deref(), || {
445                ffi::start_thrift_shared_memory_server(
446                    &memory_name,
447                    &server_options.thrift_options().0,
448                    server_options.log_file.as_deref(),
449                )
450                .with_context(|| {
451                    format!(
452                        "Failed to start shared memory server: {}",
453                        transport.memory_name
454                    )
455                })
456            })
457        }
458        ThriftTransport::Pipe(transport) => {
459            debug!(
460                "Starting named pipe server: {}",
461                transport.pipe_path.display()
462            );
463            let pipe_name = utils::path_to_cstring(&transport.pipe_path)?;
464            ffi::clear_connection_error()?;
465            call_with_temp_environment(env_variables.as_deref(), || {
466                ffi::start_thrift_pipe_server(
467                    &pipe_name,
468                    &server_options.thrift_options().0,
469                    server_options.log_file.as_deref(),
470                )
471                .with_context(|| {
472                    format!(
473                        "Failed to start pipe server: {}",
474                        transport.pipe_path.display()
475                    )
476                })
477            })
478        }
479        ThriftTransport::Socket(transport) => {
480            debug!(
481                "Starting socket server on port: {}",
482                transport.address.port()
483            );
484            ffi::clear_connection_error()?;
485            call_with_temp_environment(env_variables.as_deref(), || {
486                ffi::start_thrift_socket_server(
487                    i32::from(transport.address.port()),
488                    &server_options.thrift_options().0,
489                    server_options.log_file.as_deref(),
490                )
491            })
492        }
493    }
494}
495
496/// Start an interactive Houdini session with engine server embedded.
497pub fn start_houdini_server(
498    pipe_name: impl AsRef<str>,
499    houdini_executable: impl AsRef<Path>,
500    fx_license: bool,
501    env_variables: Option<&[(String, String)]>,
502) -> Result<Child> {
503    let mut command = Command::new(houdini_executable.as_ref());
504    call_with_temp_environment(env_variables, move || {
505        command
506            .arg(format!("-hess=pipe:{}", pipe_name.as_ref()))
507            .arg(if fx_license {
508                "-force-fx-license"
509            } else {
510                "-core"
511            })
512            .stdin(Stdio::null())
513            .stdout(Stdio::null())
514            .stderr(Stdio::null())
515            .spawn()
516            .map_err(HapiError::from)
517    })
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use std::{ffi::OsString, net::Ipv4Addr, num::NonZeroU64};
524
525    use crate::ffi::enums::StatusVerbosity;
526
527    #[test]
528    fn license_preference_display_strings() {
529        assert_eq!(
530            LicensePreference::AnyAvailable.to_string(),
531            "--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
532        );
533        assert_eq!(
534            LicensePreference::HoudiniEngineOnly.to_string(),
535            "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
536        );
537        assert_eq!(
538            LicensePreference::HoudiniEngineAndCore.to_string(),
539            "--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
540        );
541    }
542
543    #[test]
544    fn shared_memory_transport_builder_applies_options() {
545        let transport = ThriftSharedMemoryTransportBuilder::default()
546            .with_memory_name("test-memory")
547            .with_buffer_type(ThriftSharedMemoryBufferType::RingBuffer)
548            .with_buffer_size(NonZeroU64::new(512).unwrap())
549            .build();
550
551        assert_eq!(transport.memory_name, "test-memory");
552        assert_eq!(transport.buffer_type, ThriftSharedMemoryBufferType::RingBuffer);
553        assert_eq!(transport.buffer_size, 512);
554    }
555
556    #[test]
557    fn shared_memory_transport_builder_clamps_oversized_buffer() {
558        let transport = ThriftSharedMemoryTransportBuilder::default()
559            .with_buffer_size(NonZeroU64::new(i64::MAX as u64 + 1).unwrap())
560            .build();
561
562        assert_eq!(transport.buffer_size, 1024);
563    }
564
565    #[test]
566    fn server_options_shared_memory_maps_to_session_and_thrift_options() {
567        let transport = ThriftSharedMemoryTransportBuilder::default()
568            .with_buffer_type(ThriftSharedMemoryBufferType::RingBuffer)
569            .with_buffer_size(NonZeroU64::new(256).unwrap())
570            .build();
571        let options = ServerOptions::default()
572            .with_auto_close(false)
573            .with_verbosity(StatusVerbosity::Statusverbosity2)
574            .with_server_ready_timeout(5_000)
575            .with_thrift_transport(ThriftTransport::SharedMemory(transport.clone()));
576
577        let session_info = options.session_info();
578        assert_eq!(
579            session_info.shared_memory_buffer_type(),
580            ThriftSharedMemoryBufferType::RingBuffer
581        );
582        assert_eq!(session_info.shared_memory_buffer_size(), 256);
583
584        let thrift_options = options.thrift_options();
585        assert!(!thrift_options.auto_close());
586        assert_eq!(thrift_options.verbosity(), StatusVerbosity::Statusverbosity2);
587        assert_eq!(
588            thrift_options.shared_memory_buffer_type(),
589            ThriftSharedMemoryBufferType::RingBuffer
590        );
591        assert_eq!(thrift_options.shared_memory_buffer_size(), 256);
592        assert_eq!(thrift_options.timeout_ms(), 5_000.0);
593    }
594
595    #[test]
596    fn server_options_license_preference_sets_plugin_env() {
597        let options =
598            ServerOptions::default().with_license_preference(LicensePreference::HoudiniEngineOnly);
599        let env = options.env_variables.expect("env map");
600        assert_eq!(
601            env.get(&OsString::from("HOUDINI_PLUGIN_LIC_OPT")),
602            Some(&OsString::from(
603                "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
604            ))
605        );
606    }
607
608    #[test]
609    fn socket_with_defaults_preserves_address() {
610        let address = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 12_345);
611        let options = ServerOptions::socket_with_defaults(address);
612        let ThriftTransport::Socket(ThriftSocketTransport { address: actual }) =
613            options.thrift_transport
614        else {
615            panic!("expected socket transport");
616        };
617        assert_eq!(actual, address);
618    }
619
620    #[test]
621    fn connect_rejects_mismatched_transport_without_calling_hapi() {
622        let memory_options = ServerOptions::shared_memory_with_defaults();
623        let pipe_options = ServerOptions::pipe_with_defaults();
624        let socket_options =
625            ServerOptions::socket_with_defaults(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9_999));
626
627        assert!(connect_to_memory_server(pipe_options.clone(), None).is_err());
628        assert!(connect_to_pipe_server(memory_options.clone(), None).is_err());
629        assert!(connect_to_socket_server(memory_options, None).is_err());
630        assert!(connect_to_memory_server(socket_options.clone(), None).is_err());
631        assert!(connect_to_pipe_server(socket_options, None).is_err());
632    }
633}