lunatic_stdout_capture/
lib.rs

1use std::{
2    any::Any,
3    fmt::{Display, Formatter},
4    io::{stdout, Cursor, IoSlice, IoSliceMut, Read, Seek, SeekFrom, Write},
5    sync::{Arc, Mutex, RwLock},
6};
7
8use wasi_common::{
9    file::{Advice, FdFlags, FileType, Filestat},
10    Error, ErrorExt, SystemTimeSpec, WasiFile,
11};
12
13// This signature looks scary, but it just means that the vector holding all output streams
14// is rarely extended and often accessed (`RwLock`). The `Mutex` is necessary to allow
15// parallel writes for independent processes, it doesn't have any contention.
16type StdOutVec = Arc<RwLock<Vec<Mutex<Cursor<Vec<u8>>>>>>;
17
18/// `StdoutCapture` holds the standard output from multiple processes.
19///
20/// The most common pattern of usage is to capture together the output from a starting process
21/// and all sub-processes. E.g. Hide output of sub-processes during testing.
22#[derive(Clone, Debug)]
23pub struct StdoutCapture {
24    // If true, all captured writes are echoed to stdout. This is used in testing scenarios with
25    // the flag `--nocapture` set, because we still need to capture the output to inspect panics.
26    echo: bool,
27    writers: StdOutVec,
28    // Index of the stdout currently in use by a process
29    index: usize,
30}
31
32impl PartialEq for StdoutCapture {
33    fn eq(&self, other: &Self) -> bool {
34        Arc::ptr_eq(&self.writers, &other.writers) && self.index == other.index
35    }
36}
37
38// Displays content of all processes contained inside `StdoutCapture`.
39impl Display for StdoutCapture {
40    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
41        let streams = RwLock::read(&self.writers).unwrap();
42        // If there is only one process, don't enumerate the output
43        if streams.len() == 1 {
44            write!(f, "{}", self.content()).unwrap();
45        } else {
46            for (i, stream) in streams.iter().enumerate() {
47                writeln!(f, " --- process {i} stdout ---").unwrap();
48                let stream = stream.lock().unwrap();
49                let content = String::from_utf8_lossy(stream.get_ref()).to_string();
50                write!(f, "{content}").unwrap();
51            }
52        }
53        Ok(())
54    }
55}
56
57impl StdoutCapture {
58    // Create a new `StdoutCapture` with one stream inside.
59    pub fn new(echo: bool) -> Self {
60        Self {
61            echo,
62            writers: Arc::new(RwLock::new(vec![Mutex::new(Cursor::new(Vec::new()))])),
63            index: 0,
64        }
65    }
66
67    /// Returns `true` if this is the only reference to the outputs.
68    pub fn only_reference(&self) -> bool {
69        Arc::strong_count(&self.writers) == 1
70    }
71
72    /// Returns a clone of `StdoutCapture` pointing to the next stream
73    pub fn next(&self) -> Self {
74        let index = {
75            let mut writers = RwLock::write(&self.writers).unwrap();
76            // If the stream already exists don't add a new one, e.g. stdout & stderr share the same stream.
77            writers.push(Mutex::new(Cursor::new(Vec::new())));
78            writers.len() - 1
79        };
80        Self {
81            echo: self.echo,
82            writers: self.writers.clone(),
83            index,
84        }
85    }
86
87    /// Returns true if all streams are empty
88    pub fn is_empty(&self) -> bool {
89        let streams = RwLock::read(&self.writers).unwrap();
90        streams.iter().all(|stream| {
91            let stream = stream.lock().unwrap();
92            stream.get_ref().is_empty()
93        })
94    }
95
96    /// Returns stream's content
97    pub fn content(&self) -> String {
98        let streams = RwLock::read(&self.writers).unwrap();
99        let stream = streams[self.index].lock().unwrap();
100        String::from_utf8_lossy(stream.get_ref()).to_string()
101    }
102
103    /// Add string to end of the stream
104    pub fn push_str(&self, content: &str) {
105        let streams = RwLock::read(&self.writers).unwrap();
106        let mut stream = streams[self.index].lock().unwrap();
107        write!(stream, "{content}").unwrap();
108    }
109}
110
111#[wiggle::async_trait]
112impl WasiFile for StdoutCapture {
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116    async fn datasync(&self) -> Result<(), Error> {
117        Ok(())
118    }
119    async fn sync(&self) -> Result<(), Error> {
120        Ok(())
121    }
122    async fn get_filetype(&self) -> Result<FileType, Error> {
123        Ok(FileType::Pipe)
124    }
125    async fn get_fdflags(&self) -> Result<FdFlags, Error> {
126        Ok(FdFlags::APPEND)
127    }
128    async fn set_fdflags(&mut self, _fdflags: FdFlags) -> Result<(), Error> {
129        Err(Error::badf())
130    }
131    async fn get_filestat(&self) -> Result<Filestat, Error> {
132        Ok(Filestat {
133            device_id: 0,
134            inode: 0,
135            filetype: self.get_filetype().await?,
136            nlink: 0,
137            size: 0, // XXX no way to get a size out of a Write :(
138            atim: None,
139            mtim: None,
140            ctim: None,
141        })
142    }
143    async fn set_filestat_size(&self, _size: u64) -> Result<(), Error> {
144        Err(Error::badf())
145    }
146    async fn advise(&self, _offset: u64, _len: u64, _advice: Advice) -> Result<(), Error> {
147        Err(Error::badf())
148    }
149    async fn allocate(&self, _offset: u64, _len: u64) -> Result<(), Error> {
150        Err(Error::badf())
151    }
152    async fn read_vectored<'a>(&self, _bufs: &mut [IoSliceMut<'a>]) -> Result<u64, Error> {
153        Err(Error::badf())
154    }
155    async fn read_vectored_at<'a>(
156        &self,
157        _bufs: &mut [IoSliceMut<'a>],
158        _offset: u64,
159    ) -> Result<u64, Error> {
160        Err(Error::badf())
161    }
162    async fn write_vectored<'a>(&self, bufs: &[IoSlice<'a>]) -> Result<u64, Error> {
163        let streams = RwLock::read(&self.writers).unwrap();
164        let mut stream = streams[self.index].lock().unwrap();
165        let n = stream.write_vectored(bufs)?;
166        // Echo the captured part to stdout
167        if self.echo {
168            stream.seek(SeekFrom::End(-(n as i64)))?;
169            let mut echo = vec![0; n];
170            stream.read_exact(&mut echo)?;
171            stdout().write_all(&echo)?;
172        }
173        Ok(n.try_into()?)
174    }
175    async fn write_vectored_at<'a>(
176        &self,
177        _bufs: &[IoSlice<'a>],
178        _offset: u64,
179    ) -> Result<u64, Error> {
180        Err(Error::badf())
181    }
182    async fn seek(&self, _pos: SeekFrom) -> Result<u64, Error> {
183        Err(Error::badf())
184    }
185    async fn peek(&self, _buf: &mut [u8]) -> Result<u64, Error> {
186        Err(Error::badf())
187    }
188    async fn set_times(
189        &self,
190        _atime: Option<SystemTimeSpec>,
191        _mtime: Option<SystemTimeSpec>,
192    ) -> Result<(), Error> {
193        Err(Error::badf())
194    }
195    fn num_ready_bytes(&self) -> Result<u64, Error> {
196        Ok(0)
197    }
198    fn isatty(&self) -> bool {
199        false
200    }
201    async fn readable(&self) -> Result<(), Error> {
202        Err(Error::badf())
203    }
204    async fn writable(&self) -> Result<(), Error> {
205        Err(Error::badf())
206    }
207
208    async fn sock_accept(&self, _fdflags: FdFlags) -> Result<Box<dyn WasiFile>, Error> {
209        Err(Error::badf())
210    }
211}