ffmpeg_sidecar/
named_pipes.rs

1//! Cross-platform abstraction over Windows async named pipes and Unix FIFO.
2//!
3//! The primary use-case is streaming multiple outputs from FFmpeg into a Rust program.
4//! For more commentary and end-to-end usage, see `examples/named_pipes.rs`:
5//! <https://github.com/nathanbabcock/ffmpeg-sidecar/blob/main/examples/named_pipes.rs>
6
7use anyhow::Result;
8use std::io::Read;
9
10/// On Windows, prepend the pipe name with `\\.\pipe\`.
11/// On Unix, return the name as-is.
12#[macro_export]
13macro_rules! pipe_name {
14  ($name:expr) => {
15    if cfg!(windows) {
16      concat!(r#"\\.\pipe\"#, $name)
17    } else {
18      $name
19    }
20  };
21}
22
23/// Windows-only; an FFI pointer to a named pipe handle.
24#[cfg(windows)]
25pub struct NamedPipeHandle(*mut winapi::ctypes::c_void);
26
27/// <https://github.com/retep998/winapi-rs/issues/396>
28#[cfg(windows)]
29unsafe impl Send for NamedPipeHandle {}
30
31/// Cross-platform abstraction over Windows async named pipes and Unix FIFO.
32pub struct NamedPipe {
33  /// The name that the pipe was opened with. It will start with `\\.\pipe\` on Windows.
34  pub name: String,
35
36  /// Windows-only; an FFI pointer to a named pipe handle.
37  #[cfg(windows)]
38  pub handle: NamedPipeHandle,
39
40  /// Unix-only; a blocking file handle to the FIFO.
41  #[cfg(unix)]
42  pub file: std::fs::File,
43}
44
45#[cfg(windows)]
46impl NamedPipe {
47  /// On Windows the pipe name must be in the format `\\.\pipe\{pipe_name}`.
48  /// @see <https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createnamedpipew>
49  pub fn new<S: AsRef<str>>(pipe_name: S) -> Result<Self> {
50    use std::ffi::OsStr;
51    use std::os::windows::ffi::OsStrExt;
52    use std::ptr::null_mut;
53    use winapi::um::namedpipeapi::CreateNamedPipeW;
54    use winapi::um::winbase::{PIPE_ACCESS_DUPLEX, PIPE_TYPE_BYTE, PIPE_WAIT};
55
56    let path_wide: Vec<u16> = OsStr::new(pipe_name.as_ref())
57      .encode_wide()
58      .chain(Some(0))
59      .collect();
60
61    let handle = unsafe {
62      CreateNamedPipeW(
63        path_wide.as_ptr(),
64        PIPE_ACCESS_DUPLEX,
65        PIPE_TYPE_BYTE | PIPE_WAIT,
66        1,
67        1024 * 1024 * 64,
68        1024 * 1024 * 64,
69        0, // "A value of zero will result in a default time-out of 50 milliseconds."
70        null_mut(),
71      )
72    };
73
74    if handle == winapi::um::handleapi::INVALID_HANDLE_VALUE {
75      anyhow::bail!("Failed to create named pipe");
76    }
77
78    Ok(Self {
79      handle: NamedPipeHandle(handle),
80      name: pipe_name.as_ref().to_string(),
81    })
82  }
83}
84
85#[cfg(windows)]
86impl Drop for NamedPipe {
87  fn drop(&mut self) {
88    unsafe {
89      winapi::um::handleapi::CloseHandle(self.handle.0);
90    }
91  }
92}
93
94#[cfg(windows)]
95impl Read for NamedPipe {
96  fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
97    use std::io::Error;
98    use std::ptr::null_mut;
99    use winapi::{
100      shared::minwindef::{DWORD, LPVOID},
101      um::fileapi::ReadFile,
102    };
103
104    let mut bytes_read: DWORD = 0;
105    unsafe {
106      let read_status = ReadFile(
107        self.handle.0,
108        buf.as_mut_ptr() as LPVOID,
109        buf.len() as DWORD,
110        &mut bytes_read,
111        null_mut(),
112      );
113      if read_status == 0 {
114        let error = Error::last_os_error();
115        if error.raw_os_error() == Some(109) {
116          // pipe has been closed since last read
117          return Ok(0);
118        } else {
119          return std::io::Result::Err(error);
120        }
121      }
122    };
123
124    Ok(bytes_read as usize)
125  }
126}
127
128// The unix implementation is comparatively quite simple...
129
130#[cfg(unix)]
131impl NamedPipe {
132  pub fn new<S: AsRef<str>>(pipe_name: S) -> Result<Self> {
133    use nix::{fcntl::OFlag, sys::stat, unistd};
134    use std::os::fd::AsRawFd;
135    use std::os::unix::fs::OpenOptionsExt;
136    unistd::mkfifo(pipe_name.as_ref(), stat::Mode::S_IRWXU)?;
137
138    // Open in non-blocking mode so the function completes
139    let file = std::fs::OpenOptions::new()
140      .read(true)
141      .custom_flags(OFlag::O_NONBLOCK.bits())
142      .open(pipe_name.as_ref())?;
143
144    // Switch to blocking mode so it doesn't read too early
145    let fd = AsRawFd::as_raw_fd(&file);
146    nix::fcntl::fcntl(fd, nix::fcntl::FcntlArg::F_SETFL(OFlag::empty()))?;
147
148    Ok(Self {
149      file,
150      name: pipe_name.as_ref().to_string(),
151    })
152  }
153}
154
155#[cfg(unix)]
156impl Read for NamedPipe {
157  fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
158    self.file.read(buf)
159  }
160}
161
162#[cfg(unix)]
163impl Drop for NamedPipe {
164  fn drop(&mut self) {
165    use nix::unistd;
166    use std::path::Path;
167    unistd::unlink(Path::new(&self.name)).ok();
168  }
169}