1use std::collections::VecDeque;
35use std::sync::Arc;
36use std::time::SystemTime;
37
38use bytes::Bytes;
39use tokio::sync::Mutex;
40
41use crate::audit_log::{
42 AuditHmacKey, EOF_HMAC_COMMENT_PREFIX, PREV_TAIL_COMMENT_PREFIX, chain_step, compute_eof_hmac,
43 genesis_prev, hex_encode,
44};
45
46#[derive(Debug, Clone)]
50pub struct AccessLogEntry {
51 pub time: SystemTime,
52 pub bucket: String,
53 pub remote_ip: Option<String>,
54 pub requester: Option<String>,
55 pub operation: &'static str,
56 pub key: Option<String>,
57 pub request_uri: String,
58 pub http_status: u16,
59 pub error_code: Option<String>,
60 pub bytes_sent: u64,
61 pub object_size: u64,
62 pub total_time_ms: u64,
63 pub user_agent: Option<String>,
64}
65
66#[derive(Debug, Clone)]
72pub struct AccessLogDest {
73 pub dir: std::path::PathBuf,
74}
75
76impl AccessLogDest {
77 pub fn parse(s: &str) -> Result<Self, String> {
78 if let Some(stripped) = s.strip_prefix("s3://") {
79 return Err(format!(
80 "v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
81 Use a local path or pipe via filebeat / vector to S3."
82 ));
83 }
84 let dir = std::path::PathBuf::from(s);
85 Ok(Self { dir })
86 }
87
88 pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
91 let secs = now
92 .duration_since(SystemTime::UNIX_EPOCH)
93 .map(|d| d.as_secs())
94 .unwrap_or(0);
95 let (y, mo, d, h) = unix_to_ymdh(secs as i64);
96 self.dir
97 .join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
98 }
99}
100
101pub struct AccessLog {
105 dest: AccessLogDest,
106 buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
107 flush_every_secs: u64,
108 max_entries_before_flush: usize,
109 batch_counter: Arc<std::sync::atomic::AtomicU64>,
110 hmac_key: Option<Arc<AuditHmacKey>>,
115 chain_state: Arc<Mutex<ChainState>>,
119 last_emitted_hmac: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
125 last_emitted_path: Arc<std::sync::Mutex<Option<std::path::PathBuf>>>,
131}
132
133#[derive(Debug, Clone)]
134struct ChainState {
135 last_hmac: [u8; 32],
136 primed: bool,
139}
140
141impl Default for ChainState {
142 fn default() -> Self {
143 Self {
144 last_hmac: genesis_prev(),
145 primed: false,
146 }
147 }
148}
149
150impl AccessLog {
151 pub fn new(dest: AccessLogDest) -> Self {
152 Self {
153 dest,
154 buf: Arc::new(Mutex::new(VecDeque::new())),
155 flush_every_secs: 60,
156 max_entries_before_flush: 5_000,
157 batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
158 hmac_key: None,
159 chain_state: Arc::new(Mutex::new(ChainState::default())),
160 last_emitted_hmac: Arc::new(std::sync::Mutex::new(None)),
161 last_emitted_path: Arc::new(std::sync::Mutex::new(None)),
162 }
163 }
164
165 #[must_use]
171 pub fn with_hmac_key(mut self, key: Arc<AuditHmacKey>) -> Self {
172 self.hmac_key = Some(key);
173 self
174 }
175
176 pub async fn record(&self, entry: AccessLogEntry) {
177 let mut buf = self.buf.lock().await;
178 buf.push_back(entry);
179 if buf.len() >= self.max_entries_before_flush {
180 }
184 }
185
186 pub fn spawn_flusher(&self) -> tokio::task::JoinHandle<()> {
191 let dest = self.dest.clone();
192 let buf = Arc::clone(&self.buf);
193 let interval = self.flush_every_secs;
194 let counter = Arc::clone(&self.batch_counter);
195 let hmac_key = self.hmac_key.clone();
196 let chain_state = Arc::clone(&self.chain_state);
197 let last_emitted_hmac = Arc::clone(&self.last_emitted_hmac);
198 let last_emitted_path = Arc::clone(&self.last_emitted_path);
199 if let Err(e) = std::fs::create_dir_all(&dest.dir) {
200 tracing::warn!(
201 "S4 access log: could not create dir {}: {e}",
202 dest.dir.display()
203 );
204 }
205 tokio::spawn(async move {
206 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
207 loop {
208 tick.tick().await;
209 let drained: Vec<AccessLogEntry> = {
210 let mut b = buf.lock().await;
211 if b.is_empty() {
212 continue;
213 }
214 b.drain(..).collect()
215 };
216 let now = SystemTime::now();
217 let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
218 let path = dest.path_for(now, batch);
219 let (body, new_last_for_drop) = if let Some(key) = hmac_key.as_ref() {
220 let mut state = chain_state.lock().await;
221 let (rendered, new_last) = render_lines_chained(&drained, key, &state);
222 state.last_hmac = new_last;
223 state.primed = true;
224 let mut with_marker = rendered;
236 let eof = compute_eof_hmac(key, &new_last);
237 with_marker.push_str(EOF_HMAC_COMMENT_PREFIX);
238 with_marker.push_str(&hex_encode(&eof));
239 with_marker.push('\n');
240 (with_marker, Some(new_last))
241 } else {
242 (render_lines(&drained), None)
243 };
244 let body_bytes: Bytes = Bytes::from(body);
245 let path_clone = path.clone();
246 let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
247 use std::io::Write;
248 let mut f = std::fs::OpenOptions::new()
249 .create(true)
250 .append(true)
251 .open(&path_clone)?;
252 f.write_all(&body_bytes)
253 })
254 .await;
255 match res {
256 Ok(Ok(())) => {
257 if let Some(h) = new_last_for_drop {
262 if let Ok(mut g) = last_emitted_hmac.lock() {
263 *g = Some(h);
264 }
265 if let Ok(mut g) = last_emitted_path.lock() {
266 *g = Some(path.clone());
267 }
268 }
269 }
270 Ok(Err(e)) => {
271 tracing::warn!("S4 access log write failed at {}: {e}", path.display());
272 }
273 Err(e) => {
274 tracing::warn!("S4 access log task join failed: {e}");
275 }
276 }
277 }
278 })
279 }
280
281 fn drop_emit_eof_marker(&mut self) {
291 let pending: Vec<AccessLogEntry> = match self.buf.try_lock() {
297 Ok(mut b) => b.drain(..).collect(),
298 Err(_) => Vec::new(),
299 };
300 let Some(key) = self.hmac_key.clone() else {
301 return;
305 };
306 if pending.is_empty() {
307 return;
312 }
313 let mut state = ChainState::default();
320 if let Ok(s) = self.chain_state.try_lock() {
321 state = s.clone();
322 } else if let Ok(g) = self.last_emitted_hmac.lock()
323 && let Some(h) = *g
324 {
325 state.last_hmac = h;
326 state.primed = true;
327 }
328 let now = SystemTime::now();
329 let batch = self
330 .batch_counter
331 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
332 let path = self.dest.path_for(now, batch);
333 let (rendered, new_last) = render_lines_chained(&pending, &key, &state);
334 let mut with_marker = rendered;
335 let eof = compute_eof_hmac(&key, &new_last);
336 with_marker.push_str(EOF_HMAC_COMMENT_PREFIX);
337 with_marker.push_str(&hex_encode(&eof));
338 with_marker.push('\n');
339 if let Err(e) = std::fs::create_dir_all(&self.dest.dir) {
340 tracing::warn!(
341 "S4 access log Drop: could not ensure dir {}: {e}",
342 self.dest.dir.display()
343 );
344 return;
345 }
346 let res = std::fs::OpenOptions::new()
347 .create(true)
348 .append(true)
349 .open(&path)
350 .and_then(|mut f| {
351 use std::io::Write;
352 f.write_all(with_marker.as_bytes())
353 });
354 if let Err(e) = res {
355 tracing::warn!(
356 "S4 access log Drop: failed to flush + EOF marker to {}: {e}",
357 path.display()
358 );
359 } else if let Ok(mut g) = self.last_emitted_path.lock() {
360 *g = Some(path);
361 }
362 }
363}
364
365impl Drop for AccessLog {
366 fn drop(&mut self) {
367 self.drop_emit_eof_marker();
376 }
377}
378
379fn render_lines_chained(
390 entries: &[AccessLogEntry],
391 key: &AuditHmacKey,
392 state: &ChainState,
393) -> (String, [u8; 32]) {
394 let mut out = String::with_capacity(entries.len() * 320 + 80);
397 if state.primed {
398 out.push_str(PREV_TAIL_COMMENT_PREFIX);
399 out.push_str(&hex_encode(&state.last_hmac));
400 out.push('\n');
401 }
402 let base = render_lines(entries);
403 let mut prev = state.last_hmac;
404 for raw_line in base.split_inclusive('\n') {
405 let line = raw_line.trim_end_matches('\n');
406 if line.is_empty() {
407 continue;
408 }
409 let mac = chain_step(key, &prev, line.as_bytes());
410 out.push_str(line);
411 out.push(' ');
412 out.push_str(&hex_encode(&mac));
413 out.push('\n');
414 prev = mac;
415 }
416 (out, prev)
417}
418
419pub type SharedAccessLog = Arc<AccessLog>;
421
422fn render_lines(entries: &[AccessLogEntry]) -> String {
423 let mut out = String::with_capacity(entries.len() * 256);
424 for e in entries {
425 let ts = unix_secs(e.time);
426 let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
427 out.push_str(&format!(
428 "- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
429 bucket = e.bucket,
430 ip = e.remote_ip.as_deref().unwrap_or("-"),
431 req = e.requester.as_deref().unwrap_or("-"),
432 op = e.operation,
433 key = e.key.as_deref().unwrap_or("-"),
434 uri = e.request_uri,
435 status = e.http_status,
436 err = e.error_code.as_deref().unwrap_or("-"),
437 bytes_sent = e.bytes_sent,
438 obj_size = e.object_size,
439 total_ms = e.total_time_ms,
440 ua = e.user_agent.as_deref().unwrap_or("-"),
441 ));
442 }
443 out
444}
445
446fn unix_secs(t: SystemTime) -> i64 {
447 t.duration_since(SystemTime::UNIX_EPOCH)
448 .map(|d| d.as_secs() as i64)
449 .unwrap_or(0)
450}
451
452fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
454 let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
455 (y, mo, d, h)
456}
457
458fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
459 let days = secs.div_euclid(86_400);
460 let rem = secs.rem_euclid(86_400);
461 let h = (rem / 3600) as u32;
462 let mi = ((rem % 3600) / 60) as u32;
463 let se = (rem % 60) as u32;
464 let z = days + 719_468;
466 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
467 let doe = z - era * 146_097;
468 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
469 let y_civil = yoe + era * 400;
470 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
471 let mp = (5 * doy + 2) / 153;
472 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
473 let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
474 let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
475 (y, mo_civil, d, h, mi, se)
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481
482 #[test]
483 fn parse_dest_local_dir() {
484 let d = AccessLogDest::parse("/var/log/s4").unwrap();
485 assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
486 }
487
488 #[test]
489 fn parse_dest_rejects_s3_url_until_phase_b() {
490 let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
491 assert!(err.contains("local-directory access-log only"));
492 }
493
494 #[test]
495 fn path_for_uses_hourly_naming() {
496 let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
497 let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
498 let p = d.path_for(now, 7);
499 let s = p.to_string_lossy();
500 assert!(s.starts_with("/tmp/s4-test/"));
501 assert!(s.ends_with("-0007.log"));
502 }
503
504 #[test]
505 fn unix_to_ymdh_known_value() {
506 let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
508 assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
509 }
510
511 fn sample_entry(bucket: &str) -> AccessLogEntry {
512 AccessLogEntry {
513 time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
514 bucket: bucket.into(),
515 remote_ip: Some("10.0.0.1".into()),
516 requester: Some("AKIATEST".into()),
517 operation: "REST.PUT.OBJECT",
518 key: Some("k".into()),
519 request_uri: "PUT /b/k HTTP/1.1".into(),
520 http_status: 200,
521 error_code: None,
522 bytes_sent: 0,
523 object_size: 4096,
524 total_time_ms: 12,
525 user_agent: Some("aws-cli/2.0".into()),
526 }
527 }
528
529 #[test]
530 fn chained_render_produces_verifiable_output() {
531 use std::str::FromStr;
532
533 use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
534 let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
535 let entries = vec![sample_entry("b1"), sample_entry("b2"), sample_entry("b3")];
536 let state = ChainState::default();
537 let (text, _last) = render_lines_chained(&entries, &key, &state);
538 assert!(!text.starts_with("# prev_file_tail="));
540 for raw in text.split_inclusive('\n') {
542 let line = raw.trim_end_matches('\n');
543 if line.is_empty() {
544 continue;
545 }
546 assert!(line.len() > 65);
547 let suf = &line[line.len() - 65..];
548 assert!(suf.starts_with(' '));
549 assert!(suf[1..].chars().all(|c| c.is_ascii_hexdigit()));
550 }
551 let report = verify_audit_bytes(
553 std::path::Path::new("<mem>"),
554 text.as_bytes(),
555 &key,
556 crate::audit_log::VerifyOptions::default(),
557 )
558 .unwrap();
559 assert!(report.first_break.is_none());
560 assert_eq!(report.ok_lines, 3);
561 }
562
563 #[test]
564 fn second_batch_emits_prev_file_tail_and_chains() {
565 use std::str::FromStr;
566
567 use crate::audit_log::{AuditHmacKey, VerifyOptions, verify_audit_bytes};
568 let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
569
570 let entries1 = vec![sample_entry("b1")];
572 let mut state = ChainState::default();
573 let (text1, last1) = render_lines_chained(&entries1, &key, &state);
574 state.last_hmac = last1;
575 state.primed = true;
576
577 let entries2 = vec![sample_entry("b2")];
580 let (text2, _) = render_lines_chained(&entries2, &key, &state);
581 assert!(text2.starts_with("# prev_file_tail="));
582 let report = verify_audit_bytes(
583 std::path::Path::new("<mem>"),
584 text2.as_bytes(),
585 &key,
586 VerifyOptions::default(),
587 )
588 .unwrap();
589 assert!(report.first_break.is_none(), "second batch must verify");
590 assert_eq!(report.ok_lines, 1);
591 let r1 = verify_audit_bytes(
593 std::path::Path::new("<mem>"),
594 text1.as_bytes(),
595 &key,
596 VerifyOptions::default(),
597 )
598 .unwrap();
599 assert!(r1.first_break.is_none());
600 }
601
602 #[test]
603 fn render_one_entry() {
604 let e = AccessLogEntry {
605 time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
606 bucket: "b".into(),
607 remote_ip: Some("10.0.0.1".into()),
608 requester: Some("AKIATEST".into()),
609 operation: "REST.PUT.OBJECT",
610 key: Some("k".into()),
611 request_uri: "PUT /b/k HTTP/1.1".into(),
612 http_status: 200,
613 error_code: None,
614 bytes_sent: 0,
615 object_size: 4096,
616 total_time_ms: 12,
617 user_agent: Some("aws-cli/2.0".into()),
618 };
619 let line = render_lines(&[e]);
620 assert!(line.contains("REST.PUT.OBJECT"));
621 assert!(line.contains("10.0.0.1"));
622 assert!(line.contains("AKIATEST"));
623 assert!(line.contains("\"aws-cli/2.0\""));
624 assert!(line.ends_with('\n'));
625 }
626}