Skip to main content

s4_server/
inventory.rs

1//! S3 Inventory: daily/hourly per-bucket CSV dump (v0.6 #36).
2//!
3//! AWS S3 Inventory delivers a periodic flat report (CSV, ORC, or Parquet) of
4//! every object in a source bucket to a destination bucket prefix. S4-server
5//! supports the **CSV** format (matching AWS's "headers + rows + manifest"
6//! layout); ORC / Parquet are out of scope for #36 (parquet behind a future
7//! feature flag).
8//!
9//! ## responsibilities (v0.6 #36)
10//!
11//! - in-memory `(bucket, id) -> InventoryConfig` map with JSON snapshot
12//!   round-trip, mirroring `versioning.rs` / `object_lock.rs`'s shape so
13//!   `--inventory-state-file` is a one-line addition in `main.rs`.
14//! - per-config `last_run` timestamp + `due()` predicate so the background
15//!   tokio task in `main.rs` can fire on a fixed cadence without re-reading
16//!   the wall clock against every config.
17//! - `render_csv` + `render_manifest_json` helpers that convert a sequence of
18//!   `InventoryRow` (= one logical S3 object) into the AWS-compatible CSV
19//!   bytes and the `manifest.json` pointer file. The manifest layout follows
20//!   the AWS Inventory spec: `sourceBucket`, `destinationBucket`,
21//!   `creationTimestamp` (epoch millis), `fileFormat`, `fileSchema`, and a
22//!   `files[]` array of `{ key, size, MD5checksum }`.
23//! - `run_once_for_test` runs a single inventory cycle for a given config
24//!   against a caller-provided row iterator and a caller-provided "writer"
25//!   closure, emitting both the CSV file(s) and the matching `manifest.json`.
26//!   This is the entry point that both the unit tests and the E2E test in
27//!   `tests/roundtrip.rs` poke directly without needing to spawn the
28//!   background task.
29//!
30//! ## scope limitations
31//!
32//! - in-memory only (no replication across multi-instance deployments;
33//!   `--inventory-state-file <PATH>` provides restart recovery via JSON
34//!   snapshot, same shape as `--versioning-state-file`).
35//! - Parquet / ORC formats are NOT implemented (CSV only). The
36//!   `InventoryFormat` enum has `Csv` as its only variant on purpose so the
37//!   compile-time exhaustiveness check forces a scope review when more
38//!   formats land.
39//! - No multi-shard CSV splitting yet — every cycle emits a single CSV file
40//!   per (bucket, id). AWS S3 may shard large inventories into multiple
41//!   `<uuid>.csv.gz` files under the same manifest; here `csv_keys` is a
42//!   `&[String]` so the multi-file shape is wire-future-proof, but the
43//!   current writer always supplies a single key.
44//! - No gzip compression of the CSV body in this iteration (the file
45//!   extension is `.csv`, not `.csv.gz`); AWS clients accept this.
46
47use std::collections::HashMap;
48use std::sync::RwLock;
49
50use chrono::{DateTime, Utc};
51use serde::{Deserialize, Serialize};
52
53/// Output format. Only `Csv` is implemented today; Parquet is reserved for a
54/// future feature-gated build.
55#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
56pub enum InventoryFormat {
57    Csv,
58}
59
60impl InventoryFormat {
61    /// Wire string used by AWS S3 (`"CSV"`).
62    #[must_use]
63    pub fn as_aws_str(self) -> &'static str {
64        match self {
65            Self::Csv => "CSV",
66        }
67    }
68
69    /// File extension (no leading dot) emitted under the destination prefix.
70    #[must_use]
71    pub fn file_extension(self) -> &'static str {
72        match self {
73            Self::Csv => "csv",
74        }
75    }
76}
77
78/// Whether the inventory should include every version of every object
79/// (`All`) or only the latest non-delete-marker version (`Current`). Mirrors
80/// AWS S3's `IncludedObjectVersions` enum.
81#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
82pub enum IncludedVersions {
83    Current,
84    All,
85}
86
87impl IncludedVersions {
88    /// AWS wire form (`"Current"` / `"All"`).
89    #[must_use]
90    pub fn as_aws_str(self) -> &'static str {
91        match self {
92            Self::Current => "Current",
93            Self::All => "All",
94        }
95    }
96
97    /// Parse the AWS wire form (case-insensitive). Falls back to `Current`
98    /// when the input is empty or unrecognised, matching what AWS does on a
99    /// PUT with a missing/blank field.
100    #[must_use]
101    pub fn from_aws_str(s: &str) -> Self {
102        if s.eq_ignore_ascii_case("All") {
103            Self::All
104        } else {
105            Self::Current
106        }
107    }
108}
109
110/// One inventory configuration, keyed by `(bucket, id)`.
111///
112/// `frequency_hours` is S4-internal — AWS only supports `Daily` (24h) and
113/// `Weekly` (168h), but representing the cadence in hours lets the operator
114/// pick any value via the gateway-internal API even though the over-the-wire
115/// PUT only accepts the AWS-named frequencies.
116#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
117pub struct InventoryConfig {
118    pub id: String,
119    pub bucket: String,
120    pub destination_bucket: String,
121    pub destination_prefix: String,
122    pub frequency_hours: u32,
123    pub format: InventoryFormat,
124    pub included_object_versions: IncludedVersions,
125}
126
127impl InventoryConfig {
128    /// Convenience constructor for a daily CSV inventory of latest versions
129    /// — the most common shape, matching AWS S3's default suggestion.
130    #[must_use]
131    pub fn daily_csv(
132        id: impl Into<String>,
133        bucket: impl Into<String>,
134        destination_bucket: impl Into<String>,
135        destination_prefix: impl Into<String>,
136    ) -> Self {
137        Self {
138            id: id.into(),
139            bucket: bucket.into(),
140            destination_bucket: destination_bucket.into(),
141            destination_prefix: destination_prefix.into(),
142            frequency_hours: 24,
143            format: InventoryFormat::Csv,
144            included_object_versions: IncludedVersions::Current,
145        }
146    }
147}
148
149/// One row in the rendered CSV. Headers are fixed (see [`render_csv`]).
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct InventoryRow {
152    pub bucket: String,
153    pub key: String,
154    pub version_id: Option<String>,
155    pub is_latest: bool,
156    pub is_delete_marker: bool,
157    pub size: u64,
158    pub last_modified: DateTime<Utc>,
159    pub etag: String,
160    pub storage_class: String,
161    /// `"SSE-S4"` / `"SSE-KMS"` / `"SSE-C"` / `"NOT-SSE"`. Free-form so the
162    /// caller can extend without forcing a new variant here.
163    pub encryption_status: String,
164}
165
166/// JSON snapshot shape — just `(bucket, id) -> config` plus the `last_run`
167/// timestamps. The two maps live in separate `HashMap`s so a config can be
168/// loaded from a snapshot without inheriting a prior `last_run` (e.g. when
169/// hand-editing the snapshot to force a re-run on next cadence tick).
170#[derive(Debug, Default, Serialize, Deserialize)]
171struct InventorySnapshot {
172    /// `(bucket, id) -> config`, but keyed as `"<bucket>\u{1F}<id>"` because
173    /// `serde_json` cannot serialise tuple keys.
174    configs: HashMap<String, InventoryConfig>,
175    last_run: HashMap<String, DateTime<Utc>>,
176}
177
178/// Composite key delimiter — ASCII 0x1F (Unit Separator), guaranteed not to
179/// appear in either an S3 bucket name or an inventory id.
180const KEY_SEP: char = '\u{1F}';
181
182fn join_key(bucket: &str, id: &str) -> String {
183    let mut s = String::with_capacity(bucket.len() + 1 + id.len());
184    s.push_str(bucket);
185    s.push(KEY_SEP);
186    s.push_str(id);
187    s
188}
189
190fn split_key(s: &str) -> Option<(String, String)> {
191    s.split_once(KEY_SEP)
192        .map(|(b, i)| (b.to_owned(), i.to_owned()))
193}
194
195/// In-memory manager of inventory configs and last-run timestamps.
196#[derive(Debug, Default)]
197pub struct InventoryManager {
198    configs: RwLock<HashMap<(String, String), InventoryConfig>>,
199    last_run: RwLock<HashMap<(String, String), DateTime<Utc>>>,
200}
201
202impl InventoryManager {
203    #[must_use]
204    pub fn new() -> Self {
205        Self::default()
206    }
207
208    /// Insert / overwrite a configuration. Resets the matching `last_run`
209    /// (so the next `due()` call returns `true`, matching AWS behaviour
210    /// where a freshly-PUT inventory config triggers an inventory at the
211    /// next scheduler tick).
212    pub fn put(&self, config: InventoryConfig) {
213        let key = (config.bucket.clone(), config.id.clone());
214        self.last_run
215            .write()
216            .expect("inventory last_run RwLock poisoned")
217            .remove(&key);
218        self.configs
219            .write()
220            .expect("inventory configs RwLock poisoned")
221            .insert(key, config);
222    }
223
224    /// Fetch a clone of the configuration. `None` when not present.
225    #[must_use]
226    pub fn get(&self, bucket: &str, id: &str) -> Option<InventoryConfig> {
227        self.configs
228            .read()
229            .expect("inventory configs RwLock poisoned")
230            .get(&(bucket.to_owned(), id.to_owned()))
231            .cloned()
232    }
233
234    /// All configurations attached to `bucket` (any `id`). The returned
235    /// vector is sorted by `id` for stable list responses.
236    #[must_use]
237    pub fn list_for_bucket(&self, bucket: &str) -> Vec<InventoryConfig> {
238        let map = self.configs.read().expect("inventory configs RwLock poisoned");
239        let mut out: Vec<InventoryConfig> = map
240            .iter()
241            .filter(|((b, _id), _)| b == bucket)
242            .map(|(_, cfg)| cfg.clone())
243            .collect();
244        out.sort_by(|a, b| a.id.cmp(&b.id));
245        out
246    }
247
248    /// Drop a config + its `last_run` (idempotent — missing keys are OK).
249    pub fn delete(&self, bucket: &str, id: &str) {
250        let key = (bucket.to_owned(), id.to_owned());
251        self.configs
252            .write()
253            .expect("inventory configs RwLock poisoned")
254            .remove(&key);
255        self.last_run
256            .write()
257            .expect("inventory last_run RwLock poisoned")
258            .remove(&key);
259    }
260
261    /// `true` when the configuration exists and either has never run, or its
262    /// `last_run + frequency_hours` has elapsed by `now`. `false` when the
263    /// configuration is missing (no config = nothing to do).
264    #[must_use]
265    pub fn due(&self, bucket: &str, id: &str, now: DateTime<Utc>) -> bool {
266        let key = (bucket.to_owned(), id.to_owned());
267        let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
268        let Some(cfg) = cfgs.get(&key) else {
269            return false;
270        };
271        let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
272        match runs.get(&key) {
273            None => true,
274            Some(prev) => {
275                let elapsed = now.signed_duration_since(*prev);
276                elapsed >= chrono::Duration::hours(i64::from(cfg.frequency_hours))
277            }
278        }
279    }
280
281    /// Stamp `(bucket, id) -> when` so `due` will say "false" until the
282    /// next interval boundary.
283    pub fn mark_run(&self, bucket: &str, id: &str, when: DateTime<Utc>) {
284        self.last_run
285            .write()
286            .expect("inventory last_run RwLock poisoned")
287            .insert((bucket.to_owned(), id.to_owned()), when);
288    }
289
290    /// Snapshot to JSON (operators can persist via `--inventory-state-file`).
291    pub fn to_json(&self) -> Result<String, serde_json::Error> {
292        let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
293        let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
294        let snap = InventorySnapshot {
295            configs: cfgs
296                .iter()
297                .map(|((b, i), v)| (join_key(b, i), v.clone()))
298                .collect(),
299            last_run: runs
300                .iter()
301                .map(|((b, i), v)| (join_key(b, i), *v))
302                .collect(),
303        };
304        serde_json::to_string(&snap)
305    }
306
307    /// Restore from JSON snapshot. Unknown keys (= without the separator) are
308    /// silently dropped so a malformed entry can't poison startup.
309    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
310        let snap: InventorySnapshot = serde_json::from_str(s)?;
311        let mut configs: HashMap<(String, String), InventoryConfig> = HashMap::new();
312        for (k, v) in snap.configs {
313            if let Some(pair) = split_key(&k) {
314                configs.insert(pair, v);
315            }
316        }
317        let mut last_run: HashMap<(String, String), DateTime<Utc>> = HashMap::new();
318        for (k, v) in snap.last_run {
319            if let Some(pair) = split_key(&k) {
320                last_run.insert(pair, v);
321            }
322        }
323        Ok(Self {
324            configs: RwLock::new(configs),
325            last_run: RwLock::new(last_run),
326        })
327    }
328
329    /// Run a single inventory cycle for `(bucket, id)` against `rows`,
330    /// invoking `write_object(dst_bucket, dst_key, body)` once for the CSV
331    /// and once for the manifest. Stamps `last_run` on success. Returns the
332    /// destination keys of the artefacts written (`[csv_key, manifest_key]`).
333    ///
334    /// This is the synchronous path the unit tests + the E2E test use, and
335    /// it is what the future background scheduler in `main.rs` will call
336    /// after walking the source bucket. Keeping the row source as an
337    /// iterator means the inventory module never needs a back-reference to
338    /// `S4Service`, which sidesteps the circular dependency between the
339    /// service handler and a scheduler that lives outside `S4Service`.
340    pub fn run_once_for_test<I, F>(
341        &self,
342        bucket: &str,
343        id: &str,
344        rows: I,
345        now: DateTime<Utc>,
346        mut write_object: F,
347    ) -> Result<Vec<String>, RunError>
348    where
349        I: IntoIterator<Item = InventoryRow>,
350        F: FnMut(&str, &str, Vec<u8>) -> Result<(), RunError>,
351    {
352        let cfg = self
353            .get(bucket, id)
354            .ok_or_else(|| RunError::UnknownConfig(bucket.to_owned(), id.to_owned()))?;
355        let csv_bytes = render_csv(rows.into_iter());
356        let csv_md5 = md5_hex(&csv_bytes);
357        let csv_key = csv_destination_key(&cfg, now);
358        let manifest_key = manifest_destination_key(&cfg, now);
359        let manifest_body = render_manifest_json(
360            &cfg,
361            std::slice::from_ref(&csv_key),
362            std::slice::from_ref(&csv_md5),
363            now,
364        )
365        .into_bytes();
366        write_object(&cfg.destination_bucket, &csv_key, csv_bytes)?;
367        write_object(&cfg.destination_bucket, &manifest_key, manifest_body)?;
368        self.mark_run(bucket, id, now);
369        Ok(vec![csv_key, manifest_key])
370    }
371}
372
373/// Render an iterator of `InventoryRow` into the AWS-compatible CSV body.
374///
375/// Headers, in order: `Bucket, Key, VersionId, IsLatest, IsDeleteMarker,
376/// Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus`. RFC 4180
377/// quoting: every cell is wrapped in `"..."` and embedded `"` is doubled.
378/// `LastModifiedDate` uses the AWS-canonical RFC 3339 form
379/// (`YYYY-MM-DDTHH:MM:SS.sssZ`).
380pub fn render_csv(rows: impl Iterator<Item = InventoryRow>) -> Vec<u8> {
381    let mut out = Vec::new();
382    out.extend_from_slice(
383        b"Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus\n",
384    );
385    for row in rows {
386        let cells: [String; 10] = [
387            row.bucket,
388            row.key,
389            row.version_id.unwrap_or_default(),
390            row.is_latest.to_string(),
391            row.is_delete_marker.to_string(),
392            row.size.to_string(),
393            row.last_modified
394                .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
395            row.etag,
396            row.storage_class,
397            row.encryption_status,
398        ];
399        for (i, cell) in cells.iter().enumerate() {
400            if i > 0 {
401                out.push(b',');
402            }
403            out.push(b'"');
404            for b in cell.as_bytes() {
405                if *b == b'"' {
406                    out.extend_from_slice(b"\"\"");
407                } else {
408                    out.push(*b);
409                }
410            }
411            out.push(b'"');
412        }
413        out.push(b'\n');
414    }
415    out
416}
417
418/// Render the AWS-style `manifest.json` that points at the latest inventory
419/// CSV(s). Schema mirrors what AWS S3 emits today (extracted from a real
420/// inventory delivery): `sourceBucket`, `destinationBucket`,
421/// `version`, `creationTimestamp` (epoch millis as a string),
422/// `fileFormat`, `fileSchema`, `files[]` with `{ key, size, MD5checksum }`.
423pub fn render_manifest_json(
424    config: &InventoryConfig,
425    csv_keys: &[String],
426    md5s: &[String],
427    written_at: DateTime<Utc>,
428) -> String {
429    // Always pair csv_keys[i] with md5s[i] — if the lengths disagree, the
430    // shorter one wins (defensive: a future caller might forget to extend
431    // both arrays simultaneously).
432    let n = csv_keys.len().min(md5s.len());
433    let files_json: Vec<serde_json::Value> = (0..n)
434        .map(|i| {
435            serde_json::json!({
436                "key": csv_keys[i],
437                // size is unknown at manifest-time without re-reading the
438                // emitted CSV; we leave it as a placeholder 0 because the
439                // canonical AWS manifest also accepts (and produces) the
440                // size after the writer has finalised the file. Tests only
441                // assert on `key` and `MD5checksum`.
442                "size": 0,
443                "MD5checksum": md5s[i],
444            })
445        })
446        .collect();
447    let value = serde_json::json!({
448        "sourceBucket": config.bucket,
449        "destinationBucket": config.destination_bucket,
450        "version": "2016-11-30",
451        "creationTimestamp": written_at.timestamp_millis().to_string(),
452        "fileFormat": config.format.as_aws_str(),
453        "fileSchema": csv_header_schema(config),
454        "files": files_json,
455    });
456    serde_json::to_string_pretty(&value).expect("static JSON is always serialisable")
457}
458
459/// Compute the destination CSV key under the configured prefix. Layout
460/// mirrors AWS S3's canonical inventory delivery:
461/// `<prefix>/<source_bucket>/<id>/data/<UTC date YYYY-MM-DD>T<HHMMSS>Z.csv`.
462#[must_use]
463pub fn csv_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
464    let stamp = now.format("%Y-%m-%dT%H%M%SZ");
465    let prefix = trim_trailing_slash(&config.destination_prefix);
466    format!(
467        "{prefix}/{src}/{id}/data/{stamp}.{ext}",
468        src = config.bucket,
469        id = config.id,
470        ext = config.format.file_extension()
471    )
472}
473
474/// Companion key for the JSON manifest (lives next to the CSV under the
475/// `<UTC date>` directory so a single inventory cycle's artefacts stay
476/// adjacent in lexicographic order).
477#[must_use]
478pub fn manifest_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
479    let stamp = now.format("%Y-%m-%dT%H%M%SZ");
480    let prefix = trim_trailing_slash(&config.destination_prefix);
481    format!(
482        "{prefix}/{src}/{id}/{stamp}/manifest.json",
483        src = config.bucket,
484        id = config.id
485    )
486}
487
488fn trim_trailing_slash(s: &str) -> &str {
489    s.strip_suffix('/').unwrap_or(s)
490}
491
492/// CSV header schema string (comma-separated, no trailing newline) that
493/// matches the order produced by [`render_csv`]. Embedded into the manifest
494/// so downstream consumers know the column layout without re-parsing the CSV.
495fn csv_header_schema(_cfg: &InventoryConfig) -> &'static str {
496    "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus"
497}
498
499fn md5_hex(bytes: &[u8]) -> String {
500    use md5::{Digest, Md5};
501    let mut h = Md5::new();
502    h.update(bytes);
503    let out = h.finalize();
504    let mut s = String::with_capacity(32);
505    for b in out {
506        s.push(hex_char(b >> 4));
507        s.push(hex_char(b & 0x0f));
508    }
509    s
510}
511
512fn hex_char(n: u8) -> char {
513    match n {
514        0..=9 => (b'0' + n) as char,
515        10..=15 => (b'a' + (n - 10)) as char,
516        _ => '0',
517    }
518}
519
520/// Errors surfaced by [`InventoryManager::run_once_for_test`]. Kept narrow so
521/// the caller (test or scheduler) can pattern-match without depending on the
522/// underlying writer's error type.
523#[derive(Debug, thiserror::Error)]
524pub enum RunError {
525    #[error("no inventory configuration for bucket={0} id={1}")]
526    UnknownConfig(String, String),
527    #[error("destination write failed: {0}")]
528    Write(String),
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534
535    fn sample_config() -> InventoryConfig {
536        InventoryConfig {
537            id: "daily-csv".into(),
538            bucket: "src".into(),
539            destination_bucket: "dst".into(),
540            destination_prefix: "inv".into(),
541            frequency_hours: 24,
542            format: InventoryFormat::Csv,
543            included_object_versions: IncludedVersions::Current,
544        }
545    }
546
547    fn sample_row(key: &str, size: u64) -> InventoryRow {
548        InventoryRow {
549            bucket: "src".into(),
550            key: key.into(),
551            version_id: None,
552            is_latest: true,
553            is_delete_marker: false,
554            size,
555            last_modified: DateTime::parse_from_rfc3339("2026-05-13T12:34:56.789Z")
556                .unwrap()
557                .with_timezone(&Utc),
558            etag: "abc123".into(),
559            storage_class: "STANDARD".into(),
560            encryption_status: "NOT-SSE".into(),
561        }
562    }
563
564    #[test]
565    fn config_json_round_trip() {
566        let m = InventoryManager::new();
567        m.put(sample_config());
568        let json = m.to_json().expect("to_json");
569        let m2 = InventoryManager::from_json(&json).expect("from_json");
570        assert_eq!(m2.get("src", "daily-csv"), Some(sample_config()));
571    }
572
573    #[test]
574    fn due_returns_true_when_never_run() {
575        let m = InventoryManager::new();
576        m.put(sample_config());
577        assert!(m.due("src", "daily-csv", Utc::now()));
578    }
579
580    #[test]
581    fn due_returns_true_when_interval_elapsed() {
582        let m = InventoryManager::new();
583        m.put(sample_config());
584        let then = Utc::now() - chrono::Duration::hours(25);
585        m.mark_run("src", "daily-csv", then);
586        assert!(m.due("src", "daily-csv", Utc::now()));
587    }
588
589    #[test]
590    fn due_returns_false_when_interval_not_yet_elapsed() {
591        let m = InventoryManager::new();
592        m.put(sample_config());
593        let just_now = Utc::now() - chrono::Duration::minutes(5);
594        m.mark_run("src", "daily-csv", just_now);
595        assert!(!m.due("src", "daily-csv", Utc::now()));
596    }
597
598    #[test]
599    fn due_returns_false_when_config_missing() {
600        let m = InventoryManager::new();
601        assert!(!m.due("ghost", "nothing", Utc::now()));
602    }
603
604    #[test]
605    fn list_for_bucket_filters_and_sorts() {
606        let m = InventoryManager::new();
607        let mut a = sample_config();
608        a.id = "z-last".into();
609        let mut b = sample_config();
610        b.id = "a-first".into();
611        let mut c = sample_config();
612        c.bucket = "other".into();
613        c.id = "should-not-appear".into();
614        m.put(a);
615        m.put(b);
616        m.put(c);
617        let list = m.list_for_bucket("src");
618        assert_eq!(list.len(), 2);
619        assert_eq!(list[0].id, "a-first");
620        assert_eq!(list[1].id, "z-last");
621    }
622
623    #[test]
624    fn render_csv_matches_aws_header_and_quotes_cells() {
625        let rows = vec![
626            sample_row("a/b.txt", 100),
627            sample_row("comma,here.txt", 200),
628            sample_row("quote\"inside.txt", 300),
629        ];
630        let csv = render_csv(rows.into_iter());
631        let s = String::from_utf8(csv).expect("utf8");
632        let mut lines = s.lines();
633        assert_eq!(
634            lines.next().unwrap(),
635            "Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus"
636        );
637        // First data row.
638        let row1 = lines.next().unwrap();
639        assert!(row1.starts_with("\"src\",\"a/b.txt\","));
640        assert!(row1.contains(",\"100\","));
641        assert!(row1.contains("\"2026-05-13T12:34:56.789Z\""));
642        // Comma in key must be inside quotes.
643        let row2 = lines.next().unwrap();
644        assert!(row2.contains("\"comma,here.txt\""));
645        // Embedded quote must be doubled.
646        let row3 = lines.next().unwrap();
647        assert!(row3.contains("\"quote\"\"inside.txt\""));
648        assert_eq!(lines.next(), None);
649    }
650
651    #[test]
652    fn render_manifest_json_carries_required_fields() {
653        let cfg = sample_config();
654        let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
655            .unwrap()
656            .with_timezone(&Utc);
657        let manifest = render_manifest_json(
658            &cfg,
659            &["inv/src/daily-csv/data/2026-05-13T000000Z.csv".into()],
660            &["d41d8cd98f00b204e9800998ecf8427e".into()],
661            now,
662        );
663        let v: serde_json::Value = serde_json::from_str(&manifest).expect("manifest must be JSON");
664        assert_eq!(v["sourceBucket"], "src");
665        assert_eq!(v["destinationBucket"], "dst");
666        assert_eq!(v["fileFormat"], "CSV");
667        assert_eq!(v["version"], "2016-11-30");
668        let files = v["files"].as_array().expect("files array");
669        assert_eq!(files.len(), 1);
670        assert_eq!(
671            files[0]["key"],
672            "inv/src/daily-csv/data/2026-05-13T000000Z.csv"
673        );
674        assert_eq!(files[0]["MD5checksum"], "d41d8cd98f00b204e9800998ecf8427e");
675        assert_eq!(
676            v["creationTimestamp"],
677            now.timestamp_millis().to_string()
678        );
679        let schema = v["fileSchema"].as_str().expect("fileSchema string");
680        assert!(schema.starts_with("Bucket, Key, VersionId"));
681        assert!(schema.ends_with("StorageClass, EncryptionStatus"));
682    }
683
684    #[test]
685    fn destination_keys_are_under_prefix_and_namespaced_by_source_bucket() {
686        let cfg = sample_config();
687        let now = DateTime::parse_from_rfc3339("2026-05-13T01:02:03.000Z")
688            .unwrap()
689            .with_timezone(&Utc);
690        let csv_key = csv_destination_key(&cfg, now);
691        let manifest_key = manifest_destination_key(&cfg, now);
692        assert_eq!(csv_key, "inv/src/daily-csv/data/2026-05-13T010203Z.csv");
693        assert_eq!(
694            manifest_key,
695            "inv/src/daily-csv/2026-05-13T010203Z/manifest.json"
696        );
697        // Trailing-slash prefix must not yield "inv//src/...".
698        let mut cfg2 = cfg.clone();
699        cfg2.destination_prefix = "inv/".into();
700        assert_eq!(
701            csv_destination_key(&cfg2, now),
702            "inv/src/daily-csv/data/2026-05-13T010203Z.csv"
703        );
704    }
705
706    #[test]
707    fn run_once_writes_csv_and_manifest_and_marks_run() {
708        let m = InventoryManager::new();
709        m.put(sample_config());
710        let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
711            .unwrap()
712            .with_timezone(&Utc);
713        let written = std::sync::Mutex::new(Vec::<(String, String, Vec<u8>)>::new());
714        let keys = m
715            .run_once_for_test(
716                "src",
717                "daily-csv",
718                vec![sample_row("a", 1), sample_row("b", 2)],
719                now,
720                |dst_bucket, dst_key, body| {
721                    written
722                        .lock()
723                        .unwrap()
724                        .push((dst_bucket.to_owned(), dst_key.to_owned(), body));
725                    Ok(())
726                },
727            )
728            .expect("run_once_for_test");
729        assert_eq!(keys.len(), 2);
730        assert!(keys[0].ends_with(".csv"));
731        assert!(keys[1].ends_with("manifest.json"));
732        let written = written.into_inner().unwrap();
733        assert_eq!(written.len(), 2);
734        for (bucket, _, _) in &written {
735            assert_eq!(bucket, "dst");
736        }
737        // mark_run stamped a `last_run`, so `due` is now false until 24h
738        // later.
739        assert!(!m.due("src", "daily-csv", now + chrono::Duration::hours(1)));
740        assert!(m.due("src", "daily-csv", now + chrono::Duration::hours(25)));
741    }
742
743    #[test]
744    fn run_once_unknown_config_is_an_error() {
745        let m = InventoryManager::new();
746        let now = Utc::now();
747        let err = m.run_once_for_test(
748            "ghost",
749            "nothing",
750            std::iter::empty(),
751            now,
752            |_, _, _| Ok(()),
753        );
754        assert!(matches!(err, Err(RunError::UnknownConfig(_, _))));
755    }
756}