io_mux/lib.rs
1#![forbid(missing_docs)]
2/*!
3A Mux provides a single receive end and multiple send ends. Data sent to any of the send ends comes
4out the receive end, in order, tagged by the sender.
5
6Each send end works as a file descriptor. For instance, with `io-mux` you can collect stdout and
7stderr from a process, and highlight any error output from stderr, while preserving the relative
8order of data across both stdout and stderr.
9
10Note that reading provides no "EOF" indication; if no further data arrives, it
11will block forever. Avoid reading after the source of the data exits.
12
13# Example
14
15```
16# use std::io::Write;
17# fn main() -> std::io::Result<()> {
18use io_mux::{Mux, TaggedData};
19let mut mux = Mux::new()?;
20
21let (out_tag, out_sender) = mux.make_sender()?;
22let (err_tag, err_sender) = mux.make_sender()?;
23let mut child = std::process::Command::new("sh")
24 .arg("-c")
25 .arg("echo out1 && echo err1 1>&2 && echo out2")
26 .stdout(out_sender)
27 .stderr(err_sender)
28 .spawn()?;
29
30let (done_tag, mut done_sender) = mux.make_sender()?;
31std::thread::spawn(move || match child.wait() {
32 Ok(status) if status.success() => {
33 let _ = write!(done_sender, "Done\n");
34 }
35 Ok(status) => {
36 let _ = write!(done_sender, "Child process failed\n");
37 }
38 Err(e) => {
39 let _ = write!(done_sender, "Error: {:?}\n", e);
40 }
41});
42
43let mut done = false;
44while !done {
45 let TaggedData { data, tag } = mux.read()?;
46 if tag == out_tag {
47 print!("out: ");
48 } else if tag == err_tag {
49 print!("err: ");
50 } else if tag == done_tag {
51 done = true;
52 } else {
53 panic!("Unexpected tag");
54 }
55 std::io::stdout().write_all(data)?;
56}
57# Ok(())
58# }
59```
60
61# async
62
63If you enable the `async` feature, `io-mux` additionally provides an `AsyncMux` type, which allows
64processing data asynchronously.
65
66You may want to use this with [async-process](https://crates.io/crates/async-process) or
67[async-pidfd](https://crates.io/crates/async-pidfd) to concurrently wait on the exit of a process
68and the muxed output and error of that process. Until the process exits, call `AsyncMux::read()` to
69get the next bit of output, awaiting that concurrently with the exit of the process. Once the
70process exits and will thus produce no further output, call `AsyncMux::read_nonblock` until it
71returns `None` to drain the remaining output out of the mux.
72
73# Internals
74
75Internally, `Mux` creates a UNIX datagram socket for the receive end, and a separate UNIX datagram
76socket for each sender. Datagram sockets support `recvfrom`, which provides the address of the
77sender, so `Mux::read` can use the sender address as the tag for the packet received.
78
79However, datagram sockets require reading an entire datagram with each `recvfrom` call, so
80`Mux::read` needs to find out the size of the next datagram before calling `recvfrom`. Linux
81supports directly asking for the next packet size using `recv` with `MSG_PEEK | MSG_TRUNC`. On
82other UNIX systems, we have to repeatedly call `recv` with `MSG_PEEK` and an increasingly large
83buffer, until we receive the entire packet, then make one more call without `MSG_PEEK` to tell the
84OS to discard it.
85
86`Mux` creates UNIX sockets within a temporary directory, removed when dropping the `Mux`.
87
88Note that `Mux::read` cannot provide any indication of end-of-file. When using `Mux`, you will need
89to have some other indication that no further output will arrive, such as the exit of the child
90process producing output.
91
92# Portability
93Mux can theoretically run on any UNIX system. However, on some non-Linux systems, when the buffers
94for a UNIX socket fill up, writing to the UNIX socket may return an `ENOBUFS` error rather than
95blocking. Thus, on non-Linux systems, the process writing to a `MuxSender` may encounter an error
96if the receiving process does not process its buffers quickly enough. This does not match the
97behavior of a pipe. As this may result in surprising behavior, by default io-mux does not compile
98on non-Linux systems. If you want to use io-mux on a non-Linux system, and your use case does not
99need the same semantics as a pipe, and *in particular* it will not cause a problem in your use case
100if writing to a `MuxSender` may produce an `ENOBUFS` error if you do not read from the receive end
101quickly enough, then you can compile `io-mux` on non-Linux platforms by enabling the
102`experimental-unix-support` feature of `io-mux`.
103
104If you have another UNIX platform which blocks on writes to a UNIX datagram socket with full
105buffers, as Linux does, then please send a note to the io-mux maintainer to mark support for your
106platform as non-experimental.
107*/
108
109#[cfg(not(unix))]
110compile_error!("io-mux only runs on UNIX");
111
112#[cfg(all(
113 unix,
114 not(target_os = "linux"),
115 not(feature = "experimental-unix-support")
116))]
117compile_error!(
118 "io-mux support for non-Linux platforms is experimental.
119Please read the portability note in the io-mux documentation for more information
120and potential caveats, before enabling io-mux's experimental UNIX support."
121);
122
123use std::io;
124use std::net::Shutdown;
125use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
126#[cfg(target_os = "linux")]
127use std::os::linux::net::SocketAddrExt;
128use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
129use std::os::unix::net::{SocketAddr, UnixDatagram};
130use std::path::Path;
131use std::process::Stdio;
132
133#[cfg(feature = "async")]
134use async_io::Async;
135use rustix::net::RecvFlags;
136
137const DEFAULT_BUF_SIZE: usize = 8192;
138
139/// A `Mux` provides a single receive end and multiple send ends. Data sent to any of the send ends
140/// comes out the receive end, in order, tagged by the sender.
141///
142/// `Mux` implements `AsFd` solely to support polling the underlying file descriptor for data to
143/// read. Always use `Mux` to perform the actual read.
144pub struct Mux {
145 receive: UnixDatagram,
146 receive_addr: SocketAddr,
147 tempdir: Option<tempfile::TempDir>,
148 buf: Vec<u8>,
149}
150
151impl AsFd for Mux {
152 fn as_fd(&self) -> BorrowedFd<'_> {
153 self.receive.as_fd()
154 }
155}
156
157impl AsRawFd for Mux {
158 fn as_raw_fd(&self) -> RawFd {
159 self.receive.as_raw_fd()
160 }
161}
162
163/// A send end of a `Mux`. You can convert a `MuxSender` to a `std::process::Stdio` for use with a
164/// child process, obtain the underlying file descriptor as an `OwnedFd`, or send data using
165/// `std::io::Write`.
166pub struct MuxSender(UnixDatagram);
167
168impl AsRawFd for MuxSender {
169 fn as_raw_fd(&self) -> RawFd {
170 self.0.as_raw_fd()
171 }
172}
173
174impl IntoRawFd for MuxSender {
175 fn into_raw_fd(self) -> RawFd {
176 self.0.into_raw_fd()
177 }
178}
179
180impl AsFd for MuxSender {
181 fn as_fd(&self) -> BorrowedFd<'_> {
182 self.0.as_fd()
183 }
184}
185
186impl From<MuxSender> for OwnedFd {
187 fn from(sender: MuxSender) -> OwnedFd {
188 sender.0.into()
189 }
190}
191
192impl From<MuxSender> for Stdio {
193 fn from(sender: MuxSender) -> Stdio {
194 Stdio::from(OwnedFd::from(sender))
195 }
196}
197
198impl io::Write for MuxSender {
199 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
200 self.0.send(buf)
201 }
202
203 fn flush(&mut self) -> io::Result<()> {
204 Ok(())
205 }
206}
207
208/// A unique tag associated with a sender.
209#[derive(Clone, Debug)]
210pub struct Tag(SocketAddr);
211
212impl PartialEq<Tag> for Tag {
213 fn eq(&self, rhs: &Tag) -> bool {
214 #[cfg(target_os = "linux")]
215 if let (Some(lhs), Some(rhs)) = (self.0.as_abstract_name(), rhs.0.as_abstract_name()) {
216 return lhs == rhs;
217 }
218 if let (Some(lhs), Some(rhs)) = (self.0.as_pathname(), rhs.0.as_pathname()) {
219 return lhs == rhs;
220 }
221 self.0.is_unnamed() && rhs.0.is_unnamed()
222 }
223}
224
225impl Eq for Tag {}
226
227/// Data received through a mux, along with the tag.
228#[derive(Debug, Eq, PartialEq)]
229pub struct TaggedData<'a> {
230 /// Data received, borrowed from the `Mux`.
231 pub data: &'a [u8],
232 /// Tag for the sender of this data.
233 pub tag: Tag,
234}
235
236impl Mux {
237 /// Create a new `Mux`, using Linux abstract sockets.
238 #[cfg(target_os = "linux")]
239 pub fn new_abstract() -> io::Result<Self> {
240 // It should be incredibly unlikely to have a collision, so if we have multiple in a row,
241 // something strange is likely going on, and we might continue to get the same error
242 // indefinitely. Bail after a large number of retries, so that we don't loop forever.
243 for _ in 0..32768 {
244 let receive_addr =
245 SocketAddr::from_abstract_name(format!("io-mux-{:x}", fastrand::u128(..)))?;
246 match Self::new_with_addr(receive_addr, None) {
247 Err(e) if e.kind() == io::ErrorKind::AddrInUse => continue,
248 result => return result,
249 }
250 }
251 Err(io::Error::new(
252 io::ErrorKind::AddrInUse,
253 "couldn't create unique socket name",
254 ))
255 }
256
257 /// Create a new `Mux`.
258 ///
259 /// This will create a temporary directory for all the sockets managed by this `Mux`; dropping
260 /// the `Mux` removes the temporary directory.
261 pub fn new() -> io::Result<Self> {
262 Self::new_with_tempdir(tempfile::tempdir()?)
263 }
264
265 /// Create a new `Mux`, with temporary directory under the specified path.
266 ///
267 /// This will create a temporary directory for all the sockets managed by this `Mux`; dropping
268 /// the `Mux` removes the temporary directory.
269 pub fn new_in<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
270 Self::new_with_tempdir(tempfile::tempdir_in(dir)?)
271 }
272
273 fn new_with_tempdir(tempdir: tempfile::TempDir) -> io::Result<Self> {
274 let receive_addr = SocketAddr::from_pathname(tempdir.path().join("r"))?;
275 Self::new_with_addr(receive_addr, Some(tempdir))
276 }
277
278 fn new_with_addr(
279 receive_addr: SocketAddr,
280 tempdir: Option<tempfile::TempDir>,
281 ) -> io::Result<Self> {
282 let receive = UnixDatagram::bind_addr(&receive_addr)?;
283
284 // Shutdown writing to the receive socket, to help catch possible errors. On some targets,
285 // this generates spurious errors, such as `Socket is not connected` on FreeBSD. We don't
286 // need this shutdown for correctness, so just ignore any errors.
287 let _ = receive.shutdown(Shutdown::Write);
288
289 Ok(Mux {
290 receive,
291 receive_addr,
292 tempdir,
293 buf: vec![0; DEFAULT_BUF_SIZE],
294 })
295 }
296
297 /// Create a new `MuxSender` and associated unique `Tag`. Data sent via the returned
298 /// `MuxSender` will arrive with the corresponding `Tag`.
299 pub fn make_sender(&self) -> io::Result<(Tag, MuxSender)> {
300 if let Some(ref tempdir) = self.tempdir {
301 self.make_sender_with_retry(|n| {
302 SocketAddr::from_pathname(tempdir.path().join(format!("{n:x}")))
303 })
304 } else {
305 #[cfg(target_os = "linux")]
306 return self.make_sender_with_retry(|n| {
307 SocketAddr::from_abstract_name(format!("io-mux-send-{n:x}"))
308 });
309 #[cfg(not(target_os = "linux"))]
310 panic!("Mux without tempdir on non-Linux platform")
311 }
312 }
313
314 fn make_sender_with_retry(
315 &self,
316 make_sender_addr: impl Fn(u128) -> io::Result<SocketAddr>,
317 ) -> io::Result<(Tag, MuxSender)> {
318 // It should be incredibly unlikely to have collisions, but avoid looping forever in case
319 // something strange is going on (e.g. weird seccomp filter).
320 for _ in 0..32768 {
321 let sender_addr = make_sender_addr(fastrand::u128(..))?;
322 let sender = match UnixDatagram::bind_addr(&sender_addr) {
323 Err(e) if e.kind() == io::ErrorKind::AddrInUse => continue,
324 result => result,
325 }?;
326 sender.connect_addr(&self.receive_addr)?;
327 sender.shutdown(Shutdown::Read)?;
328 return Ok((Tag(sender_addr), MuxSender(sender)));
329 }
330 Err(io::Error::new(
331 io::ErrorKind::AddrInUse,
332 "couldn't create unique socket name",
333 ))
334 }
335
336 #[cfg(all(target_os = "linux", not(feature = "test-portable")))]
337 fn recv_from_full(&mut self) -> io::Result<(&[u8], SocketAddr)> {
338 let next_packet_len = rustix::net::recv(
339 &mut self.receive,
340 &mut [],
341 RecvFlags::PEEK | RecvFlags::TRUNC,
342 )?;
343 if next_packet_len > self.buf.len() {
344 self.buf.resize(next_packet_len, 0);
345 }
346 let (bytes, addr) = self.receive.recv_from(&mut self.buf)?;
347 Ok((&self.buf[..bytes], addr))
348 }
349
350 #[cfg(not(all(target_os = "linux", not(feature = "test-portable"))))]
351 fn recv_from_full(&mut self) -> io::Result<(&[u8], SocketAddr)> {
352 loop {
353 let bytes = rustix::net::recv(&mut self.receive, &mut self.buf, RecvFlags::PEEK)?;
354 // If we filled the buffer, we may have truncated output. Retry with a bigger buffer.
355 if bytes == self.buf.len() {
356 let new_len = self.buf.len().saturating_mul(2);
357 self.buf.resize(new_len, 0);
358 } else {
359 // Get the packet address, and clear it by fetching into a zero-sized buffer.
360 let (_, addr) = self.receive.recv_from(&mut [])?;
361 return Ok((&self.buf[..bytes], addr));
362 }
363 }
364 }
365
366 /// Return the next chunk of data, together with its tag.
367 ///
368 /// This reuses a buffer managed by the `Mux`.
369 ///
370 /// Note that this provides no "EOF" indication; if no further data arrives, it will block
371 /// forever. Avoid calling it after the source of the data exits.
372 pub fn read(&mut self) -> io::Result<TaggedData<'_>> {
373 let (data, addr) = self.recv_from_full()?;
374 let tag = Tag(addr);
375 Ok(TaggedData { data, tag })
376 }
377}
378
379/// Asynchronous version of `Mux`.
380#[cfg(feature = "async")]
381pub struct AsyncMux(Async<Mux>);
382
383#[cfg(feature = "async")]
384impl AsyncMux {
385 /// Create a new `Mux`, using Linux abstract sockets.
386 #[cfg(target_os = "linux")]
387 pub fn new_abstract() -> io::Result<Self> {
388 Ok(Self(Async::new(Mux::new_abstract()?)?))
389 }
390
391 /// Create a new `AsyncMux`.
392 ///
393 /// This will create a temporary directory for all the sockets managed by this `AsyncMux`;
394 /// dropping the `AsyncMux` removes the temporary directory.
395 pub fn new() -> io::Result<Self> {
396 Ok(Self(Async::new(Mux::new()?)?))
397 }
398
399 /// Create a new `AsyncMux`, with temporary directory under the specified path.
400 ///
401 /// This will create a temporary directory for all the sockets managed by this `AsyncMux`;
402 /// dropping the `AsyncMux` removes the temporary directory.
403 pub fn new_in<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
404 Ok(Self(Async::new(Mux::new_in(dir)?)?))
405 }
406
407 /// Create a new `MuxSender` and associated unique `Tag`. Data sent via the returned
408 /// `MuxSender` will arrive with the corresponding `Tag`.
409 pub fn make_sender(&self) -> io::Result<(Tag, MuxSender)> {
410 self.0.get_ref().make_sender()
411 }
412
413 /// Return the next chunk of data, together with its tag.
414 ///
415 /// This reuses a buffer managed by the `AsyncMux`.
416 ///
417 /// Note that this provides no "EOF" indication; if no further data arrives, it will block
418 /// forever. Avoid calling it after the source of the data exits. Once the source of the data
419 /// exits, call `read_nonblock` instead, until it returns None.
420 pub async fn read(&mut self) -> io::Result<TaggedData<'_>> {
421 self.0.readable().await?;
422 let m = unsafe { self.0.get_mut() };
423 m.read()
424 }
425
426 /// Return the next chunk of data, together with its tag, if available immediately, or None if
427 /// the read would block.
428 ///
429 /// This reuses a buffer managed by the `AsyncMux`.
430 ///
431 /// Use this if you know no more data will get sent and you want to drain the remaining data.
432 pub fn read_nonblock(&mut self) -> io::Result<Option<TaggedData<'_>>> {
433 let m = unsafe { self.0.get_mut() };
434 match m.read() {
435 Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(None),
436 ret => ret.map(Some),
437 }
438 }
439}
440
441#[cfg(test)]
442mod test {
443 #[cfg(feature = "async")]
444 use super::AsyncMux;
445 use super::Mux;
446
447 #[test]
448 fn test() -> std::io::Result<()> {
449 test_with_mux(Mux::new()?)
450 }
451
452 #[test]
453 fn test_new_in() -> std::io::Result<()> {
454 let dir = tempfile::tempdir()?;
455 let dir_entries = || -> std::io::Result<usize> {
456 Ok(dir.path().read_dir()?.collect::<Result<Vec<_>, _>>()?.len())
457 };
458 assert_eq!(dir_entries()?, 0);
459 let mux = Mux::new_in(dir.path())?;
460 assert_eq!(dir_entries()?, 1);
461 test_with_mux(mux)
462 }
463
464 #[test]
465 #[cfg(target_os = "linux")]
466 fn test_abstract() -> std::io::Result<()> {
467 test_with_mux(Mux::new_abstract()?)
468 }
469
470 fn test_with_mux(mut mux: Mux) -> std::io::Result<()> {
471 let (out_tag, out_sender) = mux.make_sender()?;
472 let (err_tag, err_sender) = mux.make_sender()?;
473 let mut child = std::process::Command::new("sh")
474 .arg("-c")
475 .arg("echo out1 && echo err1 1>&2 && echo out2 && echo err2 1>&2")
476 .stdout(out_sender)
477 .stderr(err_sender)
478 .spawn()?;
479
480 let (done_tag, mut done_sender) = mux.make_sender()?;
481 std::thread::spawn(move || {
482 use std::io::Write;
483 match child.wait() {
484 Ok(status) if status.success() => {
485 let _ = write!(done_sender, "Done\n");
486 }
487 Ok(_) => {
488 let _ = write!(done_sender, "Child process failed\n");
489 }
490 Err(e) => {
491 let _ = write!(done_sender, "Error: {:?}\n", e);
492 }
493 }
494 });
495
496 let data1 = mux.read()?;
497 assert_eq!(data1.tag, out_tag);
498 assert_eq!(data1.data, b"out1\n");
499 let data2 = mux.read()?;
500 assert_eq!(data2.tag, err_tag);
501 assert_eq!(data2.data, b"err1\n");
502 let data3 = mux.read()?;
503 assert_eq!(data3.tag, out_tag);
504 assert_eq!(data3.data, b"out2\n");
505 let data4 = mux.read()?;
506 assert_eq!(data4.tag, err_tag);
507 assert_eq!(data4.data, b"err2\n");
508 let done = mux.read()?;
509 assert_eq!(done.tag, done_tag);
510 assert_eq!(done.data, b"Done\n");
511
512 Ok(())
513 }
514
515 #[cfg(feature = "async")]
516 fn test_with_async_mux(mut mux: AsyncMux) -> std::io::Result<()> {
517 use futures_lite::{FutureExt, future};
518
519 future::block_on(async {
520 let (out_tag, out_sender) = mux.make_sender()?;
521 let (err_tag, err_sender) = mux.make_sender()?;
522 let mut child = async_process::Command::new("sh")
523 .arg("-c")
524 .arg("echo out1 && echo err1 1>&2 && echo out2 && echo err2 1>&2")
525 .stdout(out_sender)
526 .stderr(err_sender)
527 .spawn()?;
528 let mut expected = vec![
529 (out_tag.clone(), b"out1\n"),
530 (err_tag.clone(), b"err1\n"),
531 (out_tag, b"out2\n"),
532 (err_tag, b"err2\n"),
533 ];
534 let mut expected = expected.drain(..);
535 let mut status = None;
536 while status.is_none() {
537 async {
538 status = Some(child.status().await?);
539 Ok::<(), std::io::Error>(())
540 }
541 .or(async {
542 let data = mux.read().await?;
543 let (expected_tag, expected_data) = expected.next().unwrap();
544 assert_eq!(data.tag, expected_tag);
545 assert_eq!(data.data, expected_data);
546 Ok(())
547 })
548 .await?;
549 }
550 while let Some(data) = mux.read_nonblock()? {
551 let (expected_tag, expected_data) = expected.next().unwrap();
552 assert_eq!(data.tag, expected_tag);
553 assert_eq!(data.data, expected_data);
554 }
555 assert!(status.unwrap().success());
556 assert_eq!(expected.next(), None);
557 Ok(())
558 })
559 }
560
561 #[cfg(feature = "async")]
562 #[test]
563 fn test_async() -> std::io::Result<()> {
564 test_with_async_mux(AsyncMux::new()?)
565 }
566
567 #[cfg(all(feature = "async", target_os = "linux"))]
568 #[test]
569 fn test_abstract_async() -> std::io::Result<()> {
570 test_with_async_mux(AsyncMux::new_abstract()?)
571 }
572}