use crate::buffer::*;
use crate::console_error::ConsoleError;
use parking_lot::Mutex;
use std::fmt;
use std::io::{self, IsTerminal};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, mpsc};
use std::thread::{JoinHandle, spawn};
use tracing::trace;
#[derive(Clone, Copy, Debug, Default)]
pub enum ConsoleStreamType {
Stderr,
#[default]
Stdout,
}
pub struct ConsoleStream {
buffer: Arc<Mutex<Vec<u8>>>,
channel: Option<mpsc::Sender<bool>>,
stream: ConsoleStreamType,
pub(crate) closed: Arc<AtomicBool>,
pub(crate) handle: Option<JoinHandle<()>>,
pub(crate) quiet: Option<Arc<AtomicBool>>,
pub(crate) test_mode: bool,
}
impl ConsoleStream {
fn internal_new(stream: ConsoleStreamType, with_handle: bool) -> Self {
let buffer = Arc::new(Mutex::new(Vec::new()));
let buffer_clone = Arc::clone(&buffer);
let (tx, rx) = mpsc::channel();
let handle = if with_handle {
Some(spawn(move || flush_on_loop(buffer_clone, stream, rx)))
} else {
None
};
Self {
buffer,
channel: Some(tx),
closed: Arc::new(AtomicBool::new(false)),
handle,
stream,
quiet: None,
test_mode: false,
}
}
pub fn new(stream: ConsoleStreamType) -> Self {
Self::internal_new(stream, true)
}
pub fn new_testing(stream: ConsoleStreamType) -> Self {
let mut console = Self::internal_new(stream, false);
console.test_mode = true;
console
}
pub fn empty(stream: ConsoleStreamType) -> Self {
Self {
buffer: Arc::new(Mutex::new(Vec::new())),
channel: None,
closed: Arc::new(AtomicBool::new(false)),
stream,
handle: None,
quiet: None,
test_mode: false,
}
}
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
pub fn is_quiet(&self) -> bool {
self.quiet
.as_ref()
.is_some_and(|quiet| quiet.load(Ordering::Relaxed))
}
pub fn is_terminal(&self) -> bool {
match self.stream {
ConsoleStreamType::Stderr => io::stderr().is_terminal(),
ConsoleStreamType::Stdout => io::stdout().is_terminal(),
}
}
pub fn buffer(&self) -> ConsoleBuffer {
ConsoleBuffer::new(self.buffer.clone(), self.stream)
}
pub fn close(&self) -> Result<(), ConsoleError> {
trace!(
"Closing {} stream",
match self.stream {
ConsoleStreamType::Stderr => "stderr",
ConsoleStreamType::Stdout => "stdout",
}
);
self.closed.store(true, Ordering::Release);
self.flush()?;
if let Some(channel) = &self.channel {
let _ = channel.send(true);
}
Ok(())
}
pub fn flush(&self) -> Result<(), ConsoleError> {
flush(&mut self.buffer.lock(), self.stream).map_err(|error| ConsoleError::FlushFailed {
error: Box::new(error),
})?;
Ok(())
}
pub fn write_raw<F: FnMut(&mut Vec<u8>) -> io::Result<()>>(
&self,
mut op: F,
) -> Result<(), ConsoleError> {
let handle_error = |error: io::Error| ConsoleError::WriteFailed {
error: Box::new(error),
};
if self.test_mode || self.is_closed() {
let mut buffer = Vec::new();
op(&mut buffer).map_err(handle_error)?;
flush(&mut buffer, self.stream).map_err(handle_error)?;
}
else {
let mut buffer = self.buffer.lock();
op(&mut buffer).map_err(handle_error)?;
if buffer.len() >= 1024 {
flush(&mut buffer, self.stream).map_err(handle_error)?;
}
}
Ok(())
}
pub fn write<T: AsRef<[u8]>>(&self, data: T) -> Result<(), ConsoleError> {
let data = data.as_ref();
if data.is_empty() {
return Ok(());
}
self.write_raw(|buffer| {
buffer.extend_from_slice(data);
Ok(())
})
}
pub fn write_line<T: AsRef<[u8]>>(&self, data: T) -> Result<(), ConsoleError> {
let data = data.as_ref();
self.write_raw(|buffer| {
if !data.is_empty() {
buffer.extend_from_slice(data);
}
buffer.push(b'\n');
Ok(())
})
}
pub fn write_line_with_prefix<T: AsRef<str>>(
&self,
data: T,
prefix: &str,
) -> Result<(), ConsoleError> {
let data = data.as_ref();
let lines = data
.lines()
.map(|line| format!("{prefix}{line}"))
.collect::<Vec<_>>()
.join("\n");
self.write_line(lines)
}
pub fn write_newline(&self) -> Result<(), ConsoleError> {
self.write_line("")
}
}
impl Clone for ConsoleStream {
fn clone(&self) -> Self {
Self {
buffer: Arc::clone(&self.buffer),
closed: Arc::clone(&self.closed),
stream: self.stream,
quiet: self.quiet.clone(),
test_mode: self.test_mode,
channel: None,
handle: None,
}
}
}
impl fmt::Debug for ConsoleStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConsoleStream")
.field("buffer", &self.buffer)
.field("closed", &self.closed)
.field("stream", &self.stream)
.field("quiet", &self.quiet)
.field("test_mode", &self.test_mode)
.finish()
}
}