superconsole/
output.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10use std::any::Any;
11use std::io;
12use std::io::Write;
13use std::thread::JoinHandle;
14
15use anyhow::Context as _;
16use crossbeam_channel::bounded;
17use crossbeam_channel::unbounded;
18use crossbeam_channel::Receiver;
19use crossbeam_channel::Sender;
20
21use crate::Dimensions;
22
23pub trait SuperConsoleOutput: Send + Sync + 'static {
24    /// Called before rendering will occur. This has a chance to prevent rendering by returning
25    /// false.
26    fn should_render(&mut self) -> bool;
27
28    /// Called to produce output. This may be called without should_render if we are finalizing or
29    /// clearing. This should flush if possible.
30    fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()>;
31
32    /// How big is the terminal to write to.
33    fn terminal_size(&self) -> anyhow::Result<Dimensions> {
34        Ok(crossterm::terminal::size()?.into())
35    }
36
37    /// Called when the console has finalized. This must block if necessary. No further output will
38    /// be emitted.
39    fn finalize(self: Box<Self>) -> anyhow::Result<()>;
40
41    /// Get this Output as an Any. This is used for testing.
42    fn as_any(&self) -> &dyn Any;
43
44    /// Get this Output as a mutable Any. This is used for testing.
45    fn as_any_mut(&mut self) -> &mut dyn Any;
46}
47
48pub struct BlockingSuperConsoleOutput {
49    /// Stream to write to.
50    stream: Box<dyn Write + Send + 'static + Sync>,
51}
52
53impl BlockingSuperConsoleOutput {
54    pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> Self {
55        Self { stream }
56    }
57}
58
59impl SuperConsoleOutput for BlockingSuperConsoleOutput {
60    fn should_render(&mut self) -> bool {
61        true
62    }
63
64    fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
65        self.stream.write_all(&buffer)?;
66        self.stream.flush()?;
67
68        Ok(())
69    }
70
71    fn finalize(self: Box<Self>) -> anyhow::Result<()> {
72        Ok(())
73    }
74
75    fn as_any(&self) -> &dyn Any {
76        self
77    }
78
79    fn as_any_mut(&mut self) -> &mut dyn Any {
80        self
81    }
82}
83
84/// A non-blocking output for the SuperConsole. This makes a few guarantees:
85///
86/// - Calls to output() after should_render() returned true will not block.
87/// - When finalize() returns, the last frame passed to output() is shown.
88/// - When an error occurs, the next fallible call will return it.
89pub(crate) struct NonBlockingSuperConsoleOutput {
90    /// A channel to send frames for writing.
91    sender: Sender<Vec<u8>>,
92    /// A channel back for errors encountered by the thread doing the writing.
93    errors: Receiver<io::Error>,
94    /// The thread doing the writing. It owns the other end of the aforementioned channels and will
95    /// exit when the data sender is closed.
96    handle: JoinHandle<()>,
97}
98
99impl NonBlockingSuperConsoleOutput {
100    pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
101        Self::new_for_writer(stream)
102    }
103
104    fn new_for_writer(mut stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
105        let (sender, receiver) = bounded::<Vec<u8>>(1);
106        let (error_sender, errors) = unbounded::<io::Error>();
107
108        let handle = std::thread::Builder::new()
109            .name("superconsole-io".to_owned())
110            .spawn(move || {
111                for frame in receiver.into_iter() {
112                    match stream.write_all(&frame).and_then(|()| stream.flush()) {
113                        Ok(()) => {}
114                        Err(e) => {
115                            // This can only fail if the sender disconnected, in which case they'll
116                            // stop sending us data momentarily, so ignore the failure.
117                            let _ignored = error_sender.try_send(e);
118                        }
119                    }
120                }
121            })
122            .context("Error spawning Superconsole I/O thread")?;
123
124        Ok(Self {
125            sender,
126            errors,
127            handle,
128        })
129    }
130}
131
132impl SuperConsoleOutput for NonBlockingSuperConsoleOutput {
133    /// Check if we have free capacity in our channel. Note that if the channel is full, that means
134    /// our writer thread already has 2 buffered frames (one in the channel, one it's currently
135    /// writing out). In this case, refuse to produce further output.
136    fn should_render(&mut self) -> bool {
137        !self.errors.is_empty() || !self.sender.is_full()
138    }
139
140    /// Attempt to send out a frame. If we called should_render, this won't block. If we didn't,
141    /// then it may block.
142    fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
143        if let Ok(err) = self.errors.try_recv() {
144            return Err(anyhow::Error::from(err).context("Superconsole I/O thread errored"));
145        }
146
147        self.sender
148            .send(buffer)
149            .context("Superconsole I/O thread has crashed")?;
150
151        Ok(())
152    }
153
154    /// Notify our writer thread that no further writes are expected. Wait for it to flush.
155    fn finalize(self: Box<Self>) -> anyhow::Result<()> {
156        let Self {
157            sender,
158            errors,
159            handle,
160        } = *self;
161        drop(sender);
162
163        let res = match errors.into_iter().next() {
164            Some(err) => Err(anyhow::Error::from(err).context("Superconsole I/O thread errored")),
165            None => Ok(()),
166        };
167
168        match handle.join() {
169            Ok(()) => {}
170            Err(panic) => std::panic::resume_unwind(panic),
171        }
172
173        res
174    }
175
176    fn as_any(&self) -> &dyn Any {
177        self
178    }
179
180    fn as_any_mut(&mut self) -> &mut dyn Any {
181        self
182    }
183}
184
185#[cfg(test)]
186mod test {
187    use crossbeam_channel::Receiver;
188
189    use super::*;
190
191    /// A test writer that just sends into a channel. Lets us block / unblock the output to test
192    /// for race conditions.
193    #[derive(Clone)]
194    struct TestWriter {
195        sender: Sender<()>,
196    }
197
198    impl TestWriter {
199        pub fn new() -> (Self, Receiver<()>) {
200            let (sender, receiver) = bounded(0);
201            (Self { sender }, receiver)
202        }
203    }
204
205    impl Write for TestWriter {
206        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
207            self.sender
208                .send(())
209                .map_err(|_| io::Error::new(io::ErrorKind::Other, "not writable"))?;
210
211            Ok(buf.len())
212        }
213
214        fn flush(&mut self) -> io::Result<()> {
215            Ok(())
216        }
217    }
218
219    fn msg() -> Vec<u8> {
220        // Not an empty buffer, to ensure write() gets called.
221        vec![1]
222    }
223
224    #[test]
225    fn test_non_blocking_output_errors_on_next_output() -> anyhow::Result<()> {
226        let (writer, drain) = TestWriter::new();
227
228        let mut output = NonBlockingSuperConsoleOutput::new_for_writer(Box::new(writer))?;
229
230        // Send a first message, this will go into write()
231        assert!(output.should_render());
232        output.output(msg())?;
233
234        // Send a second message, this will stay in the channel.
235        output.output(msg())?;
236
237        // Now, kill the output
238        assert!(!output.should_render());
239        drop(drain);
240
241        // We expect that should_render() will eventually return true.
242        while !output.should_render() {
243            std::thread::yield_now();
244            continue;
245        }
246
247        // Likewise, we expect that sending output and finalizing wold fail.
248        assert!(output.output(vec![]).is_err());
249        assert!(Box::new(output).finalize().is_err());
250
251        Ok(())
252    }
253}