Skip to main content

s4_server/
access_log.rs

1//! S3-style access-log emission (v0.4 #20).
2//!
3//! Writes one line per completed request in the AWS S3 server access log
4//! format (close enough for `awk` / `goaccess` / standard log analyzers
5//! to parse). Output is buffered and flushed periodically — destination
6//! is **another S3 bucket** (the convention AWS itself uses for S3
7//! server access logs), reached via the same backend the gateway is
8//! fronting. No new outbound dependencies.
9//!
10//! ## Format
11//!
12//! `bucket-owner bucket [time] remote-ip requester request-id operation
13//!  key request-uri http-status error-code bytes-sent object-size
14//!  total-time turn-around-time referer user-agent version-id
15//!  host-id sig-version cipher-suite auth-type host-header tls-version
16//!  access-point-arn acl-required`
17//!
18//! Most fields are stubbed (`-`) for the v0.4 release; the load-bearing
19//! columns are time, remote-ip, requester, operation, key, status,
20//! bytes-sent, object-size, total-time, user-agent.
21//!
22//! ## Operator config
23//!
24//! `--access-log s3://logs-bucket/prefix/{date}/` enables emission. The
25//! `{date}` placeholder expands to `YYYY-MM-DD-HH` (hourly rollover).
26//!
27//! ## Implementation note
28//!
29//! We deliberately don't compress the access-log objects — they're text
30//! and S4's own bucket-policy enforcement may want to read them raw. If
31//! you want them squished, point S4 at *another* S4 instance or front
32//! the log bucket with a separate gateway.
33
34use std::collections::VecDeque;
35use std::sync::Arc;
36use std::time::SystemTime;
37
38use bytes::Bytes;
39use tokio::sync::Mutex;
40
41use crate::audit_log::{
42    AuditHmacKey, PREV_TAIL_COMMENT_PREFIX, chain_step, genesis_prev, hex_encode,
43};
44
45/// Per-request structured fields collected at handler completion. The
46/// emitter renders this into the on-the-wire S3 access-log format on
47/// flush.
48#[derive(Debug, Clone)]
49pub struct AccessLogEntry {
50    pub time: SystemTime,
51    pub bucket: String,
52    pub remote_ip: Option<String>,
53    pub requester: Option<String>,
54    pub operation: &'static str,
55    pub key: Option<String>,
56    pub request_uri: String,
57    pub http_status: u16,
58    pub error_code: Option<String>,
59    pub bytes_sent: u64,
60    pub object_size: u64,
61    pub total_time_ms: u64,
62    pub user_agent: Option<String>,
63}
64
65/// Operator-configured destination: a local directory where hourly
66/// rotated `.log` files are written. v0.4 scope — `s3://` destination
67/// is a post-v0.4 follow-up; for now ship the entries to local disk and
68/// let a separate log-shipper (filebeat / fluent-bit / vector) push them
69/// to wherever they need to go.
70#[derive(Debug, Clone)]
71pub struct AccessLogDest {
72    pub dir: std::path::PathBuf,
73}
74
75impl AccessLogDest {
76    pub fn parse(s: &str) -> Result<Self, String> {
77        if let Some(stripped) = s.strip_prefix("s3://") {
78            return Err(format!(
79                "v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
80                 Use a local path or pipe via filebeat / vector to S3."
81            ));
82        }
83        let dir = std::path::PathBuf::from(s);
84        Ok(Self { dir })
85    }
86
87    /// Compose the file path for a flush at `now`. One file per hour
88    /// + a batch counter so high-volume hours don't single-file-balloon.
89    pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
90        let secs = now
91            .duration_since(SystemTime::UNIX_EPOCH)
92            .map(|d| d.as_secs())
93            .unwrap_or(0);
94        let (y, mo, d, h) = unix_to_ymdh(secs as i64);
95        self.dir
96            .join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
97    }
98}
99
100/// Buffered emitter. Per-handler call sites push entries via
101/// [`AccessLog::record`]; a background task drains the buffer and writes
102/// one S3 object per flush window.
103pub struct AccessLog {
104    dest: AccessLogDest,
105    buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
106    flush_every_secs: u64,
107    max_entries_before_flush: usize,
108    batch_counter: Arc<std::sync::atomic::AtomicU64>,
109    /// v0.5 #31: optional HMAC-SHA256 key. When `Some(...)`, the
110    /// flusher appends a hex HMAC column to every line and emits a
111    /// `# prev_file_tail=<hex>` comment at the top of each rotated
112    /// batch file so the chain extends across rotations.
113    hmac_key: Option<Arc<AuditHmacKey>>,
114    /// Running chain state — the last HMAC the flusher emitted (or
115    /// the genesis seed if nothing has been emitted yet). Updated
116    /// in-place at the end of each flush batch.
117    chain_state: Arc<Mutex<ChainState>>,
118}
119
120#[derive(Debug, Clone)]
121struct ChainState {
122    last_hmac: [u8; 32],
123    /// True once at least one batch has been written, so the next
124    /// batch knows it must emit a `# prev_file_tail=` comment.
125    primed: bool,
126}
127
128impl Default for ChainState {
129    fn default() -> Self {
130        Self {
131            last_hmac: genesis_prev(),
132            primed: false,
133        }
134    }
135}
136
137impl AccessLog {
138    pub fn new(dest: AccessLogDest) -> Self {
139        Self {
140            dest,
141            buf: Arc::new(Mutex::new(VecDeque::new())),
142            flush_every_secs: 60,
143            max_entries_before_flush: 5_000,
144            batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
145            hmac_key: None,
146            chain_state: Arc::new(Mutex::new(ChainState::default())),
147        }
148    }
149
150    /// v0.5 #31: turn on tamper-evident HMAC chaining. Every emitted
151    /// line gets a trailing hex HMAC column, and each new batch file
152    /// starts with a `# prev_file_tail=<hex>` comment so the chain
153    /// extends across rotations. Without this builder, lines are
154    /// emitted exactly as before (back-compat with v0.4 #20 readers).
155    #[must_use]
156    pub fn with_hmac_key(mut self, key: Arc<AuditHmacKey>) -> Self {
157        self.hmac_key = Some(key);
158        self
159    }
160
161    pub async fn record(&self, entry: AccessLogEntry) {
162        let mut buf = self.buf.lock().await;
163        buf.push_back(entry);
164        if buf.len() >= self.max_entries_before_flush {
165            // Wake the flusher early — it polls on `flush_every_secs`,
166            // but a burst should land sooner. We do this by leaving the
167            // entries queued; the flusher loop checks size on every tick.
168        }
169    }
170
171    /// Spawn the background flusher. Drains the buffer every
172    /// `flush_every_secs` (default 60) and appends to the per-hour file
173    /// in `dest.dir`. Returns the tokio JoinHandle so the caller can
174    /// abort on shutdown if needed.
175    pub fn spawn_flusher(&self) -> tokio::task::JoinHandle<()> {
176        let dest = self.dest.clone();
177        let buf = Arc::clone(&self.buf);
178        let interval = self.flush_every_secs;
179        let counter = Arc::clone(&self.batch_counter);
180        let hmac_key = self.hmac_key.clone();
181        let chain_state = Arc::clone(&self.chain_state);
182        if let Err(e) = std::fs::create_dir_all(&dest.dir) {
183            tracing::warn!(
184                "S4 access log: could not create dir {}: {e}",
185                dest.dir.display()
186            );
187        }
188        tokio::spawn(async move {
189            let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
190            loop {
191                tick.tick().await;
192                let drained: Vec<AccessLogEntry> = {
193                    let mut b = buf.lock().await;
194                    if b.is_empty() {
195                        continue;
196                    }
197                    b.drain(..).collect()
198                };
199                let now = SystemTime::now();
200                let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
201                let path = dest.path_for(now, batch);
202                let body = if let Some(key) = hmac_key.as_ref() {
203                    let mut state = chain_state.lock().await;
204                    let (rendered, new_last) = render_lines_chained(&drained, key, &state);
205                    state.last_hmac = new_last;
206                    state.primed = true;
207                    rendered
208                } else {
209                    render_lines(&drained)
210                };
211                let body_bytes: Bytes = Bytes::from(body);
212                let path_clone = path.clone();
213                let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
214                    use std::io::Write;
215                    let mut f = std::fs::OpenOptions::new()
216                        .create(true)
217                        .append(true)
218                        .open(&path_clone)?;
219                    f.write_all(&body_bytes)
220                })
221                .await;
222                match res {
223                    Ok(Ok(())) => {}
224                    Ok(Err(e)) => {
225                        tracing::warn!("S4 access log write failed at {}: {e}", path.display());
226                    }
227                    Err(e) => {
228                        tracing::warn!("S4 access log task join failed: {e}");
229                    }
230                }
231            }
232        })
233    }
234}
235
236/// Render `entries` with a trailing HMAC column on each line, plus a
237/// `# prev_file_tail=<hex>` preamble when `state.primed` is true (i.e.
238/// this is not the very first batch). Returns the rendered text and
239/// the final chain HMAC, which the caller must persist back to the
240/// shared state.
241///
242/// Each line's HMAC is computed over `prev_hmac || line_no_hmac`,
243/// where `line_no_hmac` is the bytes of the line WITHOUT the trailing
244/// HMAC column AND WITHOUT the trailing newline. The producer then
245/// appends ` <hex>\n` to land on the wire format the verifier expects.
246fn render_lines_chained(
247    entries: &[AccessLogEntry],
248    key: &AuditHmacKey,
249    state: &ChainState,
250) -> (String, [u8; 32]) {
251    // Reserve a generous budget: ~256 chars per base line + 65 for
252    // " <hex>\n", plus 80 for the preamble.
253    let mut out = String::with_capacity(entries.len() * 320 + 80);
254    if state.primed {
255        out.push_str(PREV_TAIL_COMMENT_PREFIX);
256        out.push_str(&hex_encode(&state.last_hmac));
257        out.push('\n');
258    }
259    let base = render_lines(entries);
260    let mut prev = state.last_hmac;
261    for raw_line in base.split_inclusive('\n') {
262        let line = raw_line.trim_end_matches('\n');
263        if line.is_empty() {
264            continue;
265        }
266        let mac = chain_step(key, &prev, line.as_bytes());
267        out.push_str(line);
268        out.push(' ');
269        out.push_str(&hex_encode(&mac));
270        out.push('\n');
271        prev = mac;
272    }
273    (out, prev)
274}
275
276/// Public wrapper for ease of `Arc<AccessLog>` plumbing in S4Service.
277pub type SharedAccessLog = Arc<AccessLog>;
278
279fn render_lines(entries: &[AccessLogEntry]) -> String {
280    let mut out = String::with_capacity(entries.len() * 256);
281    for e in entries {
282        let ts = unix_secs(e.time);
283        let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
284        out.push_str(&format!(
285            "- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
286            bucket = e.bucket,
287            ip = e.remote_ip.as_deref().unwrap_or("-"),
288            req = e.requester.as_deref().unwrap_or("-"),
289            op = e.operation,
290            key = e.key.as_deref().unwrap_or("-"),
291            uri = e.request_uri,
292            status = e.http_status,
293            err = e.error_code.as_deref().unwrap_or("-"),
294            bytes_sent = e.bytes_sent,
295            obj_size = e.object_size,
296            total_ms = e.total_time_ms,
297            ua = e.user_agent.as_deref().unwrap_or("-"),
298        ));
299    }
300    out
301}
302
303fn unix_secs(t: SystemTime) -> i64 {
304    t.duration_since(SystemTime::UNIX_EPOCH)
305        .map(|d| d.as_secs() as i64)
306        .unwrap_or(0)
307}
308
309/// Civil from unix seconds → (year, month, day, hour). UTC.
310fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
311    let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
312    (y, mo, d, h)
313}
314
315fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
316    let days = secs.div_euclid(86_400);
317    let rem = secs.rem_euclid(86_400);
318    let h = (rem / 3600) as u32;
319    let mi = ((rem % 3600) / 60) as u32;
320    let se = (rem % 60) as u32;
321    // Hinnant civil-from-days (public domain)
322    let z = days + 719_468;
323    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
324    let doe = z - era * 146_097;
325    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
326    let y_civil = yoe + era * 400;
327    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
328    let mp = (5 * doy + 2) / 153;
329    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
330    let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
331    let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
332    (y, mo_civil, d, h, mi, se)
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn parse_dest_local_dir() {
341        let d = AccessLogDest::parse("/var/log/s4").unwrap();
342        assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
343    }
344
345    #[test]
346    fn parse_dest_rejects_s3_url_until_phase_b() {
347        let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
348        assert!(err.contains("local-directory access-log only"));
349    }
350
351    #[test]
352    fn path_for_uses_hourly_naming() {
353        let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
354        let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
355        let p = d.path_for(now, 7);
356        let s = p.to_string_lossy();
357        assert!(s.starts_with("/tmp/s4-test/"));
358        assert!(s.ends_with("-0007.log"));
359    }
360
361    #[test]
362    fn unix_to_ymdh_known_value() {
363        // 2026-05-13 00:00:00 UTC = 1779048000s
364        let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
365        assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
366    }
367
368    fn sample_entry(bucket: &str) -> AccessLogEntry {
369        AccessLogEntry {
370            time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
371            bucket: bucket.into(),
372            remote_ip: Some("10.0.0.1".into()),
373            requester: Some("AKIATEST".into()),
374            operation: "REST.PUT.OBJECT",
375            key: Some("k".into()),
376            request_uri: "PUT /b/k HTTP/1.1".into(),
377            http_status: 200,
378            error_code: None,
379            bytes_sent: 0,
380            object_size: 4096,
381            total_time_ms: 12,
382            user_agent: Some("aws-cli/2.0".into()),
383        }
384    }
385
386    #[test]
387    fn chained_render_produces_verifiable_output() {
388        use std::str::FromStr;
389
390        use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
391        let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
392        let entries = vec![sample_entry("b1"), sample_entry("b2"), sample_entry("b3")];
393        let state = ChainState::default();
394        let (text, _last) = render_lines_chained(&entries, &key, &state);
395        // No prev_file_tail comment on the first batch.
396        assert!(!text.starts_with("# prev_file_tail="));
397        // Each line ends with " <64 hex>\n"
398        for raw in text.split_inclusive('\n') {
399            let line = raw.trim_end_matches('\n');
400            if line.is_empty() {
401                continue;
402            }
403            assert!(line.len() > 65);
404            let suf = &line[line.len() - 65..];
405            assert!(suf.starts_with(' '));
406            assert!(suf[1..].chars().all(|c| c.is_ascii_hexdigit()));
407        }
408        // Verifier is happy.
409        let report =
410            verify_audit_bytes(std::path::Path::new("<mem>"), text.as_bytes(), &key).unwrap();
411        assert!(report.first_break.is_none());
412        assert_eq!(report.ok_lines, 3);
413    }
414
415    #[test]
416    fn second_batch_emits_prev_file_tail_and_chains() {
417        use std::str::FromStr;
418
419        use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
420        let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
421
422        // First batch.
423        let entries1 = vec![sample_entry("b1")];
424        let mut state = ChainState::default();
425        let (text1, last1) = render_lines_chained(&entries1, &key, &state);
426        state.last_hmac = last1;
427        state.primed = true;
428
429        // Second batch — must start with # prev_file_tail= and verify
430        // when fed independently to the verifier.
431        let entries2 = vec![sample_entry("b2")];
432        let (text2, _) = render_lines_chained(&entries2, &key, &state);
433        assert!(text2.starts_with("# prev_file_tail="));
434        let report =
435            verify_audit_bytes(std::path::Path::new("<mem>"), text2.as_bytes(), &key).unwrap();
436        assert!(report.first_break.is_none(), "second batch must verify");
437        assert_eq!(report.ok_lines, 1);
438        // First batch verifies on its own too.
439        let r1 =
440            verify_audit_bytes(std::path::Path::new("<mem>"), text1.as_bytes(), &key).unwrap();
441        assert!(r1.first_break.is_none());
442    }
443
444    #[test]
445    fn render_one_entry() {
446        let e = AccessLogEntry {
447            time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
448            bucket: "b".into(),
449            remote_ip: Some("10.0.0.1".into()),
450            requester: Some("AKIATEST".into()),
451            operation: "REST.PUT.OBJECT",
452            key: Some("k".into()),
453            request_uri: "PUT /b/k HTTP/1.1".into(),
454            http_status: 200,
455            error_code: None,
456            bytes_sent: 0,
457            object_size: 4096,
458            total_time_ms: 12,
459            user_agent: Some("aws-cli/2.0".into()),
460        };
461        let line = render_lines(&[e]);
462        assert!(line.contains("REST.PUT.OBJECT"));
463        assert!(line.contains("10.0.0.1"));
464        assert!(line.contains("AKIATEST"));
465        assert!(line.contains("\"aws-cli/2.0\""));
466        assert!(line.ends_with('\n'));
467    }
468}