use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use arrow_array::cast::AsArray;
use arrow_array::types::TimestampNanosecondType;
use arrow_array::{
ArrayRef, RecordBatch, StringArray, TimestampNanosecondArray, UInt16Array, UInt64Array,
};
use arrow_ipc::reader::StreamReader;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use super::{AuditAction, AuditError, AuditEvent, AuditLog};
static SEGMENT_SEQ: AtomicU64 = AtomicU64::new(0);
pub fn audit_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
Field::new("ident", DataType::Utf8, false),
Field::new("action", DataType::Utf8, false),
Field::new("repo", DataType::Utf8, false),
Field::new("artifact", DataType::Utf8, false),
Field::new("source_ip", DataType::Utf8, false),
Field::new("status", DataType::UInt16, false),
Field::new("bytes", DataType::UInt64, false),
Field::new("detail", DataType::Utf8, false),
]))
}
pub struct ArrowIpcAuditLog {
schema: SchemaRef,
writer: Mutex<StreamWriter<BufWriter<File>>>,
segment: PathBuf,
}
impl ArrowIpcAuditLog {
pub fn new(dir: impl AsRef<Path>) -> Result<Self, AuditError> {
let dir = dir.as_ref();
fs::create_dir_all(dir)?;
let schema = audit_schema();
let seq = SEGMENT_SEQ.fetch_add(1, Ordering::Relaxed);
let segment = dir.join(format!(
"audit-{}-{}-{}.arrows",
super::now_nanos(),
std::process::id(),
seq
));
let file = File::create(&segment)?;
let writer = StreamWriter::try_new(BufWriter::new(file), &schema)?;
Ok(Self {
schema,
writer: Mutex::new(writer),
segment,
})
}
pub fn segment_path(&self) -> &Path {
&self.segment
}
fn batch(&self, event: &AuditEvent) -> Result<RecordBatch, AuditError> {
let cols: Vec<ArrayRef> = vec![
Arc::new(TimestampNanosecondArray::from(vec![event.ts_nanos])),
Arc::new(StringArray::from(vec![event.ident.as_str()])),
Arc::new(StringArray::from(vec![event.action.as_str()])),
Arc::new(StringArray::from(vec![event.repo.as_str()])),
Arc::new(StringArray::from(vec![event.artifact.as_str()])),
Arc::new(StringArray::from(vec![event.source_ip.as_str()])),
Arc::new(UInt16Array::from(vec![event.status])),
Arc::new(UInt64Array::from(vec![event.bytes])),
Arc::new(StringArray::from(vec![event.detail.as_str()])),
];
Ok(RecordBatch::try_new(self.schema.clone(), cols)?)
}
pub fn read_dir(dir: impl AsRef<Path>) -> Result<Vec<AuditEvent>, AuditError> {
let dir = dir.as_ref();
if !dir.exists() {
return Ok(Vec::new());
}
let mut segments: Vec<PathBuf> = fs::read_dir(dir)?
.filter_map(|e| e.ok().map(|e| e.path()))
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("audit-") && n.ends_with(".arrows"))
.unwrap_or(false)
})
.collect();
segments.sort();
let mut events = Vec::new();
for seg in segments {
let file = File::open(&seg)?;
let reader = StreamReader::try_new(file, None)?;
for batch in reader {
let batch = batch?;
decode_batch(&batch, &mut events)?;
}
}
Ok(events)
}
}
impl AuditLog for ArrowIpcAuditLog {
fn record(&self, event: AuditEvent) -> Result<(), AuditError> {
let batch = self.batch(&event)?;
let mut writer = self
.writer
.lock()
.map_err(|_| AuditError::Other("audit writer mutex poisoned".into()))?;
writer.write(&batch)?;
writer.get_mut().flush()?;
Ok(())
}
fn flush(&self) -> Result<(), AuditError> {
let mut writer = self
.writer
.lock()
.map_err(|_| AuditError::Other("audit writer mutex poisoned".into()))?;
writer.get_mut().flush()?;
Ok(())
}
}
impl Drop for ArrowIpcAuditLog {
fn drop(&mut self) {
if let Ok(mut writer) = self.writer.lock() {
let _ = writer.finish();
}
}
}
fn decode_batch(batch: &RecordBatch, out: &mut Vec<AuditEvent>) -> Result<(), AuditError> {
let schema_err = || AuditError::Other("audit segment column type mismatch".into());
let ts = batch
.column(0)
.as_primitive_opt::<TimestampNanosecondType>()
.ok_or_else(schema_err)?;
let ident = batch.column(1).as_string_opt::<i32>().ok_or_else(schema_err)?;
let action = batch.column(2).as_string_opt::<i32>().ok_or_else(schema_err)?;
let repo = batch.column(3).as_string_opt::<i32>().ok_or_else(schema_err)?;
let artifact = batch.column(4).as_string_opt::<i32>().ok_or_else(schema_err)?;
let source_ip = batch.column(5).as_string_opt::<i32>().ok_or_else(schema_err)?;
let status = batch
.column(6)
.as_any()
.downcast_ref::<UInt16Array>()
.ok_or_else(schema_err)?;
let bytes = batch
.column(7)
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(schema_err)?;
let detail = batch.column(8).as_string_opt::<i32>().ok_or_else(schema_err)?;
for i in 0..batch.num_rows() {
out.push(AuditEvent {
ts_nanos: ts.value(i),
ident: ident.value(i).to_string(),
action: AuditAction::from_wire(action.value(i)),
repo: repo.value(i).to_string(),
artifact: artifact.value(i).to_string(),
source_ip: source_ip.value(i).to_string(),
status: status.value(i),
bytes: bytes.value(i),
detail: detail.value(i).to_string(),
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn roundtrips_events_through_a_segment() {
let dir = tempfile::tempdir().unwrap();
let want = vec![
AuditEvent::new("alice", AuditAction::Upload, "crates", "serde@1.0.200", "10.0.0.1:4444", 200, 1234),
AuditEvent::new("anonymous", AuditAction::Download, "crates", "serde@1.0.200", "10.0.0.2:5555", 200, 1234)
.with_detail("application/octet-stream"),
AuditEvent::new("bob", AuditAction::Upload, "crates", "evil@9", "10.0.0.3:6666", 403, 0)
.with_detail("read-only"),
];
{
let log = ArrowIpcAuditLog::new(dir.path()).unwrap();
for e in &want {
log.record(e.clone()).unwrap();
}
}
let got = ArrowIpcAuditLog::read_dir(dir.path()).unwrap();
assert_eq!(got, want, "events must roundtrip byte-for-byte");
}
#[test]
fn is_append_only_across_reopen() {
let dir = tempfile::tempdir().unwrap();
{
let log = ArrowIpcAuditLog::new(dir.path()).unwrap();
log.record(AuditEvent::new("alice", AuditAction::Upload, "r", "a@1", "ip:1", 200, 1)).unwrap();
}
{
let log = ArrowIpcAuditLog::new(dir.path()).unwrap();
log.record(AuditEvent::new("bob", AuditAction::Download, "r", "a@1", "ip:2", 200, 1)).unwrap();
}
let segments = fs::read_dir(dir.path()).unwrap().count();
assert_eq!(segments, 2, "each run writes its own append-only segment");
let got = ArrowIpcAuditLog::read_dir(dir.path()).unwrap();
assert_eq!(got.len(), 2);
let idents: Vec<_> = got.iter().map(|e| e.ident.as_str()).collect();
assert!(idents.contains(&"alice") && idents.contains(&"bob"));
}
#[test]
fn read_dir_on_missing_dir_is_empty() {
let got = ArrowIpcAuditLog::read_dir("/nonexistent/holger/audit/path").unwrap();
assert!(got.is_empty());
}
}