vantage-log-writer 0.5.0

Vantage extension for append-only log files (JSONL)
Documentation
use std::collections::HashMap;
use std::path::PathBuf;

use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::warn;

pub(crate) enum WriteOp {
    Append { path: PathBuf, line: String },
}

pub(crate) const WRITE_QUEUE_CAPACITY: usize = 256;

pub(crate) fn spawn(mut rx: mpsc::Receiver<WriteOp>) -> JoinHandle<()> {
    tokio::spawn(async move {
        let mut handles: HashMap<PathBuf, BufWriter<File>> = HashMap::new();
        while let Some(op) = rx.recv().await {
            match op {
                WriteOp::Append { path, line } => {
                    let writer = match get_or_open(&mut handles, &path).await {
                        Ok(w) => w,
                        Err(e) => {
                            warn!(target: "vantage_log_writer", path = %path.display(), error = %e, "open failed");
                            continue;
                        }
                    };
                    if let Err(e) = writer.write_all(line.as_bytes()).await {
                        warn!(target: "vantage_log_writer", path = %path.display(), error = %e, "write failed");
                        handles.remove(&path);
                        continue;
                    }
                    if let Err(e) = writer.flush().await {
                        warn!(target: "vantage_log_writer", path = %path.display(), error = %e, "flush failed");
                        handles.remove(&path);
                    }
                }
            }
        }
        for (path, mut writer) in handles {
            if let Err(e) = writer.flush().await {
                warn!(target: "vantage_log_writer", path = %path.display(), error = %e, "final flush failed");
            }
        }
    })
}

async fn get_or_open<'a>(
    handles: &'a mut HashMap<PathBuf, BufWriter<File>>,
    path: &PathBuf,
) -> std::io::Result<&'a mut BufWriter<File>> {
    if !handles.contains_key(path) {
        if let Some(parent) = path.parent() {
            tokio::fs::create_dir_all(parent).await.ok();
        }
        let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)
            .await?;
        handles.insert(path.clone(), BufWriter::new(file));
    }
    Ok(handles.get_mut(path).unwrap())
}