s4-server 0.8.0

S4 — Squished S3 — GPU-accelerated transparent compression S3-compatible storage gateway (cargo install s4-server installs the `s4` binary).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
//! S3-style access-log emission (v0.4 #20).
//!
//! Writes one line per completed request in the AWS S3 server access log
//! format (close enough for `awk` / `goaccess` / standard log analyzers
//! to parse). Output is buffered and flushed periodically — destination
//! is **another S3 bucket** (the convention AWS itself uses for S3
//! server access logs), reached via the same backend the gateway is
//! fronting. No new outbound dependencies.
//!
//! ## Format
//!
//! `bucket-owner bucket [time] remote-ip requester request-id operation
//!  key request-uri http-status error-code bytes-sent object-size
//!  total-time turn-around-time referer user-agent version-id
//!  host-id sig-version cipher-suite auth-type host-header tls-version
//!  access-point-arn acl-required`
//!
//! Most fields are stubbed (`-`) for the v0.4 release; the load-bearing
//! columns are time, remote-ip, requester, operation, key, status,
//! bytes-sent, object-size, total-time, user-agent.
//!
//! ## Operator config
//!
//! `--access-log s3://logs-bucket/prefix/{date}/` enables emission. The
//! `{date}` placeholder expands to `YYYY-MM-DD-HH` (hourly rollover).
//!
//! ## Implementation note
//!
//! We deliberately don't compress the access-log objects — they're text
//! and S4's own bucket-policy enforcement may want to read them raw. If
//! you want them squished, point S4 at *another* S4 instance or front
//! the log bucket with a separate gateway.

use std::collections::VecDeque;
use std::sync::Arc;
use std::time::SystemTime;

use bytes::Bytes;
use tokio::sync::Mutex;

use crate::audit_log::{
    AuditHmacKey, PREV_TAIL_COMMENT_PREFIX, chain_step, genesis_prev, hex_encode,
};

/// Per-request structured fields collected at handler completion. The
/// emitter renders this into the on-the-wire S3 access-log format on
/// flush.
#[derive(Debug, Clone)]
pub struct AccessLogEntry {
    pub time: SystemTime,
    pub bucket: String,
    pub remote_ip: Option<String>,
    pub requester: Option<String>,
    pub operation: &'static str,
    pub key: Option<String>,
    pub request_uri: String,
    pub http_status: u16,
    pub error_code: Option<String>,
    pub bytes_sent: u64,
    pub object_size: u64,
    pub total_time_ms: u64,
    pub user_agent: Option<String>,
}

/// Operator-configured destination: a local directory where hourly
/// rotated `.log` files are written. v0.4 scope — `s3://` destination
/// is a post-v0.4 follow-up; for now ship the entries to local disk and
/// let a separate log-shipper (filebeat / fluent-bit / vector) push them
/// to wherever they need to go.
#[derive(Debug, Clone)]
pub struct AccessLogDest {
    pub dir: std::path::PathBuf,
}

impl AccessLogDest {
    pub fn parse(s: &str) -> Result<Self, String> {
        if let Some(stripped) = s.strip_prefix("s3://") {
            return Err(format!(
                "v0.4 ships local-directory access-log only; got s3:// destination ({stripped:?}). \
                 Use a local path or pipe via filebeat / vector to S3."
            ));
        }
        let dir = std::path::PathBuf::from(s);
        Ok(Self { dir })
    }

    /// Compose the file path for a flush at `now`. One file per hour
    /// + a batch counter so high-volume hours don't single-file-balloon.
    pub fn path_for(&self, now: SystemTime, batch: u64) -> std::path::PathBuf {
        let secs = now
            .duration_since(SystemTime::UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);
        let (y, mo, d, h) = unix_to_ymdh(secs as i64);
        self.dir
            .join(format!("{y:04}-{mo:02}-{d:02}-{h:02}-{batch:04}.log"))
    }
}

/// Buffered emitter. Per-handler call sites push entries via
/// [`AccessLog::record`]; a background task drains the buffer and writes
/// one S3 object per flush window.
pub struct AccessLog {
    dest: AccessLogDest,
    buf: Arc<Mutex<VecDeque<AccessLogEntry>>>,
    flush_every_secs: u64,
    max_entries_before_flush: usize,
    batch_counter: Arc<std::sync::atomic::AtomicU64>,
    /// v0.5 #31: optional HMAC-SHA256 key. When `Some(...)`, the
    /// flusher appends a hex HMAC column to every line and emits a
    /// `# prev_file_tail=<hex>` comment at the top of each rotated
    /// batch file so the chain extends across rotations.
    hmac_key: Option<Arc<AuditHmacKey>>,
    /// Running chain state — the last HMAC the flusher emitted (or
    /// the genesis seed if nothing has been emitted yet). Updated
    /// in-place at the end of each flush batch.
    chain_state: Arc<Mutex<ChainState>>,
}

#[derive(Debug, Clone)]
struct ChainState {
    last_hmac: [u8; 32],
    /// True once at least one batch has been written, so the next
    /// batch knows it must emit a `# prev_file_tail=` comment.
    primed: bool,
}

impl Default for ChainState {
    fn default() -> Self {
        Self {
            last_hmac: genesis_prev(),
            primed: false,
        }
    }
}

impl AccessLog {
    pub fn new(dest: AccessLogDest) -> Self {
        Self {
            dest,
            buf: Arc::new(Mutex::new(VecDeque::new())),
            flush_every_secs: 60,
            max_entries_before_flush: 5_000,
            batch_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
            hmac_key: None,
            chain_state: Arc::new(Mutex::new(ChainState::default())),
        }
    }

    /// v0.5 #31: turn on tamper-evident HMAC chaining. Every emitted
    /// line gets a trailing hex HMAC column, and each new batch file
    /// starts with a `# prev_file_tail=<hex>` comment so the chain
    /// extends across rotations. Without this builder, lines are
    /// emitted exactly as before (back-compat with v0.4 #20 readers).
    #[must_use]
    pub fn with_hmac_key(mut self, key: Arc<AuditHmacKey>) -> Self {
        self.hmac_key = Some(key);
        self
    }

    pub async fn record(&self, entry: AccessLogEntry) {
        let mut buf = self.buf.lock().await;
        buf.push_back(entry);
        if buf.len() >= self.max_entries_before_flush {
            // Wake the flusher early — it polls on `flush_every_secs`,
            // but a burst should land sooner. We do this by leaving the
            // entries queued; the flusher loop checks size on every tick.
        }
    }

    /// Spawn the background flusher. Drains the buffer every
    /// `flush_every_secs` (default 60) and appends to the per-hour file
    /// in `dest.dir`. Returns the tokio JoinHandle so the caller can
    /// abort on shutdown if needed.
    pub fn spawn_flusher(&self) -> tokio::task::JoinHandle<()> {
        let dest = self.dest.clone();
        let buf = Arc::clone(&self.buf);
        let interval = self.flush_every_secs;
        let counter = Arc::clone(&self.batch_counter);
        let hmac_key = self.hmac_key.clone();
        let chain_state = Arc::clone(&self.chain_state);
        if let Err(e) = std::fs::create_dir_all(&dest.dir) {
            tracing::warn!(
                "S4 access log: could not create dir {}: {e}",
                dest.dir.display()
            );
        }
        tokio::spawn(async move {
            let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval));
            loop {
                tick.tick().await;
                let drained: Vec<AccessLogEntry> = {
                    let mut b = buf.lock().await;
                    if b.is_empty() {
                        continue;
                    }
                    b.drain(..).collect()
                };
                let now = SystemTime::now();
                let batch = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                let path = dest.path_for(now, batch);
                let body = if let Some(key) = hmac_key.as_ref() {
                    let mut state = chain_state.lock().await;
                    let (rendered, new_last) = render_lines_chained(&drained, key, &state);
                    state.last_hmac = new_last;
                    state.primed = true;
                    rendered
                } else {
                    render_lines(&drained)
                };
                let body_bytes: Bytes = Bytes::from(body);
                let path_clone = path.clone();
                let res = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
                    use std::io::Write;
                    let mut f = std::fs::OpenOptions::new()
                        .create(true)
                        .append(true)
                        .open(&path_clone)?;
                    f.write_all(&body_bytes)
                })
                .await;
                match res {
                    Ok(Ok(())) => {}
                    Ok(Err(e)) => {
                        tracing::warn!("S4 access log write failed at {}: {e}", path.display());
                    }
                    Err(e) => {
                        tracing::warn!("S4 access log task join failed: {e}");
                    }
                }
            }
        })
    }
}

/// Render `entries` with a trailing HMAC column on each line, plus a
/// `# prev_file_tail=<hex>` preamble when `state.primed` is true (i.e.
/// this is not the very first batch). Returns the rendered text and
/// the final chain HMAC, which the caller must persist back to the
/// shared state.
///
/// Each line's HMAC is computed over `prev_hmac || line_no_hmac`,
/// where `line_no_hmac` is the bytes of the line WITHOUT the trailing
/// HMAC column AND WITHOUT the trailing newline. The producer then
/// appends ` <hex>\n` to land on the wire format the verifier expects.
fn render_lines_chained(
    entries: &[AccessLogEntry],
    key: &AuditHmacKey,
    state: &ChainState,
) -> (String, [u8; 32]) {
    // Reserve a generous budget: ~256 chars per base line + 65 for
    // " <hex>\n", plus 80 for the preamble.
    let mut out = String::with_capacity(entries.len() * 320 + 80);
    if state.primed {
        out.push_str(PREV_TAIL_COMMENT_PREFIX);
        out.push_str(&hex_encode(&state.last_hmac));
        out.push('\n');
    }
    let base = render_lines(entries);
    let mut prev = state.last_hmac;
    for raw_line in base.split_inclusive('\n') {
        let line = raw_line.trim_end_matches('\n');
        if line.is_empty() {
            continue;
        }
        let mac = chain_step(key, &prev, line.as_bytes());
        out.push_str(line);
        out.push(' ');
        out.push_str(&hex_encode(&mac));
        out.push('\n');
        prev = mac;
    }
    (out, prev)
}

/// Public wrapper for ease of `Arc<AccessLog>` plumbing in S4Service.
pub type SharedAccessLog = Arc<AccessLog>;

fn render_lines(entries: &[AccessLogEntry]) -> String {
    let mut out = String::with_capacity(entries.len() * 256);
    for e in entries {
        let ts = unix_secs(e.time);
        let (y, mo, d, h, mi, se) = unix_to_ymdhms(ts);
        out.push_str(&format!(
            "- {bucket} [{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{se:02}Z] {ip} {req} - {op} {key} \"{uri}\" {status} {err} {bytes_sent} {obj_size} {total_ms} - - \"{ua}\" - - SigV4 - AuthHeader - TLSv1.3 - -\n",
            bucket = e.bucket,
            ip = e.remote_ip.as_deref().unwrap_or("-"),
            req = e.requester.as_deref().unwrap_or("-"),
            op = e.operation,
            key = e.key.as_deref().unwrap_or("-"),
            uri = e.request_uri,
            status = e.http_status,
            err = e.error_code.as_deref().unwrap_or("-"),
            bytes_sent = e.bytes_sent,
            obj_size = e.object_size,
            total_ms = e.total_time_ms,
            ua = e.user_agent.as_deref().unwrap_or("-"),
        ));
    }
    out
}

fn unix_secs(t: SystemTime) -> i64 {
    t.duration_since(SystemTime::UNIX_EPOCH)
        .map(|d| d.as_secs() as i64)
        .unwrap_or(0)
}

/// Civil from unix seconds → (year, month, day, hour). UTC.
fn unix_to_ymdh(secs: i64) -> (i64, u32, u32, u32) {
    let (y, mo, d, h, _mi, _se) = unix_to_ymdhms(secs);
    (y, mo, d, h)
}

fn unix_to_ymdhms(secs: i64) -> (i64, u32, u32, u32, u32, u32) {
    let days = secs.div_euclid(86_400);
    let rem = secs.rem_euclid(86_400);
    let h = (rem / 3600) as u32;
    let mi = ((rem % 3600) / 60) as u32;
    let se = (rem % 60) as u32;
    // Hinnant civil-from-days (public domain)
    let z = days + 719_468;
    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
    let doe = z - era * 146_097;
    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365;
    let y_civil = yoe + era * 400;
    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
    let mp = (5 * doy + 2) / 153;
    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
    let mo_civil = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
    let y = if mo_civil <= 2 { y_civil + 1 } else { y_civil };
    (y, mo_civil, d, h, mi, se)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parse_dest_local_dir() {
        let d = AccessLogDest::parse("/var/log/s4").unwrap();
        assert_eq!(d.dir, std::path::PathBuf::from("/var/log/s4"));
    }

    #[test]
    fn parse_dest_rejects_s3_url_until_phase_b() {
        let err = AccessLogDest::parse("s3://logs/access/").unwrap_err();
        assert!(err.contains("local-directory access-log only"));
    }

    #[test]
    fn path_for_uses_hourly_naming() {
        let d = AccessLogDest::parse("/tmp/s4-test").unwrap();
        let now = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
        let p = d.path_for(now, 7);
        let s = p.to_string_lossy();
        assert!(s.starts_with("/tmp/s4-test/"));
        assert!(s.ends_with("-0007.log"));
    }

    #[test]
    fn unix_to_ymdh_known_value() {
        // 2026-05-13 00:00:00 UTC = 1779048000s
        let (y, mo, d, h) = unix_to_ymdh(1_779_148_800);
        assert!(y == 2026 && (1..=12).contains(&mo) && (1..=31).contains(&d) && h < 24);
    }

    fn sample_entry(bucket: &str) -> AccessLogEntry {
        AccessLogEntry {
            time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
            bucket: bucket.into(),
            remote_ip: Some("10.0.0.1".into()),
            requester: Some("AKIATEST".into()),
            operation: "REST.PUT.OBJECT",
            key: Some("k".into()),
            request_uri: "PUT /b/k HTTP/1.1".into(),
            http_status: 200,
            error_code: None,
            bytes_sent: 0,
            object_size: 4096,
            total_time_ms: 12,
            user_agent: Some("aws-cli/2.0".into()),
        }
    }

    #[test]
    fn chained_render_produces_verifiable_output() {
        use std::str::FromStr;

        use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
        let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();
        let entries = vec![sample_entry("b1"), sample_entry("b2"), sample_entry("b3")];
        let state = ChainState::default();
        let (text, _last) = render_lines_chained(&entries, &key, &state);
        // No prev_file_tail comment on the first batch.
        assert!(!text.starts_with("# prev_file_tail="));
        // Each line ends with " <64 hex>\n"
        for raw in text.split_inclusive('\n') {
            let line = raw.trim_end_matches('\n');
            if line.is_empty() {
                continue;
            }
            assert!(line.len() > 65);
            let suf = &line[line.len() - 65..];
            assert!(suf.starts_with(' '));
            assert!(suf[1..].chars().all(|c| c.is_ascii_hexdigit()));
        }
        // Verifier is happy.
        let report =
            verify_audit_bytes(std::path::Path::new("<mem>"), text.as_bytes(), &key).unwrap();
        assert!(report.first_break.is_none());
        assert_eq!(report.ok_lines, 3);
    }

    #[test]
    fn second_batch_emits_prev_file_tail_and_chains() {
        use std::str::FromStr;

        use crate::audit_log::{AuditHmacKey, verify_audit_bytes};
        let key = AuditHmacKey::from_str("raw:0123456789abcdef0123456789abcdef").unwrap();

        // First batch.
        let entries1 = vec![sample_entry("b1")];
        let mut state = ChainState::default();
        let (text1, last1) = render_lines_chained(&entries1, &key, &state);
        state.last_hmac = last1;
        state.primed = true;

        // Second batch — must start with # prev_file_tail= and verify
        // when fed independently to the verifier.
        let entries2 = vec![sample_entry("b2")];
        let (text2, _) = render_lines_chained(&entries2, &key, &state);
        assert!(text2.starts_with("# prev_file_tail="));
        let report =
            verify_audit_bytes(std::path::Path::new("<mem>"), text2.as_bytes(), &key).unwrap();
        assert!(report.first_break.is_none(), "second batch must verify");
        assert_eq!(report.ok_lines, 1);
        // First batch verifies on its own too.
        let r1 =
            verify_audit_bytes(std::path::Path::new("<mem>"), text1.as_bytes(), &key).unwrap();
        assert!(r1.first_break.is_none());
    }

    #[test]
    fn render_one_entry() {
        let e = AccessLogEntry {
            time: SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000),
            bucket: "b".into(),
            remote_ip: Some("10.0.0.1".into()),
            requester: Some("AKIATEST".into()),
            operation: "REST.PUT.OBJECT",
            key: Some("k".into()),
            request_uri: "PUT /b/k HTTP/1.1".into(),
            http_status: 200,
            error_code: None,
            bytes_sent: 0,
            object_size: 4096,
            total_time_ms: 12,
            user_agent: Some("aws-cli/2.0".into()),
        };
        let line = render_lines(&[e]);
        assert!(line.contains("REST.PUT.OBJECT"));
        assert!(line.contains("10.0.0.1"));
        assert!(line.contains("AKIATEST"));
        assert!(line.contains("\"aws-cli/2.0\""));
        assert!(line.ends_with('\n'));
    }
}