1use std::collections::HashMap;
48use std::sync::RwLock;
49
50use chrono::{DateTime, Utc};
51use serde::{Deserialize, Serialize};
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
56pub enum InventoryFormat {
57 Csv,
58}
59
60impl InventoryFormat {
61 #[must_use]
63 pub fn as_aws_str(self) -> &'static str {
64 match self {
65 Self::Csv => "CSV",
66 }
67 }
68
69 #[must_use]
71 pub fn file_extension(self) -> &'static str {
72 match self {
73 Self::Csv => "csv",
74 }
75 }
76}
77
78#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
82pub enum IncludedVersions {
83 Current,
84 All,
85}
86
87impl IncludedVersions {
88 #[must_use]
90 pub fn as_aws_str(self) -> &'static str {
91 match self {
92 Self::Current => "Current",
93 Self::All => "All",
94 }
95 }
96
97 #[must_use]
101 pub fn from_aws_str(s: &str) -> Self {
102 if s.eq_ignore_ascii_case("All") {
103 Self::All
104 } else {
105 Self::Current
106 }
107 }
108}
109
110#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
117pub struct InventoryConfig {
118 pub id: String,
119 pub bucket: String,
120 pub destination_bucket: String,
121 pub destination_prefix: String,
122 pub frequency_hours: u32,
123 pub format: InventoryFormat,
124 pub included_object_versions: IncludedVersions,
125}
126
127impl InventoryConfig {
128 #[must_use]
131 pub fn daily_csv(
132 id: impl Into<String>,
133 bucket: impl Into<String>,
134 destination_bucket: impl Into<String>,
135 destination_prefix: impl Into<String>,
136 ) -> Self {
137 Self {
138 id: id.into(),
139 bucket: bucket.into(),
140 destination_bucket: destination_bucket.into(),
141 destination_prefix: destination_prefix.into(),
142 frequency_hours: 24,
143 format: InventoryFormat::Csv,
144 included_object_versions: IncludedVersions::Current,
145 }
146 }
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct InventoryRow {
152 pub bucket: String,
153 pub key: String,
154 pub version_id: Option<String>,
155 pub is_latest: bool,
156 pub is_delete_marker: bool,
157 pub size: u64,
158 pub last_modified: DateTime<Utc>,
159 pub etag: String,
160 pub storage_class: String,
161 pub encryption_status: String,
164}
165
166#[derive(Debug, Default, Serialize, Deserialize)]
171struct InventorySnapshot {
172 configs: HashMap<String, InventoryConfig>,
175 last_run: HashMap<String, DateTime<Utc>>,
176}
177
178const KEY_SEP: char = '\u{1F}';
181
182fn join_key(bucket: &str, id: &str) -> String {
183 let mut s = String::with_capacity(bucket.len() + 1 + id.len());
184 s.push_str(bucket);
185 s.push(KEY_SEP);
186 s.push_str(id);
187 s
188}
189
190fn split_key(s: &str) -> Option<(String, String)> {
191 s.split_once(KEY_SEP)
192 .map(|(b, i)| (b.to_owned(), i.to_owned()))
193}
194
195#[derive(Debug, Default)]
197pub struct InventoryManager {
198 configs: RwLock<HashMap<(String, String), InventoryConfig>>,
199 last_run: RwLock<HashMap<(String, String), DateTime<Utc>>>,
200}
201
202impl InventoryManager {
203 #[must_use]
204 pub fn new() -> Self {
205 Self::default()
206 }
207
208 pub fn put(&self, config: InventoryConfig) {
213 let key = (config.bucket.clone(), config.id.clone());
214 self.last_run
215 .write()
216 .expect("inventory last_run RwLock poisoned")
217 .remove(&key);
218 self.configs
219 .write()
220 .expect("inventory configs RwLock poisoned")
221 .insert(key, config);
222 }
223
224 #[must_use]
226 pub fn get(&self, bucket: &str, id: &str) -> Option<InventoryConfig> {
227 self.configs
228 .read()
229 .expect("inventory configs RwLock poisoned")
230 .get(&(bucket.to_owned(), id.to_owned()))
231 .cloned()
232 }
233
234 #[must_use]
237 pub fn list_for_bucket(&self, bucket: &str) -> Vec<InventoryConfig> {
238 let map = self.configs.read().expect("inventory configs RwLock poisoned");
239 let mut out: Vec<InventoryConfig> = map
240 .iter()
241 .filter(|((b, _id), _)| b == bucket)
242 .map(|(_, cfg)| cfg.clone())
243 .collect();
244 out.sort_by(|a, b| a.id.cmp(&b.id));
245 out
246 }
247
248 pub fn delete(&self, bucket: &str, id: &str) {
250 let key = (bucket.to_owned(), id.to_owned());
251 self.configs
252 .write()
253 .expect("inventory configs RwLock poisoned")
254 .remove(&key);
255 self.last_run
256 .write()
257 .expect("inventory last_run RwLock poisoned")
258 .remove(&key);
259 }
260
261 #[must_use]
265 pub fn due(&self, bucket: &str, id: &str, now: DateTime<Utc>) -> bool {
266 let key = (bucket.to_owned(), id.to_owned());
267 let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
268 let Some(cfg) = cfgs.get(&key) else {
269 return false;
270 };
271 let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
272 match runs.get(&key) {
273 None => true,
274 Some(prev) => {
275 let elapsed = now.signed_duration_since(*prev);
276 elapsed >= chrono::Duration::hours(i64::from(cfg.frequency_hours))
277 }
278 }
279 }
280
281 pub fn mark_run(&self, bucket: &str, id: &str, when: DateTime<Utc>) {
284 self.last_run
285 .write()
286 .expect("inventory last_run RwLock poisoned")
287 .insert((bucket.to_owned(), id.to_owned()), when);
288 }
289
290 pub fn to_json(&self) -> Result<String, serde_json::Error> {
292 let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
293 let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
294 let snap = InventorySnapshot {
295 configs: cfgs
296 .iter()
297 .map(|((b, i), v)| (join_key(b, i), v.clone()))
298 .collect(),
299 last_run: runs
300 .iter()
301 .map(|((b, i), v)| (join_key(b, i), *v))
302 .collect(),
303 };
304 serde_json::to_string(&snap)
305 }
306
307 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
310 let snap: InventorySnapshot = serde_json::from_str(s)?;
311 let mut configs: HashMap<(String, String), InventoryConfig> = HashMap::new();
312 for (k, v) in snap.configs {
313 if let Some(pair) = split_key(&k) {
314 configs.insert(pair, v);
315 }
316 }
317 let mut last_run: HashMap<(String, String), DateTime<Utc>> = HashMap::new();
318 for (k, v) in snap.last_run {
319 if let Some(pair) = split_key(&k) {
320 last_run.insert(pair, v);
321 }
322 }
323 Ok(Self {
324 configs: RwLock::new(configs),
325 last_run: RwLock::new(last_run),
326 })
327 }
328
329 pub fn run_once_for_test<I, F>(
341 &self,
342 bucket: &str,
343 id: &str,
344 rows: I,
345 now: DateTime<Utc>,
346 mut write_object: F,
347 ) -> Result<Vec<String>, RunError>
348 where
349 I: IntoIterator<Item = InventoryRow>,
350 F: FnMut(&str, &str, Vec<u8>) -> Result<(), RunError>,
351 {
352 let cfg = self
353 .get(bucket, id)
354 .ok_or_else(|| RunError::UnknownConfig(bucket.to_owned(), id.to_owned()))?;
355 let csv_bytes = render_csv(rows.into_iter());
356 let csv_md5 = md5_hex(&csv_bytes);
357 let csv_key = csv_destination_key(&cfg, now);
358 let manifest_key = manifest_destination_key(&cfg, now);
359 let manifest_body = render_manifest_json(
360 &cfg,
361 std::slice::from_ref(&csv_key),
362 std::slice::from_ref(&csv_md5),
363 now,
364 )
365 .into_bytes();
366 write_object(&cfg.destination_bucket, &csv_key, csv_bytes)?;
367 write_object(&cfg.destination_bucket, &manifest_key, manifest_body)?;
368 self.mark_run(bucket, id, now);
369 Ok(vec![csv_key, manifest_key])
370 }
371}
372
373pub fn render_csv(rows: impl Iterator<Item = InventoryRow>) -> Vec<u8> {
381 let mut out = Vec::new();
382 out.extend_from_slice(
383 b"Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus\n",
384 );
385 for row in rows {
386 let cells: [String; 10] = [
387 row.bucket,
388 row.key,
389 row.version_id.unwrap_or_default(),
390 row.is_latest.to_string(),
391 row.is_delete_marker.to_string(),
392 row.size.to_string(),
393 row.last_modified
394 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
395 row.etag,
396 row.storage_class,
397 row.encryption_status,
398 ];
399 for (i, cell) in cells.iter().enumerate() {
400 if i > 0 {
401 out.push(b',');
402 }
403 out.push(b'"');
404 for b in cell.as_bytes() {
405 if *b == b'"' {
406 out.extend_from_slice(b"\"\"");
407 } else {
408 out.push(*b);
409 }
410 }
411 out.push(b'"');
412 }
413 out.push(b'\n');
414 }
415 out
416}
417
418pub fn render_manifest_json(
424 config: &InventoryConfig,
425 csv_keys: &[String],
426 md5s: &[String],
427 written_at: DateTime<Utc>,
428) -> String {
429 let n = csv_keys.len().min(md5s.len());
433 let files_json: Vec<serde_json::Value> = (0..n)
434 .map(|i| {
435 serde_json::json!({
436 "key": csv_keys[i],
437 "size": 0,
443 "MD5checksum": md5s[i],
444 })
445 })
446 .collect();
447 let value = serde_json::json!({
448 "sourceBucket": config.bucket,
449 "destinationBucket": config.destination_bucket,
450 "version": "2016-11-30",
451 "creationTimestamp": written_at.timestamp_millis().to_string(),
452 "fileFormat": config.format.as_aws_str(),
453 "fileSchema": csv_header_schema(config),
454 "files": files_json,
455 });
456 serde_json::to_string_pretty(&value).expect("static JSON is always serialisable")
457}
458
459#[must_use]
463pub fn csv_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
464 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
465 let prefix = trim_trailing_slash(&config.destination_prefix);
466 format!(
467 "{prefix}/{src}/{id}/data/{stamp}.{ext}",
468 src = config.bucket,
469 id = config.id,
470 ext = config.format.file_extension()
471 )
472}
473
474#[must_use]
478pub fn manifest_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
479 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
480 let prefix = trim_trailing_slash(&config.destination_prefix);
481 format!(
482 "{prefix}/{src}/{id}/{stamp}/manifest.json",
483 src = config.bucket,
484 id = config.id
485 )
486}
487
488fn trim_trailing_slash(s: &str) -> &str {
489 s.strip_suffix('/').unwrap_or(s)
490}
491
492fn csv_header_schema(_cfg: &InventoryConfig) -> &'static str {
496 "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus"
497}
498
499fn md5_hex(bytes: &[u8]) -> String {
500 use md5::{Digest, Md5};
501 let mut h = Md5::new();
502 h.update(bytes);
503 let out = h.finalize();
504 let mut s = String::with_capacity(32);
505 for b in out {
506 s.push(hex_char(b >> 4));
507 s.push(hex_char(b & 0x0f));
508 }
509 s
510}
511
512fn hex_char(n: u8) -> char {
513 match n {
514 0..=9 => (b'0' + n) as char,
515 10..=15 => (b'a' + (n - 10)) as char,
516 _ => '0',
517 }
518}
519
520#[derive(Debug, thiserror::Error)]
524pub enum RunError {
525 #[error("no inventory configuration for bucket={0} id={1}")]
526 UnknownConfig(String, String),
527 #[error("destination write failed: {0}")]
528 Write(String),
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534
535 fn sample_config() -> InventoryConfig {
536 InventoryConfig {
537 id: "daily-csv".into(),
538 bucket: "src".into(),
539 destination_bucket: "dst".into(),
540 destination_prefix: "inv".into(),
541 frequency_hours: 24,
542 format: InventoryFormat::Csv,
543 included_object_versions: IncludedVersions::Current,
544 }
545 }
546
547 fn sample_row(key: &str, size: u64) -> InventoryRow {
548 InventoryRow {
549 bucket: "src".into(),
550 key: key.into(),
551 version_id: None,
552 is_latest: true,
553 is_delete_marker: false,
554 size,
555 last_modified: DateTime::parse_from_rfc3339("2026-05-13T12:34:56.789Z")
556 .unwrap()
557 .with_timezone(&Utc),
558 etag: "abc123".into(),
559 storage_class: "STANDARD".into(),
560 encryption_status: "NOT-SSE".into(),
561 }
562 }
563
564 #[test]
565 fn config_json_round_trip() {
566 let m = InventoryManager::new();
567 m.put(sample_config());
568 let json = m.to_json().expect("to_json");
569 let m2 = InventoryManager::from_json(&json).expect("from_json");
570 assert_eq!(m2.get("src", "daily-csv"), Some(sample_config()));
571 }
572
573 #[test]
574 fn due_returns_true_when_never_run() {
575 let m = InventoryManager::new();
576 m.put(sample_config());
577 assert!(m.due("src", "daily-csv", Utc::now()));
578 }
579
580 #[test]
581 fn due_returns_true_when_interval_elapsed() {
582 let m = InventoryManager::new();
583 m.put(sample_config());
584 let then = Utc::now() - chrono::Duration::hours(25);
585 m.mark_run("src", "daily-csv", then);
586 assert!(m.due("src", "daily-csv", Utc::now()));
587 }
588
589 #[test]
590 fn due_returns_false_when_interval_not_yet_elapsed() {
591 let m = InventoryManager::new();
592 m.put(sample_config());
593 let just_now = Utc::now() - chrono::Duration::minutes(5);
594 m.mark_run("src", "daily-csv", just_now);
595 assert!(!m.due("src", "daily-csv", Utc::now()));
596 }
597
598 #[test]
599 fn due_returns_false_when_config_missing() {
600 let m = InventoryManager::new();
601 assert!(!m.due("ghost", "nothing", Utc::now()));
602 }
603
604 #[test]
605 fn list_for_bucket_filters_and_sorts() {
606 let m = InventoryManager::new();
607 let mut a = sample_config();
608 a.id = "z-last".into();
609 let mut b = sample_config();
610 b.id = "a-first".into();
611 let mut c = sample_config();
612 c.bucket = "other".into();
613 c.id = "should-not-appear".into();
614 m.put(a);
615 m.put(b);
616 m.put(c);
617 let list = m.list_for_bucket("src");
618 assert_eq!(list.len(), 2);
619 assert_eq!(list[0].id, "a-first");
620 assert_eq!(list[1].id, "z-last");
621 }
622
623 #[test]
624 fn render_csv_matches_aws_header_and_quotes_cells() {
625 let rows = vec![
626 sample_row("a/b.txt", 100),
627 sample_row("comma,here.txt", 200),
628 sample_row("quote\"inside.txt", 300),
629 ];
630 let csv = render_csv(rows.into_iter());
631 let s = String::from_utf8(csv).expect("utf8");
632 let mut lines = s.lines();
633 assert_eq!(
634 lines.next().unwrap(),
635 "Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus"
636 );
637 let row1 = lines.next().unwrap();
639 assert!(row1.starts_with("\"src\",\"a/b.txt\","));
640 assert!(row1.contains(",\"100\","));
641 assert!(row1.contains("\"2026-05-13T12:34:56.789Z\""));
642 let row2 = lines.next().unwrap();
644 assert!(row2.contains("\"comma,here.txt\""));
645 let row3 = lines.next().unwrap();
647 assert!(row3.contains("\"quote\"\"inside.txt\""));
648 assert_eq!(lines.next(), None);
649 }
650
651 #[test]
652 fn render_manifest_json_carries_required_fields() {
653 let cfg = sample_config();
654 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
655 .unwrap()
656 .with_timezone(&Utc);
657 let manifest = render_manifest_json(
658 &cfg,
659 &["inv/src/daily-csv/data/2026-05-13T000000Z.csv".into()],
660 &["d41d8cd98f00b204e9800998ecf8427e".into()],
661 now,
662 );
663 let v: serde_json::Value = serde_json::from_str(&manifest).expect("manifest must be JSON");
664 assert_eq!(v["sourceBucket"], "src");
665 assert_eq!(v["destinationBucket"], "dst");
666 assert_eq!(v["fileFormat"], "CSV");
667 assert_eq!(v["version"], "2016-11-30");
668 let files = v["files"].as_array().expect("files array");
669 assert_eq!(files.len(), 1);
670 assert_eq!(
671 files[0]["key"],
672 "inv/src/daily-csv/data/2026-05-13T000000Z.csv"
673 );
674 assert_eq!(files[0]["MD5checksum"], "d41d8cd98f00b204e9800998ecf8427e");
675 assert_eq!(
676 v["creationTimestamp"],
677 now.timestamp_millis().to_string()
678 );
679 let schema = v["fileSchema"].as_str().expect("fileSchema string");
680 assert!(schema.starts_with("Bucket, Key, VersionId"));
681 assert!(schema.ends_with("StorageClass, EncryptionStatus"));
682 }
683
684 #[test]
685 fn destination_keys_are_under_prefix_and_namespaced_by_source_bucket() {
686 let cfg = sample_config();
687 let now = DateTime::parse_from_rfc3339("2026-05-13T01:02:03.000Z")
688 .unwrap()
689 .with_timezone(&Utc);
690 let csv_key = csv_destination_key(&cfg, now);
691 let manifest_key = manifest_destination_key(&cfg, now);
692 assert_eq!(csv_key, "inv/src/daily-csv/data/2026-05-13T010203Z.csv");
693 assert_eq!(
694 manifest_key,
695 "inv/src/daily-csv/2026-05-13T010203Z/manifest.json"
696 );
697 let mut cfg2 = cfg.clone();
699 cfg2.destination_prefix = "inv/".into();
700 assert_eq!(
701 csv_destination_key(&cfg2, now),
702 "inv/src/daily-csv/data/2026-05-13T010203Z.csv"
703 );
704 }
705
706 #[test]
707 fn run_once_writes_csv_and_manifest_and_marks_run() {
708 let m = InventoryManager::new();
709 m.put(sample_config());
710 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
711 .unwrap()
712 .with_timezone(&Utc);
713 let written = std::sync::Mutex::new(Vec::<(String, String, Vec<u8>)>::new());
714 let keys = m
715 .run_once_for_test(
716 "src",
717 "daily-csv",
718 vec![sample_row("a", 1), sample_row("b", 2)],
719 now,
720 |dst_bucket, dst_key, body| {
721 written
722 .lock()
723 .unwrap()
724 .push((dst_bucket.to_owned(), dst_key.to_owned(), body));
725 Ok(())
726 },
727 )
728 .expect("run_once_for_test");
729 assert_eq!(keys.len(), 2);
730 assert!(keys[0].ends_with(".csv"));
731 assert!(keys[1].ends_with("manifest.json"));
732 let written = written.into_inner().unwrap();
733 assert_eq!(written.len(), 2);
734 for (bucket, _, _) in &written {
735 assert_eq!(bucket, "dst");
736 }
737 assert!(!m.due("src", "daily-csv", now + chrono::Duration::hours(1)));
740 assert!(m.due("src", "daily-csv", now + chrono::Duration::hours(25)));
741 }
742
743 #[test]
744 fn run_once_unknown_config_is_an_error() {
745 let m = InventoryManager::new();
746 let now = Utc::now();
747 let err = m.run_once_for_test(
748 "ghost",
749 "nothing",
750 std::iter::empty(),
751 now,
752 |_, _, _| Ok(()),
753 );
754 assert!(matches!(err, Err(RunError::UnknownConfig(_, _))));
755 }
756}