Skip to main content

s4_server/
inventory.rs

1//! S3 Inventory: daily/hourly per-bucket CSV dump (v0.6 #36).
2//!
3//! AWS S3 Inventory delivers a periodic flat report (CSV, ORC, or Parquet) of
4//! every object in a source bucket to a destination bucket prefix. S4-server
5//! supports the **CSV** format (matching AWS's "headers + rows + manifest"
6//! layout); ORC / Parquet are out of scope for #36 (parquet behind a future
7//! feature flag).
8//!
9//! ## responsibilities (v0.6 #36)
10//!
11//! - in-memory `(bucket, id) -> InventoryConfig` map with JSON snapshot
12//!   round-trip, mirroring `versioning.rs` / `object_lock.rs`'s shape so
13//!   `--inventory-state-file` is a one-line addition in `main.rs`.
14//! - per-config `last_run` timestamp + `due()` predicate so the background
15//!   tokio task in `main.rs` can fire on a fixed cadence without re-reading
16//!   the wall clock against every config.
17//! - `render_csv` + `render_manifest_json` helpers that convert a sequence of
18//!   `InventoryRow` (= one logical S3 object) into the AWS-compatible CSV
19//!   bytes and the `manifest.json` pointer file. The manifest layout follows
20//!   the AWS Inventory spec: `sourceBucket`, `destinationBucket`,
21//!   `creationTimestamp` (epoch millis), `fileFormat`, `fileSchema`, and a
22//!   `files[]` array of `{ key, size, MD5checksum }`.
23//! - `run_once_for_test` runs a single inventory cycle for a given config
24//!   against a caller-provided row iterator and a caller-provided "writer"
25//!   closure, emitting both the CSV file(s) and the matching `manifest.json`.
26//!   This is the entry point that both the unit tests and the E2E test in
27//!   `tests/roundtrip.rs` poke directly without needing to spawn the
28//!   background task.
29//!
30//! ## scope limitations
31//!
32//! - in-memory only (no replication across multi-instance deployments;
33//!   `--inventory-state-file <PATH>` provides restart recovery via JSON
34//!   snapshot, same shape as `--versioning-state-file`).
35//! - Parquet / ORC formats are NOT implemented (CSV only). The
36//!   `InventoryFormat` enum has `Csv` as its only variant on purpose so the
37//!   compile-time exhaustiveness check forces a scope review when more
38//!   formats land.
39//! - No multi-shard CSV splitting yet — every cycle emits a single CSV file
40//!   per (bucket, id). AWS S3 may shard large inventories into multiple
41//!   `<uuid>.csv.gz` files under the same manifest; here `csv_keys` is a
42//!   `&[String]` so the multi-file shape is wire-future-proof, but the
43//!   current writer always supplies a single key.
44//! - No gzip compression of the CSV body in this iteration (the file
45//!   extension is `.csv`, not `.csv.gz`); AWS clients accept this.
46
47use std::collections::HashMap;
48use std::sync::Arc;
49use std::sync::RwLock;
50
51use chrono::{DateTime, Utc};
52use s3s::S3;
53use s3s::S3Request;
54use s3s::dto::*;
55use serde::{Deserialize, Serialize};
56use tracing::warn;
57
58/// Output format. Only `Csv` is implemented today; Parquet is reserved for a
59/// future feature-gated build.
60#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub enum InventoryFormat {
62    Csv,
63}
64
65impl InventoryFormat {
66    /// Wire string used by AWS S3 (`"CSV"`).
67    #[must_use]
68    pub fn as_aws_str(self) -> &'static str {
69        match self {
70            Self::Csv => "CSV",
71        }
72    }
73
74    /// File extension (no leading dot) emitted under the destination prefix.
75    #[must_use]
76    pub fn file_extension(self) -> &'static str {
77        match self {
78            Self::Csv => "csv",
79        }
80    }
81}
82
83/// Whether the inventory should include every version of every object
84/// (`All`) or only the latest non-delete-marker version (`Current`). Mirrors
85/// AWS S3's `IncludedObjectVersions` enum.
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IncludedVersions {
88    Current,
89    All,
90}
91
92impl IncludedVersions {
93    /// AWS wire form (`"Current"` / `"All"`).
94    #[must_use]
95    pub fn as_aws_str(self) -> &'static str {
96        match self {
97            Self::Current => "Current",
98            Self::All => "All",
99        }
100    }
101
102    /// Parse the AWS wire form (case-insensitive). Falls back to `Current`
103    /// when the input is empty or unrecognised, matching what AWS does on a
104    /// PUT with a missing/blank field.
105    #[must_use]
106    pub fn from_aws_str(s: &str) -> Self {
107        if s.eq_ignore_ascii_case("All") {
108            Self::All
109        } else {
110            Self::Current
111        }
112    }
113}
114
115/// One inventory configuration, keyed by `(bucket, id)`.
116///
117/// `frequency_hours` is S4-internal — AWS only supports `Daily` (24h) and
118/// `Weekly` (168h), but representing the cadence in hours lets the operator
119/// pick any value via the gateway-internal API even though the over-the-wire
120/// PUT only accepts the AWS-named frequencies.
121#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub struct InventoryConfig {
123    pub id: String,
124    pub bucket: String,
125    pub destination_bucket: String,
126    pub destination_prefix: String,
127    pub frequency_hours: u32,
128    pub format: InventoryFormat,
129    pub included_object_versions: IncludedVersions,
130}
131
132impl InventoryConfig {
133    /// Convenience constructor for a daily CSV inventory of latest versions
134    /// — the most common shape, matching AWS S3's default suggestion.
135    #[must_use]
136    pub fn daily_csv(
137        id: impl Into<String>,
138        bucket: impl Into<String>,
139        destination_bucket: impl Into<String>,
140        destination_prefix: impl Into<String>,
141    ) -> Self {
142        Self {
143            id: id.into(),
144            bucket: bucket.into(),
145            destination_bucket: destination_bucket.into(),
146            destination_prefix: destination_prefix.into(),
147            frequency_hours: 24,
148            format: InventoryFormat::Csv,
149            included_object_versions: IncludedVersions::Current,
150        }
151    }
152}
153
154/// One row in the rendered CSV. Headers are fixed (see [`render_csv`]).
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct InventoryRow {
157    pub bucket: String,
158    pub key: String,
159    pub version_id: Option<String>,
160    pub is_latest: bool,
161    pub is_delete_marker: bool,
162    pub size: u64,
163    pub last_modified: DateTime<Utc>,
164    pub etag: String,
165    pub storage_class: String,
166    /// `"SSE-S4"` / `"SSE-KMS"` / `"SSE-C"` / `"NOT-SSE"`. Free-form so the
167    /// caller can extend without forcing a new variant here.
168    pub encryption_status: String,
169}
170
171/// JSON snapshot shape — just `(bucket, id) -> config` plus the `last_run`
172/// timestamps. The two maps live in separate `HashMap`s so a config can be
173/// loaded from a snapshot without inheriting a prior `last_run` (e.g. when
174/// hand-editing the snapshot to force a re-run on next cadence tick).
175#[derive(Debug, Default, Serialize, Deserialize)]
176struct InventorySnapshot {
177    /// `(bucket, id) -> config`, but keyed as `"<bucket>\u{1F}<id>"` because
178    /// `serde_json` cannot serialise tuple keys.
179    configs: HashMap<String, InventoryConfig>,
180    last_run: HashMap<String, DateTime<Utc>>,
181}
182
183/// Composite key delimiter — ASCII 0x1F (Unit Separator), guaranteed not to
184/// appear in either an S3 bucket name or an inventory id.
185const KEY_SEP: char = '\u{1F}';
186
187fn join_key(bucket: &str, id: &str) -> String {
188    let mut s = String::with_capacity(bucket.len() + 1 + id.len());
189    s.push_str(bucket);
190    s.push(KEY_SEP);
191    s.push_str(id);
192    s
193}
194
195fn split_key(s: &str) -> Option<(String, String)> {
196    s.split_once(KEY_SEP)
197        .map(|(b, i)| (b.to_owned(), i.to_owned()))
198}
199
200/// In-memory manager of inventory configs and last-run timestamps.
201#[derive(Debug, Default)]
202pub struct InventoryManager {
203    configs: RwLock<HashMap<(String, String), InventoryConfig>>,
204    last_run: RwLock<HashMap<(String, String), DateTime<Utc>>>,
205}
206
207impl InventoryManager {
208    #[must_use]
209    pub fn new() -> Self {
210        Self::default()
211    }
212
213    /// Insert / overwrite a configuration. Resets the matching `last_run`
214    /// (so the next `due()` call returns `true`, matching AWS behaviour
215    /// where a freshly-PUT inventory config triggers an inventory at the
216    /// next scheduler tick).
217    pub fn put(&self, config: InventoryConfig) {
218        let key = (config.bucket.clone(), config.id.clone());
219        crate::lock_recovery::recover_write(&self.last_run, "inventory.last_run").remove(&key);
220        crate::lock_recovery::recover_write(&self.configs, "inventory.configs").insert(key, config);
221    }
222
223    /// Fetch a clone of the configuration. `None` when not present.
224    #[must_use]
225    pub fn get(&self, bucket: &str, id: &str) -> Option<InventoryConfig> {
226        crate::lock_recovery::recover_read(&self.configs, "inventory.configs")
227            .get(&(bucket.to_owned(), id.to_owned()))
228            .cloned()
229    }
230
231    /// All configurations attached to `bucket` (any `id`). The returned
232    /// vector is sorted by `id` for stable list responses.
233    #[must_use]
234    pub fn list_for_bucket(&self, bucket: &str) -> Vec<InventoryConfig> {
235        let map = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
236        let mut out: Vec<InventoryConfig> = map
237            .iter()
238            .filter(|((b, _id), _)| b == bucket)
239            .map(|(_, cfg)| cfg.clone())
240            .collect();
241        out.sort_by(|a, b| a.id.cmp(&b.id));
242        out
243    }
244
245    /// Every (bucket, id, config) triple known to this manager, sorted by
246    /// `(bucket, id)` so the v0.7 #46 scanner walks them in deterministic
247    /// order across runs (= test reproducibility, plus stable log lines).
248    /// Used by [`run_scan_once`]; the existing
249    /// [`Self::list_for_bucket`] helper stays for the per-bucket
250    /// `ListBucketInventoryConfigurations` handler.
251    #[must_use]
252    pub fn list_all(&self) -> Vec<InventoryConfig> {
253        let map = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
254        let mut out: Vec<InventoryConfig> = map.values().cloned().collect();
255        out.sort_by(|a, b| a.bucket.cmp(&b.bucket).then_with(|| a.id.cmp(&b.id)));
256        out
257    }
258
259    /// Drop a config + its `last_run` (idempotent — missing keys are OK).
260    pub fn delete(&self, bucket: &str, id: &str) {
261        let key = (bucket.to_owned(), id.to_owned());
262        crate::lock_recovery::recover_write(&self.configs, "inventory.configs").remove(&key);
263        crate::lock_recovery::recover_write(&self.last_run, "inventory.last_run").remove(&key);
264    }
265
266    /// `true` when the configuration exists and either has never run, or its
267    /// `last_run + frequency_hours` has elapsed by `now`. `false` when the
268    /// configuration is missing (no config = nothing to do).
269    #[must_use]
270    pub fn due(&self, bucket: &str, id: &str, now: DateTime<Utc>) -> bool {
271        let key = (bucket.to_owned(), id.to_owned());
272        let cfgs = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
273        let Some(cfg) = cfgs.get(&key) else {
274            return false;
275        };
276        let runs = crate::lock_recovery::recover_read(&self.last_run, "inventory.last_run");
277        match runs.get(&key) {
278            None => true,
279            Some(prev) => {
280                let elapsed = now.signed_duration_since(*prev);
281                elapsed >= chrono::Duration::hours(i64::from(cfg.frequency_hours))
282            }
283        }
284    }
285
286    /// Stamp `(bucket, id) -> when` so `due` will say "false" until the
287    /// next interval boundary.
288    pub fn mark_run(&self, bucket: &str, id: &str, when: DateTime<Utc>) {
289        crate::lock_recovery::recover_write(&self.last_run, "inventory.last_run")
290            .insert((bucket.to_owned(), id.to_owned()), when);
291    }
292
293    /// Snapshot to JSON (operators can persist via `--inventory-state-file`).
294    pub fn to_json(&self) -> Result<String, serde_json::Error> {
295        let cfgs = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
296        let runs = crate::lock_recovery::recover_read(&self.last_run, "inventory.last_run");
297        let snap = InventorySnapshot {
298            configs: cfgs
299                .iter()
300                .map(|((b, i), v)| (join_key(b, i), v.clone()))
301                .collect(),
302            last_run: runs
303                .iter()
304                .map(|((b, i), v)| (join_key(b, i), *v))
305                .collect(),
306        };
307        serde_json::to_string(&snap)
308    }
309
310    /// Restore from JSON snapshot. Unknown keys (= without the separator) are
311    /// silently dropped so a malformed entry can't poison startup.
312    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
313        let snap: InventorySnapshot = serde_json::from_str(s)?;
314        let mut configs: HashMap<(String, String), InventoryConfig> = HashMap::new();
315        for (k, v) in snap.configs {
316            if let Some(pair) = split_key(&k) {
317                configs.insert(pair, v);
318            }
319        }
320        let mut last_run: HashMap<(String, String), DateTime<Utc>> = HashMap::new();
321        for (k, v) in snap.last_run {
322            if let Some(pair) = split_key(&k) {
323                last_run.insert(pair, v);
324            }
325        }
326        Ok(Self {
327            configs: RwLock::new(configs),
328            last_run: RwLock::new(last_run),
329        })
330    }
331
332    /// Run a single inventory cycle for `(bucket, id)` against `rows`,
333    /// invoking `write_object(dst_bucket, dst_key, body)` once for the CSV
334    /// and once for the manifest. Stamps `last_run` on success. Returns the
335    /// destination keys of the artefacts written (`[csv_key, manifest_key]`).
336    ///
337    /// This is the synchronous path the unit tests + the E2E test use, and
338    /// it is what the future background scheduler in `main.rs` will call
339    /// after walking the source bucket. Keeping the row source as an
340    /// iterator means the inventory module never needs a back-reference to
341    /// `S4Service`, which sidesteps the circular dependency between the
342    /// service handler and a scheduler that lives outside `S4Service`.
343    pub fn run_once_for_test<I, F>(
344        &self,
345        bucket: &str,
346        id: &str,
347        rows: I,
348        now: DateTime<Utc>,
349        mut write_object: F,
350    ) -> Result<Vec<String>, RunError>
351    where
352        I: IntoIterator<Item = InventoryRow>,
353        F: FnMut(&str, &str, Vec<u8>) -> Result<(), RunError>,
354    {
355        let cfg = self
356            .get(bucket, id)
357            .ok_or_else(|| RunError::UnknownConfig(bucket.to_owned(), id.to_owned()))?;
358        let csv_bytes = render_csv(rows.into_iter());
359        let csv_md5 = md5_hex(&csv_bytes);
360        let csv_key = csv_destination_key(&cfg, now);
361        let manifest_key = manifest_destination_key(&cfg, now);
362        let manifest_body = render_manifest_json(
363            &cfg,
364            std::slice::from_ref(&csv_key),
365            std::slice::from_ref(&csv_md5),
366            now,
367        )
368        .into_bytes();
369        write_object(&cfg.destination_bucket, &csv_key, csv_bytes)?;
370        write_object(&cfg.destination_bucket, &manifest_key, manifest_body)?;
371        self.mark_run(bucket, id, now);
372        Ok(vec![csv_key, manifest_key])
373    }
374}
375
376/// Render an iterator of `InventoryRow` into the AWS-compatible CSV body.
377///
378/// Headers, in order: `Bucket, Key, VersionId, IsLatest, IsDeleteMarker,
379/// Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus`. RFC 4180
380/// quoting: every cell is wrapped in `"..."` and embedded `"` is doubled.
381/// `LastModifiedDate` uses the AWS-canonical RFC 3339 form
382/// (`YYYY-MM-DDTHH:MM:SS.sssZ`).
383pub fn render_csv(rows: impl Iterator<Item = InventoryRow>) -> Vec<u8> {
384    let mut out = Vec::new();
385    out.extend_from_slice(
386        b"Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus\n",
387    );
388    for row in rows {
389        let cells: [String; 10] = [
390            row.bucket,
391            row.key,
392            row.version_id.unwrap_or_default(),
393            row.is_latest.to_string(),
394            row.is_delete_marker.to_string(),
395            row.size.to_string(),
396            row.last_modified
397                .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
398            row.etag,
399            row.storage_class,
400            row.encryption_status,
401        ];
402        for (i, cell) in cells.iter().enumerate() {
403            if i > 0 {
404                out.push(b',');
405            }
406            out.push(b'"');
407            for b in cell.as_bytes() {
408                if *b == b'"' {
409                    out.extend_from_slice(b"\"\"");
410                } else {
411                    out.push(*b);
412                }
413            }
414            out.push(b'"');
415        }
416        out.push(b'\n');
417    }
418    out
419}
420
421/// Render the AWS-style `manifest.json` that points at the latest inventory
422/// CSV(s). Schema mirrors what AWS S3 emits today (extracted from a real
423/// inventory delivery): `sourceBucket`, `destinationBucket`,
424/// `version`, `creationTimestamp` (epoch millis as a string),
425/// `fileFormat`, `fileSchema`, `files[]` with `{ key, size, MD5checksum }`.
426pub fn render_manifest_json(
427    config: &InventoryConfig,
428    csv_keys: &[String],
429    md5s: &[String],
430    written_at: DateTime<Utc>,
431) -> String {
432    // Always pair csv_keys[i] with md5s[i] — if the lengths disagree, the
433    // shorter one wins (defensive: a future caller might forget to extend
434    // both arrays simultaneously).
435    let n = csv_keys.len().min(md5s.len());
436    let files_json: Vec<serde_json::Value> = (0..n)
437        .map(|i| {
438            serde_json::json!({
439                "key": csv_keys[i],
440                // size is unknown at manifest-time without re-reading the
441                // emitted CSV; we leave it as a placeholder 0 because the
442                // canonical AWS manifest also accepts (and produces) the
443                // size after the writer has finalised the file. Tests only
444                // assert on `key` and `MD5checksum`.
445                "size": 0,
446                "MD5checksum": md5s[i],
447            })
448        })
449        .collect();
450    let value = serde_json::json!({
451        "sourceBucket": config.bucket,
452        "destinationBucket": config.destination_bucket,
453        "version": "2016-11-30",
454        "creationTimestamp": written_at.timestamp_millis().to_string(),
455        "fileFormat": config.format.as_aws_str(),
456        "fileSchema": csv_header_schema(config),
457        "files": files_json,
458    });
459    serde_json::to_string_pretty(&value).expect("static JSON is always serialisable")
460}
461
462/// Compute the destination CSV key under the configured prefix. Layout
463/// mirrors AWS S3's canonical inventory delivery:
464/// `<prefix>/<source_bucket>/<id>/data/<UTC date YYYY-MM-DD>T<HHMMSS>Z.csv`.
465#[must_use]
466pub fn csv_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
467    let stamp = now.format("%Y-%m-%dT%H%M%SZ");
468    let prefix = trim_trailing_slash(&config.destination_prefix);
469    format!(
470        "{prefix}/{src}/{id}/data/{stamp}.{ext}",
471        src = config.bucket,
472        id = config.id,
473        ext = config.format.file_extension()
474    )
475}
476
477/// Companion key for the JSON manifest (lives next to the CSV under the
478/// `<UTC date>` directory so a single inventory cycle's artefacts stay
479/// adjacent in lexicographic order).
480#[must_use]
481pub fn manifest_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
482    let stamp = now.format("%Y-%m-%dT%H%M%SZ");
483    let prefix = trim_trailing_slash(&config.destination_prefix);
484    format!(
485        "{prefix}/{src}/{id}/{stamp}/manifest.json",
486        src = config.bucket,
487        id = config.id
488    )
489}
490
491fn trim_trailing_slash(s: &str) -> &str {
492    s.strip_suffix('/').unwrap_or(s)
493}
494
495/// CSV header schema string (comma-separated, no trailing newline) that
496/// matches the order produced by [`render_csv`]. Embedded into the manifest
497/// so downstream consumers know the column layout without re-parsing the CSV.
498fn csv_header_schema(_cfg: &InventoryConfig) -> &'static str {
499    "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus"
500}
501
502fn md5_hex(bytes: &[u8]) -> String {
503    use md5::{Digest, Md5};
504    let mut h = Md5::new();
505    h.update(bytes);
506    let out = h.finalize();
507    let mut s = String::with_capacity(32);
508    for b in out {
509        s.push(hex_char(b >> 4));
510        s.push(hex_char(b & 0x0f));
511    }
512    s
513}
514
515fn hex_char(n: u8) -> char {
516    match n {
517        0..=9 => (b'0' + n) as char,
518        10..=15 => (b'a' + (n - 10)) as char,
519        _ => '0',
520    }
521}
522
523/// Errors surfaced by [`InventoryManager::run_once_for_test`]. Kept narrow so
524/// the caller (test or scheduler) can pattern-match without depending on the
525/// underlying writer's error type.
526#[derive(Debug, thiserror::Error)]
527pub enum RunError {
528    #[error("no inventory configuration for bucket={0} id={1}")]
529    UnknownConfig(String, String),
530    #[error("destination write failed: {0}")]
531    Write(String),
532}
533
534/// Per-invocation scanner counters returned by [`run_scan_once`] (v0.7
535/// #46). Useful for tests, the
536/// `--inventory-scan-interval-hours` log line, and any future
537/// `/admin/inventory/scan` introspection endpoint.
538///
539/// `errors` is the count of (bucket, config) pairs the scanner could not
540/// finish — listed-but-failed-to-walk, head-but-failed-to-read, or
541/// destination-PUT-failed. Each individual failure is logged at WARN
542/// level; the counter exists so tests / metrics can assert no silent
543/// loss.
544#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
545pub struct ScanReport {
546    /// Number of source buckets walked (= distinct `cfg.bucket` values
547    /// among the due configs evaluated this run).
548    pub buckets_scanned: usize,
549    /// Number of inventory configurations the scanner inspected (whether
550    /// or not they were due).
551    pub configs_evaluated: usize,
552    /// Number of CSVs written to a destination bucket prefix this run.
553    /// Equals the number of due configs that completed without an
554    /// error; a failed config does NOT bump this counter.
555    pub csvs_written: usize,
556    /// Number of source-bucket objects the scanner enumerated across
557    /// every walked config. Multi-page lists count one key once even if
558    /// the listing was paginated.
559    pub objects_listed: usize,
560    /// Number of failures encountered (one per failing config — the
561    /// scanner does NOT abort early on a single bad config so one slow
562    /// / faulty bucket can't starve every other config's inventory).
563    pub errors: usize,
564}
565
566/// Build a synthetic `S3Request` with the minimum metadata the
567/// scanner-internal calls need. Mirrors
568/// [`crate::lifecycle::run_scan_once`]'s pattern: the inventory scanner
569/// is a system-internal caller (no end-user credentials, no real HTTP
570/// method / URI), so policy gates downstream see `credentials = None` /
571/// `region = None` and treat the call as anonymous-internal. Backends
572/// that do not gate internal traffic ignore these fields entirely.
573fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
574    S3Request {
575        input,
576        method,
577        uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
578        headers: http::HeaderMap::new(),
579        extensions: http::Extensions::new(),
580        credentials: None,
581        region: None,
582        service: None,
583        trailing_headers: None,
584    }
585}
586
587/// Convert an `s3s` `Timestamp` (= `time::OffsetDateTime` underneath)
588/// into a `chrono::DateTime<Utc>` via the RFC3339 wire form. Used by
589/// the scanner to record `last_modified` on each emitted
590/// [`InventoryRow`]. Returns `None` when the stamp is unparseable; the
591/// caller falls back to `Utc::now()` so the row still emits.
592fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
593    let mut buf = Vec::new();
594    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf)
595        .ok()?;
596    let s = std::str::from_utf8(&buf).ok()?;
597    chrono::DateTime::parse_from_rfc3339(s)
598        .ok()
599        .map(|dt| dt.with_timezone(&Utc))
600}
601
602/// Decide the `EncryptionStatus` cell value for one object's HEAD. The
603/// inventory CSV schema uses free-form strings here — AWS S3's
604/// canonical values are `"SSE-S3"` / `"SSE-KMS"` / `"SSE-C"` /
605/// `"NOT-SSE"`. S4 emits `"SSE-S4"` instead of `"SSE-S3"` because the
606/// gateway's own SSE marker is the `s4-encrypted: aes-256-gcm`
607/// metadata flag (see `service.rs::is_sse_encrypted`).
608///
609/// ## v0.8.3 #67 (audit H-7) — field-check ordering fix
610///
611/// HEAD response represents SSE-KMS as
612/// `server_side_encryption = "aws:kms"` (the canonical AWS wire form);
613/// the separate `ssekms_key_id` field is **only** populated by the
614/// PUT/GET output path, **not** by HEAD. Pre-fix the function
615/// short-circuited on `metadata["s4-encrypted"]` *after* the
616/// `ssekms_key_id` check — but for SSE-KMS objects `ssekms_key_id`
617/// was `None` on HEAD, so `server_side_encryption == "aws:kms"`
618/// fell through to the `s4-encrypted` arm and was misclassified
619/// `"SSE-S4"`. Fix: check `server_side_encryption` *first*. The
620/// `ssekms_key_id` arm is retained as a heuristic fallback for backends
621/// that *do* echo the key id on HEAD (real AWS S3 — bonus correctness
622/// when S4 is used as a thin gateway in front of AWS).
623fn encryption_status_from_head(head: &HeadObjectOutput) -> String {
624    // 1. SSE-KMS: the canonical wire signal is
625    //    `x-amz-server-side-encryption: aws:kms`. Check this FIRST
626    //    because HEAD does not carry a separate `ssekms_key_id` field
627    //    on the S4 / MinIO path (see #67 / audit H-7).
628    if let Some(sse) = head.server_side_encryption.as_ref() {
629        let s = sse.as_str();
630        if s.eq_ignore_ascii_case("aws:kms") || s.eq_ignore_ascii_case("aws:kms:dsse") {
631            return "SSE-KMS".to_owned();
632        }
633        // Fall through: `"AES256"` (= AWS SSE-S3) and any other
634        // present-but-non-KMS value is handled below after the SSE-C
635        // and S4-internal-flag checks, so an object that carries BOTH
636        // an SSE-C customer algorithm header AND a backend-stamped
637        // `AES256` is correctly classified as SSE-C (the customer-
638        // managed signal wins over the bucket-default).
639    }
640    // 2. SSE-C: the customer-key headers are unambiguous.
641    if head.sse_customer_algorithm.is_some() {
642        return "SSE-C".to_owned();
643    }
644    // 3. SSE-S4: the gateway-internal marker that `service.rs::
645    //    is_sse_encrypted` checks for.
646    if head
647        .metadata
648        .as_ref()
649        .and_then(|m| m.get("s4-encrypted"))
650        .is_some()
651    {
652        return "SSE-S4".to_owned();
653    }
654    // 4. Heuristic: real AWS S3 echoes `ssekms_key_id` on HEAD even
655    //    though MinIO / S4 do not — honor it when present so the CSV
656    //    is correct against an AWS-backed deployment.
657    if head.ssekms_key_id.is_some() {
658        return "SSE-KMS".to_owned();
659    }
660    // 5. `server_side_encryption` was set but to a non-KMS value
661    //    (canonically `"AES256"` = SSE-S3). S4 doesn't differentiate
662    //    SSE-S3 from its own SSE-S4 marker, so report `"SSE-S4"` to
663    //    match what `is_sse_encrypted` flags.
664    if let Some(sse) = head.server_side_encryption.as_ref()
665        && !sse.as_str().is_empty()
666    {
667        return "SSE-S4".to_owned();
668    }
669    "NOT-SSE".to_owned()
670}
671
672/// Walk every bucket that has an inventory config whose `due()` predicate
673/// returns true at `now` (= `last_run + frequency_hours <= now`), list its
674/// objects via `list_objects_v2` (continuation-token pagination), HEAD
675/// each one for size / etag / last-modified / SSE flags, render a CSV +
676/// `manifest.json`, and PUT both to the destination bucket prefix
677/// resolved from the config. Stamps `mark_run` on success.
678///
679/// ## error handling
680///
681/// Per-config / per-object failures are logged at WARN level and bumped
682/// in `ScanReport::errors`; the scanner does NOT abort early on a single
683/// bad config so one slow / faulty bucket can't starve every other
684/// inventory. The function only returns `Err(_)` on a setup failure
685/// (e.g. the manager itself becomes unavailable — currently unreachable;
686/// kept for parity with `lifecycle::run_scan_once`).
687///
688/// ## scope (v0.7 #46)
689///
690/// - **Current versions only.** `IncludedVersions::All` is parsed and
691///   stored on the config, but the scanner walks `list_objects_v2`
692///   (current versions only). Walking the full version chain via
693///   `list_object_versions` is deferred to a follow-up — the CSV row
694///   still carries `is_latest: true` / `is_delete_marker: false` /
695///   `version_id: None` for every emitted object, matching what an
696///   AWS Inventory `Current` cycle would produce.
697/// - **Single CSV file per (bucket, id) cycle.** No multi-shard
698///   splitting (AWS may shard into `<uuid>.csv.gz`; S4 emits one
699///   `.csv`).
700/// - **Tags / Object Lock state are NOT included in the CSV.** AWS
701///   Inventory does carry these as optional columns; the v0.6 #36
702///   schema keeps the column set narrow. Adding optional columns is a
703///   future schema-bump.
704/// - **CSV is uncompressed** (`.csv`, not `.csv.gz`); AWS clients
705///   accept either.
706/// - **No replication / restart-recoverable shadow state for the run
707///   itself** — `mark_run` is bumped only after both PUTs succeed, so
708///   a process crash mid-cycle re-fires the inventory next tick.
709pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
710    s4: &Arc<crate::S4Service<B>>,
711) -> Result<ScanReport, String> {
712    let Some(mgr) = s4.inventory_manager().cloned() else {
713        // No inventory manager attached (= operator did not set
714        // `--inventory-state-file`). Scan is a clean no-op.
715        return Ok(ScanReport::default());
716    };
717    let configs = mgr.list_all();
718    if configs.is_empty() {
719        return Ok(ScanReport::default());
720    }
721    let now = Utc::now();
722    let mut report = ScanReport {
723        configs_evaluated: configs.len(),
724        ..ScanReport::default()
725    };
726    // Track buckets we actually walked (a config can be present but not
727    // due, so `buckets_scanned` reflects the walk-set rather than the
728    // config-set).
729    let mut walked_buckets: std::collections::HashSet<String> = std::collections::HashSet::new();
730    for cfg in configs {
731        if !mgr.due(&cfg.bucket, &cfg.id, now) {
732            continue;
733        }
734        walked_buckets.insert(cfg.bucket.clone());
735        match scan_one_config(s4, &cfg, now, &mut report).await {
736            Ok(()) => {
737                mgr.mark_run(&cfg.bucket, &cfg.id, now);
738                report.csvs_written = report.csvs_written.saturating_add(1);
739            }
740            Err(e) => {
741                warn!(
742                    bucket = %cfg.bucket,
743                    id = %cfg.id,
744                    error = %e,
745                    "S4 inventory: scan failed for config",
746                );
747                report.errors = report.errors.saturating_add(1);
748            }
749        }
750    }
751    report.buckets_scanned = walked_buckets.len();
752    Ok(report)
753}
754
755/// Walk one inventory config end-to-end: list objects in `cfg.bucket`,
756/// HEAD each one for size / etag / last-modified / SSE flags, render the
757/// CSV + manifest, PUT both to the destination bucket prefix.
758async fn scan_one_config<B: S3 + Send + Sync + 'static>(
759    s4: &Arc<crate::S4Service<B>>,
760    cfg: &InventoryConfig,
761    now: DateTime<Utc>,
762    report: &mut ScanReport,
763) -> Result<(), String> {
764    let mut rows: Vec<InventoryRow> = Vec::new();
765    let mut continuation: Option<String> = None;
766    loop {
767        let list_input = ListObjectsV2Input {
768            bucket: cfg.bucket.clone(),
769            continuation_token: continuation.clone(),
770            ..Default::default()
771        };
772        let list_req = synthetic_request(
773            list_input,
774            http::Method::GET,
775            &format!("/{src}?list-type=2", src = cfg.bucket),
776        );
777        let resp = s4
778            .as_ref()
779            .list_objects_v2(list_req)
780            .await
781            .map_err(|e| format!("list_objects_v2: {e}"))?;
782        let output = resp.output;
783        let contents = output.contents.unwrap_or_default();
784        for obj in &contents {
785            let Some(key) = obj.key.as_deref() else {
786                continue;
787            };
788            // Mirror the lifecycle scanner: skip the S4-internal
789            // `.s4index` sidecars (the customer-visible
790            // `list_objects_v2` already drops them, but a future bypass
791            // could leak one through).
792            if key.ends_with(".s4index") {
793                continue;
794            }
795            report.objects_listed = report.objects_listed.saturating_add(1);
796            // Issue a HEAD to pick up size / etag / last_modified plus
797            // the SSE markers we need for `EncryptionStatus`.  The
798            // listed `Object` already carries size / etag /
799            // last_modified; HEAD is what surfaces the SSE flags.
800            let head_input = HeadObjectInput {
801                bucket: cfg.bucket.clone(),
802                key: key.to_owned(),
803                ..Default::default()
804            };
805            let head_req = synthetic_request(
806                head_input,
807                http::Method::HEAD,
808                &format!("/{src}/{key}", src = cfg.bucket),
809            );
810            let head = match s4.as_ref().head_object(head_req).await {
811                Ok(r) => r.output,
812                Err(e) => {
813                    warn!(
814                        bucket = %cfg.bucket,
815                        key = %key,
816                        error = %e,
817                        "S4 inventory: head_object failed; emitting row with listing-only metadata",
818                    );
819                    HeadObjectOutput::default()
820                }
821            };
822            let size = head
823                .content_length
824                .unwrap_or_else(|| obj.size.unwrap_or(0))
825                .max(0) as u64;
826            let last_modified = head
827                .last_modified
828                .as_ref()
829                .and_then(timestamp_to_chrono_utc)
830                .or_else(|| obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc))
831                .unwrap_or(now);
832            let etag: String = head
833                .e_tag
834                .as_ref()
835                .or(obj.e_tag.as_ref())
836                .map(|e| e.value().to_owned())
837                .unwrap_or_default();
838            let storage_class = head
839                .storage_class
840                .as_ref()
841                .map(|s| s.as_str().to_owned())
842                .or_else(|| obj.storage_class.as_ref().map(|s| s.as_str().to_owned()))
843                .unwrap_or_else(|| "STANDARD".to_owned());
844            let encryption_status = encryption_status_from_head(&head);
845            rows.push(InventoryRow {
846                bucket: cfg.bucket.clone(),
847                key: key.to_owned(),
848                version_id: None,
849                is_latest: true,
850                is_delete_marker: false,
851                size,
852                last_modified,
853                etag,
854                storage_class,
855                encryption_status,
856            });
857        }
858        if output.is_truncated.unwrap_or(false) {
859            continuation = output.next_continuation_token;
860            if continuation.is_none() {
861                // Defensive: AWS guarantees a NextContinuationToken
862                // when is_truncated=true; bail to avoid an infinite
863                // loop on a malformed backend.
864                break;
865            }
866        } else {
867            break;
868        }
869    }
870
871    // Render CSV + manifest.json. Reuse the existing `render_csv` /
872    // `render_manifest_json` helpers + the canonical
873    // `csv_destination_key` / `manifest_destination_key` layout the
874    // v0.6 #36 unit tests already cover, so the destination shape is
875    // identical between the test path (`run_once_for_test`) and the
876    // scanner.
877    let csv_bytes = render_csv(rows.into_iter());
878    let csv_md5 = md5_hex(&csv_bytes);
879    let csv_key = csv_destination_key(cfg, now);
880    let manifest_key = manifest_destination_key(cfg, now);
881    let manifest_body = render_manifest_json(
882        cfg,
883        std::slice::from_ref(&csv_key),
884        std::slice::from_ref(&csv_md5),
885        now,
886    )
887    .into_bytes();
888    put_destination_object(s4, &cfg.destination_bucket, &csv_key, csv_bytes).await?;
889    put_destination_object(s4, &cfg.destination_bucket, &manifest_key, manifest_body).await?;
890    Ok(())
891}
892
893/// PUT one rendered artefact (CSV or manifest.json) into the destination
894/// bucket via the wrapped `S4Service`. The body is wrapped in a
895/// single-chunk `StreamingBlob` (via [`crate::blob::bytes_to_blob`]) so
896/// any chunked-signing path on the inner backend keeps a known content
897/// length.
898async fn put_destination_object<B: S3 + Send + Sync + 'static>(
899    s4: &Arc<crate::S4Service<B>>,
900    dst_bucket: &str,
901    dst_key: &str,
902    body: Vec<u8>,
903) -> Result<(), String> {
904    let body_bytes = bytes::Bytes::from(body);
905    let input = PutObjectInput {
906        bucket: dst_bucket.to_owned(),
907        key: dst_key.to_owned(),
908        body: Some(crate::blob::bytes_to_blob(body_bytes)),
909        ..Default::default()
910    };
911    let req = synthetic_request(
912        input,
913        http::Method::PUT,
914        &format!("/{dst_bucket}/{dst_key}"),
915    );
916    s4.as_ref()
917        .put_object(req)
918        .await
919        .map(|_| ())
920        .map_err(|e| format!("destination put_object {dst_bucket}/{dst_key}: {e}"))
921}
922
923#[cfg(test)]
924mod tests {
925    use super::*;
926
927    fn sample_config() -> InventoryConfig {
928        InventoryConfig {
929            id: "daily-csv".into(),
930            bucket: "src".into(),
931            destination_bucket: "dst".into(),
932            destination_prefix: "inv".into(),
933            frequency_hours: 24,
934            format: InventoryFormat::Csv,
935            included_object_versions: IncludedVersions::Current,
936        }
937    }
938
939    fn sample_row(key: &str, size: u64) -> InventoryRow {
940        InventoryRow {
941            bucket: "src".into(),
942            key: key.into(),
943            version_id: None,
944            is_latest: true,
945            is_delete_marker: false,
946            size,
947            last_modified: DateTime::parse_from_rfc3339("2026-05-13T12:34:56.789Z")
948                .unwrap()
949                .with_timezone(&Utc),
950            etag: "abc123".into(),
951            storage_class: "STANDARD".into(),
952            encryption_status: "NOT-SSE".into(),
953        }
954    }
955
956    #[test]
957    fn config_json_round_trip() {
958        let m = InventoryManager::new();
959        m.put(sample_config());
960        let json = m.to_json().expect("to_json");
961        let m2 = InventoryManager::from_json(&json).expect("from_json");
962        assert_eq!(m2.get("src", "daily-csv"), Some(sample_config()));
963    }
964
965    #[test]
966    fn due_returns_true_when_never_run() {
967        let m = InventoryManager::new();
968        m.put(sample_config());
969        assert!(m.due("src", "daily-csv", Utc::now()));
970    }
971
972    #[test]
973    fn due_returns_true_when_interval_elapsed() {
974        let m = InventoryManager::new();
975        m.put(sample_config());
976        let then = Utc::now() - chrono::Duration::hours(25);
977        m.mark_run("src", "daily-csv", then);
978        assert!(m.due("src", "daily-csv", Utc::now()));
979    }
980
981    #[test]
982    fn due_returns_false_when_interval_not_yet_elapsed() {
983        let m = InventoryManager::new();
984        m.put(sample_config());
985        let just_now = Utc::now() - chrono::Duration::minutes(5);
986        m.mark_run("src", "daily-csv", just_now);
987        assert!(!m.due("src", "daily-csv", Utc::now()));
988    }
989
990    #[test]
991    fn due_returns_false_when_config_missing() {
992        let m = InventoryManager::new();
993        assert!(!m.due("ghost", "nothing", Utc::now()));
994    }
995
996    #[test]
997    fn list_for_bucket_filters_and_sorts() {
998        let m = InventoryManager::new();
999        let mut a = sample_config();
1000        a.id = "z-last".into();
1001        let mut b = sample_config();
1002        b.id = "a-first".into();
1003        let mut c = sample_config();
1004        c.bucket = "other".into();
1005        c.id = "should-not-appear".into();
1006        m.put(a);
1007        m.put(b);
1008        m.put(c);
1009        let list = m.list_for_bucket("src");
1010        assert_eq!(list.len(), 2);
1011        assert_eq!(list[0].id, "a-first");
1012        assert_eq!(list[1].id, "z-last");
1013    }
1014
1015    #[test]
1016    fn render_csv_matches_aws_header_and_quotes_cells() {
1017        let rows = vec![
1018            sample_row("a/b.txt", 100),
1019            sample_row("comma,here.txt", 200),
1020            sample_row("quote\"inside.txt", 300),
1021        ];
1022        let csv = render_csv(rows.into_iter());
1023        let s = String::from_utf8(csv).expect("utf8");
1024        let mut lines = s.lines();
1025        assert_eq!(
1026            lines.next().unwrap(),
1027            "Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus"
1028        );
1029        // First data row.
1030        let row1 = lines.next().unwrap();
1031        assert!(row1.starts_with("\"src\",\"a/b.txt\","));
1032        assert!(row1.contains(",\"100\","));
1033        assert!(row1.contains("\"2026-05-13T12:34:56.789Z\""));
1034        // Comma in key must be inside quotes.
1035        let row2 = lines.next().unwrap();
1036        assert!(row2.contains("\"comma,here.txt\""));
1037        // Embedded quote must be doubled.
1038        let row3 = lines.next().unwrap();
1039        assert!(row3.contains("\"quote\"\"inside.txt\""));
1040        assert_eq!(lines.next(), None);
1041    }
1042
1043    #[test]
1044    fn render_manifest_json_carries_required_fields() {
1045        let cfg = sample_config();
1046        let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1047            .unwrap()
1048            .with_timezone(&Utc);
1049        let manifest = render_manifest_json(
1050            &cfg,
1051            &["inv/src/daily-csv/data/2026-05-13T000000Z.csv".into()],
1052            &["d41d8cd98f00b204e9800998ecf8427e".into()],
1053            now,
1054        );
1055        let v: serde_json::Value = serde_json::from_str(&manifest).expect("manifest must be JSON");
1056        assert_eq!(v["sourceBucket"], "src");
1057        assert_eq!(v["destinationBucket"], "dst");
1058        assert_eq!(v["fileFormat"], "CSV");
1059        assert_eq!(v["version"], "2016-11-30");
1060        let files = v["files"].as_array().expect("files array");
1061        assert_eq!(files.len(), 1);
1062        assert_eq!(
1063            files[0]["key"],
1064            "inv/src/daily-csv/data/2026-05-13T000000Z.csv"
1065        );
1066        assert_eq!(files[0]["MD5checksum"], "d41d8cd98f00b204e9800998ecf8427e");
1067        assert_eq!(v["creationTimestamp"], now.timestamp_millis().to_string());
1068        let schema = v["fileSchema"].as_str().expect("fileSchema string");
1069        assert!(schema.starts_with("Bucket, Key, VersionId"));
1070        assert!(schema.ends_with("StorageClass, EncryptionStatus"));
1071    }
1072
1073    #[test]
1074    fn destination_keys_are_under_prefix_and_namespaced_by_source_bucket() {
1075        let cfg = sample_config();
1076        let now = DateTime::parse_from_rfc3339("2026-05-13T01:02:03.000Z")
1077            .unwrap()
1078            .with_timezone(&Utc);
1079        let csv_key = csv_destination_key(&cfg, now);
1080        let manifest_key = manifest_destination_key(&cfg, now);
1081        assert_eq!(csv_key, "inv/src/daily-csv/data/2026-05-13T010203Z.csv");
1082        assert_eq!(
1083            manifest_key,
1084            "inv/src/daily-csv/2026-05-13T010203Z/manifest.json"
1085        );
1086        // Trailing-slash prefix must not yield "inv//src/...".
1087        let mut cfg2 = cfg.clone();
1088        cfg2.destination_prefix = "inv/".into();
1089        assert_eq!(
1090            csv_destination_key(&cfg2, now),
1091            "inv/src/daily-csv/data/2026-05-13T010203Z.csv"
1092        );
1093    }
1094
1095    #[test]
1096    fn run_once_writes_csv_and_manifest_and_marks_run() {
1097        let m = InventoryManager::new();
1098        m.put(sample_config());
1099        let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1100            .unwrap()
1101            .with_timezone(&Utc);
1102        let written = std::sync::Mutex::new(Vec::<(String, String, Vec<u8>)>::new());
1103        let keys = m
1104            .run_once_for_test(
1105                "src",
1106                "daily-csv",
1107                vec![sample_row("a", 1), sample_row("b", 2)],
1108                now,
1109                |dst_bucket, dst_key, body| {
1110                    written
1111                        .lock()
1112                        .unwrap()
1113                        .push((dst_bucket.to_owned(), dst_key.to_owned(), body));
1114                    Ok(())
1115                },
1116            )
1117            .expect("run_once_for_test");
1118        assert_eq!(keys.len(), 2);
1119        assert!(keys[0].ends_with(".csv"));
1120        assert!(keys[1].ends_with("manifest.json"));
1121        let written = written.into_inner().unwrap();
1122        assert_eq!(written.len(), 2);
1123        for (bucket, _, _) in &written {
1124            assert_eq!(bucket, "dst");
1125        }
1126        // mark_run stamped a `last_run`, so `due` is now false until 24h
1127        // later.
1128        assert!(!m.due("src", "daily-csv", now + chrono::Duration::hours(1)));
1129        assert!(m.due("src", "daily-csv", now + chrono::Duration::hours(25)));
1130    }
1131
1132    #[test]
1133    fn run_once_unknown_config_is_an_error() {
1134        let m = InventoryManager::new();
1135        let now = Utc::now();
1136        let err = m.run_once_for_test(
1137            "ghost",
1138            "nothing",
1139            std::iter::empty(),
1140            now,
1141            |_, _, _| Ok(()),
1142        );
1143        assert!(matches!(err, Err(RunError::UnknownConfig(_, _))));
1144    }
1145
1146    // ---- v0.8.3 #67 (audit H-7): encryption_status_from_head ordering --
1147    //
1148    // HEAD's SSE-KMS signal is `server_side_encryption = "aws:kms"`;
1149    // the separate `ssekms_key_id` field is a PUT/GET output only.
1150    // Pre-fix the function checked `ssekms_key_id` before
1151    // `server_side_encryption` and an SSE-S4 fall-through branch
1152    // misclassified `aws:kms` HEADs as `"SSE-S4"`. These four tests
1153    // pin the post-fix ordering at the unit level so a future refactor
1154    // cannot silently re-introduce the bug.
1155
1156    #[test]
1157    fn encryption_status_sse_kms_via_aws_kms_string() {
1158        let head = HeadObjectOutput {
1159            server_side_encryption: Some(ServerSideEncryption::from_static(
1160                ServerSideEncryption::AWS_KMS,
1161            )),
1162            ..Default::default()
1163        };
1164        assert_eq!(encryption_status_from_head(&head), "SSE-KMS");
1165    }
1166
1167    #[test]
1168    fn encryption_status_sse_c_via_customer_algorithm() {
1169        let head = HeadObjectOutput {
1170            sse_customer_algorithm: Some("AES256".to_owned()),
1171            ..Default::default()
1172        };
1173        assert_eq!(encryption_status_from_head(&head), "SSE-C");
1174    }
1175
1176    #[test]
1177    fn encryption_status_sse_s4_via_metadata_flag() {
1178        let mut metadata = HashMap::new();
1179        metadata.insert("s4-encrypted".to_owned(), "aes-256-gcm".to_owned());
1180        let head = HeadObjectOutput {
1181            metadata: Some(metadata),
1182            ..Default::default()
1183        };
1184        assert_eq!(encryption_status_from_head(&head), "SSE-S4");
1185    }
1186
1187    #[test]
1188    fn encryption_status_not_sse_when_all_absent() {
1189        let head = HeadObjectOutput::default();
1190        assert_eq!(encryption_status_from_head(&head), "NOT-SSE");
1191    }
1192
1193    // ---- v0.7 #46: scanner runner tests --------------------------------
1194    //
1195    // These tests stand up an in-memory `S4Service` over a tiny
1196    // `InvScannerMemBackend` (separate from the larger `MemoryBackend`
1197    // in `tests/roundtrip.rs` so this module stays self-contained, and
1198    // separate from the lifecycle scanner's `ScannerMemBackend` so each
1199    // module owns its own minimal stub). Implements only the three
1200    // `S3` methods the inventory scanner touches: `put_object`,
1201    // `head_object`, `list_objects_v2`.
1202
1203    use std::collections::HashMap as StdHashMap;
1204    use std::sync::Mutex as StdMutex;
1205
1206    use bytes::Bytes;
1207    use s3s::dto as dto2;
1208    use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1209    use s4_codec::dispatcher::AlwaysDispatcher;
1210    use s4_codec::passthrough::Passthrough;
1211    use s4_codec::{CodecKind, CodecRegistry};
1212
1213    use crate::S4Service;
1214
1215    #[derive(Default)]
1216    struct InvScannerMemBackend {
1217        objects: StdMutex<StdHashMap<(String, String), InvScannerStored>>,
1218    }
1219
1220    #[derive(Clone)]
1221    struct InvScannerStored {
1222        body: Bytes,
1223        last_modified: dto2::Timestamp,
1224    }
1225
1226    impl InvScannerMemBackend {
1227        fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1228            self.objects.lock().unwrap().insert(
1229                (bucket.to_owned(), key.to_owned()),
1230                InvScannerStored {
1231                    body,
1232                    last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1233                },
1234            );
1235        }
1236    }
1237
1238    #[async_trait::async_trait]
1239    impl S3 for InvScannerMemBackend {
1240        async fn put_object(
1241            &self,
1242            req: S3Request<dto2::PutObjectInput>,
1243        ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1244            // Drain the body (the inventory scanner sends a real CSV /
1245            // manifest as the PUT body) and record what landed.
1246            let body = match req.input.body {
1247                Some(blob) => crate::blob::collect_blob(blob, usize::MAX)
1248                    .await
1249                    .map_err(|e| {
1250                        S3Error::with_message(S3ErrorCode::InternalError, format!("{e}"))
1251                    })?,
1252                None => Bytes::new(),
1253            };
1254            self.put_now(&req.input.bucket, &req.input.key, body);
1255            Ok(S3Response::new(dto2::PutObjectOutput::default()))
1256        }
1257
1258        async fn head_object(
1259            &self,
1260            req: S3Request<dto2::HeadObjectInput>,
1261        ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1262            let key = (req.input.bucket.clone(), req.input.key.clone());
1263            let lock = self.objects.lock().unwrap();
1264            let stored = lock
1265                .get(&key)
1266                .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1267            Ok(S3Response::new(dto2::HeadObjectOutput {
1268                content_length: Some(stored.body.len() as i64),
1269                last_modified: Some(stored.last_modified.clone()),
1270                e_tag: Some(dto2::ETag::Strong(format!("etag-{}", stored.body.len()))),
1271                ..Default::default()
1272            }))
1273        }
1274
1275        async fn list_objects_v2(
1276            &self,
1277            req: S3Request<dto2::ListObjectsV2Input>,
1278        ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1279            let prefix = req.input.bucket.clone();
1280            let lock = self.objects.lock().unwrap();
1281            let mut contents: Vec<dto2::Object> = lock
1282                .iter()
1283                .filter(|((b, _), _)| b == &prefix)
1284                .map(|((_, k), v)| dto2::Object {
1285                    key: Some(k.clone()),
1286                    size: Some(v.body.len() as i64),
1287                    last_modified: Some(v.last_modified.clone()),
1288                    e_tag: Some(dto2::ETag::Strong(format!("etag-{}", v.body.len()))),
1289                    ..Default::default()
1290                })
1291                .collect();
1292            contents.sort_by(|a, b| a.key.cmp(&b.key));
1293            let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1294            Ok(S3Response::new(dto2::ListObjectsV2Output {
1295                name: Some(prefix),
1296                contents: Some(contents),
1297                key_count: Some(key_count),
1298                is_truncated: Some(false),
1299                ..Default::default()
1300            }))
1301        }
1302
1303        async fn get_object(
1304            &self,
1305            req: S3Request<dto2::GetObjectInput>,
1306        ) -> S3Result<S3Response<dto2::GetObjectOutput>> {
1307            let key = (req.input.bucket.clone(), req.input.key.clone());
1308            let lock = self.objects.lock().unwrap();
1309            let stored = lock
1310                .get(&key)
1311                .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1312            Ok(S3Response::new(dto2::GetObjectOutput {
1313                content_length: Some(stored.body.len() as i64),
1314                last_modified: Some(stored.last_modified.clone()),
1315                body: Some(crate::blob::bytes_to_blob(stored.body.clone())),
1316                ..Default::default()
1317            }))
1318        }
1319    }
1320
1321    fn make_codec() -> (Arc<CodecRegistry>, Arc<AlwaysDispatcher>) {
1322        (
1323            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough))),
1324            Arc::new(AlwaysDispatcher(CodecKind::Passthrough)),
1325        )
1326    }
1327
1328    /// Build an `S4Service` over a pre-seeded backend, optionally with
1329    /// the given inventory manager attached. The backend is consumed
1330    /// into the service (matching the `lifecycle.rs` test pattern); to
1331    /// observe destination writes, the test issues post-scan
1332    /// `list_objects_v2` / `get_object` calls through the service.
1333    fn make_inv_service(
1334        backend: InvScannerMemBackend,
1335        with_inv: Option<Arc<InventoryManager>>,
1336    ) -> Arc<S4Service<InvScannerMemBackend>> {
1337        let (registry, dispatcher) = make_codec();
1338        let svc = S4Service::new(backend, registry, dispatcher);
1339        let svc = match with_inv {
1340            Some(m) => svc.with_inventory(m),
1341            None => svc,
1342        };
1343        Arc::new(svc)
1344    }
1345
1346    #[tokio::test]
1347    async fn run_scan_once_no_inventory_manager_returns_empty_report() {
1348        // No inventory manager attached → clean no-op.
1349        let s4 = make_inv_service(InvScannerMemBackend::default(), None);
1350        let report = run_scan_once(&s4).await.expect("scan");
1351        assert_eq!(report, ScanReport::default());
1352    }
1353
1354    #[tokio::test]
1355    async fn run_scan_once_no_configs_returns_empty_report() {
1356        // Manager attached but no configs registered → no-op.
1357        let mgr = Arc::new(InventoryManager::new());
1358        let s4 = make_inv_service(InvScannerMemBackend::default(), Some(Arc::clone(&mgr)));
1359        let report = run_scan_once(&s4).await.expect("scan");
1360        assert_eq!(report.configs_evaluated, 0);
1361        assert_eq!(report.csvs_written, 0);
1362        assert_eq!(report.objects_listed, 0);
1363    }
1364
1365    #[tokio::test]
1366    async fn run_scan_once_walks_bucket_and_writes_csv_and_manifest() {
1367        // Bucket "src" has three objects + a destination bucket "dst".
1368        // Inventory config is freshly put, so `due()` returns true on
1369        // first call. After the scanner runs, `dst` has the rendered
1370        // CSV + manifest.json under the configured prefix.
1371        let mgr = Arc::new(InventoryManager::new());
1372        mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1373        let backend = InvScannerMemBackend::default();
1374        for (key, body) in [
1375            ("alpha.txt", &b"AAA"[..]),
1376            ("nested/beta.bin", &b"BB"[..]),
1377            ("z.txt", &b"Z"[..]),
1378        ] {
1379            backend.put_now("src", key, Bytes::copy_from_slice(body));
1380        }
1381        let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1382
1383        let report = run_scan_once(&s4).await.expect("scan");
1384        assert_eq!(report.configs_evaluated, 1);
1385        assert_eq!(report.buckets_scanned, 1);
1386        assert_eq!(report.objects_listed, 3);
1387        assert_eq!(report.csvs_written, 1);
1388        assert_eq!(report.errors, 0);
1389
1390        // Destination bucket: one CSV + one manifest.json under the
1391        // configured prefix `inv/src/d1/...`. List via the service's
1392        // own `list_objects_v2` so the post-scan check exercises the
1393        // same code-path the scanner walked.
1394        let list_req = synthetic_request(
1395            ListObjectsV2Input {
1396                bucket: "dst".into(),
1397                ..Default::default()
1398            },
1399            http::Method::GET,
1400            "/dst?list-type=2",
1401        );
1402        let list_resp = s4
1403            .as_ref()
1404            .list_objects_v2(list_req)
1405            .await
1406            .expect("post-scan list");
1407        let dst_keys: Vec<String> = list_resp
1408            .output
1409            .contents
1410            .unwrap_or_default()
1411            .into_iter()
1412            .filter_map(|o| o.key)
1413            .collect();
1414        let csv_keys: Vec<String> = dst_keys
1415            .iter()
1416            .filter(|k| k.ends_with(".csv"))
1417            .cloned()
1418            .collect();
1419        let manifest_keys: Vec<String> = dst_keys
1420            .iter()
1421            .filter(|k| k.ends_with("manifest.json"))
1422            .cloned()
1423            .collect();
1424        assert_eq!(
1425            csv_keys.len(),
1426            1,
1427            "exactly one CSV must land; got {dst_keys:?}"
1428        );
1429        assert_eq!(
1430            manifest_keys.len(),
1431            1,
1432            "exactly one manifest.json must land; got {dst_keys:?}"
1433        );
1434        assert!(
1435            csv_keys[0].starts_with("inv/src/d1/data/"),
1436            "CSV key must be under <prefix>/<bucket>/<id>/data/, got {}",
1437            csv_keys[0]
1438        );
1439        assert!(
1440            manifest_keys[0].starts_with("inv/src/d1/"),
1441            "manifest key must be under <prefix>/<bucket>/<id>/, got {}",
1442            manifest_keys[0]
1443        );
1444
1445        // CSV body: header + 3 data rows = 4 lines.
1446        let get_req = synthetic_request(
1447            GetObjectInput {
1448                bucket: "dst".into(),
1449                key: csv_keys[0].clone(),
1450                ..Default::default()
1451            },
1452            http::Method::GET,
1453            &format!("/dst/{}", csv_keys[0]),
1454        );
1455        let get_resp = s4.as_ref().get_object(get_req).await.expect("read CSV");
1456        let body = get_resp.output.body.expect("body");
1457        let csv_bytes = crate::blob::collect_blob(body, usize::MAX)
1458            .await
1459            .expect("collect");
1460        let csv_text = std::str::from_utf8(&csv_bytes).expect("utf8");
1461        let line_count = csv_text.lines().count();
1462        assert_eq!(line_count, 4, "header + 3 data rows; got:\n{csv_text}");
1463        assert!(csv_text.starts_with("Bucket,Key,VersionId"));
1464        // All three source keys must appear quoted in the CSV body.
1465        assert!(csv_text.contains("\"alpha.txt\""));
1466        assert!(csv_text.contains("\"nested/beta.bin\""));
1467        assert!(csv_text.contains("\"z.txt\""));
1468    }
1469
1470    #[tokio::test]
1471    async fn run_scan_once_skips_configs_that_are_not_due() {
1472        // Stamp `mark_run` at "now" so `due()` returns false until 24h
1473        // later — the scanner must NOT walk the bucket and NOT bump
1474        // `csvs_written`.
1475        let mgr = Arc::new(InventoryManager::new());
1476        mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1477        mgr.mark_run("src", "d1", Utc::now());
1478        let backend = InvScannerMemBackend::default();
1479        backend.put_now("src", "alpha.txt", Bytes::from_static(b"A"));
1480        let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1481
1482        let report = run_scan_once(&s4).await.expect("scan");
1483        assert_eq!(report.configs_evaluated, 1);
1484        assert_eq!(report.buckets_scanned, 0, "no walk; due() returned false");
1485        assert_eq!(report.csvs_written, 0);
1486        assert_eq!(report.objects_listed, 0);
1487        assert_eq!(report.errors, 0);
1488
1489        // Nothing landed in `dst`.
1490        let list_req = synthetic_request(
1491            ListObjectsV2Input {
1492                bucket: "dst".into(),
1493                ..Default::default()
1494            },
1495            http::Method::GET,
1496            "/dst?list-type=2",
1497        );
1498        let list_resp = s4
1499            .as_ref()
1500            .list_objects_v2(list_req)
1501            .await
1502            .expect("post-scan list");
1503        assert!(
1504            list_resp.output.contents.unwrap_or_default().is_empty(),
1505            "no destination writes expected when config is not due"
1506        );
1507    }
1508
1509    /// v0.8.4 #77 (audit H-8): a panic inside the `configs` write
1510    /// guard poisons the lock. `to_json` must recover via
1511    /// [`crate::lock_recovery::recover_read`] and surface the data
1512    /// instead of re-panicking on the SIGUSR1 dump-back path.
1513    #[test]
1514    fn inventory_to_json_after_panic_recovers_via_poison() {
1515        let mgr = std::sync::Arc::new(InventoryManager::new());
1516        mgr.put(InventoryConfig::daily_csv("inv1", "src", "dst", "reports/"));
1517        let mgr_cl = std::sync::Arc::clone(&mgr);
1518        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1519            let mut g = mgr_cl.configs.write().expect("clean lock");
1520            g.insert(
1521                ("src2".into(), "inv2".into()),
1522                InventoryConfig::daily_csv("inv2", "src2", "dst2", "r/"),
1523            );
1524            panic!("force-poison");
1525        }));
1526        assert!(
1527            mgr.configs.is_poisoned(),
1528            "write panic must poison configs lock"
1529        );
1530        let json = mgr.to_json().expect("to_json after poison must succeed");
1531        let mgr2 = InventoryManager::from_json(&json).expect("from_json");
1532        assert!(
1533            mgr2.get("src", "inv1").is_some(),
1534            "recovered snapshot keeps original entry"
1535        );
1536    }
1537}