1use std::collections::VecDeque;
35use std::sync::Arc;
36use std::time::SystemTime;
37
38use bytes::Bytes;
39use tokio::sync::Mutex;
40
41use crate::audit_log::{
42 AuditHmacKey, EOF_HMAC_COMMENT_PREFIX, PREV_TAIL_COMMENT_PREFIX, chain_step, compute_eof_hmac,
43 genesis_prev, hex_encode,
44};
45
46#[derive(Debug, Clone)]
50pub struct AccessLogEntry {
51 pub time: SystemTime,
52 pub bucket: String,
53 pub remote_ip: Option<String>,
54 pub requester: Option<String>,
55 pub operation: &'static str,
56 pub key: Option<String>,
57 pub request_uri: String,
58 pub http_status: u16,
59 pub error_code: Option<String>,
60 pub bytes_sent: u64,
61 pub object_size: u64,
62 pub total_time_ms: u64,
63 pub user_agent: Option<String>,
64}
65
66#[derive(Debug, Clone)]
72pub struct AccessLogDest {
73 pub dir: std::path::PathBuf,
74}
75
76impl AccessLogDest {
77 pub fn parse(s: &str) -> Result<Self, String> {
78 if let Some(stripped) = s.strip_prefix("s3://") {
79 return Err(format!(
80 "v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
81 Use a local path or pipe via filebeat / vector to S3."
82 ));
83 }
84 let dir = std::path::PathBuf::from(s);
85 Ok(Self { dir })
86 }
87
88 pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
91 let secs = now
92 .duration_since(SystemTime::UNIX_EPOCH)
93 .map(|d| d.as_secs())
94 .unwrap_or(0);
95 let (y, mo, d, h) = unix_to_ymdh(secs as i64);
96 self.dir
97 .join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
98 }
99}
100
101pub struct AccessLog {
105 dest: AccessLogDest,
106 buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
107 flush_every_secs: u64,
108 max_entries_before_flush: usize,
109 batch_counter: Arc<std::sync::atomic::AtomicU64>,
110 hmac_key: Option<Arc<AuditHmacKey>>,
115 chain_state: Arc<Mutex<ChainState>>,
119 last_emitted_hmac: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
125 last_emitted_path: Arc<std::sync::Mutex<Option<std::path::PathBuf>>>,
131}
132
133#[derive(Debug, Clone)]
134struct ChainState {
135 last_hmac: [u8; 32],
136 primed: bool,
139}
140
141impl Default for ChainState {
142 fn default() -> Self {
143 Self {
144 last_hmac: genesis_prev(),
145 primed: false,
146 }
147 }
148}
149
150impl AccessLog {
151 pub fn new(dest: AccessLogDest) -> Self {
152 Self {
153 dest,
154 buf: Arc::new(Mutex::new(VecDeque::new())),
155 flush_every_secs: 60,
156 max_entries_before_flush: 5_000,
157 batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
158 hmac_key: None,
159 chain_state: Arc::new(Mutex::new(ChainState::default())),
160 last_emitted_hmac: Arc::new(std::sync::Mutex::new(None)),
161 last_emitted_path: Arc::new(std::sync::Mutex::new(None)),
162 }
163 }
164
165 #[must_use]
171 pub fn with_hmac_key(mut self, key: Arc<AuditHmacKey>) -> Self {
172 self.hmac_key = Some(key);
173 self
174 }
175
176 pub async fn record(&self, entry: AccessLogEntry) {
177 let mut buf = self.buf.lock().await;
178 buf.push_back(entry);
179 if buf.len() >= self.max_entries_before_flush {
180 }
184 }
185
186 pub fn spawn_flusher(
200 &self,
201 shutdown: Option<Arc<tokio::sync::Notify>>,
202 ) -> tokio::task::JoinHandle<()> {
203 let dest = self.dest.clone();
204 let buf = Arc::clone(&self.buf);
205 let interval = self.flush_every_secs;
206 let counter = Arc::clone(&self.batch_counter);
207 let hmac_key = self.hmac_key.clone();
208 let chain_state = Arc::clone(&self.chain_state);
209 let last_emitted_hmac = Arc::clone(&self.last_emitted_hmac);
210 let last_emitted_path = Arc::clone(&self.last_emitted_path);
211 if let Err(e) = std::fs::create_dir_all(&dest.dir) {
212 tracing::warn!(
213 "S4 access log: could not create dir {}: {e}",
214 dest.dir.display()
215 );
216 }
217 tokio::spawn(async move {
218 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
219 loop {
220 tokio::select! {
221 () = async {
222 if let Some(ref n) = shutdown {
223 n.notified().await;
224 } else {
225 std::future::pending::<()>().await;
230 }
231 } => {
232 tracing::info!(
233 "S4 access log flusher shutting down (got cancel signal); draining one last batch"
234 );
235 let drained: Vec<AccessLogEntry> = {
242 let mut b = buf.lock().await;
243 b.drain(..).collect()
244 };
245 if drained.is_empty() {
246 return;
247 }
248 let now = SystemTime::now();
249 let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
250 let path = dest.path_for(now, batch);
251 let (body, _) = if let Some(key) = hmac_key.as_ref() {
252 let mut state = chain_state.lock().await;
253 let (rendered, new_last) =
254 render_lines_chained(&drained, key, &state);
255 state.last_hmac = new_last;
256 state.primed = true;
257 let mut with_marker = rendered;
258 let eof = compute_eof_hmac(key, &new_last);
259 with_marker.push_str(EOF_HMAC_COMMENT_PREFIX);
260 with_marker.push_str(&hex_encode(&eof));
261 with_marker.push('\n');
262 (with_marker, Some(new_last))
263 } else {
264 (render_lines(&drained), None)
265 };
266 let body_bytes: Bytes = Bytes::from(body);
267 let path_clone = path.clone();
268 let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
269 use std::io::Write;
270 let mut f = std::fs::OpenOptions::new()
271 .create(true)
272 .append(true)
273 .open(&path_clone)?;
274 f.write_all(&body_bytes)
275 })
276 .await;
277 match res {
278 Ok(Ok(())) => {}
279 Ok(Err(e)) => {
280 tracing::warn!(
281 "S4 access log final-drain write failed at {}: {e}",
282 path.display()
283 );
284 }
285 Err(e) => {
286 tracing::warn!(
287 "S4 access log final-drain join failed: {e}"
288 );
289 }
290 }
291 return;
292 }
293 _ = tick.tick() => {}
294 }
295 let drained: Vec<AccessLogEntry> = {
296 let mut b = buf.lock().await;
297 if b.is_empty() {
298 continue;
299 }
300 b.drain(..).collect()
301 };
302 let now = SystemTime::now();
303 let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
304 let path = dest.path_for(now, batch);
305 let (body, new_last_for_drop) = if let Some(key) = hmac_key.as_ref() {
306 let mut state = chain_state.lock().await;
307 let (rendered, new_last) = render_lines_chained(&drained, key, &state);
308 state.last_hmac = new_last;
309 state.primed = true;
310 let mut with_marker = rendered;
322 let eof = compute_eof_hmac(key, &new_last);
323 with_marker.push_str(EOF_HMAC_COMMENT_PREFIX);
324 with_marker.push_str(&hex_encode(&eof));
325 with_marker.push('\n');
326 (with_marker, Some(new_last))
327 } else {
328 (render_lines(&drained), None)
329 };
330 let body_bytes: Bytes = Bytes::from(body);
331 let path_clone = path.clone();
332 let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
333 use std::io::Write;
334 let mut f = std::fs::OpenOptions::new()
335 .create(true)
336 .append(true)
337 .open(&path_clone)?;
338 f.write_all(&body_bytes)
339 })
340 .await;
341 match res {
342 Ok(Ok(())) => {
343 if let Some(h) = new_last_for_drop {
348 if let Ok(mut g) = last_emitted_hmac.lock() {
349 *g = Some(h);
350 }
351 if let Ok(mut g) = last_emitted_path.lock() {
352 *g = Some(path.clone());
353 }
354 }
355 }
356 Ok(Err(e)) => {
357 tracing::warn!("S4 access log write failed at {}: {e}", path.display());
358 }
359 Err(e) => {
360 tracing::warn!("S4 access log task join failed: {e}");
361 }
362 }
363 }
364 })
365 }
366
367 fn drop_emit_eof_marker(&mut self) {
377 let pending: Vec<AccessLogEntry> = match self.buf.try_lock() {
383 Ok(mut b) => b.drain(..).collect(),
384 Err(_) => Vec::new(),
385 };
386 let Some(key) = self.hmac_key.clone() else {
387 return;
391 };
392 if pending.is_empty() {
393 return;
398 }
399 let mut state = ChainState::default();
406 if let Ok(s) = self.chain_state.try_lock() {
407 state = s.clone();
408 } else if let Ok(g) = self.last_emitted_hmac.lock()
409 && let Some(h) = *g
410 {
411 state.last_hmac = h;
412 state.primed = true;
413 }
414 let now = SystemTime::now();
415 let batch = self
416 .batch_counter
417 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
418 let path = self.dest.path_for(now, batch);
419 let (rendered, new_last) = render_lines_chained(&pending, &key, &state);
420 let mut with_marker = rendered;
421 let eof = compute_eof_hmac(&key, &new_last);
422 with_marker.push_str(EOF_HMAC_COMMENT_PREFIX);
423 with_marker.push_str(&hex_encode(&eof));
424 with_marker.push('\n');
425 if let Err(e) = std::fs::create_dir_all(&self.dest.dir) {
426 tracing::warn!(
427 "S4 access log Drop: could not ensure dir {}: {e}",
428 self.dest.dir.display()
429 );
430 return;
431 }
432 let res = std::fs::OpenOptions::new()
433 .create(true)
434 .append(true)
435 .open(&path)
436 .and_then(|mut f| {
437 use std::io::Write;
438 f.write_all(with_marker.as_bytes())
439 });
440 if let Err(e) = res {
441 tracing::warn!(
442 "S4 access log Drop: failed to flush + EOF marker to {}: {e}",
443 path.display()
444 );
445 } else if let Ok(mut g) = self.last_emitted_path.lock() {
446 *g = Some(path);
447 }
448 }
449}
450
451impl Drop for AccessLog {
452 fn drop(&mut self) {
453 self.drop_emit_eof_marker();
462 }
463}
464
465fn render_lines_chained(
476 entries: &[AccessLogEntry],
477 key: &AuditHmacKey,
478 state: &ChainState,
479) -> (String, [u8; 32]) {
480 let mut out = String::with_capacity(entries.len() * 320 + 80);
483 if state.primed {
484 out.push_str(PREV_TAIL_COMMENT_PREFIX);
485 out.push_str(&hex_encode(&state.last_hmac));
486 out.push('\n');
487 }
488 let base = render_lines(entries);
489 let mut prev = state.last_hmac;
490 for raw_line in base.split_inclusive('\n') {
491 let line = raw_line.trim_end_matches('\n');
492 if line.is_empty() {
493 continue;
494 }
495 let mac = chain_step(key, &prev, line.as_bytes());
496 out.push_str(line);
497 out.push(' ');
498 out.push_str(&hex_encode(&mac));
499 out.push('\n');
500 prev = mac;
501 }
502 (out, prev)
503}
504
505pub type SharedAccessLog = Arc<AccessLog>;
507
508fn render_lines(entries: &[AccessLogEntry]) -> String {
509 let mut out = String::with_capacity(entries.len() * 256);
510 for e in entries {
511 let ts = unix_secs(e.time);
512 let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
513 out.push_str(&format!(
514 "- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
515 bucket = e.bucket,
516 ip = e.remote_ip.as_deref().unwrap_or("-"),
517 req = e.requester.as_deref().unwrap_or("-"),
518 op = e.operation,
519 key = e.key.as_deref().unwrap_or("-"),
520 uri = e.request_uri,
521 status = e.http_status,
522 err = e.error_code.as_deref().unwrap_or("-"),
523 bytes_sent = e.bytes_sent,
524 obj_size = e.object_size,
525 total_ms = e.total_time_ms,
526 ua = e.user_agent.as_deref().unwrap_or("-"),
527 ));
528 }
529 out
530}
531
532fn unix_secs(t: SystemTime) -> i64 {
533 t.duration_since(SystemTime::UNIX_EPOCH)
534 .map(|d| d.as_secs() as i64)
535 .unwrap_or(0)
536}
537
538fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
540 let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
541 (y, mo, d, h)
542}
543
544fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
545 let days = secs.div_euclid(86_400);
546 let rem = secs.rem_euclid(86_400);
547 let h = (rem / 3600) as u32;
548 let mi = ((rem % 3600) / 60) as u32;
549 let se = (rem % 60) as u32;
550 let z = days + 719_468;
552 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
553 let doe = z - era * 146_097;
554 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
555 let y_civil = yoe + era * 400;
556 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
557 let mp = (5 * doy + 2) / 153;
558 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
559 let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
560 let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
561 (y, mo_civil, d, h, mi, se)
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567
568 #[test]
569 fn parse_dest_local_dir() {
570 let d = AccessLogDest::parse("/var/log/s4").unwrap();
571 assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
572 }
573
574 #[test]
575 fn parse_dest_rejects_s3_url_until_phase_b() {
576 let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
577 assert!(err.contains("local-directory access-log only"));
578 }
579
580 #[test]
581 fn path_for_uses_hourly_naming() {
582 let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
583 let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
584 let p = d.path_for(now, 7);
585 let s = p.to_string_lossy();
586 assert!(s.starts_with("/tmp/s4-test/"));
587 assert!(s.ends_with("-0007.log"));
588 }
589
590 #[test]
591 fn unix_to_ymdh_known_value() {
592 let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
594 assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
595 }
596
597 fn sample_entry(bucket: &str) -> AccessLogEntry {
598 AccessLogEntry {
599 time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
600 bucket: bucket.into(),
601 remote_ip: Some("10.0.0.1".into()),
602 requester: Some("AKIATEST".into()),
603 operation: "REST.PUT.OBJECT",
604 key: Some("k".into()),
605 request_uri: "PUT /b/k HTTP/1.1".into(),
606 http_status: 200,
607 error_code: None,
608 bytes_sent: 0,
609 object_size: 4096,
610 total_time_ms: 12,
611 user_agent: Some("aws-cli/2.0".into()),
612 }
613 }
614
615 #[test]
616 fn chained_render_produces_verifiable_output() {
617 use std::str::FromStr;
618
619 use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
620 let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
621 let entries = vec![sample_entry("b1"), sample_entry("b2"), sample_entry("b3")];
622 let state = ChainState::default();
623 let (text, _last) = render_lines_chained(&entries, &key, &state);
624 assert!(!text.starts_with("# prev_file_tail="));
626 for raw in text.split_inclusive('\n') {
628 let line = raw.trim_end_matches('\n');
629 if line.is_empty() {
630 continue;
631 }
632 assert!(line.len() > 65);
633 let suf = &line[line.len() - 65..];
634 assert!(suf.starts_with(' '));
635 assert!(suf[1..].chars().all(|c| c.is_ascii_hexdigit()));
636 }
637 let report = verify_audit_bytes(
639 std::path::Path::new("<mem>"),
640 text.as_bytes(),
641 &key,
642 crate::audit_log::VerifyOptions::default(),
643 )
644 .unwrap();
645 assert!(report.first_break.is_none());
646 assert_eq!(report.ok_lines, 3);
647 }
648
649 #[test]
650 fn second_batch_emits_prev_file_tail_and_chains() {
651 use std::str::FromStr;
652
653 use crate::audit_log::{AuditHmacKey, VerifyOptions, verify_audit_bytes};
654 let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
655
656 let entries1 = vec![sample_entry("b1")];
658 let mut state = ChainState::default();
659 let (text1, last1) = render_lines_chained(&entries1, &key, &state);
660 state.last_hmac = last1;
661 state.primed = true;
662
663 let entries2 = vec![sample_entry("b2")];
666 let (text2, _) = render_lines_chained(&entries2, &key, &state);
667 assert!(text2.starts_with("# prev_file_tail="));
668 let report = verify_audit_bytes(
669 std::path::Path::new("<mem>"),
670 text2.as_bytes(),
671 &key,
672 VerifyOptions::default(),
673 )
674 .unwrap();
675 assert!(report.first_break.is_none(), "second batch must verify");
676 assert_eq!(report.ok_lines, 1);
677 let r1 = verify_audit_bytes(
679 std::path::Path::new("<mem>"),
680 text1.as_bytes(),
681 &key,
682 VerifyOptions::default(),
683 )
684 .unwrap();
685 assert!(r1.first_break.is_none());
686 }
687
688 #[test]
689 fn render_one_entry() {
690 let e = AccessLogEntry {
691 time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
692 bucket: "b".into(),
693 remote_ip: Some("10.0.0.1".into()),
694 requester: Some("AKIATEST".into()),
695 operation: "REST.PUT.OBJECT",
696 key: Some("k".into()),
697 request_uri: "PUT /b/k HTTP/1.1".into(),
698 http_status: 200,
699 error_code: None,
700 bytes_sent: 0,
701 object_size: 4096,
702 total_time_ms: 12,
703 user_agent: Some("aws-cli/2.0".into()),
704 };
705 let line = render_lines(&[e]);
706 assert!(line.contains("REST.PUT.OBJECT"));
707 assert!(line.contains("10.0.0.1"));
708 assert!(line.contains("AKIATEST"));
709 assert!(line.contains("\"aws-cli/2.0\""));
710 assert!(line.ends_with('\n'));
711 }
712}