1use std::collections::VecDeque;
35use std::sync::Arc;
36use std::time::SystemTime;
37
38use bytes::Bytes;
39use tokio::sync::Mutex;
40
41use crate::audit_log::{
42 AuditHmacKey, PREV_TAIL_COMMENT_PREFIX, chain_step, genesis_prev, hex_encode,
43};
44
45#[derive(Debug, Clone)]
49pub struct AccessLogEntry {
50 pub time: SystemTime,
51 pub bucket: String,
52 pub remote_ip: Option<String>,
53 pub requester: Option<String>,
54 pub operation: &'static str,
55 pub key: Option<String>,
56 pub request_uri: String,
57 pub http_status: u16,
58 pub error_code: Option<String>,
59 pub bytes_sent: u64,
60 pub object_size: u64,
61 pub total_time_ms: u64,
62 pub user_agent: Option<String>,
63}
64
65#[derive(Debug, Clone)]
71pub struct AccessLogDest {
72 pub dir: std::path::PathBuf,
73}
74
75impl AccessLogDest {
76 pub fn parse(s: &str) -> Result<Self, String> {
77 if let Some(stripped) = s.strip_prefix("s3://") {
78 return Err(format!(
79 "v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
80 Use a local path or pipe via filebeat / vector to S3."
81 ));
82 }
83 let dir = std::path::PathBuf::from(s);
84 Ok(Self { dir })
85 }
86
87 pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
90 let secs = now
91 .duration_since(SystemTime::UNIX_EPOCH)
92 .map(|d| d.as_secs())
93 .unwrap_or(0);
94 let (y, mo, d, h) = unix_to_ymdh(secs as i64);
95 self.dir
96 .join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
97 }
98}
99
100pub struct AccessLog {
104 dest: AccessLogDest,
105 buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
106 flush_every_secs: u64,
107 max_entries_before_flush: usize,
108 batch_counter: Arc<std::sync::atomic::AtomicU64>,
109 hmac_key: Option<Arc<AuditHmacKey>>,
114 chain_state: Arc<Mutex<ChainState>>,
118}
119
120#[derive(Debug, Clone)]
121struct ChainState {
122 last_hmac: [u8; 32],
123 primed: bool,
126}
127
128impl Default for ChainState {
129 fn default() -> Self {
130 Self {
131 last_hmac: genesis_prev(),
132 primed: false,
133 }
134 }
135}
136
137impl AccessLog {
138 pub fn new(dest: AccessLogDest) -> Self {
139 Self {
140 dest,
141 buf: Arc::new(Mutex::new(VecDeque::new())),
142 flush_every_secs: 60,
143 max_entries_before_flush: 5_000,
144 batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
145 hmac_key: None,
146 chain_state: Arc::new(Mutex::new(ChainState::default())),
147 }
148 }
149
150 #[must_use]
156 pub fn with_hmac_key(mut self, key: Arc<AuditHmacKey>) -> Self {
157 self.hmac_key = Some(key);
158 self
159 }
160
161 pub async fn record(&self, entry: AccessLogEntry) {
162 let mut buf = self.buf.lock().await;
163 buf.push_back(entry);
164 if buf.len() >= self.max_entries_before_flush {
165 }
169 }
170
171 pub fn spawn_flusher(&self) -> tokio::task::JoinHandle<()> {
176 let dest = self.dest.clone();
177 let buf = Arc::clone(&self.buf);
178 let interval = self.flush_every_secs;
179 let counter = Arc::clone(&self.batch_counter);
180 let hmac_key = self.hmac_key.clone();
181 let chain_state = Arc::clone(&self.chain_state);
182 if let Err(e) = std::fs::create_dir_all(&dest.dir) {
183 tracing::warn!(
184 "S4 access log: could not create dir {}: {e}",
185 dest.dir.display()
186 );
187 }
188 tokio::spawn(async move {
189 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
190 loop {
191 tick.tick().await;
192 let drained: Vec<AccessLogEntry> = {
193 let mut b = buf.lock().await;
194 if b.is_empty() {
195 continue;
196 }
197 b.drain(..).collect()
198 };
199 let now = SystemTime::now();
200 let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
201 let path = dest.path_for(now, batch);
202 let body = if let Some(key) = hmac_key.as_ref() {
203 let mut state = chain_state.lock().await;
204 let (rendered, new_last) = render_lines_chained(&drained, key, &state);
205 state.last_hmac = new_last;
206 state.primed = true;
207 rendered
208 } else {
209 render_lines(&drained)
210 };
211 let body_bytes: Bytes = Bytes::from(body);
212 let path_clone = path.clone();
213 let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
214 use std::io::Write;
215 let mut f = std::fs::OpenOptions::new()
216 .create(true)
217 .append(true)
218 .open(&path_clone)?;
219 f.write_all(&body_bytes)
220 })
221 .await;
222 match res {
223 Ok(Ok(())) => {}
224 Ok(Err(e)) => {
225 tracing::warn!("S4 access log write failed at {}: {e}", path.display());
226 }
227 Err(e) => {
228 tracing::warn!("S4 access log task join failed: {e}");
229 }
230 }
231 }
232 })
233 }
234}
235
236fn render_lines_chained(
247 entries: &[AccessLogEntry],
248 key: &AuditHmacKey,
249 state: &ChainState,
250) -> (String, [u8; 32]) {
251 let mut out = String::with_capacity(entries.len() * 320 + 80);
254 if state.primed {
255 out.push_str(PREV_TAIL_COMMENT_PREFIX);
256 out.push_str(&hex_encode(&state.last_hmac));
257 out.push('\n');
258 }
259 let base = render_lines(entries);
260 let mut prev = state.last_hmac;
261 for raw_line in base.split_inclusive('\n') {
262 let line = raw_line.trim_end_matches('\n');
263 if line.is_empty() {
264 continue;
265 }
266 let mac = chain_step(key, &prev, line.as_bytes());
267 out.push_str(line);
268 out.push(' ');
269 out.push_str(&hex_encode(&mac));
270 out.push('\n');
271 prev = mac;
272 }
273 (out, prev)
274}
275
276pub type SharedAccessLog = Arc<AccessLog>;
278
279fn render_lines(entries: &[AccessLogEntry]) -> String {
280 let mut out = String::with_capacity(entries.len() * 256);
281 for e in entries {
282 let ts = unix_secs(e.time);
283 let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
284 out.push_str(&format!(
285 "- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
286 bucket = e.bucket,
287 ip = e.remote_ip.as_deref().unwrap_or("-"),
288 req = e.requester.as_deref().unwrap_or("-"),
289 op = e.operation,
290 key = e.key.as_deref().unwrap_or("-"),
291 uri = e.request_uri,
292 status = e.http_status,
293 err = e.error_code.as_deref().unwrap_or("-"),
294 bytes_sent = e.bytes_sent,
295 obj_size = e.object_size,
296 total_ms = e.total_time_ms,
297 ua = e.user_agent.as_deref().unwrap_or("-"),
298 ));
299 }
300 out
301}
302
303fn unix_secs(t: SystemTime) -> i64 {
304 t.duration_since(SystemTime::UNIX_EPOCH)
305 .map(|d| d.as_secs() as i64)
306 .unwrap_or(0)
307}
308
309fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
311 let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
312 (y, mo, d, h)
313}
314
315fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
316 let days = secs.div_euclid(86_400);
317 let rem = secs.rem_euclid(86_400);
318 let h = (rem / 3600) as u32;
319 let mi = ((rem % 3600) / 60) as u32;
320 let se = (rem % 60) as u32;
321 let z = days + 719_468;
323 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
324 let doe = z - era * 146_097;
325 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
326 let y_civil = yoe + era * 400;
327 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
328 let mp = (5 * doy + 2) / 153;
329 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
330 let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
331 let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
332 (y, mo_civil, d, h, mi, se)
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn parse_dest_local_dir() {
341 let d = AccessLogDest::parse("/var/log/s4").unwrap();
342 assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
343 }
344
345 #[test]
346 fn parse_dest_rejects_s3_url_until_phase_b() {
347 let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
348 assert!(err.contains("local-directory access-log only"));
349 }
350
351 #[test]
352 fn path_for_uses_hourly_naming() {
353 let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
354 let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
355 let p = d.path_for(now, 7);
356 let s = p.to_string_lossy();
357 assert!(s.starts_with("/tmp/s4-test/"));
358 assert!(s.ends_with("-0007.log"));
359 }
360
361 #[test]
362 fn unix_to_ymdh_known_value() {
363 let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
365 assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
366 }
367
368 fn sample_entry(bucket: &str) -> AccessLogEntry {
369 AccessLogEntry {
370 time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
371 bucket: bucket.into(),
372 remote_ip: Some("10.0.0.1".into()),
373 requester: Some("AKIATEST".into()),
374 operation: "REST.PUT.OBJECT",
375 key: Some("k".into()),
376 request_uri: "PUT /b/k HTTP/1.1".into(),
377 http_status: 200,
378 error_code: None,
379 bytes_sent: 0,
380 object_size: 4096,
381 total_time_ms: 12,
382 user_agent: Some("aws-cli/2.0".into()),
383 }
384 }
385
386 #[test]
387 fn chained_render_produces_verifiable_output() {
388 use std::str::FromStr;
389
390 use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
391 let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
392 let entries = vec![sample_entry("b1"), sample_entry("b2"), sample_entry("b3")];
393 let state = ChainState::default();
394 let (text, _last) = render_lines_chained(&entries, &key, &state);
395 assert!(!text.starts_with("# prev_file_tail="));
397 for raw in text.split_inclusive('\n') {
399 let line = raw.trim_end_matches('\n');
400 if line.is_empty() {
401 continue;
402 }
403 assert!(line.len() > 65);
404 let suf = &line[line.len() - 65..];
405 assert!(suf.starts_with(' '));
406 assert!(suf[1..].chars().all(|c| c.is_ascii_hexdigit()));
407 }
408 let report =
410 verify_audit_bytes(std::path::Path::new("<mem>"), text.as_bytes(), &key).unwrap();
411 assert!(report.first_break.is_none());
412 assert_eq!(report.ok_lines, 3);
413 }
414
415 #[test]
416 fn second_batch_emits_prev_file_tail_and_chains() {
417 use std::str::FromStr;
418
419 use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
420 let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
421
422 let entries1 = vec![sample_entry("b1")];
424 let mut state = ChainState::default();
425 let (text1, last1) = render_lines_chained(&entries1, &key, &state);
426 state.last_hmac = last1;
427 state.primed = true;
428
429 let entries2 = vec![sample_entry("b2")];
432 let (text2, _) = render_lines_chained(&entries2, &key, &state);
433 assert!(text2.starts_with("# prev_file_tail="));
434 let report =
435 verify_audit_bytes(std::path::Path::new("<mem>"), text2.as_bytes(), &key).unwrap();
436 assert!(report.first_break.is_none(), "second batch must verify");
437 assert_eq!(report.ok_lines, 1);
438 let r1 =
440 verify_audit_bytes(std::path::Path::new("<mem>"), text1.as_bytes(), &key).unwrap();
441 assert!(r1.first_break.is_none());
442 }
443
444 #[test]
445 fn render_one_entry() {
446 let e = AccessLogEntry {
447 time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
448 bucket: "b".into(),
449 remote_ip: Some("10.0.0.1".into()),
450 requester: Some("AKIATEST".into()),
451 operation: "REST.PUT.OBJECT",
452 key: Some("k".into()),
453 request_uri: "PUT /b/k HTTP/1.1".into(),
454 http_status: 200,
455 error_code: None,
456 bytes_sent: 0,
457 object_size: 4096,
458 total_time_ms: 12,
459 user_agent: Some("aws-cli/2.0".into()),
460 };
461 let line = render_lines(&[e]);
462 assert!(line.contains("REST.PUT.OBJECT"));
463 assert!(line.contains("10.0.0.1"));
464 assert!(line.contains("AKIATEST"));
465 assert!(line.contains("\"aws-cli/2.0\""));
466 assert!(line.ends_with('\n'));
467 }
468}