1use std::any::Any;
11use std::io;
12use std::io::Write;
13use std::thread::JoinHandle;
14
15use anyhow::Context as _;
16use crossbeam_channel::bounded;
17use crossbeam_channel::unbounded;
18use crossbeam_channel::Receiver;
19use crossbeam_channel::Sender;
20
21use crate::Dimensions;
22
23pub trait SuperConsoleOutput: Send + Sync + 'static {
24 fn should_render(&mut self) -> bool;
27
28 fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()>;
31
32 fn terminal_size(&self) -> anyhow::Result<Dimensions> {
34 Ok(crossterm::terminal::size()?.into())
35 }
36
37 fn finalize(self: Box<Self>) -> anyhow::Result<()>;
40
41 fn as_any(&self) -> &dyn Any;
43
44 fn as_any_mut(&mut self) -> &mut dyn Any;
46}
47
48pub struct BlockingSuperConsoleOutput {
49 stream: Box<dyn Write + Send + 'static + Sync>,
51}
52
53impl BlockingSuperConsoleOutput {
54 pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> Self {
55 Self { stream }
56 }
57}
58
59impl SuperConsoleOutput for BlockingSuperConsoleOutput {
60 fn should_render(&mut self) -> bool {
61 true
62 }
63
64 fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
65 self.stream.write_all(&buffer)?;
66 self.stream.flush()?;
67
68 Ok(())
69 }
70
71 fn finalize(self: Box<Self>) -> anyhow::Result<()> {
72 Ok(())
73 }
74
75 fn as_any(&self) -> &dyn Any {
76 self
77 }
78
79 fn as_any_mut(&mut self) -> &mut dyn Any {
80 self
81 }
82}
83
84pub(crate) struct NonBlockingSuperConsoleOutput {
90 sender: Sender<Vec<u8>>,
92 errors: Receiver<io::Error>,
94 handle: JoinHandle<()>,
97}
98
99impl NonBlockingSuperConsoleOutput {
100 pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
101 Self::new_for_writer(stream)
102 }
103
104 fn new_for_writer(mut stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
105 let (sender, receiver) = bounded::<Vec<u8>>(1);
106 let (error_sender, errors) = unbounded::<io::Error>();
107
108 let handle = std::thread::Builder::new()
109 .name("superconsole-io".to_owned())
110 .spawn(move || {
111 for frame in receiver.into_iter() {
112 match stream.write_all(&frame).and_then(|()| stream.flush()) {
113 Ok(()) => {}
114 Err(e) => {
115 let _ignored = error_sender.try_send(e);
118 }
119 }
120 }
121 })
122 .context("Error spawning Superconsole I/O thread")?;
123
124 Ok(Self {
125 sender,
126 errors,
127 handle,
128 })
129 }
130}
131
132impl SuperConsoleOutput for NonBlockingSuperConsoleOutput {
133 fn should_render(&mut self) -> bool {
137 !self.errors.is_empty() || !self.sender.is_full()
138 }
139
140 fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
143 if let Ok(err) = self.errors.try_recv() {
144 return Err(anyhow::Error::from(err).context("Superconsole I/O thread errored"));
145 }
146
147 self.sender
148 .send(buffer)
149 .context("Superconsole I/O thread has crashed")?;
150
151 Ok(())
152 }
153
154 fn finalize(self: Box<Self>) -> anyhow::Result<()> {
156 let Self {
157 sender,
158 errors,
159 handle,
160 } = *self;
161 drop(sender);
162
163 let res = match errors.into_iter().next() {
164 Some(err) => Err(anyhow::Error::from(err).context("Superconsole I/O thread errored")),
165 None => Ok(()),
166 };
167
168 match handle.join() {
169 Ok(()) => {}
170 Err(panic) => std::panic::resume_unwind(panic),
171 }
172
173 res
174 }
175
176 fn as_any(&self) -> &dyn Any {
177 self
178 }
179
180 fn as_any_mut(&mut self) -> &mut dyn Any {
181 self
182 }
183}
184
185#[cfg(test)]
186mod test {
187 use crossbeam_channel::Receiver;
188
189 use super::*;
190
191 #[derive(Clone)]
194 struct TestWriter {
195 sender: Sender<()>,
196 }
197
198 impl TestWriter {
199 pub fn new() -> (Self, Receiver<()>) {
200 let (sender, receiver) = bounded(0);
201 (Self { sender }, receiver)
202 }
203 }
204
205 impl Write for TestWriter {
206 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
207 self.sender
208 .send(())
209 .map_err(|_| io::Error::new(io::ErrorKind::Other, "not writable"))?;
210
211 Ok(buf.len())
212 }
213
214 fn flush(&mut self) -> io::Result<()> {
215 Ok(())
216 }
217 }
218
219 fn msg() -> Vec<u8> {
220 vec![1]
222 }
223
224 #[test]
225 fn test_non_blocking_output_errors_on_next_output() -> anyhow::Result<()> {
226 let (writer, drain) = TestWriter::new();
227
228 let mut output = NonBlockingSuperConsoleOutput::new_for_writer(Box::new(writer))?;
229
230 assert!(output.should_render());
232 output.output(msg())?;
233
234 output.output(msg())?;
236
237 assert!(!output.should_render());
239 drop(drain);
240
241 while !output.should_render() {
243 std::thread::yield_now();
244 continue;
245 }
246
247 assert!(output.output(vec![]).is_err());
249 assert!(Box::new(output).finalize().is_err());
250
251 Ok(())
252 }
253}