use std::collections::VecDeque;
use std::sync::Arc;
use std::time::SystemTime;
use bytes::Bytes;
use tokio::sync::Mutex;
use crate::audit_log::{
AuditHmacKey, PREV_TAIL_COMMENT_PREFIX, chain_step, genesis_prev, hex_encode,
};
#[derive(Debug, Clone)]
pub struct AccessLogEntry {
pub time: SystemTime,
pub bucket: String,
pub remote_ip: Option<String>,
pub requester: Option<String>,
pub operation: &'static str,
pub key: Option<String>,
pub request_uri: String,
pub http_status: u16,
pub error_code: Option<String>,
pub bytes_sent: u64,
pub object_size: u64,
pub total_time_ms: u64,
pub user_agent: Option<String>,
}
#[derive(Debug, Clone)]
pub struct AccessLogDest {
pub dir: std::path::PathBuf,
}
impl AccessLogDest {
pub fn parse(s: &str) -> Result<Self, String> {
if let Some(stripped) = s.strip_prefix("s3://") {
return Err(format!(
"v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
Use a local path or pipe via filebeat / vector to S3."
));
}
let dir = std::path::PathBuf::from(s);
Ok(Self { dir })
}
pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
let secs = now
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let (y, mo, d, h) = unix_to_ymdh(secs as i64);
self.dir
.join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
}
}
pub struct AccessLog {
dest: AccessLogDest,
buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
flush_every_secs: u64,
max_entries_before_flush: usize,
batch_counter: Arc<std::sync::atomic::AtomicU64>,
hmac_key: Option<Arc<AuditHmacKey>>,
chain_state: Arc<Mutex<ChainState>>,
}
#[derive(Debug, Clone)]
struct ChainState {
last_hmac: [u8; 32],
primed: bool,
}
impl Default for ChainState {
fn default() -> Self {
Self {
last_hmac: genesis_prev(),
primed: false,
}
}
}
impl AccessLog {
pub fn new(dest: AccessLogDest) -> Self {
Self {
dest,
buf: Arc::new(Mutex::new(VecDeque::new())),
flush_every_secs: 60,
max_entries_before_flush: 5_000,
batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
hmac_key: None,
chain_state: Arc::new(Mutex::new(ChainState::default())),
}
}
#[must_use]
pub fn with_hmac_key(mut self, key: Arc<AuditHmacKey>) -> Self {
self.hmac_key = Some(key);
self
}
pub async fn record(&self, entry: AccessLogEntry) {
let mut buf = self.buf.lock().await;
buf.push_back(entry);
if buf.len() >= self.max_entries_before_flush {
}
}
pub fn spawn_flusher(&self) -> tokio::task::JoinHandle<()> {
let dest = self.dest.clone();
let buf = Arc::clone(&self.buf);
let interval = self.flush_every_secs;
let counter = Arc::clone(&self.batch_counter);
let hmac_key = self.hmac_key.clone();
let chain_state = Arc::clone(&self.chain_state);
if let Err(e) = std::fs::create_dir_all(&dest.dir) {
tracing::warn!(
"S4 access log: could not create dir {}: {e}",
dest.dir.display()
);
}
tokio::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
loop {
tick.tick().await;
let drained: Vec<AccessLogEntry> = {
let mut b = buf.lock().await;
if b.is_empty() {
continue;
}
b.drain(..).collect()
};
let now = SystemTime::now();
let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let path = dest.path_for(now, batch);
let body = if let Some(key) = hmac_key.as_ref() {
let mut state = chain_state.lock().await;
let (rendered, new_last) = render_lines_chained(&drained, key, &state);
state.last_hmac = new_last;
state.primed = true;
rendered
} else {
render_lines(&drained)
};
let body_bytes: Bytes = Bytes::from(body);
let path_clone = path.clone();
let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path_clone)?;
f.write_all(&body_bytes)
})
.await;
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!("S4 access log write failed at {}: {e}", path.display());
}
Err(e) => {
tracing::warn!("S4 access log task join failed: {e}");
}
}
}
})
}
}
fn render_lines_chained(
entries: &[AccessLogEntry],
key: &AuditHmacKey,
state: &ChainState,
) -> (String, [u8; 32]) {
let mut out = String::with_capacity(entries.len() * 320 + 80);
if state.primed {
out.push_str(PREV_TAIL_COMMENT_PREFIX);
out.push_str(&hex_encode(&state.last_hmac));
out.push('\n');
}
let base = render_lines(entries);
let mut prev = state.last_hmac;
for raw_line in base.split_inclusive('\n') {
let line = raw_line.trim_end_matches('\n');
if line.is_empty() {
continue;
}
let mac = chain_step(key, &prev, line.as_bytes());
out.push_str(line);
out.push(' ');
out.push_str(&hex_encode(&mac));
out.push('\n');
prev = mac;
}
(out, prev)
}
pub type SharedAccessLog = Arc<AccessLog>;
fn render_lines(entries: &[AccessLogEntry]) -> String {
let mut out = String::with_capacity(entries.len() * 256);
for e in entries {
let ts = unix_secs(e.time);
let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
out.push_str(&format!(
"- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
bucket = e.bucket,
ip = e.remote_ip.as_deref().unwrap_or("-"),
req = e.requester.as_deref().unwrap_or("-"),
op = e.operation,
key = e.key.as_deref().unwrap_or("-"),
uri = e.request_uri,
status = e.http_status,
err = e.error_code.as_deref().unwrap_or("-"),
bytes_sent = e.bytes_sent,
obj_size = e.object_size,
total_ms = e.total_time_ms,
ua = e.user_agent.as_deref().unwrap_or("-"),
));
}
out
}
fn unix_secs(t: SystemTime) -> i64 {
t.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
}
fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
(y, mo, d, h)
}
fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
let days = secs.div_euclid(86_400);
let rem = secs.rem_euclid(86_400);
let h = (rem / 3600) as u32;
let mi = ((rem % 3600) / 60) as u32;
let se = (rem % 60) as u32;
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
let y_civil = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
(y, mo_civil, d, h, mi, se)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_dest_local_dir() {
let d = AccessLogDest::parse("/var/log/s4").unwrap();
assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
}
#[test]
fn parse_dest_rejects_s3_url_until_phase_b() {
let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
assert!(err.contains("local-directory access-log only"));
}
#[test]
fn path_for_uses_hourly_naming() {
let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
let p = d.path_for(now, 7);
let s = p.to_string_lossy();
assert!(s.starts_with("/tmp/s4-test/"));
assert!(s.ends_with("-0007.log"));
}
#[test]
fn unix_to_ymdh_known_value() {
let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
}
fn sample_entry(bucket: &str) -> AccessLogEntry {
AccessLogEntry {
time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
bucket: bucket.into(),
remote_ip: Some("10.0.0.1".into()),
requester: Some("AKIATEST".into()),
operation: "REST.PUT.OBJECT",
key: Some("k".into()),
request_uri: "PUT /b/k HTTP/1.1".into(),
http_status: 200,
error_code: None,
bytes_sent: 0,
object_size: 4096,
total_time_ms: 12,
user_agent: Some("aws-cli/2.0".into()),
}
}
#[test]
fn chained_render_produces_verifiable_output() {
use std::str::FromStr;
use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
let entries = vec![sample_entry("b1"), sample_entry("b2"), sample_entry("b3")];
let state = ChainState::default();
let (text, _last) = render_lines_chained(&entries, &key, &state);
assert!(!text.starts_with("# prev_file_tail="));
for raw in text.split_inclusive('\n') {
let line = raw.trim_end_matches('\n');
if line.is_empty() {
continue;
}
assert!(line.len() > 65);
let suf = &line[line.len() - 65..];
assert!(suf.starts_with(' '));
assert!(suf[1..].chars().all(|c| c.is_ascii_hexdigit()));
}
let report =
verify_audit_bytes(std::path::Path::new("<mem>"), text.as_bytes(), &key).unwrap();
assert!(report.first_break.is_none());
assert_eq!(report.ok_lines, 3);
}
#[test]
fn second_batch_emits_prev_file_tail_and_chains() {
use std::str::FromStr;
use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
let entries1 = vec![sample_entry("b1")];
let mut state = ChainState::default();
let (text1, last1) = render_lines_chained(&entries1, &key, &state);
state.last_hmac = last1;
state.primed = true;
let entries2 = vec![sample_entry("b2")];
let (text2, _) = render_lines_chained(&entries2, &key, &state);
assert!(text2.starts_with("# prev_file_tail="));
let report =
verify_audit_bytes(std::path::Path::new("<mem>"), text2.as_bytes(), &key).unwrap();
assert!(report.first_break.is_none(), "second batch must verify");
assert_eq!(report.ok_lines, 1);
let r1 =
verify_audit_bytes(std::path::Path::new("<mem>"), text1.as_bytes(), &key).unwrap();
assert!(r1.first_break.is_none());
}
#[test]
fn render_one_entry() {
let e = AccessLogEntry {
time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
bucket: "b".into(),
remote_ip: Some("10.0.0.1".into()),
requester: Some("AKIATEST".into()),
operation: "REST.PUT.OBJECT",
key: Some("k".into()),
request_uri: "PUT /b/k HTTP/1.1".into(),
http_status: 200,
error_code: None,
bytes_sent: 0,
object_size: 4096,
total_time_ms: 12,
user_agent: Some("aws-cli/2.0".into()),
};
let line = render_lines(&[e]);
assert!(line.contains("REST.PUT.OBJECT"));
assert!(line.contains("10.0.0.1"));
assert!(line.contains("AKIATEST"));
assert!(line.contains("\"aws-cli/2.0\""));
assert!(line.ends_with('\n'));
}
}