Expand description
A Mux provides a single receive end and multiple send ends. Data sent to any of the send ends comes out the receive end, in order, tagged by the sender.
Each send end works as a file descriptor. For instance, with io-mux
you can collect stdout and
stderr from a process, and highlight any error output from stderr, while preserving the relative
order of data across both stdout and stderr.
Note that reading provides no “EOF” indication; if no further data arrives, it will block forever. Avoid reading after the source of the data exits.
§Example
use io_mux::{Mux, TaggedData};
let mut mux = Mux::new()?;
let (out_tag, out_sender) = mux.make_sender()?;
let (err_tag, err_sender) = mux.make_sender()?;
let mut child = std::process::Command::new("sh")
.arg("-c")
.arg("echo out1 && echo err1 1>&2 && echo out2")
.stdout(out_sender)
.stderr(err_sender)
.spawn()?;
let (done_tag, mut done_sender) = mux.make_sender()?;
std::thread::spawn(move || match child.wait() {
Ok(status) if status.success() => {
let _ = write!(done_sender, "Done\n");
}
Ok(status) => {
let _ = write!(done_sender, "Child process failed\n");
}
Err(e) => {
let _ = write!(done_sender, "Error: {:?}\n", e);
}
});
let mut done = false;
while !done {
let TaggedData { data, tag } = mux.read()?;
if tag == out_tag {
print!("out: ");
} else if tag == err_tag {
print!("err: ");
} else if tag == done_tag {
done = true;
} else {
panic!("Unexpected tag");
}
std::io::stdout().write_all(data)?;
}
§async
If you enable the async
feature, io-mux
additionally provides an AsyncMux
type, which allows
processing data asynchronously.
You may want to use this with async-process or
async-pidfd to concurrently wait on the exit of a process
and the muxed output and error of that process. Until the process exits, call AsyncMux::read()
to
get the next bit of output, awaiting that concurrently with the exit of the process. Once the
process exits and will thus produce no further output, call AsyncMux::read_nonblock
until it
returns None
to drain the remaining output out of the mux.
§Internals
Internally, Mux
creates a UNIX datagram socket for the receive end, and a separate UNIX datagram
socket for each sender. Datagram sockets support recvfrom
, which provides the address of the
sender, so Mux::read
can use the sender address as the tag for the packet received.
However, datagram sockets require reading an entire datagram with each recvfrom
call, so
Mux::read
needs to find out the size of the next datagram before calling recvfrom
. Linux
supports directly asking for the next packet size using recv
with MSG_PEEK | MSG_TRUNC
. On
other UNIX systems, we have to repeatedly call recv
with MSG_PEEK
and an increasingly large
buffer, until we receive the entire packet, then make one more call without MSG_PEEK
to tell the
OS to discard it.
Mux
creates UNIX sockets within a temporary directory, removed when dropping the Mux
.
Note that Mux::read
cannot provide any indication of end-of-file. When using Mux
, you will need
to have some other indication that no further output will arrive, such as the exit of the child
process producing output.
§Portability
Mux can theoretically run on any UNIX system. However, on some non-Linux systems, when the buffers
for a UNIX socket fill up, writing to the UNIX socket may return an ENOBUFS
error rather than
blocking. Thus, on non-Linux systems, the process writing to a MuxSender
may encounter an error
if the receiving process does not process its buffers quickly enough. This does not match the
behavior of a pipe. As this may result in surprising behavior, by default io-mux does not compile
on non-Linux systems. If you want to use io-mux on a non-Linux system, and your use case does not
need the same semantics as a pipe, and in particular it will not cause a problem in your use case
if writing to a MuxSender
may produce an ENOBUFS
error if you do not read from the receive end
quickly enough, then you can compile io-mux
on non-Linux platforms by enabling the
experimental-unix-support
feature of io-mux
.
If you have another UNIX platform which blocks on writes to a UNIX datagram socket with full buffers, as Linux does, then please send a note to the io-mux maintainer to mark support for your platform as non-experimental.
Structs§
- Async
Mux - Asynchronous version of
Mux
. - Mux
- A
Mux
provides a single receive end and multiple send ends. Data sent to any of the send ends comes out the receive end, in order, tagged by the sender. - MuxSender
- A send end of a
Mux
. You can convert aMuxSender
to astd::process::Stdio
for use with a child process, obtain the underlying file descriptor as anOwnedFd
, or send data usingstd::io::Write
. - Tag
- A unique tag associated with a sender.
- Tagged
Data - Data received through a mux, along with the tag.