videostream/host.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2025 Au-Zone Technologies
3
4use crate::Error;
5use std::{
6 ffi::{CStr, CString},
7 io,
8 os::unix::prelude::OsStrExt,
9 path::{Path, PathBuf},
10};
11use videostream_sys as ffi;
12
13/// The Host structure provides the frame sharing functionality. Only a single
14/// host can own frames while a host can have many Client subscribers to the
15/// frames.
16///
17/// A host is created with a socket path which it will own exclusively and
18/// allowing clients to connect in order to receive frames.
19///
20/// # Examples
21///
22/// ```no_run
23/// use videostream::host::Host;
24///
25/// # fn main() -> Result<(), videostream::Error> {
26/// let host = Host::new("/tmp/video.sock")?;
27/// println!("Host listening on: {:?}", host.path()?);
28/// # Ok(())
29/// # }
30/// ```
31pub struct Host {
32 ptr: *mut ffi::VSLHost,
33}
34
35impl std::fmt::Debug for Host {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 let path = self
38 .path()
39 .unwrap_or_else(|_| PathBuf::from("<invalid_path>"));
40 f.debug_struct("Host").field("path", &path).finish()
41 }
42}
43
44impl Host {
45 /// Creates a new Host and creates a socket at the specified path on which
46 /// it will listen for client connections.
47 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
48 let path_str_c = CString::new(path.as_ref().as_os_str().as_bytes())?;
49 let ptr = vsl!(vsl_host_init(path_str_c.as_ptr()));
50 if ptr.is_null() {
51 let err = io::Error::last_os_error();
52 return Err(err.into());
53 }
54
55 Ok(Host { ptr })
56 }
57
58 pub fn path(&self) -> Result<PathBuf, Error> {
59 let path_str_c = vsl!(vsl_host_path(self.ptr));
60 if path_str_c.is_null() {
61 return Err(Error::NullPointer);
62 }
63
64 let path_str = unsafe { CStr::from_ptr(path_str_c).to_str()? };
65 Ok(PathBuf::from(path_str))
66 }
67
68 /// Polls the host's socket connections for activity.
69 ///
70 /// Waits for socket activity (new connections or client messages) using poll().
71 /// Should be called in a loop before [`Host::process`]. The `wait` parameter
72 /// controls timeout behavior:
73 /// - `> 0`: Poll waits up to this duration in milliseconds
74 /// - `= 0`: Returns immediately
75 /// - `< 0`: Waits indefinitely
76 ///
77 /// # Arguments
78 ///
79 /// * `wait` - Timeout in milliseconds
80 ///
81 /// # Returns
82 ///
83 /// Returns the number of sockets with activity, 0 on timeout, or an error.
84 ///
85 /// # Errors
86 ///
87 /// Returns [`Error::Io`] if the underlying poll() call fails.
88 ///
89 /// # Example
90 ///
91 /// ```no_run
92 /// use videostream::host::Host;
93 ///
94 /// let host = Host::new("/tmp/video.sock")?;
95 /// loop {
96 /// match host.poll(1000) {
97 /// Ok(n) if n > 0 => {
98 /// host.process()?;
99 /// }
100 /// Ok(_) => {} // timeout
101 /// Err(e) => eprintln!("Poll error: {}", e),
102 /// }
103 /// }
104 /// # Ok::<(), videostream::Error>(())
105 /// ```
106 pub fn poll(&self, wait: i64) -> Result<i32, Error> {
107 let ret = vsl!(vsl_host_poll(self.ptr, wait));
108 if ret < 0 {
109 let err = io::Error::last_os_error();
110 return Err(err.into());
111 }
112 Ok(ret)
113 }
114
115 /// Processes host tasks: expires old frames and services one client connection.
116 ///
117 /// First expires frames past their lifetime, then services the first available
118 /// connection (accepting new clients or processing client messages). Should be
119 /// called in a loop, typically after [`Host::poll`] indicates activity.
120 ///
121 /// # Returns
122 ///
123 /// Returns `Ok(())` on success.
124 ///
125 /// # Errors
126 ///
127 /// Returns [`Error::Io`] if processing fails. Common errors include `ETIMEDOUT`
128 /// if no activity is available.
129 ///
130 /// # Example
131 ///
132 /// ```no_run
133 /// use videostream::host::Host;
134 ///
135 /// let host = Host::new("/tmp/video.sock")?;
136 /// loop {
137 /// if host.poll(1000)? > 0 {
138 /// host.process()?;
139 /// }
140 /// }
141 /// # Ok::<(), videostream::Error>(())
142 /// ```
143 pub fn process(&self) -> Result<(), Error> {
144 let ret = vsl!(vsl_host_process(self.ptr));
145 if ret < 0 {
146 let err = io::Error::last_os_error();
147 return Err(err.into());
148 }
149 Ok(())
150 }
151
152 /// Services a single client socket.
153 ///
154 /// Processes messages from a specific client socket. Does not accept new
155 /// connections - use [`Host::process`] for that. Useful when you need to
156 /// track errors for individual clients.
157 ///
158 /// # Arguments
159 ///
160 /// * `sock` - The client socket file descriptor to service
161 ///
162 /// # Returns
163 ///
164 /// Returns `Ok(())` on success.
165 ///
166 /// # Errors
167 ///
168 /// Returns [`Error::Io`] on failure. Common errors include `EPIPE` if the
169 /// client has disconnected.
170 ///
171 /// # Example
172 ///
173 /// ```no_run
174 /// use videostream::host::Host;
175 ///
176 /// let host = Host::new("/tmp/video.sock")?;
177 /// let sockets = host.sockets()?;
178 ///
179 /// // Service each client socket individually
180 /// for sock in &sockets[1..] { // Skip listening socket
181 /// if let Err(e) = host.service(*sock) {
182 /// eprintln!("Error servicing socket {}: {}", sock, e);
183 /// }
184 /// }
185 /// # Ok::<(), videostream::Error>(())
186 /// ```
187 pub fn service(&self, sock: i32) -> Result<(), Error> {
188 let ret = vsl!(vsl_host_service(self.ptr, sock));
189 if ret < 0 {
190 let err = io::Error::last_os_error();
191 return Err(err.into());
192 }
193 Ok(())
194 }
195
196 /// Requests a copy of the sockets managed by the host.
197 ///
198 /// Returns socket file descriptors for the host's listening socket and all
199 /// connected client sockets. The first socket is always the listening socket.
200 /// The array should be refreshed frequently as sockets may become stale.
201 ///
202 /// Thread-safe: allows one thread to use sockets for messaging while another
203 /// polls for reads.
204 ///
205 /// # Returns
206 ///
207 /// Returns a vector of socket file descriptors. The first entry is the
208 /// listening socket, followed by client sockets.
209 ///
210 /// # Errors
211 ///
212 /// Returns [`Error::Io`] if the operation fails.
213 ///
214 /// # Example
215 ///
216 /// ```no_run
217 /// use videostream::host::Host;
218 ///
219 /// let host = Host::new("/tmp/video.sock")?;
220 /// let sockets = host.sockets()?;
221 /// println!("Listening socket: {}", sockets[0]);
222 /// println!("Number of clients: {}", sockets.len() - 1);
223 /// # Ok::<(), videostream::Error>(())
224 /// ```
225 pub fn sockets(&self) -> Result<Vec<i32>, Error> {
226 // First call to get the required size
227 let mut max_sockets: usize = 0;
228 let _ret = vsl!(vsl_host_sockets(
229 self.ptr,
230 0,
231 std::ptr::null_mut(),
232 &mut max_sockets as *mut usize
233 ));
234
235 if max_sockets == 0 {
236 return Ok(Vec::new());
237 }
238
239 // Allocate buffer and get actual sockets
240 let mut sockets = vec![0i32; max_sockets];
241 let ret = vsl!(vsl_host_sockets(
242 self.ptr,
243 max_sockets,
244 sockets.as_mut_ptr(),
245 std::ptr::null_mut()
246 ));
247
248 if ret < 0 {
249 let err = io::Error::last_os_error();
250 return Err(err.into());
251 }
252
253 Ok(sockets)
254 }
255
256 /// Posts a frame to all connected clients.
257 ///
258 /// Transfers ownership of the frame to the host. The frame is broadcast to all
259 /// connected clients and will be automatically released when it expires. Do not
260 /// use frames after posting them to the host, as ownership has been transferred
261 /// and the host will manage their lifecycle.
262 ///
263 /// # Arguments
264 ///
265 /// * `frame` - Frame to post (ownership transferred to host)
266 /// * `expires` - Expiration time in nanoseconds (absolute, from [`crate::timestamp`])
267 /// * `duration` - Frame duration in nanoseconds (-1 if unknown)
268 /// * `pts` - Presentation timestamp in nanoseconds (-1 if unknown)
269 /// * `dts` - Decode timestamp in nanoseconds (-1 if unknown)
270 ///
271 /// # Returns
272 ///
273 /// Returns `Ok(())` on success.
274 ///
275 /// # Errors
276 ///
277 /// Returns [`Error::Io`] if posting fails.
278 ///
279 /// # Example
280 ///
281 /// ```no_run
282 /// use videostream::{host::Host, frame::Frame, timestamp};
283 ///
284 /// let host = Host::new("/tmp/video.sock")?;
285 /// let frame = Frame::new(1920, 1080, 1920 * 2, "YUYV")?;
286 /// frame.alloc(None)?;
287 ///
288 /// let now = timestamp()?;
289 /// let expires = now + 1_000_000_000; // 1 second
290 /// host.post(frame, expires, -1, -1, -1)?;
291 /// # Ok::<(), videostream::Error>(())
292 /// ```
293 pub fn post(
294 &self,
295 frame: crate::frame::Frame,
296 expires: i64,
297 duration: i64,
298 pts: i64,
299 dts: i64,
300 ) -> Result<(), Error> {
301 let frame_ptr = frame.get_ptr();
302
303 let ret = vsl!(vsl_host_post(
304 self.ptr, frame_ptr, expires, duration, pts, dts
305 ));
306 if ret < 0 {
307 let err = io::Error::last_os_error();
308 return Err(err.into());
309 }
310
311 // Only transfer ownership after successful posting
312 std::mem::forget(frame);
313 Ok(())
314 }
315
316 /// Drops a frame from the host.
317 ///
318 /// Removes the host association of the frame and returns ownership to the
319 /// caller. Can be used to cancel a previously posted frame before it expires.
320 ///
321 /// # Arguments
322 ///
323 /// * `frame` - Frame to drop from host (must be owned by this host)
324 ///
325 /// # Returns
326 ///
327 /// Returns `Ok(())` on success.
328 ///
329 /// # Errors
330 ///
331 /// Returns [`Error::Io`] if the operation fails.
332 ///
333 /// # Example
334 ///
335 /// ```no_run
336 /// use videostream::{host::Host, frame::Frame};
337 ///
338 /// let host = Host::new("/tmp/video.sock")?;
339 /// let frame = Frame::new(640, 480, 0, "RGB3")?;
340 /// frame.alloc(None)?;
341 ///
342 /// // Cancel the frame before posting it
343 /// host.drop_frame(&frame)?;
344 /// // If you want to post the frame, do so after cancellation is not needed
345 /// // host.post(frame, expires, -1, -1, -1)?;
346 /// # Ok::<(), videostream::Error>(())
347 /// ```
348 pub fn drop_frame(&self, frame: &crate::frame::Frame) -> Result<(), Error> {
349 let ret = vsl!(vsl_host_drop(self.ptr, frame.get_ptr()));
350 if ret < 0 {
351 let err = io::Error::last_os_error();
352 return Err(err.into());
353 }
354 Ok(())
355 }
356}
357
358impl Drop for Host {
359 fn drop(&mut self) {
360 if let Ok(lib) = ffi::init() {
361 unsafe {
362 lib.vsl_host_release(self.ptr);
363 }
364 }
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use std::path::PathBuf;
372
373 /// Helper to create a unique socket path for each test.
374 /// Uses process ID and thread ID to ensure uniqueness across parallel test runs.
375 fn test_socket_path(name: &str) -> PathBuf {
376 PathBuf::from(format!(
377 "/tmp/vsl_host_{}_{}_{:?}.sock",
378 name,
379 std::process::id(),
380 std::thread::current().id()
381 ))
382 }
383
384 #[test]
385 fn test_host() {
386 let path = test_socket_path("basic");
387 let host = Host::new(&path).unwrap();
388 assert_eq!(path, host.path().unwrap());
389 assert!(path.exists());
390 // Rust doesn't provide an is_socket but we at least confirm some things it is
391 // not.
392 assert!(!path.is_file());
393 assert!(!path.is_dir());
394 assert!(!path.is_symlink());
395
396 // FIXME: currently the library will unlink old sockets, this should be
397 // corrected along with adding proper cleanup and error handling when a
398 // socket is already present.
399 //
400 // Creating a second host at the same path should raise an error.
401 // let host2 = Host::new(&path);
402 // assert!(host2.is_err());
403 }
404
405 #[test]
406 fn test_host_sockets() {
407 let path = test_socket_path("sockets");
408 let host = Host::new(&path).unwrap();
409
410 // Should have at least the listening socket
411 let sockets = host.sockets().unwrap();
412 assert!(
413 !sockets.is_empty(),
414 "Expected at least 1 socket (listening socket)"
415 );
416
417 // The first socket should be the listening socket and be a valid FD
418 assert!(sockets[0] >= 0, "Listening socket FD should be >= 0");
419 }
420
421 #[test]
422 fn test_host_poll_timeout() {
423 let path = test_socket_path("poll");
424 let host = Host::new(&path).unwrap();
425
426 // Poll with immediate timeout should return 0 (no activity)
427 let result = host.poll(0).unwrap();
428 assert_eq!(result, 0, "Poll with 0 timeout should return 0");
429 }
430
431 #[test]
432 fn test_host_process() {
433 let path = test_socket_path("process");
434 let host = Host::new(&path).unwrap();
435
436 // Process should handle timeout when no activity
437 match host.process() {
438 Ok(_) | Err(Error::Io(_)) => {} // Both are acceptable
439 Err(e) => panic!("Unexpected error: {}", e),
440 }
441 }
442
443 #[test]
444 fn test_host_drop_frame() {
445 let path = test_socket_path("drop_frame");
446 let host = Host::new(&path).unwrap();
447
448 let frame = crate::frame::Frame::new(640, 480, 0, "RGB3").unwrap();
449 frame.alloc(None).unwrap();
450
451 // Test drop_frame on an allocated frame
452 // The behavior depends on the C API - may succeed or fail
453 // depending on whether the frame is tracked by the host
454 let _ = host.drop_frame(&frame);
455 }
456
457 #[test]
458 fn test_host_debug() {
459 let path = test_socket_path("debug");
460 let host = Host::new(&path).unwrap();
461 let debug_str = format!("{:?}", host);
462
463 // Debug output should contain Host and path info
464 assert!(debug_str.contains("Host"));
465 assert!(debug_str.contains("debug"));
466 }
467}