hapi_rs/
server.rs

1use std::{
2    collections::HashMap,
3    ffi::{CString, OsStr, OsString},
4    net::SocketAddrV4,
5    num::NonZeroU64,
6    path::{Path, PathBuf},
7    process::{Child, Command, Stdio},
8    thread,
9    time::Duration,
10};
11
12use log::{debug, error, warn};
13use temp_env;
14
15use crate::{
16    errors::{ErrorContext, HapiError, Result},
17    ffi::{self, ThriftServerOptions, enums::StatusVerbosity},
18    session::UninitializedSession,
19    utils,
20};
21
22pub use crate::ffi::raw::ThriftSharedMemoryBufferType;
23
24#[derive(Copy, Clone, Debug, PartialEq, Eq)]
25pub enum LicensePreference {
26    AnyAvailable,
27    HoudiniEngineOnly,
28    HoudiniEngineAndCore,
29}
30
31impl std::fmt::Display for LicensePreference {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(
34            f,
35            "{}",
36            match self {
37                LicensePreference::AnyAvailable => {
38                    "--check-licenses=Houdini-Engine,Houdini-Escape,Houdini-Fx"
39                }
40                LicensePreference::HoudiniEngineOnly => {
41                    "--check-licenses=Houdini-Engine --skip-licenses=Houdini-Escape,Houdini-Fx"
42                }
43                LicensePreference::HoudiniEngineAndCore => {
44                    "--check-licenses=Houdini-Engine,Houdini-Escape --skip-licenses=Houdini-Fx"
45                }
46            }
47        )
48    }
49}
50
51#[derive(Clone, Debug)]
52pub struct ThriftSharedMemoryTransport {
53    pub memory_name: String,
54    pub buffer_type: ThriftSharedMemoryBufferType,
55    pub buffer_size: i64,
56}
57
58#[derive(Clone, Debug)]
59pub struct ThriftSocketTransport {
60    pub address: SocketAddrV4,
61}
62
63#[derive(Clone, Debug)]
64pub struct ThriftPipeTransport {
65    pub pipe_path: PathBuf,
66}
67
68#[derive(Clone, Debug)]
69pub enum ThriftTransport {
70    SharedMemory(ThriftSharedMemoryTransport),
71    Pipe(ThriftPipeTransport),
72    Socket(ThriftSocketTransport),
73}
74
75pub struct ThriftSharedMemoryTransportBuilder {
76    memory_name: String,
77    buffer_type: ThriftSharedMemoryBufferType,
78    buffer_size: i64,
79}
80
81impl Default for ThriftSharedMemoryTransportBuilder {
82    fn default() -> Self {
83        Self {
84            memory_name: format!("shared-memory-{}", utils::random_string(16)),
85            buffer_type: ThriftSharedMemoryBufferType::Buffer,
86            buffer_size: 1024, // MB
87        }
88    }
89}
90
91impl ThriftSharedMemoryTransportBuilder {
92    pub fn with_memory_name(mut self, name: impl Into<String>) -> Self {
93        self.memory_name = name.into();
94        self
95    }
96    pub fn with_buffer_type(mut self, buffer_type: ThriftSharedMemoryBufferType) -> Self {
97        self.buffer_type = buffer_type;
98        self
99    }
100    pub fn with_buffer_size(mut self, buffer_size: NonZeroU64) -> Self {
101        self.buffer_size = match buffer_size.get().try_into() {
102            Ok(size) => size,
103            Err(_) => {
104                // When u64 can't fit into i64, use default of 1024 MB
105                warn!(
106                    "ThriftSharedMemoryTransport buffer size is too large, using default of 1024"
107                );
108                1024
109            }
110        };
111        self
112    }
113    pub fn build(self) -> ThriftSharedMemoryTransport {
114        ThriftSharedMemoryTransport {
115            memory_name: self.memory_name,
116            buffer_type: self.buffer_type,
117            buffer_size: self.buffer_size,
118        }
119    }
120}
121
122// TODO: rename ServerConfiguration
123#[derive(Clone, Debug)]
124pub struct ServerOptions {
125    pub thrift_transport: ThriftTransport,
126    pub auto_close: bool,
127    pub verbosity: StatusVerbosity,
128    pub log_file: Option<CString>,
129    pub env_variables: Option<HashMap<OsString, OsString>>,
130    pub license_preference: Option<LicensePreference>,
131    pub connection_count: i32,
132    pub server_ready_timeout: Option<u32>,
133    pub(crate) connection_retry_interval: Option<Duration>,
134}
135
136impl Default for ServerOptions {
137    fn default() -> Self {
138        Self {
139            thrift_transport: ThriftTransport::SharedMemory(
140                ThriftSharedMemoryTransportBuilder::default().build(),
141            ),
142            auto_close: true,
143            verbosity: StatusVerbosity::Statusverbosity0,
144            log_file: None,
145            env_variables: None,
146            license_preference: None,
147            connection_count: 0,
148            server_ready_timeout: None,
149            connection_retry_interval: Some(Duration::from_secs(10)),
150        }
151    }
152}
153
154impl ServerOptions {
155    /// Create options for a shared-memory transport with a random name.
156    pub fn shared_memory_with_defaults() -> Self {
157        Self::default().with_thrift_transport(ThriftTransport::SharedMemory(
158            ThriftSharedMemoryTransportBuilder::default().build(),
159        ))
160    }
161
162    /// Create options for a named pipe transport.
163    pub fn pipe_with_defaults() -> Self {
164        Self::default().with_thrift_transport(ThriftTransport::Pipe(ThriftPipeTransport {
165            pipe_path: PathBuf::from(format!("hapi-pipe-{}", utils::random_string(16))),
166        }))
167    }
168
169    /// Create options for a socket transport.
170    pub fn socket_with_defaults(address: SocketAddrV4) -> Self {
171        Self::default()
172            .with_thrift_transport(ThriftTransport::Socket(ThriftSocketTransport { address }))
173    }
174
175    pub fn with_thrift_transport(mut self, transport: ThriftTransport) -> Self {
176        self.thrift_transport = transport;
177        self
178    }
179
180    /// Set a connection timeout used when establishing Thrift sessions.
181    pub fn with_connection_timeout(mut self, timeout: Option<Duration>) -> Self {
182        self.connection_retry_interval = timeout;
183        self
184    }
185
186    /// Set the license preference for the server.
187    /// For more information, see https://www.sidefx.com/docs/houdini//licensing/system.html
188    /// Default is No preference, the server decides which license to check out.
189    pub fn with_license_preference(mut self, license_preference: LicensePreference) -> Self {
190        self.license_preference.replace(license_preference);
191
192        self.env_variables.get_or_insert_default().insert(
193            OsString::from("HOUDINI_PLUGIN_LIC_OPT"),
194            OsString::from(license_preference.to_string()),
195        );
196
197        self
198    }
199
200    /// Set the log file for the server.
201    /// BUG: HARS 21.0.685 has a bug where the log file is always created in the working directory
202    pub fn with_log_file(mut self, file: impl AsRef<Path>) -> Self {
203        self.log_file = Some(utils::path_to_cstring(file).expect("Path to CString failed"));
204        self
205    }
206
207    /// Set **real** environment variables before the server starts.
208    /// Unlike [`crate::session::Session::set_server_var`], where the variables are set in the session after the
209    /// server starts.
210    pub fn with_env_variables<'a, I, K, V>(mut self, variables: I) -> Self
211    where
212        I: Iterator<Item = &'a (K, V)>,
213        K: Into<OsString> + Clone + 'a,
214        V: Into<OsString> + Clone + 'a,
215    {
216        self.env_variables = Some(
217            variables
218                .map(|(k, v)| (k.clone().into(), v.clone().into()))
219                .collect(),
220        );
221        self
222    }
223
224    /// Automatically close the server when the last connection drops.
225    pub fn with_auto_close(mut self, auto_close: bool) -> Self {
226        self.auto_close = auto_close;
227        self
228    }
229
230    /// Set the verbosity level for the server.
231    pub fn with_verbosity(mut self, verbosity: StatusVerbosity) -> Self {
232        self.verbosity = verbosity;
233        self
234    }
235
236    pub fn with_connection_count(mut self, connection_count: i32) -> Self {
237        // BUG: HARS 21.0.* has a bug where the connection count is not respected.
238        // If connection_count is > 0, there is a bug in HARS which prevents session creation.
239        // However, async attribute access requires a connection count > 0 according to SESI support, otherwise HARS crashes too.
240        self.connection_count = connection_count;
241        self
242    }
243
244    /// Set the timeout for the server to be ready in ms
245    /// This is the timeout for the server to initialize and be ready to accept connections.
246    pub fn with_server_ready_timeout(mut self, timeout: u32) -> Self {
247        self.server_ready_timeout.replace(timeout);
248        self
249    }
250
251    pub(crate) fn session_info(&self) -> crate::ffi::SessionInfo {
252        let mut session_info =
253            crate::ffi::SessionInfo::default().with_connection_count(self.connection_count);
254
255        if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
256            session_info.set_shared_memory_buffer_type(transport.buffer_type);
257            session_info.set_shared_memory_buffer_size(transport.buffer_size);
258        }
259
260        session_info
261    }
262
263    pub(crate) fn thrift_options(&self) -> crate::ffi::ThriftServerOptions {
264        let mut options = ThriftServerOptions::default()
265            .with_auto_close(self.auto_close)
266            .with_verbosity(self.verbosity);
267
268        if let ThriftTransport::SharedMemory(transport) = &self.thrift_transport {
269            options.set_shared_memory_buffer_type(transport.buffer_type);
270            options.set_shared_memory_buffer_size(transport.buffer_size);
271        }
272        if let Some(timeout) = self.server_ready_timeout {
273            options.set_timeout_ms(timeout as f32);
274        }
275
276        options
277    }
278}
279
280fn call_with_temp_environment<R, T, F>(variables: Option<&[(T, T)]>, f: F) -> Result<R>
281where
282    T: AsRef<OsStr>,
283    F: FnOnce() -> Result<R>,
284{
285    if let Some(env_variables) = variables {
286        let env_variables: Vec<(&OsStr, Option<&OsStr>)> = env_variables
287            .iter()
288            .map(|(k, v)| (k.as_ref(), Some(v.as_ref())))
289            .collect::<Vec<_>>();
290        temp_env::with_vars(env_variables.as_slice(), f)
291    } else {
292        f()
293    }
294}
295
296/// Connect to the Thrift pipe server and return an uninitialized session.
297pub fn connect_to_pipe_server(
298    server_options: ServerOptions,
299    pid: Option<u32>,
300) -> Result<UninitializedSession> {
301    let ThriftTransport::Pipe(ThriftPipeTransport { pipe_path }) = &server_options.thrift_transport
302    else {
303        return Err(HapiError::Internal(
304            "ServerOptions is not configured for pipe transport".to_owned(),
305        ));
306    };
307    let pipe_name = utils::path_to_cstring(pipe_path)?;
308    debug!("Connecting to pipe server: {:?}", pipe_path.display());
309    let handle = try_connect_with_timeout(
310        server_options.connection_retry_interval,
311        Duration::from_millis(100),
312        || ffi::new_thrift_piped_session(&pipe_name, &server_options.session_info().0),
313    )?;
314    Ok(UninitializedSession {
315        session_handle: handle,
316        server_options: Some(server_options),
317        server_pid: pid,
318    })
319}
320
321/// Connect to the Thrift shared memory server and return an uninitialized session.
322pub fn connect_to_memory_server(
323    server_options: ServerOptions,
324    pid: Option<u32>,
325) -> Result<UninitializedSession> {
326    let ThriftTransport::SharedMemory(ThriftSharedMemoryTransport { memory_name, .. }) =
327        &server_options.thrift_transport
328    else {
329        return Err(HapiError::Internal(
330            "ServerOptions is not configured for shared memory transport".to_owned(),
331        ));
332    };
333    let mem_name_cstr = CString::new(memory_name.clone())?;
334    debug!("Connecting to shared memory server: {:?}", memory_name);
335    let handle = try_connect_with_timeout(
336        server_options.connection_retry_interval,
337        Duration::from_millis(100),
338        || ffi::new_thrift_shared_memory_session(&mem_name_cstr, &server_options.session_info().0),
339    )?;
340    Ok(UninitializedSession {
341        session_handle: handle,
342        server_options: Some(server_options),
343        server_pid: pid,
344    })
345}
346
347fn try_connect_with_timeout<F: Fn() -> Result<crate::ffi::raw::HAPI_Session>>(
348    timeout: Option<Duration>,
349    wait_ms: Duration,
350    f: F,
351) -> Result<crate::ffi::raw::HAPI_Session> {
352    debug!("Trying to connect to server with timeout: {:?}", timeout);
353    let mut waited = Duration::from_secs(0);
354    let mut last_error = None;
355    let handle = loop {
356        match f() {
357            Ok(handle) => break handle,
358            Err(e) => {
359                error!("Error while trying to connect to server: {:?}", e);
360                last_error.replace(e);
361                thread::sleep(wait_ms);
362                waited += wait_ms;
363            }
364        }
365        if let Some(timeout) = timeout
366            && waited > timeout
367        {
368            // last_error is guaranteed to be Some() because we break out of the loop if we get a result.
369            return Err(last_error.unwrap()).context(format!(
370                "Could not connect to server within timeout: {timeout:?}"
371            ));
372        }
373    };
374    Ok(handle)
375}
376
377/// Connect to the Thrift socket server and return an uninitialized session.
378pub fn connect_to_socket_server(
379    server_options: ServerOptions,
380    pid: Option<u32>,
381) -> Result<UninitializedSession> {
382    let ThriftTransport::Socket(ThriftSocketTransport { address }) =
383        &server_options.thrift_transport
384    else {
385        return Err(HapiError::Internal(
386            "ServerOptions is not configured for socket transport".to_owned(),
387        ));
388    };
389    debug!("Connecting to socket server: {:?}", address);
390    let host = CString::new(address.ip().to_string())
391        .map_err(HapiError::from)
392        .context("Converting SocketAddr to CString")?;
393    let handle = try_connect_with_timeout(
394        server_options.connection_retry_interval,
395        Duration::from_millis(100),
396        || {
397            ffi::new_thrift_socket_session(
398                address.port() as i32,
399                &host,
400                &server_options.session_info().0,
401            )
402        },
403    )?;
404    Ok(UninitializedSession {
405        session_handle: handle,
406        server_options: Some(server_options),
407        server_pid: pid,
408    })
409}
410
411pub fn start_engine_server(server_options: &ServerOptions) -> Result<u32> {
412    let env_variables = server_options.env_variables.as_ref().map(|env_variables| {
413        env_variables
414            .iter()
415            .map(|(k, v)| (k.as_os_str(), v.as_os_str()))
416            .collect::<Vec<_>>()
417    });
418    match &server_options.thrift_transport {
419        ThriftTransport::SharedMemory(transport) => {
420            debug!(
421                "Starting shared memory server name: {}",
422                transport.memory_name
423            );
424            let memory_name = CString::new(transport.memory_name.clone())?;
425            ffi::clear_connection_error()?;
426            call_with_temp_environment(env_variables.as_deref(), || {
427                ffi::start_thrift_shared_memory_server(
428                    &memory_name,
429                    &server_options.thrift_options().0,
430                    server_options.log_file.as_deref(),
431                )
432                .with_context(|| {
433                    format!(
434                        "Failed to start shared memory server: {}",
435                        transport.memory_name
436                    )
437                })
438            })
439        }
440        ThriftTransport::Pipe(transport) => {
441            debug!("Starting named pipe server: {:?}", transport.pipe_path);
442            let pipe_name = utils::path_to_cstring(&transport.pipe_path)?;
443            ffi::clear_connection_error()?;
444            call_with_temp_environment(env_variables.as_deref(), || {
445                ffi::start_thrift_pipe_server(
446                    &pipe_name,
447                    &server_options.thrift_options().0,
448                    server_options.log_file.as_deref(),
449                )
450                .with_context(|| format!("Failed to start pipe server: {:?}", transport.pipe_path))
451            })
452        }
453        ThriftTransport::Socket(transport) => {
454            debug!(
455                "Starting socket server on port: {}",
456                transport.address.port()
457            );
458            ffi::clear_connection_error()?;
459            call_with_temp_environment(env_variables.as_deref(), || {
460                ffi::start_thrift_socket_server(
461                    transport.address.port() as i32,
462                    &server_options.thrift_options().0,
463                    server_options.log_file.as_deref(),
464                )
465            })
466        }
467    }
468}
469
470/// Start an interactive Houdini session with engine server embedded.
471pub fn start_houdini_server(
472    pipe_name: impl AsRef<str>,
473    houdini_executable: impl AsRef<Path>,
474    fx_license: bool,
475    env_variables: Option<&[(String, String)]>,
476) -> Result<Child> {
477    let mut command = Command::new(houdini_executable.as_ref());
478    call_with_temp_environment(env_variables, move || {
479        command
480            .arg(format!("-hess=pipe:{}", pipe_name.as_ref()))
481            .arg(if fx_license {
482                "-force-fx-license"
483            } else {
484                "-core"
485            })
486            .stdin(Stdio::null())
487            .stdout(Stdio::null())
488            .stderr(Stdio::null())
489            .spawn()
490            .map_err(HapiError::from)
491    })
492}