Skip to main content

s4_server/
access_log.rs

1//! S3-style access-log emission (v0.4 #20).
2//!
3//! Writes one line per completed request in the AWS S3 server access log
4//! format (close enough for `awk` / `goaccess` / standard log analyzers
5//! to parse). Output is buffered and flushed periodically — destination
6//! is **another S3 bucket** (the convention AWS itself uses for S3
7//! server access logs), reached via the same backend the gateway is
8//! fronting. No new outbound dependencies.
9//!
10//! ## Format
11//!
12//! `bucket-owner bucket [time] remote-ip requester request-id operation
13//!  key request-uri http-status error-code bytes-sent object-size
14//!  total-time turn-around-time referer user-agent version-id
15//!  host-id sig-version cipher-suite auth-type host-header tls-version
16//!  access-point-arn acl-required`
17//!
18//! Most fields are stubbed (`-`) for the v0.4 release; the load-bearing
19//! columns are time, remote-ip, requester, operation, key, status,
20//! bytes-sent, object-size, total-time, user-agent.
21//!
22//! ## Operator config
23//!
24//! `--access-log s3://logs-bucket/prefix/{date}/` enables emission. The
25//! `{date}` placeholder expands to `YYYY-MM-DD-HH` (hourly rollover).
26//!
27//! ## Implementation note
28//!
29//! We deliberately don't compress the access-log objects — they're text
30//! and S4's own bucket-policy enforcement may want to read them raw. If
31//! you want them squished, point S4 at *another* S4 instance or front
32//! the log bucket with a separate gateway.
33
34use std::collections::VecDeque;
35use std::sync::Arc;
36use std::time::SystemTime;
37
38use bytes::Bytes;
39use tokio::sync::Mutex;
40
41/// Per-request structured fields collected at handler completion. The
42/// emitter renders this into the on-the-wire S3 access-log format on
43/// flush.
44#[derive(Debug, Clone)]
45pub struct AccessLogEntry {
46    pub time: SystemTime,
47    pub bucket: String,
48    pub remote_ip: Option<String>,
49    pub requester: Option<String>,
50    pub operation: &'static str,
51    pub key: Option<String>,
52    pub request_uri: String,
53    pub http_status: u16,
54    pub error_code: Option<String>,
55    pub bytes_sent: u64,
56    pub object_size: u64,
57    pub total_time_ms: u64,
58    pub user_agent: Option<String>,
59}
60
61/// Operator-configured destination: a local directory where hourly
62/// rotated `.log` files are written. v0.4 scope — `s3://` destination
63/// is a post-v0.4 follow-up; for now ship the entries to local disk and
64/// let a separate log-shipper (filebeat / fluent-bit / vector) push them
65/// to wherever they need to go.
66#[derive(Debug, Clone)]
67pub struct AccessLogDest {
68    pub dir: std::path::PathBuf,
69}
70
71impl AccessLogDest {
72    pub fn parse(s: &str) -> Result<Self, String> {
73        if let Some(stripped) = s.strip_prefix("s3://") {
74            return Err(format!(
75                "v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
76                 Use a local path or pipe via filebeat / vector to S3."
77            ));
78        }
79        let dir = std::path::PathBuf::from(s);
80        Ok(Self { dir })
81    }
82
83    /// Compose the file path for a flush at `now`. One file per hour
84    /// + a batch counter so high-volume hours don't single-file-balloon.
85    pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
86        let secs = now
87            .duration_since(SystemTime::UNIX_EPOCH)
88            .map(|d| d.as_secs())
89            .unwrap_or(0);
90        let (y, mo, d, h) = unix_to_ymdh(secs as i64);
91        self.dir
92            .join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
93    }
94}
95
96/// Buffered emitter. Per-handler call sites push entries via
97/// [`AccessLog::record`]; a background task drains the buffer and writes
98/// one S3 object per flush window.
99pub struct AccessLog {
100    dest: AccessLogDest,
101    buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
102    flush_every_secs: u64,
103    max_entries_before_flush: usize,
104    batch_counter: Arc<std::sync::atomic::AtomicU64>,
105}
106
107impl AccessLog {
108    pub fn new(dest: AccessLogDest) -> Self {
109        Self {
110            dest,
111            buf: Arc::new(Mutex::new(VecDeque::new())),
112            flush_every_secs: 60,
113            max_entries_before_flush: 5_000,
114            batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
115        }
116    }
117
118    pub async fn record(&self, entry: AccessLogEntry) {
119        let mut buf = self.buf.lock().await;
120        buf.push_back(entry);
121        if buf.len() >= self.max_entries_before_flush {
122            // Wake the flusher early — it polls on `flush_every_secs`,
123            // but a burst should land sooner. We do this by leaving the
124            // entries queued; the flusher loop checks size on every tick.
125        }
126    }
127
128    /// Spawn the background flusher. Drains the buffer every
129    /// `flush_every_secs` (default 60) and appends to the per-hour file
130    /// in `dest.dir`. Returns the tokio JoinHandle so the caller can
131    /// abort on shutdown if needed.
132    pub fn spawn_flusher(&self) -> tokio::task::JoinHandle<()> {
133        let dest = self.dest.clone();
134        let buf = Arc::clone(&self.buf);
135        let interval = self.flush_every_secs;
136        let counter = Arc::clone(&self.batch_counter);
137        if let Err(e) = std::fs::create_dir_all(&dest.dir) {
138            tracing::warn!(
139                "S4 access log: could not create dir {}: {e}",
140                dest.dir.display()
141            );
142        }
143        tokio::spawn(async move {
144            let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
145            loop {
146                tick.tick().await;
147                let drained: Vec<AccessLogEntry> = {
148                    let mut b = buf.lock().await;
149                    if b.is_empty() {
150                        continue;
151                    }
152                    b.drain(..).collect()
153                };
154                let now = SystemTime::now();
155                let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
156                let path = dest.path_for(now, batch);
157                let body = render_lines(&drained);
158                let body_bytes: Bytes = Bytes::from(body);
159                let path_clone = path.clone();
160                let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
161                    use std::io::Write;
162                    let mut f = std::fs::OpenOptions::new()
163                        .create(true)
164                        .append(true)
165                        .open(&path_clone)?;
166                    f.write_all(&body_bytes)
167                })
168                .await;
169                match res {
170                    Ok(Ok(())) => {}
171                    Ok(Err(e)) => {
172                        tracing::warn!("S4 access log write failed at {}: {e}", path.display());
173                    }
174                    Err(e) => {
175                        tracing::warn!("S4 access log task join failed: {e}");
176                    }
177                }
178            }
179        })
180    }
181}
182
183/// Public wrapper for ease of `Arc<AccessLog>` plumbing in S4Service.
184pub type SharedAccessLog = Arc<AccessLog>;
185
186fn render_lines(entries: &[AccessLogEntry]) -> String {
187    let mut out = String::with_capacity(entries.len() * 256);
188    for e in entries {
189        let ts = unix_secs(e.time);
190        let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
191        out.push_str(&format!(
192            "- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
193            bucket = e.bucket,
194            ip = e.remote_ip.as_deref().unwrap_or("-"),
195            req = e.requester.as_deref().unwrap_or("-"),
196            op = e.operation,
197            key = e.key.as_deref().unwrap_or("-"),
198            uri = e.request_uri,
199            status = e.http_status,
200            err = e.error_code.as_deref().unwrap_or("-"),
201            bytes_sent = e.bytes_sent,
202            obj_size = e.object_size,
203            total_ms = e.total_time_ms,
204            ua = e.user_agent.as_deref().unwrap_or("-"),
205        ));
206    }
207    out
208}
209
210fn unix_secs(t: SystemTime) -> i64 {
211    t.duration_since(SystemTime::UNIX_EPOCH)
212        .map(|d| d.as_secs() as i64)
213        .unwrap_or(0)
214}
215
216/// Civil from unix seconds → (year, month, day, hour). UTC.
217fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
218    let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
219    (y, mo, d, h)
220}
221
222fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
223    let days = secs.div_euclid(86_400);
224    let rem = secs.rem_euclid(86_400);
225    let h = (rem / 3600) as u32;
226    let mi = ((rem % 3600) / 60) as u32;
227    let se = (rem % 60) as u32;
228    // Hinnant civil-from-days (public domain)
229    let z = days + 719_468;
230    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
231    let doe = z - era * 146_097;
232    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
233    let y_civil = yoe + era * 400;
234    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
235    let mp = (5 * doy + 2) / 153;
236    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
237    let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
238    let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
239    (y, mo_civil, d, h, mi, se)
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn parse_dest_local_dir() {
248        let d = AccessLogDest::parse("/var/log/s4").unwrap();
249        assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
250    }
251
252    #[test]
253    fn parse_dest_rejects_s3_url_until_phase_b() {
254        let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
255        assert!(err.contains("local-directory access-log only"));
256    }
257
258    #[test]
259    fn path_for_uses_hourly_naming() {
260        let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
261        let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
262        let p = d.path_for(now, 7);
263        let s = p.to_string_lossy();
264        assert!(s.starts_with("/tmp/s4-test/"));
265        assert!(s.ends_with("-0007.log"));
266    }
267
268    #[test]
269    fn unix_to_ymdh_known_value() {
270        // 2026-05-13 00:00:00 UTC = 1779048000s
271        let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
272        assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
273    }
274
275    #[test]
276    fn render_one_entry() {
277        let e = AccessLogEntry {
278            time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
279            bucket: "b".into(),
280            remote_ip: Some("10.0.0.1".into()),
281            requester: Some("AKIATEST".into()),
282            operation: "REST.PUT.OBJECT",
283            key: Some("k".into()),
284            request_uri: "PUT /b/k HTTP/1.1".into(),
285            http_status: 200,
286            error_code: None,
287            bytes_sent: 0,
288            object_size: 4096,
289            total_time_ms: 12,
290            user_agent: Some("aws-cli/2.0".into()),
291        };
292        let line = render_lines(&[e]);
293        assert!(line.contains("REST.PUT.OBJECT"));
294        assert!(line.contains("10.0.0.1"));
295        assert!(line.contains("AKIATEST"));
296        assert!(line.contains("\"aws-cli/2.0\""));
297        assert!(line.ends_with('\n'));
298    }
299}