use crate::error::{ErrorWithContext, Result, ResultExt};
use crate::output::timestamp::Timestamp;
use crate::output::{self, Printer};
use crate::token::{SerialTokenizer, Token};
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use std::thread::{self, ScopedJoinHandle};
pub struct MainLoop<'a> {
options: output::Options,
prefix_length: usize,
timestamp: Arc<Mutex<Timestamp>>,
loops: Vec<StreamLoop<'a>>,
}
impl<'a> MainLoop<'a> {
pub fn new(options: output::Options) -> Self {
Self {
options,
prefix_length: 0,
timestamp: Arc::new(Mutex::new(Timestamp::new())),
loops: vec![],
}
}
pub fn add_stream(
&mut self,
input: &'a mut (dyn Read + Send),
output: &'a mut (dyn Write + Send),
prefix: &str,
) {
self.prefix_length = std::cmp::max(self.prefix_length, prefix.len());
let mut options = self.options.clone();
options.prefix = prefix.to_string();
self.loops.push(StreamLoop::new(
input,
output,
self.timestamp.clone(),
options,
));
}
pub fn run(self) -> Result<()> {
thread::scope(|s| {
let threads = self
.loops
.into_iter()
.map(|mut l| s.spawn(move || l.loop_stream()))
.collect::<Vec<ScopedJoinHandle<Result<()>>>>();
for t in threads {
t.join()
.expect("Thread reading tokens unexpectedly panicked")?;
}
let timestamp_prefix = output::timestamp::create_prefix(
&self.timestamp,
self.options.show_delta,
self.options.microseconds,
);
println!(
"{}{}{}: \u{23f1} End",
timestamp_prefix,
if self.prefix_length > 0 { " " } else { "" },
"-".repeat(self.prefix_length)
);
Ok(())
})
}
}
struct StreamLoop<'a> {
tokenizer: SerialTokenizer<'a>,
printer: Printer<'a>,
}
impl<'a> StreamLoop<'a> {
fn new(
input_stream: &'a mut (dyn Read + Send),
output_stream: &'a mut (dyn Write + Send),
timestamp: Arc<Mutex<Timestamp>>,
output_options: output::Options,
) -> Self {
Self {
tokenizer: SerialTokenizer::new(input_stream),
printer: Printer::new(output_stream, timestamp, output_options),
}
}
fn loop_stream(&mut self) -> Result<()> {
loop {
match self.tokenizer.next() {
Ok(token) => {
self.printer
.print(&token)
.error_context("Error writing to stdout")?;
if token == Token::EndOfFile {
break;
}
}
Err(error) => {
return Err(ErrorWithContext::wrap("Error reading input", error));
}
}
}
Ok(())
}
}