1use std::collections::HashMap;
48use std::sync::Arc;
49use std::sync::RwLock;
50
51use chrono::{DateTime, Utc};
52use s3s::S3;
53use s3s::S3Request;
54use s3s::dto::*;
55use serde::{Deserialize, Serialize};
56use tracing::warn;
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub enum InventoryFormat {
62 Csv,
63}
64
65impl InventoryFormat {
66 #[must_use]
68 pub fn as_aws_str(self) -> &'static str {
69 match self {
70 Self::Csv => "CSV",
71 }
72 }
73
74 #[must_use]
76 pub fn file_extension(self) -> &'static str {
77 match self {
78 Self::Csv => "csv",
79 }
80 }
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IncludedVersions {
88 Current,
89 All,
90}
91
92impl IncludedVersions {
93 #[must_use]
95 pub fn as_aws_str(self) -> &'static str {
96 match self {
97 Self::Current => "Current",
98 Self::All => "All",
99 }
100 }
101
102 #[must_use]
106 pub fn from_aws_str(s: &str) -> Self {
107 if s.eq_ignore_ascii_case("All") {
108 Self::All
109 } else {
110 Self::Current
111 }
112 }
113}
114
115#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub struct InventoryConfig {
123 pub id: String,
124 pub bucket: String,
125 pub destination_bucket: String,
126 pub destination_prefix: String,
127 pub frequency_hours: u32,
128 pub format: InventoryFormat,
129 pub included_object_versions: IncludedVersions,
130}
131
132impl InventoryConfig {
133 #[must_use]
136 pub fn daily_csv(
137 id: impl Into<String>,
138 bucket: impl Into<String>,
139 destination_bucket: impl Into<String>,
140 destination_prefix: impl Into<String>,
141 ) -> Self {
142 Self {
143 id: id.into(),
144 bucket: bucket.into(),
145 destination_bucket: destination_bucket.into(),
146 destination_prefix: destination_prefix.into(),
147 frequency_hours: 24,
148 format: InventoryFormat::Csv,
149 included_object_versions: IncludedVersions::Current,
150 }
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct InventoryRow {
157 pub bucket: String,
158 pub key: String,
159 pub version_id: Option<String>,
160 pub is_latest: bool,
161 pub is_delete_marker: bool,
162 pub size: u64,
163 pub last_modified: DateTime<Utc>,
164 pub etag: String,
165 pub storage_class: String,
166 pub encryption_status: String,
169}
170
171#[derive(Debug, Default, Serialize, Deserialize)]
176struct InventorySnapshot {
177 configs: HashMap<String, InventoryConfig>,
180 last_run: HashMap<String, DateTime<Utc>>,
181}
182
183const KEY_SEP: char = '\u{1F}';
186
187fn join_key(bucket: &str, id: &str) -> String {
188 let mut s = String::with_capacity(bucket.len() + 1 + id.len());
189 s.push_str(bucket);
190 s.push(KEY_SEP);
191 s.push_str(id);
192 s
193}
194
195fn split_key(s: &str) -> Option<(String, String)> {
196 s.split_once(KEY_SEP)
197 .map(|(b, i)| (b.to_owned(), i.to_owned()))
198}
199
200#[derive(Debug, Default)]
202pub struct InventoryManager {
203 configs: RwLock<HashMap<(String, String), InventoryConfig>>,
204 last_run: RwLock<HashMap<(String, String), DateTime<Utc>>>,
205}
206
207impl InventoryManager {
208 #[must_use]
209 pub fn new() -> Self {
210 Self::default()
211 }
212
213 pub fn put(&self, config: InventoryConfig) {
218 let key = (config.bucket.clone(), config.id.clone());
219 self.last_run
220 .write()
221 .expect("inventory last_run RwLock poisoned")
222 .remove(&key);
223 self.configs
224 .write()
225 .expect("inventory configs RwLock poisoned")
226 .insert(key, config);
227 }
228
229 #[must_use]
231 pub fn get(&self, bucket: &str, id: &str) -> Option<InventoryConfig> {
232 self.configs
233 .read()
234 .expect("inventory configs RwLock poisoned")
235 .get(&(bucket.to_owned(), id.to_owned()))
236 .cloned()
237 }
238
239 #[must_use]
242 pub fn list_for_bucket(&self, bucket: &str) -> Vec<InventoryConfig> {
243 let map = self.configs.read().expect("inventory configs RwLock poisoned");
244 let mut out: Vec<InventoryConfig> = map
245 .iter()
246 .filter(|((b, _id), _)| b == bucket)
247 .map(|(_, cfg)| cfg.clone())
248 .collect();
249 out.sort_by(|a, b| a.id.cmp(&b.id));
250 out
251 }
252
253 #[must_use]
260 pub fn list_all(&self) -> Vec<InventoryConfig> {
261 let map = self.configs.read().expect("inventory configs RwLock poisoned");
262 let mut out: Vec<InventoryConfig> = map.values().cloned().collect();
263 out.sort_by(|a, b| a.bucket.cmp(&b.bucket).then_with(|| a.id.cmp(&b.id)));
264 out
265 }
266
267 pub fn delete(&self, bucket: &str, id: &str) {
269 let key = (bucket.to_owned(), id.to_owned());
270 self.configs
271 .write()
272 .expect("inventory configs RwLock poisoned")
273 .remove(&key);
274 self.last_run
275 .write()
276 .expect("inventory last_run RwLock poisoned")
277 .remove(&key);
278 }
279
280 #[must_use]
284 pub fn due(&self, bucket: &str, id: &str, now: DateTime<Utc>) -> bool {
285 let key = (bucket.to_owned(), id.to_owned());
286 let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
287 let Some(cfg) = cfgs.get(&key) else {
288 return false;
289 };
290 let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
291 match runs.get(&key) {
292 None => true,
293 Some(prev) => {
294 let elapsed = now.signed_duration_since(*prev);
295 elapsed >= chrono::Duration::hours(i64::from(cfg.frequency_hours))
296 }
297 }
298 }
299
300 pub fn mark_run(&self, bucket: &str, id: &str, when: DateTime<Utc>) {
303 self.last_run
304 .write()
305 .expect("inventory last_run RwLock poisoned")
306 .insert((bucket.to_owned(), id.to_owned()), when);
307 }
308
309 pub fn to_json(&self) -> Result<String, serde_json::Error> {
311 let cfgs = self.configs.read().expect("inventory configs RwLock poisoned");
312 let runs = self.last_run.read().expect("inventory last_run RwLock poisoned");
313 let snap = InventorySnapshot {
314 configs: cfgs
315 .iter()
316 .map(|((b, i), v)| (join_key(b, i), v.clone()))
317 .collect(),
318 last_run: runs
319 .iter()
320 .map(|((b, i), v)| (join_key(b, i), *v))
321 .collect(),
322 };
323 serde_json::to_string(&snap)
324 }
325
326 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
329 let snap: InventorySnapshot = serde_json::from_str(s)?;
330 let mut configs: HashMap<(String, String), InventoryConfig> = HashMap::new();
331 for (k, v) in snap.configs {
332 if let Some(pair) = split_key(&k) {
333 configs.insert(pair, v);
334 }
335 }
336 let mut last_run: HashMap<(String, String), DateTime<Utc>> = HashMap::new();
337 for (k, v) in snap.last_run {
338 if let Some(pair) = split_key(&k) {
339 last_run.insert(pair, v);
340 }
341 }
342 Ok(Self {
343 configs: RwLock::new(configs),
344 last_run: RwLock::new(last_run),
345 })
346 }
347
348 pub fn run_once_for_test<I, F>(
360 &self,
361 bucket: &str,
362 id: &str,
363 rows: I,
364 now: DateTime<Utc>,
365 mut write_object: F,
366 ) -> Result<Vec<String>, RunError>
367 where
368 I: IntoIterator<Item = InventoryRow>,
369 F: FnMut(&str, &str, Vec<u8>) -> Result<(), RunError>,
370 {
371 let cfg = self
372 .get(bucket, id)
373 .ok_or_else(|| RunError::UnknownConfig(bucket.to_owned(), id.to_owned()))?;
374 let csv_bytes = render_csv(rows.into_iter());
375 let csv_md5 = md5_hex(&csv_bytes);
376 let csv_key = csv_destination_key(&cfg, now);
377 let manifest_key = manifest_destination_key(&cfg, now);
378 let manifest_body = render_manifest_json(
379 &cfg,
380 std::slice::from_ref(&csv_key),
381 std::slice::from_ref(&csv_md5),
382 now,
383 )
384 .into_bytes();
385 write_object(&cfg.destination_bucket, &csv_key, csv_bytes)?;
386 write_object(&cfg.destination_bucket, &manifest_key, manifest_body)?;
387 self.mark_run(bucket, id, now);
388 Ok(vec![csv_key, manifest_key])
389 }
390}
391
392pub fn render_csv(rows: impl Iterator<Item = InventoryRow>) -> Vec<u8> {
400 let mut out = Vec::new();
401 out.extend_from_slice(
402 b"Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus\n",
403 );
404 for row in rows {
405 let cells: [String; 10] = [
406 row.bucket,
407 row.key,
408 row.version_id.unwrap_or_default(),
409 row.is_latest.to_string(),
410 row.is_delete_marker.to_string(),
411 row.size.to_string(),
412 row.last_modified
413 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
414 row.etag,
415 row.storage_class,
416 row.encryption_status,
417 ];
418 for (i, cell) in cells.iter().enumerate() {
419 if i > 0 {
420 out.push(b',');
421 }
422 out.push(b'"');
423 for b in cell.as_bytes() {
424 if *b == b'"' {
425 out.extend_from_slice(b"\"\"");
426 } else {
427 out.push(*b);
428 }
429 }
430 out.push(b'"');
431 }
432 out.push(b'\n');
433 }
434 out
435}
436
437pub fn render_manifest_json(
443 config: &InventoryConfig,
444 csv_keys: &[String],
445 md5s: &[String],
446 written_at: DateTime<Utc>,
447) -> String {
448 let n = csv_keys.len().min(md5s.len());
452 let files_json: Vec<serde_json::Value> = (0..n)
453 .map(|i| {
454 serde_json::json!({
455 "key": csv_keys[i],
456 "size": 0,
462 "MD5checksum": md5s[i],
463 })
464 })
465 .collect();
466 let value = serde_json::json!({
467 "sourceBucket": config.bucket,
468 "destinationBucket": config.destination_bucket,
469 "version": "2016-11-30",
470 "creationTimestamp": written_at.timestamp_millis().to_string(),
471 "fileFormat": config.format.as_aws_str(),
472 "fileSchema": csv_header_schema(config),
473 "files": files_json,
474 });
475 serde_json::to_string_pretty(&value).expect("static JSON is always serialisable")
476}
477
478#[must_use]
482pub fn csv_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
483 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
484 let prefix = trim_trailing_slash(&config.destination_prefix);
485 format!(
486 "{prefix}/{src}/{id}/data/{stamp}.{ext}",
487 src = config.bucket,
488 id = config.id,
489 ext = config.format.file_extension()
490 )
491}
492
493#[must_use]
497pub fn manifest_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
498 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
499 let prefix = trim_trailing_slash(&config.destination_prefix);
500 format!(
501 "{prefix}/{src}/{id}/{stamp}/manifest.json",
502 src = config.bucket,
503 id = config.id
504 )
505}
506
507fn trim_trailing_slash(s: &str) -> &str {
508 s.strip_suffix('/').unwrap_or(s)
509}
510
511fn csv_header_schema(_cfg: &InventoryConfig) -> &'static str {
515 "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus"
516}
517
518fn md5_hex(bytes: &[u8]) -> String {
519 use md5::{Digest, Md5};
520 let mut h = Md5::new();
521 h.update(bytes);
522 let out = h.finalize();
523 let mut s = String::with_capacity(32);
524 for b in out {
525 s.push(hex_char(b >> 4));
526 s.push(hex_char(b & 0x0f));
527 }
528 s
529}
530
531fn hex_char(n: u8) -> char {
532 match n {
533 0..=9 => (b'0' + n) as char,
534 10..=15 => (b'a' + (n - 10)) as char,
535 _ => '0',
536 }
537}
538
539#[derive(Debug, thiserror::Error)]
543pub enum RunError {
544 #[error("no inventory configuration for bucket={0} id={1}")]
545 UnknownConfig(String, String),
546 #[error("destination write failed: {0}")]
547 Write(String),
548}
549
550#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
561pub struct ScanReport {
562 pub buckets_scanned: usize,
565 pub configs_evaluated: usize,
568 pub csvs_written: usize,
572 pub objects_listed: usize,
576 pub errors: usize,
580}
581
582fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
590 S3Request {
591 input,
592 method,
593 uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
594 headers: http::HeaderMap::new(),
595 extensions: http::Extensions::new(),
596 credentials: None,
597 region: None,
598 service: None,
599 trailing_headers: None,
600 }
601}
602
603fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
609 let mut buf = Vec::new();
610 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
611 let s = std::str::from_utf8(&buf).ok()?;
612 chrono::DateTime::parse_from_rfc3339(s)
613 .ok()
614 .map(|dt| dt.with_timezone(&Utc))
615}
616
617fn encryption_status_from_head(head: &HeadObjectOutput) -> String {
639 if let Some(sse) = head.server_side_encryption.as_ref() {
644 let s = sse.as_str();
645 if s.eq_ignore_ascii_case("aws:kms") || s.eq_ignore_ascii_case("aws:kms:dsse") {
646 return "SSE-KMS".to_owned();
647 }
648 }
655 if head.sse_customer_algorithm.is_some() {
657 return "SSE-C".to_owned();
658 }
659 if head
662 .metadata
663 .as_ref()
664 .and_then(|m| m.get("s4-encrypted"))
665 .is_some()
666 {
667 return "SSE-S4".to_owned();
668 }
669 if head.ssekms_key_id.is_some() {
673 return "SSE-KMS".to_owned();
674 }
675 if let Some(sse) = head.server_side_encryption.as_ref()
680 && !sse.as_str().is_empty()
681 {
682 return "SSE-S4".to_owned();
683 }
684 "NOT-SSE".to_owned()
685}
686
687pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
725 s4: &Arc<crate::S4Service<B>>,
726) -> Result<ScanReport, String> {
727 let Some(mgr) = s4.inventory_manager().cloned() else {
728 return Ok(ScanReport::default());
731 };
732 let configs = mgr.list_all();
733 if configs.is_empty() {
734 return Ok(ScanReport::default());
735 }
736 let now = Utc::now();
737 let mut report = ScanReport {
738 configs_evaluated: configs.len(),
739 ..ScanReport::default()
740 };
741 let mut walked_buckets: std::collections::HashSet<String> = std::collections::HashSet::new();
745 for cfg in configs {
746 if !mgr.due(&cfg.bucket, &cfg.id, now) {
747 continue;
748 }
749 walked_buckets.insert(cfg.bucket.clone());
750 match scan_one_config(s4, &cfg, now, &mut report).await {
751 Ok(()) => {
752 mgr.mark_run(&cfg.bucket, &cfg.id, now);
753 report.csvs_written = report.csvs_written.saturating_add(1);
754 }
755 Err(e) => {
756 warn!(
757 bucket = %cfg.bucket,
758 id = %cfg.id,
759 error = %e,
760 "S4 inventory: scan failed for config",
761 );
762 report.errors = report.errors.saturating_add(1);
763 }
764 }
765 }
766 report.buckets_scanned = walked_buckets.len();
767 Ok(report)
768}
769
770async fn scan_one_config<B: S3 + Send + Sync + 'static>(
774 s4: &Arc<crate::S4Service<B>>,
775 cfg: &InventoryConfig,
776 now: DateTime<Utc>,
777 report: &mut ScanReport,
778) -> Result<(), String> {
779 let mut rows: Vec<InventoryRow> = Vec::new();
780 let mut continuation: Option<String> = None;
781 loop {
782 let list_input = ListObjectsV2Input {
783 bucket: cfg.bucket.clone(),
784 continuation_token: continuation.clone(),
785 ..Default::default()
786 };
787 let list_req = synthetic_request(
788 list_input,
789 http::Method::GET,
790 &format!("/{src}?list-type=2", src = cfg.bucket),
791 );
792 let resp = s4
793 .as_ref()
794 .list_objects_v2(list_req)
795 .await
796 .map_err(|e| format!("list_objects_v2: {e}"))?;
797 let output = resp.output;
798 let contents = output.contents.unwrap_or_default();
799 for obj in &contents {
800 let Some(key) = obj.key.as_deref() else {
801 continue;
802 };
803 if key.ends_with(".s4index") {
808 continue;
809 }
810 report.objects_listed = report.objects_listed.saturating_add(1);
811 let head_input = HeadObjectInput {
816 bucket: cfg.bucket.clone(),
817 key: key.to_owned(),
818 ..Default::default()
819 };
820 let head_req = synthetic_request(
821 head_input,
822 http::Method::HEAD,
823 &format!("/{src}/{key}", src = cfg.bucket),
824 );
825 let head = match s4.as_ref().head_object(head_req).await {
826 Ok(r) => r.output,
827 Err(e) => {
828 warn!(
829 bucket = %cfg.bucket,
830 key = %key,
831 error = %e,
832 "S4 inventory: head_object failed; emitting row with listing-only metadata",
833 );
834 HeadObjectOutput::default()
835 }
836 };
837 let size = head
838 .content_length
839 .unwrap_or_else(|| obj.size.unwrap_or(0))
840 .max(0) as u64;
841 let last_modified = head
842 .last_modified
843 .as_ref()
844 .and_then(timestamp_to_chrono_utc)
845 .or_else(|| obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc))
846 .unwrap_or(now);
847 let etag: String = head
848 .e_tag
849 .as_ref()
850 .or(obj.e_tag.as_ref())
851 .map(|e| e.value().to_owned())
852 .unwrap_or_default();
853 let storage_class = head
854 .storage_class
855 .as_ref()
856 .map(|s| s.as_str().to_owned())
857 .or_else(|| obj.storage_class.as_ref().map(|s| s.as_str().to_owned()))
858 .unwrap_or_else(|| "STANDARD".to_owned());
859 let encryption_status = encryption_status_from_head(&head);
860 rows.push(InventoryRow {
861 bucket: cfg.bucket.clone(),
862 key: key.to_owned(),
863 version_id: None,
864 is_latest: true,
865 is_delete_marker: false,
866 size,
867 last_modified,
868 etag,
869 storage_class,
870 encryption_status,
871 });
872 }
873 if output.is_truncated.unwrap_or(false) {
874 continuation = output.next_continuation_token;
875 if continuation.is_none() {
876 break;
880 }
881 } else {
882 break;
883 }
884 }
885
886 let csv_bytes = render_csv(rows.into_iter());
893 let csv_md5 = md5_hex(&csv_bytes);
894 let csv_key = csv_destination_key(cfg, now);
895 let manifest_key = manifest_destination_key(cfg, now);
896 let manifest_body = render_manifest_json(
897 cfg,
898 std::slice::from_ref(&csv_key),
899 std::slice::from_ref(&csv_md5),
900 now,
901 )
902 .into_bytes();
903 put_destination_object(s4, &cfg.destination_bucket, &csv_key, csv_bytes).await?;
904 put_destination_object(s4, &cfg.destination_bucket, &manifest_key, manifest_body).await?;
905 Ok(())
906}
907
908async fn put_destination_object<B: S3 + Send + Sync + 'static>(
914 s4: &Arc<crate::S4Service<B>>,
915 dst_bucket: &str,
916 dst_key: &str,
917 body: Vec<u8>,
918) -> Result<(), String> {
919 let body_bytes = bytes::Bytes::from(body);
920 let input = PutObjectInput {
921 bucket: dst_bucket.to_owned(),
922 key: dst_key.to_owned(),
923 body: Some(crate::blob::bytes_to_blob(body_bytes)),
924 ..Default::default()
925 };
926 let req = synthetic_request(
927 input,
928 http::Method::PUT,
929 &format!("/{dst_bucket}/{dst_key}"),
930 );
931 s4.as_ref()
932 .put_object(req)
933 .await
934 .map(|_| ())
935 .map_err(|e| format!("destination put_object {dst_bucket}/{dst_key}: {e}"))
936}
937
938#[cfg(test)]
939mod tests {
940 use super::*;
941
942 fn sample_config() -> InventoryConfig {
943 InventoryConfig {
944 id: "daily-csv".into(),
945 bucket: "src".into(),
946 destination_bucket: "dst".into(),
947 destination_prefix: "inv".into(),
948 frequency_hours: 24,
949 format: InventoryFormat::Csv,
950 included_object_versions: IncludedVersions::Current,
951 }
952 }
953
954 fn sample_row(key: &str, size: u64) -> InventoryRow {
955 InventoryRow {
956 bucket: "src".into(),
957 key: key.into(),
958 version_id: None,
959 is_latest: true,
960 is_delete_marker: false,
961 size,
962 last_modified: DateTime::parse_from_rfc3339("2026-05-13T12:34:56.789Z")
963 .unwrap()
964 .with_timezone(&Utc),
965 etag: "abc123".into(),
966 storage_class: "STANDARD".into(),
967 encryption_status: "NOT-SSE".into(),
968 }
969 }
970
971 #[test]
972 fn config_json_round_trip() {
973 let m = InventoryManager::new();
974 m.put(sample_config());
975 let json = m.to_json().expect("to_json");
976 let m2 = InventoryManager::from_json(&json).expect("from_json");
977 assert_eq!(m2.get("src", "daily-csv"), Some(sample_config()));
978 }
979
980 #[test]
981 fn due_returns_true_when_never_run() {
982 let m = InventoryManager::new();
983 m.put(sample_config());
984 assert!(m.due("src", "daily-csv", Utc::now()));
985 }
986
987 #[test]
988 fn due_returns_true_when_interval_elapsed() {
989 let m = InventoryManager::new();
990 m.put(sample_config());
991 let then = Utc::now() - chrono::Duration::hours(25);
992 m.mark_run("src", "daily-csv", then);
993 assert!(m.due("src", "daily-csv", Utc::now()));
994 }
995
996 #[test]
997 fn due_returns_false_when_interval_not_yet_elapsed() {
998 let m = InventoryManager::new();
999 m.put(sample_config());
1000 let just_now = Utc::now() - chrono::Duration::minutes(5);
1001 m.mark_run("src", "daily-csv", just_now);
1002 assert!(!m.due("src", "daily-csv", Utc::now()));
1003 }
1004
1005 #[test]
1006 fn due_returns_false_when_config_missing() {
1007 let m = InventoryManager::new();
1008 assert!(!m.due("ghost", "nothing", Utc::now()));
1009 }
1010
1011 #[test]
1012 fn list_for_bucket_filters_and_sorts() {
1013 let m = InventoryManager::new();
1014 let mut a = sample_config();
1015 a.id = "z-last".into();
1016 let mut b = sample_config();
1017 b.id = "a-first".into();
1018 let mut c = sample_config();
1019 c.bucket = "other".into();
1020 c.id = "should-not-appear".into();
1021 m.put(a);
1022 m.put(b);
1023 m.put(c);
1024 let list = m.list_for_bucket("src");
1025 assert_eq!(list.len(), 2);
1026 assert_eq!(list[0].id, "a-first");
1027 assert_eq!(list[1].id, "z-last");
1028 }
1029
1030 #[test]
1031 fn render_csv_matches_aws_header_and_quotes_cells() {
1032 let rows = vec![
1033 sample_row("a/b.txt", 100),
1034 sample_row("comma,here.txt", 200),
1035 sample_row("quote\"inside.txt", 300),
1036 ];
1037 let csv = render_csv(rows.into_iter());
1038 let s = String::from_utf8(csv).expect("utf8");
1039 let mut lines = s.lines();
1040 assert_eq!(
1041 lines.next().unwrap(),
1042 "Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus"
1043 );
1044 let row1 = lines.next().unwrap();
1046 assert!(row1.starts_with("\"src\",\"a/b.txt\","));
1047 assert!(row1.contains(",\"100\","));
1048 assert!(row1.contains("\"2026-05-13T12:34:56.789Z\""));
1049 let row2 = lines.next().unwrap();
1051 assert!(row2.contains("\"comma,here.txt\""));
1052 let row3 = lines.next().unwrap();
1054 assert!(row3.contains("\"quote\"\"inside.txt\""));
1055 assert_eq!(lines.next(), None);
1056 }
1057
1058 #[test]
1059 fn render_manifest_json_carries_required_fields() {
1060 let cfg = sample_config();
1061 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1062 .unwrap()
1063 .with_timezone(&Utc);
1064 let manifest = render_manifest_json(
1065 &cfg,
1066 &["inv/src/daily-csv/data/2026-05-13T000000Z.csv".into()],
1067 &["d41d8cd98f00b204e9800998ecf8427e".into()],
1068 now,
1069 );
1070 let v: serde_json::Value = serde_json::from_str(&manifest).expect("manifest must be JSON");
1071 assert_eq!(v["sourceBucket"], "src");
1072 assert_eq!(v["destinationBucket"], "dst");
1073 assert_eq!(v["fileFormat"], "CSV");
1074 assert_eq!(v["version"], "2016-11-30");
1075 let files = v["files"].as_array().expect("files array");
1076 assert_eq!(files.len(), 1);
1077 assert_eq!(
1078 files[0]["key"],
1079 "inv/src/daily-csv/data/2026-05-13T000000Z.csv"
1080 );
1081 assert_eq!(files[0]["MD5checksum"], "d41d8cd98f00b204e9800998ecf8427e");
1082 assert_eq!(
1083 v["creationTimestamp"],
1084 now.timestamp_millis().to_string()
1085 );
1086 let schema = v["fileSchema"].as_str().expect("fileSchema string");
1087 assert!(schema.starts_with("Bucket, Key, VersionId"));
1088 assert!(schema.ends_with("StorageClass, EncryptionStatus"));
1089 }
1090
1091 #[test]
1092 fn destination_keys_are_under_prefix_and_namespaced_by_source_bucket() {
1093 let cfg = sample_config();
1094 let now = DateTime::parse_from_rfc3339("2026-05-13T01:02:03.000Z")
1095 .unwrap()
1096 .with_timezone(&Utc);
1097 let csv_key = csv_destination_key(&cfg, now);
1098 let manifest_key = manifest_destination_key(&cfg, now);
1099 assert_eq!(csv_key, "inv/src/daily-csv/data/2026-05-13T010203Z.csv");
1100 assert_eq!(
1101 manifest_key,
1102 "inv/src/daily-csv/2026-05-13T010203Z/manifest.json"
1103 );
1104 let mut cfg2 = cfg.clone();
1106 cfg2.destination_prefix = "inv/".into();
1107 assert_eq!(
1108 csv_destination_key(&cfg2, now),
1109 "inv/src/daily-csv/data/2026-05-13T010203Z.csv"
1110 );
1111 }
1112
1113 #[test]
1114 fn run_once_writes_csv_and_manifest_and_marks_run() {
1115 let m = InventoryManager::new();
1116 m.put(sample_config());
1117 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1118 .unwrap()
1119 .with_timezone(&Utc);
1120 let written = std::sync::Mutex::new(Vec::<(String, String, Vec<u8>)>::new());
1121 let keys = m
1122 .run_once_for_test(
1123 "src",
1124 "daily-csv",
1125 vec![sample_row("a", 1), sample_row("b", 2)],
1126 now,
1127 |dst_bucket, dst_key, body| {
1128 written
1129 .lock()
1130 .unwrap()
1131 .push((dst_bucket.to_owned(), dst_key.to_owned(), body));
1132 Ok(())
1133 },
1134 )
1135 .expect("run_once_for_test");
1136 assert_eq!(keys.len(), 2);
1137 assert!(keys[0].ends_with(".csv"));
1138 assert!(keys[1].ends_with("manifest.json"));
1139 let written = written.into_inner().unwrap();
1140 assert_eq!(written.len(), 2);
1141 for (bucket, _, _) in &written {
1142 assert_eq!(bucket, "dst");
1143 }
1144 assert!(!m.due("src", "daily-csv", now + chrono::Duration::hours(1)));
1147 assert!(m.due("src", "daily-csv", now + chrono::Duration::hours(25)));
1148 }
1149
1150 #[test]
1151 fn run_once_unknown_config_is_an_error() {
1152 let m = InventoryManager::new();
1153 let now = Utc::now();
1154 let err = m.run_once_for_test(
1155 "ghost",
1156 "nothing",
1157 std::iter::empty(),
1158 now,
1159 |_, _, _| Ok(()),
1160 );
1161 assert!(matches!(err, Err(RunError::UnknownConfig(_, _))));
1162 }
1163
1164 #[test]
1175 fn encryption_status_sse_kms_via_aws_kms_string() {
1176 let head = HeadObjectOutput {
1177 server_side_encryption: Some(ServerSideEncryption::from_static(
1178 ServerSideEncryption::AWS_KMS,
1179 )),
1180 ..Default::default()
1181 };
1182 assert_eq!(encryption_status_from_head(&head), "SSE-KMS");
1183 }
1184
1185 #[test]
1186 fn encryption_status_sse_c_via_customer_algorithm() {
1187 let head = HeadObjectOutput {
1188 sse_customer_algorithm: Some("AES256".to_owned()),
1189 ..Default::default()
1190 };
1191 assert_eq!(encryption_status_from_head(&head), "SSE-C");
1192 }
1193
1194 #[test]
1195 fn encryption_status_sse_s4_via_metadata_flag() {
1196 let mut metadata = HashMap::new();
1197 metadata.insert("s4-encrypted".to_owned(), "aes-256-gcm".to_owned());
1198 let head = HeadObjectOutput {
1199 metadata: Some(metadata),
1200 ..Default::default()
1201 };
1202 assert_eq!(encryption_status_from_head(&head), "SSE-S4");
1203 }
1204
1205 #[test]
1206 fn encryption_status_not_sse_when_all_absent() {
1207 let head = HeadObjectOutput::default();
1208 assert_eq!(encryption_status_from_head(&head), "NOT-SSE");
1209 }
1210
1211 use std::collections::HashMap as StdHashMap;
1222 use std::sync::Mutex as StdMutex;
1223
1224 use bytes::Bytes;
1225 use s3s::dto as dto2;
1226 use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1227 use s4_codec::dispatcher::AlwaysDispatcher;
1228 use s4_codec::passthrough::Passthrough;
1229 use s4_codec::{CodecKind, CodecRegistry};
1230
1231 use crate::S4Service;
1232
1233 #[derive(Default)]
1234 struct InvScannerMemBackend {
1235 objects: StdMutex<StdHashMap<(String, String), InvScannerStored>>,
1236 }
1237
1238 #[derive(Clone)]
1239 struct InvScannerStored {
1240 body: Bytes,
1241 last_modified: dto2::Timestamp,
1242 }
1243
1244 impl InvScannerMemBackend {
1245 fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1246 self.objects.lock().unwrap().insert(
1247 (bucket.to_owned(), key.to_owned()),
1248 InvScannerStored {
1249 body,
1250 last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1251 },
1252 );
1253 }
1254 }
1255
1256 #[async_trait::async_trait]
1257 impl S3 for InvScannerMemBackend {
1258 async fn put_object(
1259 &self,
1260 req: S3Request<dto2::PutObjectInput>,
1261 ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1262 let body = match req.input.body {
1265 Some(blob) => crate::blob::collect_blob(blob, usize::MAX)
1266 .await
1267 .map_err(|e| {
1268 S3Error::with_message(S3ErrorCode::InternalError, format!("{e}"))
1269 })?,
1270 None => Bytes::new(),
1271 };
1272 self.put_now(&req.input.bucket, &req.input.key, body);
1273 Ok(S3Response::new(dto2::PutObjectOutput::default()))
1274 }
1275
1276 async fn head_object(
1277 &self,
1278 req: S3Request<dto2::HeadObjectInput>,
1279 ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1280 let key = (req.input.bucket.clone(), req.input.key.clone());
1281 let lock = self.objects.lock().unwrap();
1282 let stored = lock
1283 .get(&key)
1284 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1285 Ok(S3Response::new(dto2::HeadObjectOutput {
1286 content_length: Some(stored.body.len() as i64),
1287 last_modified: Some(stored.last_modified.clone()),
1288 e_tag: Some(dto2::ETag::Strong(format!("etag-{}", stored.body.len()))),
1289 ..Default::default()
1290 }))
1291 }
1292
1293 async fn list_objects_v2(
1294 &self,
1295 req: S3Request<dto2::ListObjectsV2Input>,
1296 ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1297 let prefix = req.input.bucket.clone();
1298 let lock = self.objects.lock().unwrap();
1299 let mut contents: Vec<dto2::Object> = lock
1300 .iter()
1301 .filter(|((b, _), _)| b == &prefix)
1302 .map(|((_, k), v)| dto2::Object {
1303 key: Some(k.clone()),
1304 size: Some(v.body.len() as i64),
1305 last_modified: Some(v.last_modified.clone()),
1306 e_tag: Some(dto2::ETag::Strong(format!("etag-{}", v.body.len()))),
1307 ..Default::default()
1308 })
1309 .collect();
1310 contents.sort_by(|a, b| a.key.cmp(&b.key));
1311 let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1312 Ok(S3Response::new(dto2::ListObjectsV2Output {
1313 name: Some(prefix),
1314 contents: Some(contents),
1315 key_count: Some(key_count),
1316 is_truncated: Some(false),
1317 ..Default::default()
1318 }))
1319 }
1320
1321 async fn get_object(
1322 &self,
1323 req: S3Request<dto2::GetObjectInput>,
1324 ) -> S3Result<S3Response<dto2::GetObjectOutput>> {
1325 let key = (req.input.bucket.clone(), req.input.key.clone());
1326 let lock = self.objects.lock().unwrap();
1327 let stored = lock
1328 .get(&key)
1329 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1330 Ok(S3Response::new(dto2::GetObjectOutput {
1331 content_length: Some(stored.body.len() as i64),
1332 last_modified: Some(stored.last_modified.clone()),
1333 body: Some(crate::blob::bytes_to_blob(stored.body.clone())),
1334 ..Default::default()
1335 }))
1336 }
1337 }
1338
1339 fn make_codec() -> (Arc<CodecRegistry>, Arc<AlwaysDispatcher>) {
1340 (
1341 Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough))),
1342 Arc::new(AlwaysDispatcher(CodecKind::Passthrough)),
1343 )
1344 }
1345
1346 fn make_inv_service(
1352 backend: InvScannerMemBackend,
1353 with_inv: Option<Arc<InventoryManager>>,
1354 ) -> Arc<S4Service<InvScannerMemBackend>> {
1355 let (registry, dispatcher) = make_codec();
1356 let svc = S4Service::new(backend, registry, dispatcher);
1357 let svc = match with_inv {
1358 Some(m) => svc.with_inventory(m),
1359 None => svc,
1360 };
1361 Arc::new(svc)
1362 }
1363
1364 #[tokio::test]
1365 async fn run_scan_once_no_inventory_manager_returns_empty_report() {
1366 let s4 = make_inv_service(InvScannerMemBackend::default(), None);
1368 let report = run_scan_once(&s4).await.expect("scan");
1369 assert_eq!(report, ScanReport::default());
1370 }
1371
1372 #[tokio::test]
1373 async fn run_scan_once_no_configs_returns_empty_report() {
1374 let mgr = Arc::new(InventoryManager::new());
1376 let s4 = make_inv_service(InvScannerMemBackend::default(), Some(Arc::clone(&mgr)));
1377 let report = run_scan_once(&s4).await.expect("scan");
1378 assert_eq!(report.configs_evaluated, 0);
1379 assert_eq!(report.csvs_written, 0);
1380 assert_eq!(report.objects_listed, 0);
1381 }
1382
1383 #[tokio::test]
1384 async fn run_scan_once_walks_bucket_and_writes_csv_and_manifest() {
1385 let mgr = Arc::new(InventoryManager::new());
1390 mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1391 let backend = InvScannerMemBackend::default();
1392 for (key, body) in [
1393 ("alpha.txt", &b"AAA"[..]),
1394 ("nested/beta.bin", &b"BB"[..]),
1395 ("z.txt", &b"Z"[..]),
1396 ] {
1397 backend.put_now("src", key, Bytes::copy_from_slice(body));
1398 }
1399 let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1400
1401 let report = run_scan_once(&s4).await.expect("scan");
1402 assert_eq!(report.configs_evaluated, 1);
1403 assert_eq!(report.buckets_scanned, 1);
1404 assert_eq!(report.objects_listed, 3);
1405 assert_eq!(report.csvs_written, 1);
1406 assert_eq!(report.errors, 0);
1407
1408 let list_req = synthetic_request(
1413 ListObjectsV2Input {
1414 bucket: "dst".into(),
1415 ..Default::default()
1416 },
1417 http::Method::GET,
1418 "/dst?list-type=2",
1419 );
1420 let list_resp = s4
1421 .as_ref()
1422 .list_objects_v2(list_req)
1423 .await
1424 .expect("post-scan list");
1425 let dst_keys: Vec<String> = list_resp
1426 .output
1427 .contents
1428 .unwrap_or_default()
1429 .into_iter()
1430 .filter_map(|o| o.key)
1431 .collect();
1432 let csv_keys: Vec<String> = dst_keys
1433 .iter()
1434 .filter(|k| k.ends_with(".csv"))
1435 .cloned()
1436 .collect();
1437 let manifest_keys: Vec<String> = dst_keys
1438 .iter()
1439 .filter(|k| k.ends_with("manifest.json"))
1440 .cloned()
1441 .collect();
1442 assert_eq!(csv_keys.len(), 1, "exactly one CSV must land; got {dst_keys:?}");
1443 assert_eq!(
1444 manifest_keys.len(),
1445 1,
1446 "exactly one manifest.json must land; got {dst_keys:?}"
1447 );
1448 assert!(
1449 csv_keys[0].starts_with("inv/src/d1/data/"),
1450 "CSV key must be under <prefix>/<bucket>/<id>/data/, got {}",
1451 csv_keys[0]
1452 );
1453 assert!(
1454 manifest_keys[0].starts_with("inv/src/d1/"),
1455 "manifest key must be under <prefix>/<bucket>/<id>/, got {}",
1456 manifest_keys[0]
1457 );
1458
1459 let get_req = synthetic_request(
1461 GetObjectInput {
1462 bucket: "dst".into(),
1463 key: csv_keys[0].clone(),
1464 ..Default::default()
1465 },
1466 http::Method::GET,
1467 &format!("/dst/{}", csv_keys[0]),
1468 );
1469 let get_resp = s4.as_ref().get_object(get_req).await.expect("read CSV");
1470 let body = get_resp.output.body.expect("body");
1471 let csv_bytes = crate::blob::collect_blob(body, usize::MAX)
1472 .await
1473 .expect("collect");
1474 let csv_text = std::str::from_utf8(&csv_bytes).expect("utf8");
1475 let line_count = csv_text.lines().count();
1476 assert_eq!(line_count, 4, "header + 3 data rows; got:\n{csv_text}");
1477 assert!(csv_text.starts_with("Bucket,Key,VersionId"));
1478 assert!(csv_text.contains("\"alpha.txt\""));
1480 assert!(csv_text.contains("\"nested/beta.bin\""));
1481 assert!(csv_text.contains("\"z.txt\""));
1482 }
1483
1484 #[tokio::test]
1485 async fn run_scan_once_skips_configs_that_are_not_due() {
1486 let mgr = Arc::new(InventoryManager::new());
1490 mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1491 mgr.mark_run("src", "d1", Utc::now());
1492 let backend = InvScannerMemBackend::default();
1493 backend.put_now("src", "alpha.txt", Bytes::from_static(b"A"));
1494 let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1495
1496 let report = run_scan_once(&s4).await.expect("scan");
1497 assert_eq!(report.configs_evaluated, 1);
1498 assert_eq!(
1499 report.buckets_scanned, 0,
1500 "no walk; due() returned false"
1501 );
1502 assert_eq!(report.csvs_written, 0);
1503 assert_eq!(report.objects_listed, 0);
1504 assert_eq!(report.errors, 0);
1505
1506 let list_req = synthetic_request(
1508 ListObjectsV2Input {
1509 bucket: "dst".into(),
1510 ..Default::default()
1511 },
1512 http::Method::GET,
1513 "/dst?list-type=2",
1514 );
1515 let list_resp = s4
1516 .as_ref()
1517 .list_objects_v2(list_req)
1518 .await
1519 .expect("post-scan list");
1520 assert!(
1521 list_resp.output.contents.unwrap_or_default().is_empty(),
1522 "no destination writes expected when config is not due"
1523 );
1524 }
1525}