1use std::collections::HashMap;
48use std::sync::Arc;
49use std::sync::RwLock;
50
51use chrono::{DateTime, Utc};
52use s3s::S3;
53use s3s::S3Request;
54use s3s::dto::*;
55use serde::{Deserialize, Serialize};
56use tracing::warn;
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub enum InventoryFormat {
62 Csv,
63}
64
65impl InventoryFormat {
66 #[must_use]
68 pub fn as_aws_str(self) -> &'static str {
69 match self {
70 Self::Csv => "CSV",
71 }
72 }
73
74 #[must_use]
76 pub fn file_extension(self) -> &'static str {
77 match self {
78 Self::Csv => "csv",
79 }
80 }
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IncludedVersions {
88 Current,
89 All,
90}
91
92impl IncludedVersions {
93 #[must_use]
95 pub fn as_aws_str(self) -> &'static str {
96 match self {
97 Self::Current => "Current",
98 Self::All => "All",
99 }
100 }
101
102 #[must_use]
106 pub fn from_aws_str(s: &str) -> Self {
107 if s.eq_ignore_ascii_case("All") {
108 Self::All
109 } else {
110 Self::Current
111 }
112 }
113}
114
115#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub struct InventoryConfig {
123 pub id: String,
124 pub bucket: String,
125 pub destination_bucket: String,
126 pub destination_prefix: String,
127 pub frequency_hours: u32,
128 pub format: InventoryFormat,
129 pub included_object_versions: IncludedVersions,
130}
131
132impl InventoryConfig {
133 #[must_use]
136 pub fn daily_csv(
137 id: impl Into<String>,
138 bucket: impl Into<String>,
139 destination_bucket: impl Into<String>,
140 destination_prefix: impl Into<String>,
141 ) -> Self {
142 Self {
143 id: id.into(),
144 bucket: bucket.into(),
145 destination_bucket: destination_bucket.into(),
146 destination_prefix: destination_prefix.into(),
147 frequency_hours: 24,
148 format: InventoryFormat::Csv,
149 included_object_versions: IncludedVersions::Current,
150 }
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct InventoryRow {
157 pub bucket: String,
158 pub key: String,
159 pub version_id: Option<String>,
160 pub is_latest: bool,
161 pub is_delete_marker: bool,
162 pub size: u64,
163 pub last_modified: DateTime<Utc>,
164 pub etag: String,
165 pub storage_class: String,
166 pub encryption_status: String,
169}
170
171#[derive(Debug, Default, Serialize, Deserialize)]
176struct InventorySnapshot {
177 configs: HashMap<String, InventoryConfig>,
180 last_run: HashMap<String, DateTime<Utc>>,
181}
182
183const KEY_SEP: char = '\u{1F}';
186
187fn join_key(bucket: &str, id: &str) -> String {
188 let mut s = String::with_capacity(bucket.len() + 1 + id.len());
189 s.push_str(bucket);
190 s.push(KEY_SEP);
191 s.push_str(id);
192 s
193}
194
195fn split_key(s: &str) -> Option<(String, String)> {
196 s.split_once(KEY_SEP)
197 .map(|(b, i)| (b.to_owned(), i.to_owned()))
198}
199
200#[derive(Debug, Default)]
202pub struct InventoryManager {
203 configs: RwLock<HashMap<(String, String), InventoryConfig>>,
204 last_run: RwLock<HashMap<(String, String), DateTime<Utc>>>,
205}
206
207impl InventoryManager {
208 #[must_use]
209 pub fn new() -> Self {
210 Self::default()
211 }
212
213 pub fn put(&self, config: InventoryConfig) {
218 let key = (config.bucket.clone(), config.id.clone());
219 self.last_run
220 .write()
221 .expect("inventory last_run RwLock poisoned")
222 .remove(&key);
223 self.configs
224 .write()
225 .expect("inventory configs RwLock poisoned")
226 .insert(key, config);
227 }
228
229 #[must_use]
231 pub fn get(&self, bucket: &str, id: &str) -> Option<InventoryConfig> {
232 self.configs
233 .read()
234 .expect("inventory configs RwLock poisoned")
235 .get(&(bucket.to_owned(), id.to_owned()))
236 .cloned()
237 }
238
239 #[must_use]
242 pub fn list_for_bucket(&self, bucket: &str) -> Vec<InventoryConfig> {
243 let map = self.configs.read().expect("inventory configs RwLock poisoned");
244 let mut out: Vec<InventoryConfig> = map
245 .iter()
246 .filter(|((b, _id), _)| b == bucket)
247 .map(|(_, cfg)| cfg.clone())
248 .collect();
249 out.sort_by(|a, b| a.id.cmp(&b.id));
250 out
251 }
252
253 #[must_use]
260 pub fn list_all(&self) -> Vec<InventoryConfig> {
261 let map = self.configs.read().expect("inventory configs RwLock poisoned");
262 let mut out: Vec<InventoryConfig> = map.values().cloned().collect();
263 out.sort_by(|a, b| a.bucket.cmp(&b.bucket).then_with(|| a.id.cmp(&b.id)));
264 out
265 }
266
267 pub fn delete(&self, bucket: &str, id: &str) {
269 let key = (bucket.to_owned(), id.to_owned());
270 self.configs
271 .write()
272 .expect("inventory configs RwLock poisoned")
273 .remove(&key);
274 self.last_run
275 .write()
276 .expect("inventory last_run RwLock poisoned")
277 .remove(&key);
278 }
279
280 #[must_use]
284 pub fn due(&self, bucket: &str, id: &str, now: DateTime<Utc>) -> bool {
285 let key = (bucket.to_owned(), id.to_owned());
286 let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
287 let Some(cfg) = cfgs.get(&key) else {
288 return false;
289 };
290 let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
291 match runs.get(&key) {
292 None => true,
293 Some(prev) => {
294 let elapsed = now.signed_duration_since(*prev);
295 elapsed >= chrono::Duration::hours(i64::from(cfg.frequency_hours))
296 }
297 }
298 }
299
300 pub fn mark_run(&self, bucket: &str, id: &str, when: DateTime<Utc>) {
303 self.last_run
304 .write()
305 .expect("inventory last_run RwLock poisoned")
306 .insert((bucket.to_owned(), id.to_owned()), when);
307 }
308
309 pub fn to_json(&self) -> Result<String, serde_json::Error> {
311 let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
312 let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
313 let snap = InventorySnapshot {
314 configs: cfgs
315 .iter()
316 .map(|((b, i), v)| (join_key(b, i), v.clone()))
317 .collect(),
318 last_run: runs
319 .iter()
320 .map(|((b, i), v)| (join_key(b, i), *v))
321 .collect(),
322 };
323 serde_json::to_string(&snap)
324 }
325
326 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
329 let snap: InventorySnapshot = serde_json::from_str(s)?;
330 let mut configs: HashMap<(String, String), InventoryConfig> = HashMap::new();
331 for (k, v) in snap.configs {
332 if let Some(pair) = split_key(&k) {
333 configs.insert(pair, v);
334 }
335 }
336 let mut last_run: HashMap<(String, String), DateTime<Utc>> = HashMap::new();
337 for (k, v) in snap.last_run {
338 if let Some(pair) = split_key(&k) {
339 last_run.insert(pair, v);
340 }
341 }
342 Ok(Self {
343 configs: RwLock::new(configs),
344 last_run: RwLock::new(last_run),
345 })
346 }
347
348 pub fn run_once_for_test<I, F>(
360 &self,
361 bucket: &str,
362 id: &str,
363 rows: I,
364 now: DateTime<Utc>,
365 mut write_object: F,
366 ) -> Result<Vec<String>, RunError>
367 where
368 I: IntoIterator<Item = InventoryRow>,
369 F: FnMut(&str, &str, Vec<u8>) -> Result<(), RunError>,
370 {
371 let cfg = self
372 .get(bucket, id)
373 .ok_or_else(|| RunError::UnknownConfig(bucket.to_owned(), id.to_owned()))?;
374 let csv_bytes = render_csv(rows.into_iter());
375 let csv_md5 = md5_hex(&csv_bytes);
376 let csv_key = csv_destination_key(&cfg, now);
377 let manifest_key = manifest_destination_key(&cfg, now);
378 let manifest_body = render_manifest_json(
379 &cfg,
380 std::slice::from_ref(&csv_key),
381 std::slice::from_ref(&csv_md5),
382 now,
383 )
384 .into_bytes();
385 write_object(&cfg.destination_bucket, &csv_key, csv_bytes)?;
386 write_object(&cfg.destination_bucket, &manifest_key, manifest_body)?;
387 self.mark_run(bucket, id, now);
388 Ok(vec![csv_key, manifest_key])
389 }
390}
391
392pub fn render_csv(rows: impl Iterator<Item = InventoryRow>) -> Vec<u8> {
400 let mut out = Vec::new();
401 out.extend_from_slice(
402 b"Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus\n",
403 );
404 for row in rows {
405 let cells: [String; 10] = [
406 row.bucket,
407 row.key,
408 row.version_id.unwrap_or_default(),
409 row.is_latest.to_string(),
410 row.is_delete_marker.to_string(),
411 row.size.to_string(),
412 row.last_modified
413 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
414 row.etag,
415 row.storage_class,
416 row.encryption_status,
417 ];
418 for (i, cell) in cells.iter().enumerate() {
419 if i > 0 {
420 out.push(b',');
421 }
422 out.push(b'"');
423 for b in cell.as_bytes() {
424 if *b == b'"' {
425 out.extend_from_slice(b"\"\"");
426 } else {
427 out.push(*b);
428 }
429 }
430 out.push(b'"');
431 }
432 out.push(b'\n');
433 }
434 out
435}
436
437pub fn render_manifest_json(
443 config: &InventoryConfig,
444 csv_keys: &[String],
445 md5s: &[String],
446 written_at: DateTime<Utc>,
447) -> String {
448 let n = csv_keys.len().min(md5s.len());
452 let files_json: Vec<serde_json::Value> = (0..n)
453 .map(|i| {
454 serde_json::json!({
455 "key": csv_keys[i],
456 "size": 0,
462 "MD5checksum": md5s[i],
463 })
464 })
465 .collect();
466 let value = serde_json::json!({
467 "sourceBucket": config.bucket,
468 "destinationBucket": config.destination_bucket,
469 "version": "2016-11-30",
470 "creationTimestamp": written_at.timestamp_millis().to_string(),
471 "fileFormat": config.format.as_aws_str(),
472 "fileSchema": csv_header_schema(config),
473 "files": files_json,
474 });
475 serde_json::to_string_pretty(&value).expect("static JSON is always serialisable")
476}
477
478#[must_use]
482pub fn csv_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
483 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
484 let prefix = trim_trailing_slash(&config.destination_prefix);
485 format!(
486 "{prefix}/{src}/{id}/data/{stamp}.{ext}",
487 src = config.bucket,
488 id = config.id,
489 ext = config.format.file_extension()
490 )
491}
492
493#[must_use]
497pub fn manifest_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
498 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
499 let prefix = trim_trailing_slash(&config.destination_prefix);
500 format!(
501 "{prefix}/{src}/{id}/{stamp}/manifest.json",
502 src = config.bucket,
503 id = config.id
504 )
505}
506
507fn trim_trailing_slash(s: &str) -> &str {
508 s.strip_suffix('/').unwrap_or(s)
509}
510
511fn csv_header_schema(_cfg: &InventoryConfig) -> &'static str {
515 "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus"
516}
517
518fn md5_hex(bytes: &[u8]) -> String {
519 use md5::{Digest, Md5};
520 let mut h = Md5::new();
521 h.update(bytes);
522 let out = h.finalize();
523 let mut s = String::with_capacity(32);
524 for b in out {
525 s.push(hex_char(b >> 4));
526 s.push(hex_char(b & 0x0f));
527 }
528 s
529}
530
531fn hex_char(n: u8) -> char {
532 match n {
533 0..=9 => (b'0' + n) as char,
534 10..=15 => (b'a' + (n - 10)) as char,
535 _ => '0',
536 }
537}
538
539#[derive(Debug, thiserror::Error)]
543pub enum RunError {
544 #[error("no inventory configuration for bucket={0} id={1}")]
545 UnknownConfig(String, String),
546 #[error("destination write failed: {0}")]
547 Write(String),
548}
549
550#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
561pub struct ScanReport {
562 pub buckets_scanned: usize,
565 pub configs_evaluated: usize,
568 pub csvs_written: usize,
572 pub objects_listed: usize,
576 pub errors: usize,
580}
581
582fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
590 S3Request {
591 input,
592 method,
593 uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
594 headers: http::HeaderMap::new(),
595 extensions: http::Extensions::new(),
596 credentials: None,
597 region: None,
598 service: None,
599 trailing_headers: None,
600 }
601}
602
603fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
609 let mut buf = Vec::new();
610 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
611 let s = std::str::from_utf8(&buf).ok()?;
612 chrono::DateTime::parse_from_rfc3339(s)
613 .ok()
614 .map(|dt| dt.with_timezone(&Utc))
615}
616
617fn encryption_status_from_head(head: &HeadObjectOutput) -> String {
624 if head.sse_customer_algorithm.is_some() {
625 return "SSE-C".to_owned();
626 }
627 if head.ssekms_key_id.is_some() {
628 return "SSE-KMS".to_owned();
629 }
630 if let Some(sse) = head.server_side_encryption.as_ref() {
631 let s = sse.as_str();
632 if s.eq_ignore_ascii_case("aws:kms") || s.eq_ignore_ascii_case("aws:kms:dsse") {
633 return "SSE-KMS".to_owned();
634 }
635 if !s.is_empty() {
636 return "SSE-S4".to_owned();
640 }
641 }
642 if head
643 .metadata
644 .as_ref()
645 .and_then(|m| m.get("s4-encrypted"))
646 .is_some()
647 {
648 return "SSE-S4".to_owned();
649 }
650 "NOT-SSE".to_owned()
651}
652
653pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
691 s4: &Arc<crate::S4Service<B>>,
692) -> Result<ScanReport, String> {
693 let Some(mgr) = s4.inventory_manager().cloned() else {
694 return Ok(ScanReport::default());
697 };
698 let configs = mgr.list_all();
699 if configs.is_empty() {
700 return Ok(ScanReport::default());
701 }
702 let now = Utc::now();
703 let mut report = ScanReport {
704 configs_evaluated: configs.len(),
705 ..ScanReport::default()
706 };
707 let mut walked_buckets: std::collections::HashSet<String> = std::collections::HashSet::new();
711 for cfg in configs {
712 if !mgr.due(&cfg.bucket, &cfg.id, now) {
713 continue;
714 }
715 walked_buckets.insert(cfg.bucket.clone());
716 match scan_one_config(s4, &cfg, now, &mut report).await {
717 Ok(()) => {
718 mgr.mark_run(&cfg.bucket, &cfg.id, now);
719 report.csvs_written = report.csvs_written.saturating_add(1);
720 }
721 Err(e) => {
722 warn!(
723 bucket = %cfg.bucket,
724 id = %cfg.id,
725 error = %e,
726 "S4 inventory: scan failed for config",
727 );
728 report.errors = report.errors.saturating_add(1);
729 }
730 }
731 }
732 report.buckets_scanned = walked_buckets.len();
733 Ok(report)
734}
735
736async fn scan_one_config<B: S3 + Send + Sync + 'static>(
740 s4: &Arc<crate::S4Service<B>>,
741 cfg: &InventoryConfig,
742 now: DateTime<Utc>,
743 report: &mut ScanReport,
744) -> Result<(), String> {
745 let mut rows: Vec<InventoryRow> = Vec::new();
746 let mut continuation: Option<String> = None;
747 loop {
748 let list_input = ListObjectsV2Input {
749 bucket: cfg.bucket.clone(),
750 continuation_token: continuation.clone(),
751 ..Default::default()
752 };
753 let list_req = synthetic_request(
754 list_input,
755 http::Method::GET,
756 &format!("/{src}?list-type=2", src = cfg.bucket),
757 );
758 let resp = s4
759 .as_ref()
760 .list_objects_v2(list_req)
761 .await
762 .map_err(|e| format!("list_objects_v2: {e}"))?;
763 let output = resp.output;
764 let contents = output.contents.unwrap_or_default();
765 for obj in &contents {
766 let Some(key) = obj.key.as_deref() else {
767 continue;
768 };
769 if key.ends_with(".s4index") {
774 continue;
775 }
776 report.objects_listed = report.objects_listed.saturating_add(1);
777 let head_input = HeadObjectInput {
782 bucket: cfg.bucket.clone(),
783 key: key.to_owned(),
784 ..Default::default()
785 };
786 let head_req = synthetic_request(
787 head_input,
788 http::Method::HEAD,
789 &format!("/{src}/{key}", src = cfg.bucket),
790 );
791 let head = match s4.as_ref().head_object(head_req).await {
792 Ok(r) => r.output,
793 Err(e) => {
794 warn!(
795 bucket = %cfg.bucket,
796 key = %key,
797 error = %e,
798 "S4 inventory: head_object failed; emitting row with listing-only metadata",
799 );
800 HeadObjectOutput::default()
801 }
802 };
803 let size = head
804 .content_length
805 .unwrap_or_else(|| obj.size.unwrap_or(0))
806 .max(0) as u64;
807 let last_modified = head
808 .last_modified
809 .as_ref()
810 .and_then(timestamp_to_chrono_utc)
811 .or_else(|| obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc))
812 .unwrap_or(now);
813 let etag: String = head
814 .e_tag
815 .as_ref()
816 .or(obj.e_tag.as_ref())
817 .map(|e| e.value().to_owned())
818 .unwrap_or_default();
819 let storage_class = head
820 .storage_class
821 .as_ref()
822 .map(|s| s.as_str().to_owned())
823 .or_else(|| obj.storage_class.as_ref().map(|s| s.as_str().to_owned()))
824 .unwrap_or_else(|| "STANDARD".to_owned());
825 let encryption_status = encryption_status_from_head(&head);
826 rows.push(InventoryRow {
827 bucket: cfg.bucket.clone(),
828 key: key.to_owned(),
829 version_id: None,
830 is_latest: true,
831 is_delete_marker: false,
832 size,
833 last_modified,
834 etag,
835 storage_class,
836 encryption_status,
837 });
838 }
839 if output.is_truncated.unwrap_or(false) {
840 continuation = output.next_continuation_token;
841 if continuation.is_none() {
842 break;
846 }
847 } else {
848 break;
849 }
850 }
851
852 let csv_bytes = render_csv(rows.into_iter());
859 let csv_md5 = md5_hex(&csv_bytes);
860 let csv_key = csv_destination_key(cfg, now);
861 let manifest_key = manifest_destination_key(cfg, now);
862 let manifest_body = render_manifest_json(
863 cfg,
864 std::slice::from_ref(&csv_key),
865 std::slice::from_ref(&csv_md5),
866 now,
867 )
868 .into_bytes();
869 put_destination_object(s4, &cfg.destination_bucket, &csv_key, csv_bytes).await?;
870 put_destination_object(s4, &cfg.destination_bucket, &manifest_key, manifest_body).await?;
871 Ok(())
872}
873
874async fn put_destination_object<B: S3 + Send + Sync + 'static>(
880 s4: &Arc<crate::S4Service<B>>,
881 dst_bucket: &str,
882 dst_key: &str,
883 body: Vec<u8>,
884) -> Result<(), String> {
885 let body_bytes = bytes::Bytes::from(body);
886 let input = PutObjectInput {
887 bucket: dst_bucket.to_owned(),
888 key: dst_key.to_owned(),
889 body: Some(crate::blob::bytes_to_blob(body_bytes)),
890 ..Default::default()
891 };
892 let req = synthetic_request(
893 input,
894 http::Method::PUT,
895 &format!("/{dst_bucket}/{dst_key}"),
896 );
897 s4.as_ref()
898 .put_object(req)
899 .await
900 .map(|_| ())
901 .map_err(|e| format!("destination put_object {dst_bucket}/{dst_key}: {e}"))
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907
908 fn sample_config() -> InventoryConfig {
909 InventoryConfig {
910 id: "daily-csv".into(),
911 bucket: "src".into(),
912 destination_bucket: "dst".into(),
913 destination_prefix: "inv".into(),
914 frequency_hours: 24,
915 format: InventoryFormat::Csv,
916 included_object_versions: IncludedVersions::Current,
917 }
918 }
919
920 fn sample_row(key: &str, size: u64) -> InventoryRow {
921 InventoryRow {
922 bucket: "src".into(),
923 key: key.into(),
924 version_id: None,
925 is_latest: true,
926 is_delete_marker: false,
927 size,
928 last_modified: DateTime::parse_from_rfc3339("2026-05-13T12:34:56.789Z")
929 .unwrap()
930 .with_timezone(&Utc),
931 etag: "abc123".into(),
932 storage_class: "STANDARD".into(),
933 encryption_status: "NOT-SSE".into(),
934 }
935 }
936
937 #[test]
938 fn config_json_round_trip() {
939 let m = InventoryManager::new();
940 m.put(sample_config());
941 let json = m.to_json().expect("to_json");
942 let m2 = InventoryManager::from_json(&json).expect("from_json");
943 assert_eq!(m2.get("src", "daily-csv"), Some(sample_config()));
944 }
945
946 #[test]
947 fn due_returns_true_when_never_run() {
948 let m = InventoryManager::new();
949 m.put(sample_config());
950 assert!(m.due("src", "daily-csv", Utc::now()));
951 }
952
953 #[test]
954 fn due_returns_true_when_interval_elapsed() {
955 let m = InventoryManager::new();
956 m.put(sample_config());
957 let then = Utc::now() - chrono::Duration::hours(25);
958 m.mark_run("src", "daily-csv", then);
959 assert!(m.due("src", "daily-csv", Utc::now()));
960 }
961
962 #[test]
963 fn due_returns_false_when_interval_not_yet_elapsed() {
964 let m = InventoryManager::new();
965 m.put(sample_config());
966 let just_now = Utc::now() - chrono::Duration::minutes(5);
967 m.mark_run("src", "daily-csv", just_now);
968 assert!(!m.due("src", "daily-csv", Utc::now()));
969 }
970
971 #[test]
972 fn due_returns_false_when_config_missing() {
973 let m = InventoryManager::new();
974 assert!(!m.due("ghost", "nothing", Utc::now()));
975 }
976
977 #[test]
978 fn list_for_bucket_filters_and_sorts() {
979 let m = InventoryManager::new();
980 let mut a = sample_config();
981 a.id = "z-last".into();
982 let mut b = sample_config();
983 b.id = "a-first".into();
984 let mut c = sample_config();
985 c.bucket = "other".into();
986 c.id = "should-not-appear".into();
987 m.put(a);
988 m.put(b);
989 m.put(c);
990 let list = m.list_for_bucket("src");
991 assert_eq!(list.len(), 2);
992 assert_eq!(list[0].id, "a-first");
993 assert_eq!(list[1].id, "z-last");
994 }
995
996 #[test]
997 fn render_csv_matches_aws_header_and_quotes_cells() {
998 let rows = vec![
999 sample_row("a/b.txt", 100),
1000 sample_row("comma,here.txt", 200),
1001 sample_row("quote\"inside.txt", 300),
1002 ];
1003 let csv = render_csv(rows.into_iter());
1004 let s = String::from_utf8(csv).expect("utf8");
1005 let mut lines = s.lines();
1006 assert_eq!(
1007 lines.next().unwrap(),
1008 "Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus"
1009 );
1010 let row1 = lines.next().unwrap();
1012 assert!(row1.starts_with("\"src\",\"a/b.txt\","));
1013 assert!(row1.contains(",\"100\","));
1014 assert!(row1.contains("\"2026-05-13T12:34:56.789Z\""));
1015 let row2 = lines.next().unwrap();
1017 assert!(row2.contains("\"comma,here.txt\""));
1018 let row3 = lines.next().unwrap();
1020 assert!(row3.contains("\"quote\"\"inside.txt\""));
1021 assert_eq!(lines.next(), None);
1022 }
1023
1024 #[test]
1025 fn render_manifest_json_carries_required_fields() {
1026 let cfg = sample_config();
1027 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1028 .unwrap()
1029 .with_timezone(&Utc);
1030 let manifest = render_manifest_json(
1031 &cfg,
1032 &["inv/src/daily-csv/data/2026-05-13T000000Z.csv".into()],
1033 &["d41d8cd98f00b204e9800998ecf8427e".into()],
1034 now,
1035 );
1036 let v: serde_json::Value = serde_json::from_str(&manifest).expect("manifest must be JSON");
1037 assert_eq!(v["sourceBucket"], "src");
1038 assert_eq!(v["destinationBucket"], "dst");
1039 assert_eq!(v["fileFormat"], "CSV");
1040 assert_eq!(v["version"], "2016-11-30");
1041 let files = v["files"].as_array().expect("files array");
1042 assert_eq!(files.len(), 1);
1043 assert_eq!(
1044 files[0]["key"],
1045 "inv/src/daily-csv/data/2026-05-13T000000Z.csv"
1046 );
1047 assert_eq!(files[0]["MD5checksum"], "d41d8cd98f00b204e9800998ecf8427e");
1048 assert_eq!(
1049 v["creationTimestamp"],
1050 now.timestamp_millis().to_string()
1051 );
1052 let schema = v["fileSchema"].as_str().expect("fileSchema string");
1053 assert!(schema.starts_with("Bucket, Key, VersionId"));
1054 assert!(schema.ends_with("StorageClass, EncryptionStatus"));
1055 }
1056
1057 #[test]
1058 fn destination_keys_are_under_prefix_and_namespaced_by_source_bucket() {
1059 let cfg = sample_config();
1060 let now = DateTime::parse_from_rfc3339("2026-05-13T01:02:03.000Z")
1061 .unwrap()
1062 .with_timezone(&Utc);
1063 let csv_key = csv_destination_key(&cfg, now);
1064 let manifest_key = manifest_destination_key(&cfg, now);
1065 assert_eq!(csv_key, "inv/src/daily-csv/data/2026-05-13T010203Z.csv");
1066 assert_eq!(
1067 manifest_key,
1068 "inv/src/daily-csv/2026-05-13T010203Z/manifest.json"
1069 );
1070 let mut cfg2 = cfg.clone();
1072 cfg2.destination_prefix = "inv/".into();
1073 assert_eq!(
1074 csv_destination_key(&cfg2, now),
1075 "inv/src/daily-csv/data/2026-05-13T010203Z.csv"
1076 );
1077 }
1078
1079 #[test]
1080 fn run_once_writes_csv_and_manifest_and_marks_run() {
1081 let m = InventoryManager::new();
1082 m.put(sample_config());
1083 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1084 .unwrap()
1085 .with_timezone(&Utc);
1086 let written = std::sync::Mutex::new(Vec::<(String, String, Vec<u8>)>::new());
1087 let keys = m
1088 .run_once_for_test(
1089 "src",
1090 "daily-csv",
1091 vec![sample_row("a", 1), sample_row("b", 2)],
1092 now,
1093 |dst_bucket, dst_key, body| {
1094 written
1095 .lock()
1096 .unwrap()
1097 .push((dst_bucket.to_owned(), dst_key.to_owned(), body));
1098 Ok(())
1099 },
1100 )
1101 .expect("run_once_for_test");
1102 assert_eq!(keys.len(), 2);
1103 assert!(keys[0].ends_with(".csv"));
1104 assert!(keys[1].ends_with("manifest.json"));
1105 let written = written.into_inner().unwrap();
1106 assert_eq!(written.len(), 2);
1107 for (bucket, _, _) in &written {
1108 assert_eq!(bucket, "dst");
1109 }
1110 assert!(!m.due("src", "daily-csv", now + chrono::Duration::hours(1)));
1113 assert!(m.due("src", "daily-csv", now + chrono::Duration::hours(25)));
1114 }
1115
1116 #[test]
1117 fn run_once_unknown_config_is_an_error() {
1118 let m = InventoryManager::new();
1119 let now = Utc::now();
1120 let err = m.run_once_for_test(
1121 "ghost",
1122 "nothing",
1123 std::iter::empty(),
1124 now,
1125 |_, _, _| Ok(()),
1126 );
1127 assert!(matches!(err, Err(RunError::UnknownConfig(_, _))));
1128 }
1129
1130 use std::collections::HashMap as StdHashMap;
1141 use std::sync::Mutex as StdMutex;
1142
1143 use bytes::Bytes;
1144 use s3s::dto as dto2;
1145 use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1146 use s4_codec::dispatcher::AlwaysDispatcher;
1147 use s4_codec::passthrough::Passthrough;
1148 use s4_codec::{CodecKind, CodecRegistry};
1149
1150 use crate::S4Service;
1151
1152 #[derive(Default)]
1153 struct InvScannerMemBackend {
1154 objects: StdMutex<StdHashMap<(String, String), InvScannerStored>>,
1155 }
1156
1157 #[derive(Clone)]
1158 struct InvScannerStored {
1159 body: Bytes,
1160 last_modified: dto2::Timestamp,
1161 }
1162
1163 impl InvScannerMemBackend {
1164 fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1165 self.objects.lock().unwrap().insert(
1166 (bucket.to_owned(), key.to_owned()),
1167 InvScannerStored {
1168 body,
1169 last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1170 },
1171 );
1172 }
1173 }
1174
1175 #[async_trait::async_trait]
1176 impl S3 for InvScannerMemBackend {
1177 async fn put_object(
1178 &self,
1179 req: S3Request<dto2::PutObjectInput>,
1180 ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1181 let body = match req.input.body {
1184 Some(blob) => crate::blob::collect_blob(blob, usize::MAX)
1185 .await
1186 .map_err(|e| {
1187 S3Error::with_message(S3ErrorCode::InternalError, format!("{e}"))
1188 })?,
1189 None => Bytes::new(),
1190 };
1191 self.put_now(&req.input.bucket, &req.input.key, body);
1192 Ok(S3Response::new(dto2::PutObjectOutput::default()))
1193 }
1194
1195 async fn head_object(
1196 &self,
1197 req: S3Request<dto2::HeadObjectInput>,
1198 ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1199 let key = (req.input.bucket.clone(), req.input.key.clone());
1200 let lock = self.objects.lock().unwrap();
1201 let stored = lock
1202 .get(&key)
1203 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1204 Ok(S3Response::new(dto2::HeadObjectOutput {
1205 content_length: Some(stored.body.len() as i64),
1206 last_modified: Some(stored.last_modified.clone()),
1207 e_tag: Some(dto2::ETag::Strong(format!("etag-{}", stored.body.len()))),
1208 ..Default::default()
1209 }))
1210 }
1211
1212 async fn list_objects_v2(
1213 &self,
1214 req: S3Request<dto2::ListObjectsV2Input>,
1215 ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1216 let prefix = req.input.bucket.clone();
1217 let lock = self.objects.lock().unwrap();
1218 let mut contents: Vec<dto2::Object> = lock
1219 .iter()
1220 .filter(|((b, _), _)| b == &prefix)
1221 .map(|((_, k), v)| dto2::Object {
1222 key: Some(k.clone()),
1223 size: Some(v.body.len() as i64),
1224 last_modified: Some(v.last_modified.clone()),
1225 e_tag: Some(dto2::ETag::Strong(format!("etag-{}", v.body.len()))),
1226 ..Default::default()
1227 })
1228 .collect();
1229 contents.sort_by(|a, b| a.key.cmp(&b.key));
1230 let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1231 Ok(S3Response::new(dto2::ListObjectsV2Output {
1232 name: Some(prefix),
1233 contents: Some(contents),
1234 key_count: Some(key_count),
1235 is_truncated: Some(false),
1236 ..Default::default()
1237 }))
1238 }
1239
1240 async fn get_object(
1241 &self,
1242 req: S3Request<dto2::GetObjectInput>,
1243 ) -> S3Result<S3Response<dto2::GetObjectOutput>> {
1244 let key = (req.input.bucket.clone(), req.input.key.clone());
1245 let lock = self.objects.lock().unwrap();
1246 let stored = lock
1247 .get(&key)
1248 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1249 Ok(S3Response::new(dto2::GetObjectOutput {
1250 content_length: Some(stored.body.len() as i64),
1251 last_modified: Some(stored.last_modified.clone()),
1252 body: Some(crate::blob::bytes_to_blob(stored.body.clone())),
1253 ..Default::default()
1254 }))
1255 }
1256 }
1257
1258 fn make_codec() -> (Arc<CodecRegistry>, Arc<AlwaysDispatcher>) {
1259 (
1260 Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough))),
1261 Arc::new(AlwaysDispatcher(CodecKind::Passthrough)),
1262 )
1263 }
1264
1265 fn make_inv_service(
1271 backend: InvScannerMemBackend,
1272 with_inv: Option<Arc<InventoryManager>>,
1273 ) -> Arc<S4Service<InvScannerMemBackend>> {
1274 let (registry, dispatcher) = make_codec();
1275 let svc = S4Service::new(backend, registry, dispatcher);
1276 let svc = match with_inv {
1277 Some(m) => svc.with_inventory(m),
1278 None => svc,
1279 };
1280 Arc::new(svc)
1281 }
1282
1283 #[tokio::test]
1284 async fn run_scan_once_no_inventory_manager_returns_empty_report() {
1285 let s4 = make_inv_service(InvScannerMemBackend::default(), None);
1287 let report = run_scan_once(&s4).await.expect("scan");
1288 assert_eq!(report, ScanReport::default());
1289 }
1290
1291 #[tokio::test]
1292 async fn run_scan_once_no_configs_returns_empty_report() {
1293 let mgr = Arc::new(InventoryManager::new());
1295 let s4 = make_inv_service(InvScannerMemBackend::default(), Some(Arc::clone(&mgr)));
1296 let report = run_scan_once(&s4).await.expect("scan");
1297 assert_eq!(report.configs_evaluated, 0);
1298 assert_eq!(report.csvs_written, 0);
1299 assert_eq!(report.objects_listed, 0);
1300 }
1301
1302 #[tokio::test]
1303 async fn run_scan_once_walks_bucket_and_writes_csv_and_manifest() {
1304 let mgr = Arc::new(InventoryManager::new());
1309 mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1310 let backend = InvScannerMemBackend::default();
1311 for (key, body) in [
1312 ("alpha.txt", &b"AAA"[..]),
1313 ("nested/beta.bin", &b"BB"[..]),
1314 ("z.txt", &b"Z"[..]),
1315 ] {
1316 backend.put_now("src", key, Bytes::copy_from_slice(body));
1317 }
1318 let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1319
1320 let report = run_scan_once(&s4).await.expect("scan");
1321 assert_eq!(report.configs_evaluated, 1);
1322 assert_eq!(report.buckets_scanned, 1);
1323 assert_eq!(report.objects_listed, 3);
1324 assert_eq!(report.csvs_written, 1);
1325 assert_eq!(report.errors, 0);
1326
1327 let list_req = synthetic_request(
1332 ListObjectsV2Input {
1333 bucket: "dst".into(),
1334 ..Default::default()
1335 },
1336 http::Method::GET,
1337 "/dst?list-type=2",
1338 );
1339 let list_resp = s4
1340 .as_ref()
1341 .list_objects_v2(list_req)
1342 .await
1343 .expect("post-scan list");
1344 let dst_keys: Vec<String> = list_resp
1345 .output
1346 .contents
1347 .unwrap_or_default()
1348 .into_iter()
1349 .filter_map(|o| o.key)
1350 .collect();
1351 let csv_keys: Vec<String> = dst_keys
1352 .iter()
1353 .filter(|k| k.ends_with(".csv"))
1354 .cloned()
1355 .collect();
1356 let manifest_keys: Vec<String> = dst_keys
1357 .iter()
1358 .filter(|k| k.ends_with("manifest.json"))
1359 .cloned()
1360 .collect();
1361 assert_eq!(csv_keys.len(), 1, "exactly one CSV must land; got {dst_keys:?}");
1362 assert_eq!(
1363 manifest_keys.len(),
1364 1,
1365 "exactly one manifest.json must land; got {dst_keys:?}"
1366 );
1367 assert!(
1368 csv_keys[0].starts_with("inv/src/d1/data/"),
1369 "CSV key must be under <prefix>/<bucket>/<id>/data/, got {}",
1370 csv_keys[0]
1371 );
1372 assert!(
1373 manifest_keys[0].starts_with("inv/src/d1/"),
1374 "manifest key must be under <prefix>/<bucket>/<id>/, got {}",
1375 manifest_keys[0]
1376 );
1377
1378 let get_req = synthetic_request(
1380 GetObjectInput {
1381 bucket: "dst".into(),
1382 key: csv_keys[0].clone(),
1383 ..Default::default()
1384 },
1385 http::Method::GET,
1386 &format!("/dst/{}", csv_keys[0]),
1387 );
1388 let get_resp = s4.as_ref().get_object(get_req).await.expect("read CSV");
1389 let body = get_resp.output.body.expect("body");
1390 let csv_bytes = crate::blob::collect_blob(body, usize::MAX)
1391 .await
1392 .expect("collect");
1393 let csv_text = std::str::from_utf8(&csv_bytes).expect("utf8");
1394 let line_count = csv_text.lines().count();
1395 assert_eq!(line_count, 4, "header + 3 data rows; got:\n{csv_text}");
1396 assert!(csv_text.starts_with("Bucket,Key,VersionId"));
1397 assert!(csv_text.contains("\"alpha.txt\""));
1399 assert!(csv_text.contains("\"nested/beta.bin\""));
1400 assert!(csv_text.contains("\"z.txt\""));
1401 }
1402
1403 #[tokio::test]
1404 async fn run_scan_once_skips_configs_that_are_not_due() {
1405 let mgr = Arc::new(InventoryManager::new());
1409 mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1410 mgr.mark_run("src", "d1", Utc::now());
1411 let backend = InvScannerMemBackend::default();
1412 backend.put_now("src", "alpha.txt", Bytes::from_static(b"A"));
1413 let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1414
1415 let report = run_scan_once(&s4).await.expect("scan");
1416 assert_eq!(report.configs_evaluated, 1);
1417 assert_eq!(
1418 report.buckets_scanned, 0,
1419 "no walk; due() returned false"
1420 );
1421 assert_eq!(report.csvs_written, 0);
1422 assert_eq!(report.objects_listed, 0);
1423 assert_eq!(report.errors, 0);
1424
1425 let list_req = synthetic_request(
1427 ListObjectsV2Input {
1428 bucket: "dst".into(),
1429 ..Default::default()
1430 },
1431 http::Method::GET,
1432 "/dst?list-type=2",
1433 );
1434 let list_resp = s4
1435 .as_ref()
1436 .list_objects_v2(list_req)
1437 .await
1438 .expect("post-scan list");
1439 assert!(
1440 list_resp.output.contents.unwrap_or_default().is_empty(),
1441 "no destination writes expected when config is not due"
1442 );
1443 }
1444}