Skip to main content

videostream/
host.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2025 Au-Zone Technologies
3
4use crate::Error;
5use std::{
6    ffi::{CStr, CString},
7    io,
8    os::unix::prelude::OsStrExt,
9    path::{Path, PathBuf},
10};
11use videostream_sys as ffi;
12
13/// The Host structure provides the frame sharing functionality.  Only a single
14/// host can own frames while a host can have many Client subscribers to the
15/// frames.
16///
17/// A host is created with a socket path which it will own exclusively and
18/// allowing clients to connect in order to receive frames.
19///
20/// # Examples
21///
22/// ```no_run
23/// use videostream::host::Host;
24///
25/// # fn main() -> Result<(), videostream::Error> {
26/// let host = Host::new("/tmp/video.sock")?;
27/// println!("Host listening on: {:?}", host.path()?);
28/// # Ok(())
29/// # }
30/// ```
31pub struct Host {
32    ptr: *mut ffi::VSLHost,
33}
34
35impl std::fmt::Debug for Host {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        let path = self
38            .path()
39            .unwrap_or_else(|_| PathBuf::from("<invalid_path>"));
40        f.debug_struct("Host").field("path", &path).finish()
41    }
42}
43
44impl Host {
45    /// Creates a new Host and creates a socket at the specified path on which
46    /// it will listen for client connections.
47    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
48        let path_str_c = CString::new(path.as_ref().as_os_str().as_bytes())?;
49        let ptr = vsl!(vsl_host_init(path_str_c.as_ptr()));
50        if ptr.is_null() {
51            let err = io::Error::last_os_error();
52            return Err(err.into());
53        }
54
55        Ok(Host { ptr })
56    }
57
58    pub fn path(&self) -> Result<PathBuf, Error> {
59        let path_str_c = vsl!(vsl_host_path(self.ptr));
60        if path_str_c.is_null() {
61            return Err(Error::NullPointer);
62        }
63
64        let path_str = unsafe { CStr::from_ptr(path_str_c).to_str()? };
65        Ok(PathBuf::from(path_str))
66    }
67
68    /// Polls the host's socket connections for activity.
69    ///
70    /// Waits for socket activity (new connections or client messages) using poll().
71    /// Should be called in a loop before [`Host::process`]. The `wait` parameter
72    /// controls timeout behavior:
73    /// - `> 0`: Poll waits up to this duration in milliseconds
74    /// - `= 0`: Returns immediately
75    /// - `< 0`: Waits indefinitely
76    ///
77    /// # Arguments
78    ///
79    /// * `wait` - Timeout in milliseconds
80    ///
81    /// # Returns
82    ///
83    /// Returns the number of sockets with activity, 0 on timeout, or an error.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`Error::Io`] if the underlying poll() call fails.
88    ///
89    /// # Example
90    ///
91    /// ```no_run
92    /// use videostream::host::Host;
93    ///
94    /// let host = Host::new("/tmp/video.sock")?;
95    /// loop {
96    ///     match host.poll(1000) {
97    ///         Ok(n) if n > 0 => {
98    ///             host.process()?;
99    ///         }
100    ///         Ok(_) => {} // timeout
101    ///         Err(e) => eprintln!("Poll error: {}", e),
102    ///     }
103    /// }
104    /// # Ok::<(), videostream::Error>(())
105    /// ```
106    pub fn poll(&self, wait: i64) -> Result<i32, Error> {
107        let ret = vsl!(vsl_host_poll(self.ptr, wait));
108        if ret < 0 {
109            let err = io::Error::last_os_error();
110            return Err(err.into());
111        }
112        Ok(ret)
113    }
114
115    /// Processes host tasks: expires old frames and services one client connection.
116    ///
117    /// First expires frames past their lifetime, then services the first available
118    /// connection (accepting new clients or processing client messages). Should be
119    /// called in a loop, typically after [`Host::poll`] indicates activity.
120    ///
121    /// # Returns
122    ///
123    /// Returns `Ok(())` on success.
124    ///
125    /// # Errors
126    ///
127    /// Returns [`Error::Io`] if processing fails. Common errors include `ETIMEDOUT`
128    /// if no activity is available.
129    ///
130    /// # Example
131    ///
132    /// ```no_run
133    /// use videostream::host::Host;
134    ///
135    /// let host = Host::new("/tmp/video.sock")?;
136    /// loop {
137    ///     if host.poll(1000)? > 0 {
138    ///         host.process()?;
139    ///     }
140    /// }
141    /// # Ok::<(), videostream::Error>(())
142    /// ```
143    pub fn process(&self) -> Result<(), Error> {
144        let ret = vsl!(vsl_host_process(self.ptr));
145        if ret < 0 {
146            let err = io::Error::last_os_error();
147            return Err(err.into());
148        }
149        Ok(())
150    }
151
152    /// Services a single client socket.
153    ///
154    /// Processes messages from a specific client socket. Does not accept new
155    /// connections - use [`Host::process`] for that. Useful when you need to
156    /// track errors for individual clients.
157    ///
158    /// # Arguments
159    ///
160    /// * `sock` - The client socket file descriptor to service
161    ///
162    /// # Returns
163    ///
164    /// Returns `Ok(())` on success.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`Error::Io`] on failure. Common errors include `EPIPE` if the
169    /// client has disconnected.
170    ///
171    /// # Example
172    ///
173    /// ```no_run
174    /// use videostream::host::Host;
175    ///
176    /// let host = Host::new("/tmp/video.sock")?;
177    /// let sockets = host.sockets()?;
178    ///
179    /// // Service each client socket individually
180    /// for sock in &sockets[1..] { // Skip listening socket
181    ///     if let Err(e) = host.service(*sock) {
182    ///         eprintln!("Error servicing socket {}: {}", sock, e);
183    ///     }
184    /// }
185    /// # Ok::<(), videostream::Error>(())
186    /// ```
187    pub fn service(&self, sock: i32) -> Result<(), Error> {
188        let ret = vsl!(vsl_host_service(self.ptr, sock));
189        if ret < 0 {
190            let err = io::Error::last_os_error();
191            return Err(err.into());
192        }
193        Ok(())
194    }
195
196    /// Requests a copy of the sockets managed by the host.
197    ///
198    /// Returns socket file descriptors for the host's listening socket and all
199    /// connected client sockets. The first socket is always the listening socket.
200    /// The array should be refreshed frequently as sockets may become stale.
201    ///
202    /// Thread-safe: allows one thread to use sockets for messaging while another
203    /// polls for reads.
204    ///
205    /// # Returns
206    ///
207    /// Returns a vector of socket file descriptors. The first entry is the
208    /// listening socket, followed by client sockets.
209    ///
210    /// # Errors
211    ///
212    /// Returns [`Error::Io`] if the operation fails.
213    ///
214    /// # Example
215    ///
216    /// ```no_run
217    /// use videostream::host::Host;
218    ///
219    /// let host = Host::new("/tmp/video.sock")?;
220    /// let sockets = host.sockets()?;
221    /// println!("Listening socket: {}", sockets[0]);
222    /// println!("Number of clients: {}", sockets.len() - 1);
223    /// # Ok::<(), videostream::Error>(())
224    /// ```
225    pub fn sockets(&self) -> Result<Vec<i32>, Error> {
226        // First call to get the required size
227        let mut max_sockets: usize = 0;
228        let _ret = vsl!(vsl_host_sockets(
229            self.ptr,
230            0,
231            std::ptr::null_mut(),
232            &mut max_sockets as *mut usize
233        ));
234
235        if max_sockets == 0 {
236            return Ok(Vec::new());
237        }
238
239        // Allocate buffer and get actual sockets
240        let mut sockets = vec![0i32; max_sockets];
241        let ret = vsl!(vsl_host_sockets(
242            self.ptr,
243            max_sockets,
244            sockets.as_mut_ptr(),
245            std::ptr::null_mut()
246        ));
247
248        if ret < 0 {
249            let err = io::Error::last_os_error();
250            return Err(err.into());
251        }
252
253        Ok(sockets)
254    }
255
256    /// Posts a frame to all connected clients.
257    ///
258    /// Transfers ownership of the frame to the host. The frame is broadcast to all
259    /// connected clients and will be automatically released when it expires. Do not
260    /// use frames after posting them to the host, as ownership has been transferred
261    /// and the host will manage their lifecycle.
262    ///
263    /// # Arguments
264    ///
265    /// * `frame` - Frame to post (ownership transferred to host)
266    /// * `expires` - Expiration time in nanoseconds (absolute, from [`crate::timestamp`])
267    /// * `duration` - Frame duration in nanoseconds (-1 if unknown)
268    /// * `pts` - Presentation timestamp in nanoseconds (-1 if unknown)
269    /// * `dts` - Decode timestamp in nanoseconds (-1 if unknown)
270    ///
271    /// # Returns
272    ///
273    /// Returns `Ok(())` on success.
274    ///
275    /// # Errors
276    ///
277    /// Returns [`Error::Io`] if posting fails.
278    ///
279    /// # Example
280    ///
281    /// ```no_run
282    /// use videostream::{host::Host, frame::Frame, timestamp};
283    ///
284    /// let host = Host::new("/tmp/video.sock")?;
285    /// let frame = Frame::new(1920, 1080, 1920 * 2, "YUYV")?;
286    /// frame.alloc(None)?;
287    ///
288    /// let now = timestamp()?;
289    /// let expires = now + 1_000_000_000; // 1 second
290    /// host.post(frame, expires, -1, -1, -1)?;
291    /// # Ok::<(), videostream::Error>(())
292    /// ```
293    pub fn post(
294        &self,
295        frame: crate::frame::Frame,
296        expires: i64,
297        duration: i64,
298        pts: i64,
299        dts: i64,
300    ) -> Result<(), Error> {
301        let frame_ptr = frame.get_ptr();
302
303        let ret = vsl!(vsl_host_post(
304            self.ptr, frame_ptr, expires, duration, pts, dts
305        ));
306        if ret < 0 {
307            let err = io::Error::last_os_error();
308            return Err(err.into());
309        }
310
311        // Only transfer ownership after successful posting
312        std::mem::forget(frame);
313        Ok(())
314    }
315
316    /// Drops a frame from the host.
317    ///
318    /// Removes the host association of the frame and returns ownership to the
319    /// caller. Can be used to cancel a previously posted frame before it expires.
320    ///
321    /// # Arguments
322    ///
323    /// * `frame` - Frame to drop from host (must be owned by this host)
324    ///
325    /// # Returns
326    ///
327    /// Returns `Ok(())` on success.
328    ///
329    /// # Errors
330    ///
331    /// Returns [`Error::Io`] if the operation fails.
332    ///
333    /// # Example
334    ///
335    /// ```no_run
336    /// use videostream::{host::Host, frame::Frame};
337    ///
338    /// let host = Host::new("/tmp/video.sock")?;
339    /// let frame = Frame::new(640, 480, 0, "RGB3")?;
340    /// frame.alloc(None)?;
341    ///
342    /// // Cancel the frame before posting it
343    /// host.drop_frame(&frame)?;
344    /// // If you want to post the frame, do so after cancellation is not needed
345    /// // host.post(frame, expires, -1, -1, -1)?;
346    /// # Ok::<(), videostream::Error>(())
347    /// ```
348    pub fn drop_frame(&self, frame: &crate::frame::Frame) -> Result<(), Error> {
349        let ret = vsl!(vsl_host_drop(self.ptr, frame.get_ptr()));
350        if ret < 0 {
351            let err = io::Error::last_os_error();
352            return Err(err.into());
353        }
354        Ok(())
355    }
356}
357
358impl Drop for Host {
359    fn drop(&mut self) {
360        if let Ok(lib) = ffi::init() {
361            unsafe {
362                lib.vsl_host_release(self.ptr);
363            }
364        }
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use std::path::PathBuf;
372
373    /// Helper to create a unique socket path for each test.
374    /// Uses process ID and thread ID to ensure uniqueness across parallel test runs.
375    fn test_socket_path(name: &str) -> PathBuf {
376        PathBuf::from(format!(
377            "/tmp/vsl_host_{}_{}_{:?}.sock",
378            name,
379            std::process::id(),
380            std::thread::current().id()
381        ))
382    }
383
384    #[test]
385    fn test_host() {
386        let path = test_socket_path("basic");
387        let host = Host::new(&path).unwrap();
388        assert_eq!(path, host.path().unwrap());
389        assert!(path.exists());
390        // Rust doesn't provide an is_socket but we at least confirm some things it is
391        // not.
392        assert!(!path.is_file());
393        assert!(!path.is_dir());
394        assert!(!path.is_symlink());
395
396        // FIXME: currently the library will unlink old sockets, this should be
397        // corrected along with adding proper cleanup and error handling when a
398        // socket is already present.
399        //
400        // Creating a second host at the same path should raise an error.
401        // let host2 = Host::new(&path);
402        // assert!(host2.is_err());
403    }
404
405    #[test]
406    fn test_host_sockets() {
407        let path = test_socket_path("sockets");
408        let host = Host::new(&path).unwrap();
409
410        // Should have at least the listening socket
411        let sockets = host.sockets().unwrap();
412        assert!(
413            !sockets.is_empty(),
414            "Expected at least 1 socket (listening socket)"
415        );
416
417        // The first socket should be the listening socket and be a valid FD
418        assert!(sockets[0] >= 0, "Listening socket FD should be >= 0");
419    }
420
421    #[test]
422    fn test_host_poll_timeout() {
423        let path = test_socket_path("poll");
424        let host = Host::new(&path).unwrap();
425
426        // Poll with immediate timeout should return 0 (no activity)
427        let result = host.poll(0).unwrap();
428        assert_eq!(result, 0, "Poll with 0 timeout should return 0");
429    }
430
431    #[test]
432    fn test_host_process() {
433        let path = test_socket_path("process");
434        let host = Host::new(&path).unwrap();
435
436        // Process should handle timeout when no activity
437        match host.process() {
438            Ok(_) | Err(Error::Io(_)) => {} // Both are acceptable
439            Err(e) => panic!("Unexpected error: {}", e),
440        }
441    }
442
443    #[test]
444    fn test_host_drop_frame() {
445        let path = test_socket_path("drop_frame");
446        let host = Host::new(&path).unwrap();
447
448        let frame = crate::frame::Frame::new(640, 480, 0, "RGB3").unwrap();
449        frame.alloc(None).unwrap();
450
451        // Test drop_frame on an allocated frame
452        // The behavior depends on the C API - may succeed or fail
453        // depending on whether the frame is tracked by the host
454        let _ = host.drop_frame(&frame);
455    }
456
457    #[test]
458    fn test_host_debug() {
459        let path = test_socket_path("debug");
460        let host = Host::new(&path).unwrap();
461        let debug_str = format!("{:?}", host);
462
463        // Debug output should contain Host and path info
464        assert!(debug_str.contains("Host"));
465        assert!(debug_str.contains("debug"));
466    }
467}