holger-server-lib 0.6.7

Holger server library: config, wiring, gRPC service, Rust API
//! Default audit backend: append-only Arrow IPC stream segments.
//!
//! Each [`ArrowIpcAuditLog`] instance opens **one fresh segment file** under the
//! configured directory and only ever *appends* record batches to it — the file
//! is never rewritten or truncated, so the log is append-only by construction.
//! A process restart opens a new segment; nothing existing is touched. Reading
//! back ([`ArrowIpcAuditLog::read_dir`]) replays every segment in the directory.
//!
//! One event = one single-row batch, flushed to the OS on each `record`, so a
//! crash loses at most the in-flight call. Audit volume is low relative to
//! artifact serving, so the per-row batch overhead is irrelevant here.

use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use arrow_array::cast::AsArray;
use arrow_array::types::TimestampNanosecondType;
use arrow_array::{
    ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray, UInt16Array, UInt64Array,
};
use arrow_ipc::reader::StreamReader;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};

use super::{AuditAction, AuditError, AuditEvent, AuditLog};

/// Disambiguates segment file names when several logs are created in the same
/// process within the same nanosecond (e.g. in tests).
static SEGMENT_SEQ: AtomicU64 = AtomicU64::new(0);

/// The columnar schema for audit records. Field order is the canonical column
/// order used by both the writer and [`ArrowIpcAuditLog::read_dir`].
pub fn audit_schema() -> SchemaRef {
    Arc::new(Schema::new(vec![
        Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
        Field::new("ident", DataType::Utf8, false),
        Field::new("action", DataType::Utf8, false),
        Field::new("repo", DataType::Utf8, false),
        Field::new("artifact", DataType::Utf8, false),
        Field::new("source_ip", DataType::Utf8, false),
        Field::new("status", DataType::UInt16, false),
        Field::new("bytes", DataType::UInt64, false),
        Field::new("detail", DataType::Utf8, false),
    ]))
}

/// Append-only Arrow IPC audit log. Cloneable handles are not provided directly;
/// share via `Arc<dyn AuditLog>` (see [`super::default_audit_log`]).
pub struct ArrowIpcAuditLog {
    schema: SchemaRef,
    /// The active segment writer. `Mutex` because the server shares one log
    /// across all async request handlers and `record` takes `&self`.
    writer: Mutex<StreamWriter<BufWriter<File>>>,
    segment: PathBuf,
}

impl ArrowIpcAuditLog {
    /// Open a new append-only segment under `dir` (created if missing).
    pub fn new(dir: impl AsRef<Path>) -> Result<Self, AuditError> {
        let dir = dir.as_ref();
        fs::create_dir_all(dir)?;

        let schema = audit_schema();
        let seq = SEGMENT_SEQ.fetch_add(1, Ordering::Relaxed);
        let segment = dir.join(format!(
            "audit-{}-{}-{}.arrows",
            super::now_nanos(),
            std::process::id(),
            seq
        ));

        // `create_new` would reject an existing name; the nanos+pid+seq key makes
        // a collision effectively impossible, and a plain create keeps the open
        // robust if the clock is coarse.
        let file = File::create(&segment)?;
        let writer = StreamWriter::try_new(BufWriter::new(file), &schema)?;

        Ok(Self {
            schema,
            writer: Mutex::new(writer),
            segment,
        })
    }

    /// Path of the segment this instance is appending to.
    pub fn segment_path(&self) -> &Path {
        &self.segment
    }

    /// Encode a single event as a one-row [`RecordBatch`].
    fn batch(&self, event: &AuditEvent) -> Result<RecordBatch, AuditError> {
        let cols: Vec<ArrayRef> = vec![
            Arc::new(TimestampNanosecondArray::from(vec![event.ts_nanos])),
            Arc::new(StringArray::from(vec![event.ident.as_str()])),
            Arc::new(StringArray::from(vec![event.action.as_str()])),
            Arc::new(StringArray::from(vec![event.repo.as_str()])),
            Arc::new(StringArray::from(vec![event.artifact.as_str()])),
            Arc::new(StringArray::from(vec![event.source_ip.as_str()])),
            Arc::new(UInt16Array::from(vec![event.status])),
            Arc::new(UInt64Array::from(vec![event.bytes])),
            Arc::new(StringArray::from(vec![event.detail.as_str()])),
        ];
        Ok(RecordBatch::try_new(self.schema.clone(), cols)?)
    }

    /// Replay every `audit-*.arrows` segment in `dir`, oldest-named first,
    /// returning all events in file then in-file order. Missing dir = empty.
    pub fn read_dir(dir: impl AsRef<Path>) -> Result<Vec<AuditEvent>, AuditError> {
        let dir = dir.as_ref();
        if !dir.exists() {
            return Ok(Vec::new());
        }

        let mut segments: Vec<PathBuf> = fs::read_dir(dir)?
            .filter_map(|e| e.ok().map(|e| e.path()))
            .filter(|p| {
                p.file_name()
                    .and_then(|n| n.to_str())
                    .map(|n| n.starts_with("audit-") && n.ends_with(".arrows"))
                    .unwrap_or(false)
            })
            .collect();
        // The nanos-prefixed names sort into creation order.
        segments.sort();

        let mut events = Vec::new();
        for seg in segments {
            let file = File::open(&seg)?;
            let reader = StreamReader::try_new(file, None)?;
            for batch in reader {
                let batch = batch?;
                decode_batch(&batch, &mut events)?;
            }
        }
        Ok(events)
    }
}

impl AuditLog for ArrowIpcAuditLog {
    fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
        let batch = self.batch(&event)?;
        let mut writer = self
            .writer
            .lock()
            .map_err(|_| AuditError::Other("audit writer mutex poisoned".into()))?;
        writer.write(&batch)?;
        // Flush the underlying BufWriter → File so the event is durable (to the
        // OS) by the time we return.
        writer.get_mut().flush()?;
        Ok(())
    }

    fn flush(&self) -> Result<(), AuditError> {
        let mut writer = self
            .writer
            .lock()
            .map_err(|_| AuditError::Other("audit writer mutex poisoned".into()))?;
        writer.get_mut().flush()?;
        Ok(())
    }
}

impl Drop for ArrowIpcAuditLog {
    fn drop(&mut self) {
        // Best-effort: write the IPC end-of-stream marker so the segment is a
        // clean, fully-terminated stream. Errors here are unrecoverable at drop.
        if let Ok(mut writer) = self.writer.lock() {
            let _ = writer.finish();
        }
    }
}

/// Decode every row of `batch` into `out`, validating the column types.
fn decode_batch(batch: &RecordBatch, out: &mut Vec<AuditEvent>) -> Result<(), AuditError> {
    let schema_err = || AuditError::Other("audit segment column type mismatch".into());

    let ts = batch
        .column(0)
        .as_primitive_opt::<TimestampNanosecondType>()
        .ok_or_else(schema_err)?;
    let ident = batch.column(1).as_string_opt::<i32>().ok_or_else(schema_err)?;
    let action = batch.column(2).as_string_opt::<i32>().ok_or_else(schema_err)?;
    let repo = batch.column(3).as_string_opt::<i32>().ok_or_else(schema_err)?;
    let artifact = batch.column(4).as_string_opt::<i32>().ok_or_else(schema_err)?;
    let source_ip = batch.column(5).as_string_opt::<i32>().ok_or_else(schema_err)?;
    let status = batch
        .column(6)
        .as_any()
        .downcast_ref::<UInt16Array>()
        .ok_or_else(schema_err)?;
    let bytes = batch
        .column(7)
        .as_any()
        .downcast_ref::<UInt64Array>()
        .ok_or_else(schema_err)?;
    let detail = batch.column(8).as_string_opt::<i32>().ok_or_else(schema_err)?;

    for i in 0..batch.num_rows() {
        out.push(AuditEvent {
            ts_nanos: ts.value(i),
            ident: ident.value(i).to_string(),
            action: AuditAction::from_wire(action.value(i)),
            repo: repo.value(i).to_string(),
            artifact: artifact.value(i).to_string(),
            source_ip: source_ip.value(i).to_string(),
            status: status.value(i),
            bytes: bytes.value(i),
            detail: detail.value(i).to_string(),
        });
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn roundtrips_events_through_a_segment() {
        let dir = tempfile::tempdir().unwrap();
        let want = vec![
            AuditEvent::new("alice", AuditAction::Upload, "crates", "serde@1.0.200", "10.0.0.1:4444", 200, 1234),
            AuditEvent::new("anonymous", AuditAction::Download, "crates", "serde@1.0.200", "10.0.0.2:5555", 200, 1234)
                .with_detail("application/octet-stream"),
            AuditEvent::new("bob", AuditAction::Upload, "crates", "evil@9", "10.0.0.3:6666", 403, 0)
                .with_detail("read-only"),
        ];

        {
            let log = ArrowIpcAuditLog::new(dir.path()).unwrap();
            for e in &want {
                log.record(e.clone()).unwrap();
            }
            // dropped here → stream finished
        }

        let got = ArrowIpcAuditLog::read_dir(dir.path()).unwrap();
        assert_eq!(got, want, "events must roundtrip byte-for-byte");
    }

    #[test]
    fn is_append_only_across_reopen() {
        let dir = tempfile::tempdir().unwrap();

        {
            let log = ArrowIpcAuditLog::new(dir.path()).unwrap();
            log.record(AuditEvent::new("alice", AuditAction::Upload, "r", "a@1", "ip:1", 200, 1)).unwrap();
        }
        // A second instance opens a NEW segment and must not disturb the first.
        {
            let log = ArrowIpcAuditLog::new(dir.path()).unwrap();
            log.record(AuditEvent::new("bob", AuditAction::Download, "r", "a@1", "ip:2", 200, 1)).unwrap();
        }

        let segments = fs::read_dir(dir.path()).unwrap().count();
        assert_eq!(segments, 2, "each run writes its own append-only segment");

        let got = ArrowIpcAuditLog::read_dir(dir.path()).unwrap();
        assert_eq!(got.len(), 2);
        let idents: Vec<_> = got.iter().map(|e| e.ident.as_str()).collect();
        assert!(idents.contains(&"alice") && idents.contains(&"bob"));
    }

    #[test]
    fn read_dir_on_missing_dir_is_empty() {
        let got = ArrowIpcAuditLog::read_dir("/nonexistent/holger/audit/path").unwrap();
        assert!(got.is_empty());
    }
}