athena-observability 3.18.0

Portable Athena observability primitives, metrics state, and logging sinks
Documentation
//! Optional Linux file sink for gateway request/operation logs and `api_key_auth_log` (NDJSON).

use std::fs::OpenOptions;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use serde::Serialize;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;

pub const GATEWAY_REQUEST_LOG_FILENAME: &str = "gateway_request.log";
pub const GATEWAY_OPERATION_LOG_FILENAME: &str = "gateway_operation.log";
pub const API_KEY_AUTH_LOG_FILENAME: &str = "api_key_auth.log";

const CHANNEL_CAP: usize = 50_000;
const BATCH_FLUSH_INTERVAL: Duration = Duration::from_millis(100);
const BATCH_MAX_LINES: usize = 256;

#[derive(Clone, Copy, Debug)]
enum LineKind {
    Request,
    Operation,
    ApiKey,
}

#[derive(Clone, Debug)]
struct BufferedLine {
    kind: LineKind,
    payload: Arc<[u8]>,
}

#[derive(Clone)]
pub struct LinuxGatewayFileLog {
    tx: mpsc::Sender<BufferedLine>,
}

impl LinuxGatewayFileLog {
    /// Spawns the async file sink against `log_dir`.
    pub fn spawn(log_dir: PathBuf) -> Self {
        let (tx, rx) = mpsc::channel(CHANNEL_CAP);
        tokio::spawn(run_linux_file_log_worker(rx, log_dir));
        Self { tx }
    }

    /// Builds the sink only when Linux file logging is enabled and usable.
    pub fn try_init<P>(enabled: bool, log_dir: P) -> Option<Arc<Self>>
    where
        P: Into<PathBuf>,
    {
        if !cfg!(target_os = "linux") || !enabled {
            return None;
        }

        let dir: PathBuf = log_dir.into();
        match probe_log_dir(&dir) {
            Ok(()) => Some(Arc::new(Self::spawn(dir))),
            Err(err) => {
                tracing::warn!(
                    target: "athena_rs::linux_gateway_file_log",
                    path = %dir.display(),
                    error = %err,
                    "gateway Linux file logging disabled: directory not usable"
                );
                None
            }
        }
    }

    fn try_send_line(&self, kind: LineKind, payload: Arc<[u8]>) -> Result<(), BufferedLine> {
        self.tx
            .try_send(BufferedLine { kind, payload })
            .map_err(|err| match err {
                TrySendError::Full(line) | TrySendError::Closed(line) => line,
            })
    }

    /// Enqueues one gateway request row.
    pub fn try_enqueue_request<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
        match serde_json::to_vec(entry) {
            Ok(mut vec) => {
                vec.push(b'\n');
                match self.try_send_line(LineKind::Request, vec.into()) {
                    Ok(()) => Ok(()),
                    Err(line) => {
                        tracing::warn!(
                            target: "athena_rs::linux_gateway_file_log",
                            "gateway request log file channel full; dropping line"
                        );
                        drop(line);
                        Err(())
                    }
                }
            }
            Err(err) => {
                tracing::warn!(
                    target: "athena_rs::linux_gateway_file_log",
                    error = %err,
                    "failed to serialize gateway request log line"
                );
                Err(())
            }
        }
    }

    /// Enqueues one gateway operation row.
    pub fn try_enqueue_operation<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
        match serde_json::to_vec(entry) {
            Ok(mut vec) => {
                vec.push(b'\n');
                match self.try_send_line(LineKind::Operation, vec.into()) {
                    Ok(()) => Ok(()),
                    Err(line) => {
                        tracing::warn!(
                            target: "athena_rs::linux_gateway_file_log",
                            "gateway operation log file channel full; dropping line"
                        );
                        drop(line);
                        Err(())
                    }
                }
            }
            Err(err) => {
                tracing::warn!(
                    target: "athena_rs::linux_gateway_file_log",
                    error = %err,
                    "failed to serialize gateway operation log line"
                );
                Err(())
            }
        }
    }

    /// Enqueues one api-key auth row.
    pub fn try_enqueue_api_key_auth<T: Serialize>(&self, entry: &T) -> Result<(), ()> {
        match serde_json::to_vec(entry) {
            Ok(mut vec) => {
                vec.push(b'\n');
                match self.try_send_line(LineKind::ApiKey, vec.into()) {
                    Ok(()) => Ok(()),
                    Err(line) => {
                        tracing::warn!(
                            target: "athena_rs::linux_gateway_file_log",
                            "api_key_auth log file channel full; dropping line"
                        );
                        drop(line);
                        Err(())
                    }
                }
            }
            Err(err) => {
                tracing::warn!(
                    target: "athena_rs::linux_gateway_file_log",
                    error = %err,
                    "failed to serialize api_key_auth log line"
                );
                Err(())
            }
        }
    }
}

fn probe_log_dir(dir: &Path) -> std::io::Result<()> {
    std::fs::create_dir_all(dir)?;
    let probe_path = dir.join(".athena_write_probe");
    {
        let mut probe = OpenOptions::new()
            .create(true)
            .append(true)
            .open(&probe_path)?;
        writeln!(probe)?;
    }
    let _ = std::fs::remove_file(probe_path);
    Ok(())
}

async fn run_linux_file_log_worker(mut rx: mpsc::Receiver<BufferedLine>, log_dir: PathBuf) {
    let request_path = log_dir.join(GATEWAY_REQUEST_LOG_FILENAME);
    let operation_path = log_dir.join(GATEWAY_OPERATION_LOG_FILENAME);
    let api_key_path = log_dir.join(API_KEY_AUTH_LOG_FILENAME);

    let mut tick = tokio::time::interval(BATCH_FLUSH_INTERVAL);
    tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
    let mut batch: Vec<BufferedLine> = Vec::with_capacity(BATCH_MAX_LINES);

    loop {
        tokio::select! {
            biased;
            msg = rx.recv() => {
                match msg {
                    Some(line) => {
                        batch.push(line);
                        if batch.len() >= BATCH_MAX_LINES {
                            flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
                        }
                    }
                    None => {
                        flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
                        return;
                    }
                }
            }
            _ = tick.tick() => {
                flush_batch_to_disk(&request_path, &operation_path, &api_key_path, &mut batch).await;
            }
        }
    }
}

async fn flush_batch_to_disk(
    request_path: &Path,
    operation_path: &Path,
    api_key_path: &Path,
    batch: &mut Vec<BufferedLine>,
) {
    if batch.is_empty() {
        return;
    }

    let drained: Vec<BufferedLine> = std::mem::take(batch);
    let req_path = request_path.to_path_buf();
    let op_path = operation_path.to_path_buf();
    let key_path = api_key_path.to_path_buf();

    let result = tokio::task::spawn_blocking(move || {
        let mut request_buf: Vec<u8> = Vec::new();
        let mut operation_buf: Vec<u8> = Vec::new();
        let mut api_key_buf: Vec<u8> = Vec::new();

        for line in drained {
            match line.kind {
                LineKind::Request => request_buf.extend_from_slice(&line.payload),
                LineKind::Operation => operation_buf.extend_from_slice(&line.payload),
                LineKind::ApiKey => api_key_buf.extend_from_slice(&line.payload),
            }
        }

        if !request_buf.is_empty() {
            append_file(&req_path, &request_buf)?;
        }
        if !operation_buf.is_empty() {
            append_file(&op_path, &operation_buf)?;
        }
        if !api_key_buf.is_empty() {
            append_file(&key_path, &api_key_buf)?;
        }
        Ok::<(), std::io::Error>(())
    })
    .await;

    match result {
        Ok(Ok(())) => {}
        Ok(Err(err)) => {
            tracing::warn!(
                target: "athena_rs::linux_gateway_file_log",
                error = %err,
                "failed to append gateway Linux file logs"
            );
        }
        Err(err) => {
            tracing::warn!(
                target: "athena_rs::linux_gateway_file_log",
                error = %err,
                "Linux file log flush task failed"
            );
        }
    }
}

fn append_file(path: &Path, bytes: &[u8]) -> std::io::Result<()> {
    let mut file = OpenOptions::new().create(true).append(true).open(path)?;
    file.write_all(bytes)?;
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::fs;

    fn scratch_dir(name: &str) -> PathBuf {
        std::env::temp_dir().join(format!(
            "athena_linux_flog_{}_{}_{}",
            name,
            std::process::id(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .map(|d| d.as_nanos())
                .unwrap_or(0)
        ))
    }

    #[test]
    #[cfg(unix)]
    fn probe_rejects_non_writable_parent() {
        let tmp = scratch_dir("probe");
        let nested = tmp.join("nested");
        fs::create_dir_all(&nested).expect("mkdir");
        let mut perms = fs::metadata(&nested).expect("meta").permissions();
        perms.set_readonly(true);
        fs::set_permissions(&nested, perms).expect("chmod");

        let dead = nested.join("impossible");
        assert!(probe_log_dir(&dead).is_err());
        let _ = fs::remove_dir_all(&tmp);
    }

    #[tokio::test]
    async fn round_trip_ndjson_line() {
        let tmp = scratch_dir("rt");
        fs::create_dir_all(&tmp).expect("mkdir");
        probe_log_dir(&tmp).expect("probe");

        let sink = LinuxGatewayFileLog::spawn(tmp.clone());
        #[derive(Serialize)]
        struct Row {
            k: i32,
        }
        sink.try_enqueue_request(&Row { k: 1 }).expect("enqueue");

        tokio::time::sleep(Duration::from_millis(300)).await;

        let path = tmp.join(GATEWAY_REQUEST_LOG_FILENAME);
        let text = fs::read_to_string(&path).expect("read");
        assert!(text.contains("\"k\":1"));
        let _ = fs::remove_dir_all(&tmp);
    }
}