holger-server-lib 0.6.7

Holger server library: config, wiring, gRPC service, Rust API
//! Append-only audit log.
//!
//! Records **who** did **what** to **which artifact** from **where**. The
//! default backend ([`ArrowIpcAuditLog`]) persists events as append-only Arrow
//! IPC stream segments on disk — columnar, never rewritten, and directly
//! loadable by any Arrow/Parquet tool for offline review.
//!
//! The [`AuditLog`] trait is the swap point: the backend can be replaced with a
//! Parquet writer, a database sink, an in-memory buffer ([`MemoryAuditLog`],
//! used in tests), or disabled entirely ([`NoopAuditLog`]) without touching the
//! request path. The server holds an `Arc<dyn AuditLog>` and the default wiring
//! installs `ArrowIpcAuditLog`.

mod arrow_ipc;
pub use arrow_ipc::ArrowIpcAuditLog;

use std::fmt;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};

/// The operation an audit record describes.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditAction {
    /// Artifact downloaded / fetched.
    Download,
    /// Artifact uploaded / stored.
    Upload,
    /// Artifact deleted.
    Delete,
    /// Repository / artifact listing.
    List,
    /// DEV → PROD promotion.
    Promote,
    /// Anything not covered above.
    Other,
}

impl AuditAction {
    /// Stable lowercase wire string (what gets written to the log column).
    pub fn as_str(&self) -> &'static str {
        match self {
            AuditAction::Download => "download",
            AuditAction::Upload => "upload",
            AuditAction::Delete => "delete",
            AuditAction::List => "list",
            AuditAction::Promote => "promote",
            AuditAction::Other => "other",
        }
    }

    /// Inverse of [`AuditAction::as_str`]; unknown strings map to `Other` so a
    /// reader never fails on a value written by a newer server.
    pub fn from_wire(s: &str) -> Self {
        match s {
            "download" => AuditAction::Download,
            "upload" => AuditAction::Upload,
            "delete" => AuditAction::Delete,
            "list" => AuditAction::List,
            "promote" => AuditAction::Promote,
            _ => AuditAction::Other,
        }
    }
}

/// One immutable audit record.
///
/// The fields are the security-relevant 5 W's: time, identity, action, the
/// artifact + repo it touched, and the client source address — plus the result
/// status and transferred byte count.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuditEvent {
    /// Event time, nanoseconds since the Unix epoch (UTC).
    pub ts_nanos: i64,
    /// Authenticated principal — the OIDC `sub` claim or the mTLS client-cert
    /// CN. `"anonymous"` for unauthenticated (open) reads.
    pub ident: String,
    /// The operation performed.
    pub action: AuditAction,
    /// Repository name the request targeted.
    pub repo: String,
    /// Artifact identifier (`namespace/name@version` or an archive path);
    /// empty when the request had no specific artifact (e.g. a bare listing).
    pub artifact: String,
    /// Client source address (`ip:port`) as seen by the server; empty if the
    /// transport did not expose a peer address.
    pub source_ip: String,
    /// Result status — an HTTP-like code (200 ok, 403 denied, 404 missing,
    /// 500 error).
    pub status: u16,
    /// Bytes transferred (0 when not applicable).
    pub bytes: u64,
    /// Free-form detail — an error message, content type, etc.
    pub detail: String,
}

impl AuditEvent {
    /// Build an event stamped with the current wall-clock time and no detail.
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        ident: impl Into<String>,
        action: AuditAction,
        repo: impl Into<String>,
        artifact: impl Into<String>,
        source_ip: impl Into<String>,
        status: u16,
        bytes: u64,
    ) -> Self {
        Self {
            ts_nanos: now_nanos(),
            ident: ident.into(),
            action,
            repo: repo.into(),
            artifact: artifact.into(),
            source_ip: source_ip.into(),
            status,
            bytes,
            detail: String::new(),
        }
    }

    /// Attach a detail string (builder style).
    pub fn with_detail(mut self, detail: impl Into<String>) -> Self {
        self.detail = detail.into();
        self
    }
}

/// Current wall-clock time in nanoseconds since the Unix epoch (0 if the clock
/// is before the epoch, which cannot happen on a sane host).
pub fn now_nanos() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_nanos() as i64)
        .unwrap_or(0)
}

/// Pluggable audit sink. Implementations MUST be append-only and thread-safe;
/// the server shares a single `Arc<dyn AuditLog>` across all request handlers,
/// so `record` takes `&self` and is expected to be internally synchronised.
pub trait AuditLog: Send + Sync {
    /// Append one event. Should be durable by the time it returns (the default
    /// backend flushes per call).
    fn record(&self, event: AuditEvent) -> Result<(), AuditError>;

    /// Flush any buffered state to durable storage. Default: no-op.
    fn flush(&self) -> Result<(), AuditError> {
        Ok(())
    }
}

/// Construct the default audit backend ([`ArrowIpcAuditLog`]) writing append-only
/// Arrow IPC segments under `dir`, as a ready-to-share trait object.
pub fn default_audit_log(dir: impl AsRef<Path>) -> Result<Arc<dyn AuditLog>, AuditError> {
    Ok(Arc::new(ArrowIpcAuditLog::new(dir)?))
}

/// A disabled audit log — records nothing. The default the server falls back to
/// when no audit directory is configured.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopAuditLog;

impl AuditLog for NoopAuditLog {
    fn record(&self, _event: AuditEvent) -> Result<(), AuditError> {
        Ok(())
    }
}

/// In-memory audit log — keeps every event in a `Vec`. For tests and for
/// callers that want to inspect/route events themselves.
#[derive(Debug, Default)]
pub struct MemoryAuditLog {
    events: Mutex<Vec<AuditEvent>>,
}

impl MemoryAuditLog {
    /// Empty log.
    pub fn new() -> Self {
        Self::default()
    }

    /// Snapshot of every recorded event, in insertion order.
    pub fn events(&self) -> Vec<AuditEvent> {
        self.events.lock().expect("audit mutex poisoned").clone()
    }
}

impl AuditLog for MemoryAuditLog {
    fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
        self.events.lock().expect("audit mutex poisoned").push(event);
        Ok(())
    }
}

/// Errors from an audit backend.
#[derive(Debug)]
pub enum AuditError {
    /// Filesystem / IO failure.
    Io(std::io::Error),
    /// Arrow encoding/decoding failure.
    Arrow(arrow_schema::ArrowError),
    /// Anything else (corrupt segment, lock poisoned, …).
    Other(String),
}

impl fmt::Display for AuditError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            AuditError::Io(e) => write!(f, "audit io error: {e}"),
            AuditError::Arrow(e) => write!(f, "audit arrow error: {e}"),
            AuditError::Other(s) => write!(f, "audit error: {s}"),
        }
    }
}

impl std::error::Error for AuditError {}

impl From<std::io::Error> for AuditError {
    fn from(e: std::io::Error) -> Self {
        AuditError::Io(e)
    }
}

impl From<arrow_schema::ArrowError> for AuditError {
    fn from(e: arrow_schema::ArrowError) -> Self {
        AuditError::Arrow(e)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn action_wire_roundtrips() {
        for a in [
            AuditAction::Download,
            AuditAction::Upload,
            AuditAction::Delete,
            AuditAction::List,
            AuditAction::Promote,
            AuditAction::Other,
        ] {
            assert_eq!(AuditAction::from_wire(a.as_str()), a);
        }
        // Unknown wire string degrades to Other, never panics.
        assert_eq!(AuditAction::from_wire("teleport"), AuditAction::Other);
    }

    #[test]
    fn memory_log_collects_in_order() {
        let log = MemoryAuditLog::new();
        log.record(AuditEvent::new("alice", AuditAction::Upload, "crates", "serde@1", "10.0.0.1:5", 200, 42))
            .unwrap();
        log.record(AuditEvent::new("anonymous", AuditAction::Download, "crates", "serde@1", "10.0.0.2:6", 200, 42))
            .unwrap();
        let evs = log.events();
        assert_eq!(evs.len(), 2);
        assert_eq!(evs[0].ident, "alice");
        assert_eq!(evs[0].action, AuditAction::Upload);
        assert_eq!(evs[1].ident, "anonymous");
    }

    #[test]
    fn noop_log_is_inert() {
        let log = NoopAuditLog;
        log.record(AuditEvent::new("x", AuditAction::List, "r", "", "", 200, 0)).unwrap();
        log.flush().unwrap();
    }
}