logup 0.1.0

Logup is a UNIX-style command that can be used to pipe stdout logs to location on disk or in the cloud without the need of an agent, logrotate, systemd or other configuration files
Documentation
mod reader;
mod writer;
mod writer_aws;
mod writer_lines;
mod writer_multi;
mod writer_newrelic;
mod writer_queue;

use crate::reader::AsyncLogReader;
use crate::writer::AsyncLogWriter;
use crate::writer_aws::{AWSArgs, AWSLogsWriter};
use crate::writer_lines::LinesWriter;
use crate::writer_multi::MultiWriter;
use crate::writer_newrelic::{NewRelicArgs, NewRelicWriter};
use crate::writer_queue::QueueWriter;
use clap::Parser;
use std::path::PathBuf;
use std::time::SystemTime;
use tokio::fs::File;
use tokio::task::JoinHandle;

#[derive(Parser)]
#[command(
    version,
    about = "Find examples on https://github.com/lucabrunox/outlog"
)]
pub struct OutlogArgs {
    #[command(flatten)]
    aws: AWSArgs,

    #[command(flatten)]
    newrelic: NewRelicArgs,

    #[arg(
        long,
        default_value_t = 1000000,
        help = "Force flush without newline beyond the given size"
    )]
    max_line_size: usize,

    #[arg(
        long,
        requires = "aws",
        help = "Max logs to keep in memory before dropping the incoming ones",
        default_value = "1000"
    )]
    max_memory_items: usize,

    #[arg(
        long,
        requires = "aws",
        help = "Max retries before dropping a log",
        default_value = "100"
    )]
    max_retries: u32,

    #[arg(help = "Read logs from a file instead of stdin")]
    input_file: Option<PathBuf>,
}

pub async fn run(args: OutlogArgs) {
    let mut handles: Vec<JoinHandle<()>> = vec![];

    {
        let mut reader = match args.input_file {
            Some(path) => Box::new(File::open(path).await.unwrap()),
            None => Box::new(tokio::io::stdin()) as Box<dyn AsyncLogReader + Send>,
        };

        let mut writers: Vec<Box<dyn AsyncLogWriter + Send>> = vec![];
        if let Some((writer, handle)) = AWSLogsWriter::new(&args.aws, args.max_retries)
            .await
            .map(|w| QueueWriter::new(w, args.max_memory_items))
        {
            writers.push(Box::new(writer));
            handles.push(handle);
        }

        if let Some((writer, handle)) =
            NewRelicWriter::new(&args.newrelic).map(|w| QueueWriter::new(w, args.max_memory_items))
        {
            writers.push(Box::new(writer));
            handles.push(handle);
        }

        let mut writer = MultiWriter::new(vec![
            Box::new(tokio::io::stdout()) as Box<dyn AsyncLogWriter + Send>,
            Box::new(LinesWriter::new(
                MultiWriter::new(writers),
                args.max_line_size,
            )),
        ]);
        read_and_write_loop(&mut reader, &mut writer).await;
    }

    // ensure everything went out of scope at this point, so that tasks can exit
    for h in handles {
        // TODO: use a timeout
        let _ = h.await;
    }
}

pub async fn read_and_write_loop(
    reader: &mut impl AsyncLogReader,
    writer: &mut impl AsyncLogWriter,
) {
    let mut buf: [u8; 1024] = [0; 1024];
    let mut time: SystemTime = SystemTime::now();

    loop {
        let size = reader.read_logs(&mut buf, &mut time).await.unwrap();
        if size == 0 {
            break;
        }
        writer.write_logs(time, &buf[..size]).await.unwrap();
    }
}

fn report_err<E>(err: E)
where
    E: std::error::Error,
    E: Send + Sync,
{
    eprintln!("[ERROR] {}", err);
    if let Some(cause) = err.source() {
        eprintln!("Caused by:");
        for (i, e) in std::iter::successors(Some(cause), |e| e.source()).enumerate() {
            eprintln!("   {}: {}", i, e);
        }
    }
}