use std::any::Any;
use std::io;
use std::io::Write;
use std::thread::JoinHandle;
use anyhow::Context as _;
use crossbeam_channel::bounded;
use crossbeam_channel::unbounded;
use crossbeam_channel::Receiver;
use crossbeam_channel::Sender;
use crate::Dimensions;
pub trait SuperConsoleOutput: Send + Sync + 'static {
fn should_render(&mut self) -> bool;
fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()>;
fn terminal_size(&self) -> anyhow::Result<Dimensions> {
Ok(crossterm::terminal::size()?.into())
}
fn finalize(self: Box<Self>) -> anyhow::Result<()>;
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
pub struct BlockingSuperConsoleOutput {
stream: Box<dyn Write + Send + 'static + Sync>,
}
impl BlockingSuperConsoleOutput {
pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> Self {
Self { stream }
}
}
impl SuperConsoleOutput for BlockingSuperConsoleOutput {
fn should_render(&mut self) -> bool {
true
}
fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
self.stream.write_all(&buffer)?;
self.stream.flush()?;
Ok(())
}
fn finalize(self: Box<Self>) -> anyhow::Result<()> {
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
pub(crate) struct NonBlockingSuperConsoleOutput {
sender: Sender<Vec<u8>>,
errors: Receiver<io::Error>,
handle: JoinHandle<()>,
}
impl NonBlockingSuperConsoleOutput {
pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
Self::new_for_writer(stream)
}
fn new_for_writer(mut stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
let (sender, receiver) = bounded::<Vec<u8>>(1);
let (error_sender, errors) = unbounded::<io::Error>();
let handle = std::thread::Builder::new()
.name("superconsole-io".to_owned())
.spawn(move || {
for frame in receiver.into_iter() {
match stream.write_all(&frame).and_then(|()| stream.flush()) {
Ok(()) => {}
Err(e) => {
let _ignored = error_sender.try_send(e);
}
}
}
})
.context("Error spawning Superconsole I/O thread")?;
Ok(Self {
sender,
errors,
handle,
})
}
}
impl SuperConsoleOutput for NonBlockingSuperConsoleOutput {
fn should_render(&mut self) -> bool {
!self.errors.is_empty() || !self.sender.is_full()
}
fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
if let Ok(err) = self.errors.try_recv() {
return Err(anyhow::Error::from(err).context("Superconsole I/O thread errored"));
}
self.sender
.send(buffer)
.context("Superconsole I/O thread has crashed")?;
Ok(())
}
fn finalize(self: Box<Self>) -> anyhow::Result<()> {
let Self {
sender,
errors,
handle,
} = *self;
drop(sender);
let res = match errors.into_iter().next() {
Some(err) => Err(anyhow::Error::from(err).context("Superconsole I/O thread errored")),
None => Ok(()),
};
match handle.join() {
Ok(()) => {}
Err(panic) => std::panic::resume_unwind(panic),
}
res
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[cfg(test)]
mod test {
use crossbeam_channel::Receiver;
use super::*;
#[derive(Clone)]
struct TestWriter {
sender: Sender<()>,
}
impl TestWriter {
pub fn new() -> (Self, Receiver<()>) {
let (sender, receiver) = bounded(0);
(Self { sender }, receiver)
}
}
impl Write for TestWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.sender
.send(())
.map_err(|_| io::Error::new(io::ErrorKind::Other, "not writable"))?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn msg() -> Vec<u8> {
vec![1]
}
#[test]
fn test_non_blocking_output_errors_on_next_output() -> anyhow::Result<()> {
let (writer, drain) = TestWriter::new();
let mut output = NonBlockingSuperConsoleOutput::new_for_writer(Box::new(writer))?;
assert!(output.should_render());
output.output(msg())?;
output.output(msg())?;
assert!(!output.should_render());
drop(drain);
while !output.should_render() {
std::thread::yield_now();
continue;
}
assert!(output.output(vec![]).is_err());
assert!(Box::new(output).finalize().is_err());
Ok(())
}
}