tailspin 6.1.0

A log file highlighter
Documentation
#![forbid(unsafe_code)]

mod cli;
mod config;
mod highlighter_builder;
mod io;
mod theme;

use cli::{FullConfig, get_config};
use io::initial_read::{InitialReadCompleteSender, initial_read_complete_channel};
use io::presenter::pager::Pager;
use io::presenter::{Present, Presenter};
use io::reader::{AsyncLineReader, Reader, StreamEvent};
use io::setup::initialize_io;
use io::writer::stdout::BrokenPipe;
use io::writer::{AsyncLineWriter, Writer};
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelRefIterator;
use tailspin::Highlighter;
use tokio::task::JoinHandle;

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
    let FullConfig {
        source,
        target,
        highlighter,
    } = get_config()?;
    let (io, _temp_dir) = initialize_io(source, target).await?;

    let (initial_read_tx, initial_read_rx) = initial_read_complete_channel();

    let process_stream_task = tokio::spawn(process_stream(io.reader, io.writer, highlighter, initial_read_tx));

    if initial_read_rx.receive().await.is_err() {
        return process_stream_task.await?;
    }

    match io.presenter {
        Presenter::StdOut(_) => BrokenPipe::suppress(process_stream_task.await?),
        Presenter::Pager(pager) => race_pager_against_stream(pager, process_stream_task).await,
    }
}

async fn race_pager_against_stream(
    pager: Pager,
    mut process_stream_task: JoinHandle<anyhow::Result<()>>,
) -> anyhow::Result<()> {
    let mut pager_task = tokio::spawn(async move { pager.present().await });

    tokio::select! {
        pager_result = &mut pager_task => {
            abort_and_drain(&mut process_stream_task).await;
            pager_result?
        }
        stream_result = &mut process_stream_task => match stream_result? {
            Ok(()) => pager_task.await?,
            Err(e) => {
                abort_and_drain(&mut pager_task).await;
                Err(e)
            }
        }
    }
}

async fn abort_and_drain<T>(handle: &mut JoinHandle<T>) {
    handle.abort();
    let _drain = handle.await;
}

async fn process_stream(
    mut reader: Reader,
    mut writer: Writer,
    highlighter: Highlighter,
    mut initial_read_complete: InitialReadCompleteSender,
) -> anyhow::Result<()> {
    loop {
        match reader.next().await? {
            StreamEvent::Started => initial_read_complete.send()?,
            StreamEvent::Ended => return Ok(()),
            StreamEvent::Line(line) => write_line(&mut writer, &highlighter, line.as_str()).await?,
            StreamEvent::Lines(lines) => write_lines(&mut writer, &highlighter, lines).await?,
        }
    }
}

async fn write_line(writer: &mut Writer, highlighter: &Highlighter, line: &str) -> anyhow::Result<()> {
    let highlighted = highlighter.apply(line);
    writer.write(&highlighted).await
}

async fn write_lines(writer: &mut Writer, highlighter: &Highlighter, lines: Vec<String>) -> anyhow::Result<()> {
    let highlighted = lines
        .par_iter()
        .map(|line| highlighter.apply(line.as_str()))
        .collect::<Vec<_>>()
        .join("\n");

    writer.write(&highlighted).await
}