io_mux/
lib.rs

1#![forbid(missing_docs)]
2/*!
3A Mux provides a single receive end and multiple send ends. Data sent to any of the send ends comes
4out the receive end, in order, tagged by the sender.
5
6Each send end works as a file descriptor. For instance, with `io-mux` you can collect stdout and
7stderr from a process, and highlight any error output from stderr, while preserving the relative
8order of data across both stdout and stderr.
9
10Note that reading provides no "EOF" indication; if no further data arrives, it
11will block forever. Avoid reading after the source of the data exits.
12
13# Example
14
15```
16# use std::io::Write;
17# fn main() -> std::io::Result<()> {
18use io_mux::{Mux, TaggedData};
19let mut mux = Mux::new()?;
20
21let (out_tag, out_sender) = mux.make_sender()?;
22let (err_tag, err_sender) = mux.make_sender()?;
23let mut child = std::process::Command::new("sh")
24    .arg("-c")
25    .arg("echo out1 && echo err1 1>&2 && echo out2")
26    .stdout(out_sender)
27    .stderr(err_sender)
28    .spawn()?;
29
30let (done_tag, mut done_sender) = mux.make_sender()?;
31std::thread::spawn(move || match child.wait() {
32    Ok(status) if status.success() => {
33        let _ = write!(done_sender, "Done\n");
34    }
35    Ok(status) => {
36        let _ = write!(done_sender, "Child process failed\n");
37    }
38    Err(e) => {
39        let _ = write!(done_sender, "Error: {:?}\n", e);
40    }
41});
42
43let mut done = false;
44while !done {
45    let TaggedData { data, tag } = mux.read()?;
46    if tag == out_tag {
47        print!("out: ");
48    } else if tag == err_tag {
49        print!("err: ");
50    } else if tag == done_tag {
51        done = true;
52    } else {
53        panic!("Unexpected tag");
54    }
55    std::io::stdout().write_all(data)?;
56}
57#     Ok(())
58# }
59```
60
61# async
62
63If you enable the `async` feature, `io-mux` additionally provides an `AsyncMux` type, which allows
64processing data asynchronously.
65
66You may want to use this with [async-process](https://crates.io/crates/async-process) or
67[async-pidfd](https://crates.io/crates/async-pidfd) to concurrently wait on the exit of a process
68and the muxed output and error of that process. Until the process exits, call `AsyncMux::read()` to
69get the next bit of output, awaiting that concurrently with the exit of the process. Once the
70process exits and will thus produce no further output, call `AsyncMux::read_nonblock` until it
71returns `None` to drain the remaining output out of the mux.
72
73# Internals
74
75Internally, `Mux` creates a UNIX datagram socket for the receive end, and a separate UNIX datagram
76socket for each sender. Datagram sockets support `recvfrom`, which provides the address of the
77sender, so `Mux::read` can use the sender address as the tag for the packet received.
78
79However, datagram sockets require reading an entire datagram with each `recvfrom` call, so
80`Mux::read` needs to find out the size of the next datagram before calling `recvfrom`. Linux
81supports directly asking for the next packet size using `recv` with `MSG_PEEK | MSG_TRUNC`. On
82other UNIX systems, we have to repeatedly call `recv` with `MSG_PEEK` and an increasingly large
83buffer, until we receive the entire packet, then make one more call without `MSG_PEEK` to tell the
84OS to discard it.
85
86`Mux` creates UNIX sockets within a temporary directory, removed when dropping the `Mux`.
87
88Note that `Mux::read` cannot provide any indication of end-of-file. When using `Mux`, you will need
89to have some other indication that no further output will arrive, such as the exit of the child
90process producing output.
91
92# Portability
93Mux can theoretically run on any UNIX system. However, on some non-Linux systems, when the buffers
94for a UNIX socket fill up, writing to the UNIX socket may return an `ENOBUFS` error rather than
95blocking. Thus, on non-Linux systems, the process writing to a `MuxSender` may encounter an error
96if the receiving process does not process its buffers quickly enough. This does not match the
97behavior of a pipe. As this may result in surprising behavior, by default io-mux does not compile
98on non-Linux systems. If you want to use io-mux on a non-Linux system, and your use case does not
99need the same semantics as a pipe, and *in particular* it will not cause a problem in your use case
100if writing to a `MuxSender` may produce an `ENOBUFS` error if you do not read from the receive end
101quickly enough, then you can compile `io-mux` on non-Linux platforms by enabling the
102`experimental-unix-support` feature of `io-mux`.
103
104If you have another UNIX platform which blocks on writes to a UNIX datagram socket with full
105buffers, as Linux does, then please send a note to the io-mux maintainer to mark support for your
106platform as non-experimental.
107*/
108
109#[cfg(not(unix))]
110compile_error!("io-mux only runs on UNIX");
111
112#[cfg(all(
113    unix,
114    not(target_os = "linux"),
115    not(feature = "experimental-unix-support")
116))]
117compile_error!(
118    "io-mux support for non-Linux platforms is experimental.
119Please read the portability note in the io-mux documentation for more information
120and potential caveats, before enabling io-mux's experimental UNIX support."
121);
122
123use std::io;
124use std::net::Shutdown;
125use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
126#[cfg(target_os = "linux")]
127use std::os::linux::net::SocketAddrExt;
128use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
129use std::os::unix::net::{SocketAddr, UnixDatagram};
130use std::path::Path;
131use std::process::Stdio;
132
133#[cfg(feature = "async")]
134use async_io::Async;
135use rustix::net::RecvFlags;
136
137const DEFAULT_BUF_SIZE: usize = 8192;
138
139/// A `Mux` provides a single receive end and multiple send ends. Data sent to any of the send ends
140/// comes out the receive end, in order, tagged by the sender.
141///
142/// `Mux` implements `AsFd` solely to support polling the underlying file descriptor for data to
143/// read. Always use `Mux` to perform the actual read.
144pub struct Mux {
145    receive: UnixDatagram,
146    receive_addr: SocketAddr,
147    tempdir: Option<tempfile::TempDir>,
148    buf: Vec<u8>,
149}
150
151impl AsFd for Mux {
152    fn as_fd(&self) -> BorrowedFd<'_> {
153        self.receive.as_fd()
154    }
155}
156
157impl AsRawFd for Mux {
158    fn as_raw_fd(&self) -> RawFd {
159        self.receive.as_raw_fd()
160    }
161}
162
163/// A send end of a `Mux`. You can convert a `MuxSender` to a `std::process::Stdio` for use with a
164/// child process, obtain the underlying file descriptor as an `OwnedFd`, or send data using
165/// `std::io::Write`.
166pub struct MuxSender(UnixDatagram);
167
168impl AsRawFd for MuxSender {
169    fn as_raw_fd(&self) -> RawFd {
170        self.0.as_raw_fd()
171    }
172}
173
174impl IntoRawFd for MuxSender {
175    fn into_raw_fd(self) -> RawFd {
176        self.0.into_raw_fd()
177    }
178}
179
180impl AsFd for MuxSender {
181    fn as_fd(&self) -> BorrowedFd<'_> {
182        self.0.as_fd()
183    }
184}
185
186impl From<MuxSender> for OwnedFd {
187    fn from(sender: MuxSender) -> OwnedFd {
188        sender.0.into()
189    }
190}
191
192impl From<MuxSender> for Stdio {
193    fn from(sender: MuxSender) -> Stdio {
194        Stdio::from(OwnedFd::from(sender))
195    }
196}
197
198impl io::Write for MuxSender {
199    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
200        self.0.send(buf)
201    }
202
203    fn flush(&mut self) -> io::Result<()> {
204        Ok(())
205    }
206}
207
208/// A unique tag associated with a sender.
209#[derive(Clone, Debug)]
210pub struct Tag(SocketAddr);
211
212impl PartialEq<Tag> for Tag {
213    fn eq(&self, rhs: &Tag) -> bool {
214        #[cfg(target_os = "linux")]
215        if let (Some(lhs), Some(rhs)) = (self.0.as_abstract_name(), rhs.0.as_abstract_name()) {
216            return lhs == rhs;
217        }
218        if let (Some(lhs), Some(rhs)) = (self.0.as_pathname(), rhs.0.as_pathname()) {
219            return lhs == rhs;
220        }
221        self.0.is_unnamed() && rhs.0.is_unnamed()
222    }
223}
224
225impl Eq for Tag {}
226
227/// Data received through a mux, along with the tag.
228#[derive(Debug, Eq, PartialEq)]
229pub struct TaggedData<'a> {
230    /// Data received, borrowed from the `Mux`.
231    pub data: &'a [u8],
232    /// Tag for the sender of this data.
233    pub tag: Tag,
234}
235
236impl Mux {
237    /// Create a new `Mux`, using Linux abstract sockets.
238    #[cfg(target_os = "linux")]
239    pub fn new_abstract() -> io::Result<Self> {
240        // It should be incredibly unlikely to have a collision, so if we have multiple in a row,
241        // something strange is likely going on, and we might continue to get the same error
242        // indefinitely. Bail after a large number of retries, so that we don't loop forever.
243        for _ in 0..32768 {
244            let receive_addr =
245                SocketAddr::from_abstract_name(format!("io-mux-{:x}", fastrand::u128(..)))?;
246            match Self::new_with_addr(receive_addr, None) {
247                Err(e) if e.kind() == io::ErrorKind::AddrInUse => continue,
248                result => return result,
249            }
250        }
251        Err(io::Error::new(
252            io::ErrorKind::AddrInUse,
253            "couldn't create unique socket name",
254        ))
255    }
256
257    /// Create a new `Mux`.
258    ///
259    /// This will create a temporary directory for all the sockets managed by this `Mux`; dropping
260    /// the `Mux` removes the temporary directory.
261    pub fn new() -> io::Result<Self> {
262        Self::new_with_tempdir(tempfile::tempdir()?)
263    }
264
265    /// Create a new `Mux`, with temporary directory under the specified path.
266    ///
267    /// This will create a temporary directory for all the sockets managed by this `Mux`; dropping
268    /// the `Mux` removes the temporary directory.
269    pub fn new_in<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
270        Self::new_with_tempdir(tempfile::tempdir_in(dir)?)
271    }
272
273    fn new_with_tempdir(tempdir: tempfile::TempDir) -> io::Result<Self> {
274        let receive_addr = SocketAddr::from_pathname(tempdir.path().join("r"))?;
275        Self::new_with_addr(receive_addr, Some(tempdir))
276    }
277
278    fn new_with_addr(
279        receive_addr: SocketAddr,
280        tempdir: Option<tempfile::TempDir>,
281    ) -> io::Result<Self> {
282        let receive = UnixDatagram::bind_addr(&receive_addr)?;
283
284        // Shutdown writing to the receive socket, to help catch possible errors. On some targets,
285        // this generates spurious errors, such as `Socket is not connected` on FreeBSD. We don't
286        // need this shutdown for correctness, so just ignore any errors.
287        let _ = receive.shutdown(Shutdown::Write);
288
289        Ok(Mux {
290            receive,
291            receive_addr,
292            tempdir,
293            buf: vec![0; DEFAULT_BUF_SIZE],
294        })
295    }
296
297    /// Create a new `MuxSender` and associated unique `Tag`. Data sent via the returned
298    /// `MuxSender` will arrive with the corresponding `Tag`.
299    pub fn make_sender(&self) -> io::Result<(Tag, MuxSender)> {
300        if let Some(ref tempdir) = self.tempdir {
301            self.make_sender_with_retry(|n| {
302                SocketAddr::from_pathname(tempdir.path().join(format!("{n:x}")))
303            })
304        } else {
305            #[cfg(target_os = "linux")]
306            return self.make_sender_with_retry(|n| {
307                SocketAddr::from_abstract_name(format!("io-mux-send-{n:x}"))
308            });
309            #[cfg(not(target_os = "linux"))]
310            panic!("Mux without tempdir on non-Linux platform")
311        }
312    }
313
314    fn make_sender_with_retry(
315        &self,
316        make_sender_addr: impl Fn(u128) -> io::Result<SocketAddr>,
317    ) -> io::Result<(Tag, MuxSender)> {
318        // It should be incredibly unlikely to have collisions, but avoid looping forever in case
319        // something strange is going on (e.g. weird seccomp filter).
320        for _ in 0..32768 {
321            let sender_addr = make_sender_addr(fastrand::u128(..))?;
322            let sender = match UnixDatagram::bind_addr(&sender_addr) {
323                Err(e) if e.kind() == io::ErrorKind::AddrInUse => continue,
324                result => result,
325            }?;
326            sender.connect_addr(&self.receive_addr)?;
327            sender.shutdown(Shutdown::Read)?;
328            return Ok((Tag(sender_addr), MuxSender(sender)));
329        }
330        Err(io::Error::new(
331            io::ErrorKind::AddrInUse,
332            "couldn't create unique socket name",
333        ))
334    }
335
336    #[cfg(all(target_os = "linux", not(feature = "test-portable")))]
337    fn recv_from_full(&mut self) -> io::Result<(&[u8], SocketAddr)> {
338        let next_packet_len = rustix::net::recv(
339            &mut self.receive,
340            &mut [],
341            RecvFlags::PEEK | RecvFlags::TRUNC,
342        )?;
343        if next_packet_len > self.buf.len() {
344            self.buf.resize(next_packet_len, 0);
345        }
346        let (bytes, addr) = self.receive.recv_from(&mut self.buf)?;
347        Ok((&self.buf[..bytes], addr))
348    }
349
350    #[cfg(not(all(target_os = "linux", not(feature = "test-portable"))))]
351    fn recv_from_full(&mut self) -> io::Result<(&[u8], SocketAddr)> {
352        loop {
353            let bytes = rustix::net::recv(&mut self.receive, &mut self.buf, RecvFlags::PEEK)?;
354            // If we filled the buffer, we may have truncated output. Retry with a bigger buffer.
355            if bytes == self.buf.len() {
356                let new_len = self.buf.len().saturating_mul(2);
357                self.buf.resize(new_len, 0);
358            } else {
359                // Get the packet address, and clear it by fetching into a zero-sized buffer.
360                let (_, addr) = self.receive.recv_from(&mut [])?;
361                return Ok((&self.buf[..bytes], addr));
362            }
363        }
364    }
365
366    /// Return the next chunk of data, together with its tag.
367    ///
368    /// This reuses a buffer managed by the `Mux`.
369    ///
370    /// Note that this provides no "EOF" indication; if no further data arrives, it will block
371    /// forever. Avoid calling it after the source of the data exits.
372    pub fn read(&mut self) -> io::Result<TaggedData<'_>> {
373        let (data, addr) = self.recv_from_full()?;
374        let tag = Tag(addr);
375        Ok(TaggedData { data, tag })
376    }
377}
378
379/// Asynchronous version of `Mux`.
380#[cfg(feature = "async")]
381pub struct AsyncMux(Async<Mux>);
382
383#[cfg(feature = "async")]
384impl AsyncMux {
385    /// Create a new `Mux`, using Linux abstract sockets.
386    #[cfg(target_os = "linux")]
387    pub fn new_abstract() -> io::Result<Self> {
388        Ok(Self(Async::new(Mux::new_abstract()?)?))
389    }
390
391    /// Create a new `AsyncMux`.
392    ///
393    /// This will create a temporary directory for all the sockets managed by this `AsyncMux`;
394    /// dropping the `AsyncMux` removes the temporary directory.
395    pub fn new() -> io::Result<Self> {
396        Ok(Self(Async::new(Mux::new()?)?))
397    }
398
399    /// Create a new `AsyncMux`, with temporary directory under the specified path.
400    ///
401    /// This will create a temporary directory for all the sockets managed by this `AsyncMux`;
402    /// dropping the `AsyncMux` removes the temporary directory.
403    pub fn new_in<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
404        Ok(Self(Async::new(Mux::new_in(dir)?)?))
405    }
406
407    /// Create a new `MuxSender` and associated unique `Tag`. Data sent via the returned
408    /// `MuxSender` will arrive with the corresponding `Tag`.
409    pub fn make_sender(&self) -> io::Result<(Tag, MuxSender)> {
410        self.0.get_ref().make_sender()
411    }
412
413    /// Return the next chunk of data, together with its tag.
414    ///
415    /// This reuses a buffer managed by the `AsyncMux`.
416    ///
417    /// Note that this provides no "EOF" indication; if no further data arrives, it will block
418    /// forever. Avoid calling it after the source of the data exits. Once the source of the data
419    /// exits, call `read_nonblock` instead, until it returns None.
420    pub async fn read(&mut self) -> io::Result<TaggedData<'_>> {
421        self.0.readable().await?;
422        let m = unsafe { self.0.get_mut() };
423        m.read()
424    }
425
426    /// Return the next chunk of data, together with its tag, if available immediately, or None if
427    /// the read would block.
428    ///
429    /// This reuses a buffer managed by the `AsyncMux`.
430    ///
431    /// Use this if you know no more data will get sent and you want to drain the remaining data.
432    pub fn read_nonblock(&mut self) -> io::Result<Option<TaggedData<'_>>> {
433        let m = unsafe { self.0.get_mut() };
434        match m.read() {
435            Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(None),
436            ret => ret.map(Some),
437        }
438    }
439}
440
441#[cfg(test)]
442mod test {
443    #[cfg(feature = "async")]
444    use super::AsyncMux;
445    use super::Mux;
446
447    #[test]
448    fn test() -> std::io::Result<()> {
449        test_with_mux(Mux::new()?)
450    }
451
452    #[test]
453    fn test_new_in() -> std::io::Result<()> {
454        let dir = tempfile::tempdir()?;
455        let dir_entries = || -> std::io::Result<usize> {
456            Ok(dir.path().read_dir()?.collect::<Result<Vec<_>, _>>()?.len())
457        };
458        assert_eq!(dir_entries()?, 0);
459        let mux = Mux::new_in(dir.path())?;
460        assert_eq!(dir_entries()?, 1);
461        test_with_mux(mux)
462    }
463
464    #[test]
465    #[cfg(target_os = "linux")]
466    fn test_abstract() -> std::io::Result<()> {
467        test_with_mux(Mux::new_abstract()?)
468    }
469
470    fn test_with_mux(mut mux: Mux) -> std::io::Result<()> {
471        let (out_tag, out_sender) = mux.make_sender()?;
472        let (err_tag, err_sender) = mux.make_sender()?;
473        let mut child = std::process::Command::new("sh")
474            .arg("-c")
475            .arg("echo out1 && echo err1 1>&2 && echo out2 && echo err2 1>&2")
476            .stdout(out_sender)
477            .stderr(err_sender)
478            .spawn()?;
479
480        let (done_tag, mut done_sender) = mux.make_sender()?;
481        std::thread::spawn(move || {
482            use std::io::Write;
483            match child.wait() {
484                Ok(status) if status.success() => {
485                    let _ = write!(done_sender, "Done\n");
486                }
487                Ok(_) => {
488                    let _ = write!(done_sender, "Child process failed\n");
489                }
490                Err(e) => {
491                    let _ = write!(done_sender, "Error: {:?}\n", e);
492                }
493            }
494        });
495
496        let data1 = mux.read()?;
497        assert_eq!(data1.tag, out_tag);
498        assert_eq!(data1.data, b"out1\n");
499        let data2 = mux.read()?;
500        assert_eq!(data2.tag, err_tag);
501        assert_eq!(data2.data, b"err1\n");
502        let data3 = mux.read()?;
503        assert_eq!(data3.tag, out_tag);
504        assert_eq!(data3.data, b"out2\n");
505        let data4 = mux.read()?;
506        assert_eq!(data4.tag, err_tag);
507        assert_eq!(data4.data, b"err2\n");
508        let done = mux.read()?;
509        assert_eq!(done.tag, done_tag);
510        assert_eq!(done.data, b"Done\n");
511
512        Ok(())
513    }
514
515    #[cfg(feature = "async")]
516    fn test_with_async_mux(mut mux: AsyncMux) -> std::io::Result<()> {
517        use futures_lite::{FutureExt, future};
518
519        future::block_on(async {
520            let (out_tag, out_sender) = mux.make_sender()?;
521            let (err_tag, err_sender) = mux.make_sender()?;
522            let mut child = async_process::Command::new("sh")
523                .arg("-c")
524                .arg("echo out1 && echo err1 1>&2 && echo out2 && echo err2 1>&2")
525                .stdout(out_sender)
526                .stderr(err_sender)
527                .spawn()?;
528            let mut expected = vec![
529                (out_tag.clone(), b"out1\n"),
530                (err_tag.clone(), b"err1\n"),
531                (out_tag, b"out2\n"),
532                (err_tag, b"err2\n"),
533            ];
534            let mut expected = expected.drain(..);
535            let mut status = None;
536            while status.is_none() {
537                async {
538                    status = Some(child.status().await?);
539                    Ok::<(), std::io::Error>(())
540                }
541                .or(async {
542                    let data = mux.read().await?;
543                    let (expected_tag, expected_data) = expected.next().unwrap();
544                    assert_eq!(data.tag, expected_tag);
545                    assert_eq!(data.data, expected_data);
546                    Ok(())
547                })
548                .await?;
549            }
550            while let Some(data) = mux.read_nonblock()? {
551                let (expected_tag, expected_data) = expected.next().unwrap();
552                assert_eq!(data.tag, expected_tag);
553                assert_eq!(data.data, expected_data);
554            }
555            assert!(status.unwrap().success());
556            assert_eq!(expected.next(), None);
557            Ok(())
558        })
559    }
560
561    #[cfg(feature = "async")]
562    #[test]
563    fn test_async() -> std::io::Result<()> {
564        test_with_async_mux(AsyncMux::new()?)
565    }
566
567    #[cfg(all(feature = "async", target_os = "linux"))]
568    #[test]
569    fn test_abstract_async() -> std::io::Result<()> {
570        test_with_async_mux(AsyncMux::new_abstract()?)
571    }
572}