1use std::collections::VecDeque;
35use std::sync::Arc;
36use std::time::SystemTime;
37
38use bytes::Bytes;
39use tokio::sync::Mutex;
40
41#[derive(Debug, Clone)]
45pub struct AccessLogEntry {
46 pub time: SystemTime,
47 pub bucket: String,
48 pub remote_ip: Option<String>,
49 pub requester: Option<String>,
50 pub operation: &'static str,
51 pub key: Option<String>,
52 pub request_uri: String,
53 pub http_status: u16,
54 pub error_code: Option<String>,
55 pub bytes_sent: u64,
56 pub object_size: u64,
57 pub total_time_ms: u64,
58 pub user_agent: Option<String>,
59}
60
61#[derive(Debug, Clone)]
67pub struct AccessLogDest {
68 pub dir: std::path::PathBuf,
69}
70
71impl AccessLogDest {
72 pub fn parse(s: &str) -> Result<Self, String> {
73 if let Some(stripped) = s.strip_prefix("s3://") {
74 return Err(format!(
75 "v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
76 Use a local path or pipe via filebeat / vector to S3."
77 ));
78 }
79 let dir = std::path::PathBuf::from(s);
80 Ok(Self { dir })
81 }
82
83 pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
86 let secs = now
87 .duration_since(SystemTime::UNIX_EPOCH)
88 .map(|d| d.as_secs())
89 .unwrap_or(0);
90 let (y, mo, d, h) = unix_to_ymdh(secs as i64);
91 self.dir
92 .join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
93 }
94}
95
96pub struct AccessLog {
100 dest: AccessLogDest,
101 buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
102 flush_every_secs: u64,
103 max_entries_before_flush: usize,
104 batch_counter: Arc<std::sync::atomic::AtomicU64>,
105}
106
107impl AccessLog {
108 pub fn new(dest: AccessLogDest) -> Self {
109 Self {
110 dest,
111 buf: Arc::new(Mutex::new(VecDeque::new())),
112 flush_every_secs: 60,
113 max_entries_before_flush: 5_000,
114 batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
115 }
116 }
117
118 pub async fn record(&self, entry: AccessLogEntry) {
119 let mut buf = self.buf.lock().await;
120 buf.push_back(entry);
121 if buf.len() >= self.max_entries_before_flush {
122 }
126 }
127
128 pub fn spawn_flusher(&self) -> tokio::task::JoinHandle<()> {
133 let dest = self.dest.clone();
134 let buf = Arc::clone(&self.buf);
135 let interval = self.flush_every_secs;
136 let counter = Arc::clone(&self.batch_counter);
137 if let Err(e) = std::fs::create_dir_all(&dest.dir) {
138 tracing::warn!(
139 "S4 access log: could not create dir {}: {e}",
140 dest.dir.display()
141 );
142 }
143 tokio::spawn(async move {
144 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
145 loop {
146 tick.tick().await;
147 let drained: Vec<AccessLogEntry> = {
148 let mut b = buf.lock().await;
149 if b.is_empty() {
150 continue;
151 }
152 b.drain(..).collect()
153 };
154 let now = SystemTime::now();
155 let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
156 let path = dest.path_for(now, batch);
157 let body = render_lines(&drained);
158 let body_bytes: Bytes = Bytes::from(body);
159 let path_clone = path.clone();
160 let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
161 use std::io::Write;
162 let mut f = std::fs::OpenOptions::new()
163 .create(true)
164 .append(true)
165 .open(&path_clone)?;
166 f.write_all(&body_bytes)
167 })
168 .await;
169 match res {
170 Ok(Ok(())) => {}
171 Ok(Err(e)) => {
172 tracing::warn!("S4 access log write failed at {}: {e}", path.display());
173 }
174 Err(e) => {
175 tracing::warn!("S4 access log task join failed: {e}");
176 }
177 }
178 }
179 })
180 }
181}
182
183pub type SharedAccessLog = Arc<AccessLog>;
185
186fn render_lines(entries: &[AccessLogEntry]) -> String {
187 let mut out = String::with_capacity(entries.len() * 256);
188 for e in entries {
189 let ts = unix_secs(e.time);
190 let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
191 out.push_str(&format!(
192 "- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
193 bucket = e.bucket,
194 ip = e.remote_ip.as_deref().unwrap_or("-"),
195 req = e.requester.as_deref().unwrap_or("-"),
196 op = e.operation,
197 key = e.key.as_deref().unwrap_or("-"),
198 uri = e.request_uri,
199 status = e.http_status,
200 err = e.error_code.as_deref().unwrap_or("-"),
201 bytes_sent = e.bytes_sent,
202 obj_size = e.object_size,
203 total_ms = e.total_time_ms,
204 ua = e.user_agent.as_deref().unwrap_or("-"),
205 ));
206 }
207 out
208}
209
210fn unix_secs(t: SystemTime) -> i64 {
211 t.duration_since(SystemTime::UNIX_EPOCH)
212 .map(|d| d.as_secs() as i64)
213 .unwrap_or(0)
214}
215
216fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
218 let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
219 (y, mo, d, h)
220}
221
222fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
223 let days = secs.div_euclid(86_400);
224 let rem = secs.rem_euclid(86_400);
225 let h = (rem / 3600) as u32;
226 let mi = ((rem % 3600) / 60) as u32;
227 let se = (rem % 60) as u32;
228 let z = days + 719_468;
230 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
231 let doe = z - era * 146_097;
232 let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
233 let y_civil = yoe + era * 400;
234 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
235 let mp = (5 * doy + 2) / 153;
236 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
237 let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
238 let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
239 (y, mo_civil, d, h, mi, se)
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn parse_dest_local_dir() {
248 let d = AccessLogDest::parse("/var/log/s4").unwrap();
249 assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
250 }
251
252 #[test]
253 fn parse_dest_rejects_s3_url_until_phase_b() {
254 let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
255 assert!(err.contains("local-directory access-log only"));
256 }
257
258 #[test]
259 fn path_for_uses_hourly_naming() {
260 let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
261 let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
262 let p = d.path_for(now, 7);
263 let s = p.to_string_lossy();
264 assert!(s.starts_with("/tmp/s4-test/"));
265 assert!(s.ends_with("-0007.log"));
266 }
267
268 #[test]
269 fn unix_to_ymdh_known_value() {
270 let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
272 assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
273 }
274
275 #[test]
276 fn render_one_entry() {
277 let e = AccessLogEntry {
278 time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
279 bucket: "b".into(),
280 remote_ip: Some("10.0.0.1".into()),
281 requester: Some("AKIATEST".into()),
282 operation: "REST.PUT.OBJECT",
283 key: Some("k".into()),
284 request_uri: "PUT /b/k HTTP/1.1".into(),
285 http_status: 200,
286 error_code: None,
287 bytes_sent: 0,
288 object_size: 4096,
289 total_time_ms: 12,
290 user_agent: Some("aws-cli/2.0".into()),
291 };
292 let line = render_lines(&[e]);
293 assert!(line.contains("REST.PUT.OBJECT"));
294 assert!(line.contains("10.0.0.1"));
295 assert!(line.contains("AKIATEST"));
296 assert!(line.contains("\"aws-cli/2.0\""));
297 assert!(line.ends_with('\n'));
298 }
299}