1use std::collections::HashMap;
48use std::sync::Arc;
49use std::sync::RwLock;
50
51use chrono::{DateTime, Utc};
52use s3s::S3;
53use s3s::S3Request;
54use s3s::dto::*;
55use serde::{Deserialize, Serialize};
56use tracing::warn;
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub enum InventoryFormat {
62 Csv,
63}
64
65impl InventoryFormat {
66 #[must_use]
68 pub fn as_aws_str(self) -> &'static str {
69 match self {
70 Self::Csv => "CSV",
71 }
72 }
73
74 #[must_use]
76 pub fn file_extension(self) -> &'static str {
77 match self {
78 Self::Csv => "csv",
79 }
80 }
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
87pub enum IncludedVersions {
88 Current,
89 All,
90}
91
92impl IncludedVersions {
93 #[must_use]
95 pub fn as_aws_str(self) -> &'static str {
96 match self {
97 Self::Current => "Current",
98 Self::All => "All",
99 }
100 }
101
102 #[must_use]
106 pub fn from_aws_str(s: &str) -> Self {
107 if s.eq_ignore_ascii_case("All") {
108 Self::All
109 } else {
110 Self::Current
111 }
112 }
113}
114
115#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub struct InventoryConfig {
123 pub id: String,
124 pub bucket: String,
125 pub destination_bucket: String,
126 pub destination_prefix: String,
127 pub frequency_hours: u32,
128 pub format: InventoryFormat,
129 pub included_object_versions: IncludedVersions,
130}
131
132impl InventoryConfig {
133 #[must_use]
136 pub fn daily_csv(
137 id: impl Into<String>,
138 bucket: impl Into<String>,
139 destination_bucket: impl Into<String>,
140 destination_prefix: impl Into<String>,
141 ) -> Self {
142 Self {
143 id: id.into(),
144 bucket: bucket.into(),
145 destination_bucket: destination_bucket.into(),
146 destination_prefix: destination_prefix.into(),
147 frequency_hours: 24,
148 format: InventoryFormat::Csv,
149 included_object_versions: IncludedVersions::Current,
150 }
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct InventoryRow {
157 pub bucket: String,
158 pub key: String,
159 pub version_id: Option<String>,
160 pub is_latest: bool,
161 pub is_delete_marker: bool,
162 pub size: u64,
163 pub last_modified: DateTime<Utc>,
164 pub etag: String,
165 pub storage_class: String,
166 pub encryption_status: String,
169}
170
171#[derive(Debug, Default, Serialize, Deserialize)]
176struct InventorySnapshot {
177 configs: HashMap<String, InventoryConfig>,
180 last_run: HashMap<String, DateTime<Utc>>,
181}
182
183const KEY_SEP: char = '\u{1F}';
186
187fn join_key(bucket: &str, id: &str) -> String {
188 let mut s = String::with_capacity(bucket.len() + 1 + id.len());
189 s.push_str(bucket);
190 s.push(KEY_SEP);
191 s.push_str(id);
192 s
193}
194
195fn split_key(s: &str) -> Option<(String, String)> {
196 s.split_once(KEY_SEP)
197 .map(|(b, i)| (b.to_owned(), i.to_owned()))
198}
199
200#[derive(Debug, Default)]
202pub struct InventoryManager {
203 configs: RwLock<HashMap<(String, String), InventoryConfig>>,
204 last_run: RwLock<HashMap<(String, String), DateTime<Utc>>>,
205}
206
207impl InventoryManager {
208 #[must_use]
209 pub fn new() -> Self {
210 Self::default()
211 }
212
213 pub fn put(&self, config: InventoryConfig) {
218 let key = (config.bucket.clone(), config.id.clone());
219 crate::lock_recovery::recover_write(&self.last_run, "inventory.last_run").remove(&key);
220 crate::lock_recovery::recover_write(&self.configs, "inventory.configs").insert(key, config);
221 }
222
223 #[must_use]
225 pub fn get(&self, bucket: &str, id: &str) -> Option<InventoryConfig> {
226 crate::lock_recovery::recover_read(&self.configs, "inventory.configs")
227 .get(&(bucket.to_owned(), id.to_owned()))
228 .cloned()
229 }
230
231 #[must_use]
234 pub fn list_for_bucket(&self, bucket: &str) -> Vec<InventoryConfig> {
235 let map = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
236 let mut out: Vec<InventoryConfig> = map
237 .iter()
238 .filter(|((b, _id), _)| b == bucket)
239 .map(|(_, cfg)| cfg.clone())
240 .collect();
241 out.sort_by(|a, b| a.id.cmp(&b.id));
242 out
243 }
244
245 #[must_use]
252 pub fn list_all(&self) -> Vec<InventoryConfig> {
253 let map = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
254 let mut out: Vec<InventoryConfig> = map.values().cloned().collect();
255 out.sort_by(|a, b| a.bucket.cmp(&b.bucket).then_with(|| a.id.cmp(&b.id)));
256 out
257 }
258
259 pub fn delete(&self, bucket: &str, id: &str) {
261 let key = (bucket.to_owned(), id.to_owned());
262 crate::lock_recovery::recover_write(&self.configs, "inventory.configs").remove(&key);
263 crate::lock_recovery::recover_write(&self.last_run, "inventory.last_run").remove(&key);
264 }
265
266 #[must_use]
270 pub fn due(&self, bucket: &str, id: &str, now: DateTime<Utc>) -> bool {
271 let key = (bucket.to_owned(), id.to_owned());
272 let cfgs = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
273 let Some(cfg) = cfgs.get(&key) else {
274 return false;
275 };
276 let runs = crate::lock_recovery::recover_read(&self.last_run, "inventory.last_run");
277 match runs.get(&key) {
278 None => true,
279 Some(prev) => {
280 let elapsed = now.signed_duration_since(*prev);
281 elapsed >= chrono::Duration::hours(i64::from(cfg.frequency_hours))
282 }
283 }
284 }
285
286 pub fn mark_run(&self, bucket: &str, id: &str, when: DateTime<Utc>) {
289 crate::lock_recovery::recover_write(&self.last_run, "inventory.last_run")
290 .insert((bucket.to_owned(), id.to_owned()), when);
291 }
292
293 pub fn to_json(&self) -> Result<String, serde_json::Error> {
295 let cfgs = crate::lock_recovery::recover_read(&self.configs, "inventory.configs");
296 let runs = crate::lock_recovery::recover_read(&self.last_run, "inventory.last_run");
297 let snap = InventorySnapshot {
298 configs: cfgs
299 .iter()
300 .map(|((b, i), v)| (join_key(b, i), v.clone()))
301 .collect(),
302 last_run: runs
303 .iter()
304 .map(|((b, i), v)| (join_key(b, i), *v))
305 .collect(),
306 };
307 serde_json::to_string(&snap)
308 }
309
310 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
313 let snap: InventorySnapshot = serde_json::from_str(s)?;
314 let mut configs: HashMap<(String, String), InventoryConfig> = HashMap::new();
315 for (k, v) in snap.configs {
316 if let Some(pair) = split_key(&k) {
317 configs.insert(pair, v);
318 }
319 }
320 let mut last_run: HashMap<(String, String), DateTime<Utc>> = HashMap::new();
321 for (k, v) in snap.last_run {
322 if let Some(pair) = split_key(&k) {
323 last_run.insert(pair, v);
324 }
325 }
326 Ok(Self {
327 configs: RwLock::new(configs),
328 last_run: RwLock::new(last_run),
329 })
330 }
331
332 pub fn run_once_for_test<I, F>(
344 &self,
345 bucket: &str,
346 id: &str,
347 rows: I,
348 now: DateTime<Utc>,
349 mut write_object: F,
350 ) -> Result<Vec<String>, RunError>
351 where
352 I: IntoIterator<Item = InventoryRow>,
353 F: FnMut(&str, &str, Vec<u8>) -> Result<(), RunError>,
354 {
355 let cfg = self
356 .get(bucket, id)
357 .ok_or_else(|| RunError::UnknownConfig(bucket.to_owned(), id.to_owned()))?;
358 let csv_bytes = render_csv(rows.into_iter());
359 let csv_md5 = md5_hex(&csv_bytes);
360 let csv_key = csv_destination_key(&cfg, now);
361 let manifest_key = manifest_destination_key(&cfg, now);
362 let manifest_body = render_manifest_json(
363 &cfg,
364 std::slice::from_ref(&csv_key),
365 std::slice::from_ref(&csv_md5),
366 now,
367 )
368 .into_bytes();
369 write_object(&cfg.destination_bucket, &csv_key, csv_bytes)?;
370 write_object(&cfg.destination_bucket, &manifest_key, manifest_body)?;
371 self.mark_run(bucket, id, now);
372 Ok(vec![csv_key, manifest_key])
373 }
374}
375
376pub fn render_csv(rows: impl Iterator<Item = InventoryRow>) -> Vec<u8> {
384 let mut out = Vec::new();
385 out.extend_from_slice(
386 b"Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus\n",
387 );
388 for row in rows {
389 let cells: [String; 10] = [
390 row.bucket,
391 row.key,
392 row.version_id.unwrap_or_default(),
393 row.is_latest.to_string(),
394 row.is_delete_marker.to_string(),
395 row.size.to_string(),
396 row.last_modified
397 .to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
398 row.etag,
399 row.storage_class,
400 row.encryption_status,
401 ];
402 for (i, cell) in cells.iter().enumerate() {
403 if i > 0 {
404 out.push(b',');
405 }
406 out.push(b'"');
407 for b in cell.as_bytes() {
408 if *b == b'"' {
409 out.extend_from_slice(b"\"\"");
410 } else {
411 out.push(*b);
412 }
413 }
414 out.push(b'"');
415 }
416 out.push(b'\n');
417 }
418 out
419}
420
421pub fn render_manifest_json(
427 config: &InventoryConfig,
428 csv_keys: &[String],
429 md5s: &[String],
430 written_at: DateTime<Utc>,
431) -> String {
432 let n = csv_keys.len().min(md5s.len());
436 let files_json: Vec<serde_json::Value> = (0..n)
437 .map(|i| {
438 serde_json::json!({
439 "key": csv_keys[i],
440 "size": 0,
446 "MD5checksum": md5s[i],
447 })
448 })
449 .collect();
450 let value = serde_json::json!({
451 "sourceBucket": config.bucket,
452 "destinationBucket": config.destination_bucket,
453 "version": "2016-11-30",
454 "creationTimestamp": written_at.timestamp_millis().to_string(),
455 "fileFormat": config.format.as_aws_str(),
456 "fileSchema": csv_header_schema(config),
457 "files": files_json,
458 });
459 serde_json::to_string_pretty(&value).expect("static JSON is always serialisable")
460}
461
462#[must_use]
466pub fn csv_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
467 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
468 let prefix = trim_trailing_slash(&config.destination_prefix);
469 format!(
470 "{prefix}/{src}/{id}/data/{stamp}.{ext}",
471 src = config.bucket,
472 id = config.id,
473 ext = config.format.file_extension()
474 )
475}
476
477#[must_use]
481pub fn manifest_destination_key(config: &InventoryConfig, now: DateTime<Utc>) -> String {
482 let stamp = now.format("%Y-%m-%dT%H%M%SZ");
483 let prefix = trim_trailing_slash(&config.destination_prefix);
484 format!(
485 "{prefix}/{src}/{id}/{stamp}/manifest.json",
486 src = config.bucket,
487 id = config.id
488 )
489}
490
491fn trim_trailing_slash(s: &str) -> &str {
492 s.strip_suffix('/').unwrap_or(s)
493}
494
495fn csv_header_schema(_cfg: &InventoryConfig) -> &'static str {
499 "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, StorageClass, EncryptionStatus"
500}
501
502fn md5_hex(bytes: &[u8]) -> String {
503 use md5::{Digest, Md5};
504 let mut h = Md5::new();
505 h.update(bytes);
506 let out = h.finalize();
507 let mut s = String::with_capacity(32);
508 for b in out {
509 s.push(hex_char(b >> 4));
510 s.push(hex_char(b & 0x0f));
511 }
512 s
513}
514
515fn hex_char(n: u8) -> char {
516 match n {
517 0..=9 => (b'0' + n) as char,
518 10..=15 => (b'a' + (n - 10)) as char,
519 _ => '0',
520 }
521}
522
523#[derive(Debug, thiserror::Error)]
527pub enum RunError {
528 #[error("no inventory configuration for bucket={0} id={1}")]
529 UnknownConfig(String, String),
530 #[error("destination write failed: {0}")]
531 Write(String),
532}
533
534#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
545pub struct ScanReport {
546 pub buckets_scanned: usize,
549 pub configs_evaluated: usize,
552 pub csvs_written: usize,
556 pub objects_listed: usize,
560 pub errors: usize,
564}
565
566fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
574 S3Request {
575 input,
576 method,
577 uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
578 headers: http::HeaderMap::new(),
579 extensions: http::Extensions::new(),
580 credentials: None,
581 region: None,
582 service: None,
583 trailing_headers: None,
584 }
585}
586
587fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
593 let mut buf = Vec::new();
594 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf)
595 .ok()?;
596 let s = std::str::from_utf8(&buf).ok()?;
597 chrono::DateTime::parse_from_rfc3339(s)
598 .ok()
599 .map(|dt| dt.with_timezone(&Utc))
600}
601
602fn encryption_status_from_head(head: &HeadObjectOutput) -> String {
624 if let Some(sse) = head.server_side_encryption.as_ref() {
629 let s = sse.as_str();
630 if s.eq_ignore_ascii_case("aws:kms") || s.eq_ignore_ascii_case("aws:kms:dsse") {
631 return "SSE-KMS".to_owned();
632 }
633 }
640 if head.sse_customer_algorithm.is_some() {
642 return "SSE-C".to_owned();
643 }
644 if head
647 .metadata
648 .as_ref()
649 .and_then(|m| m.get("s4-encrypted"))
650 .is_some()
651 {
652 return "SSE-S4".to_owned();
653 }
654 if head.ssekms_key_id.is_some() {
658 return "SSE-KMS".to_owned();
659 }
660 if let Some(sse) = head.server_side_encryption.as_ref()
665 && !sse.as_str().is_empty()
666 {
667 return "SSE-S4".to_owned();
668 }
669 "NOT-SSE".to_owned()
670}
671
672pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
710 s4: &Arc<crate::S4Service<B>>,
711) -> Result<ScanReport, String> {
712 let Some(mgr) = s4.inventory_manager().cloned() else {
713 return Ok(ScanReport::default());
716 };
717 let configs = mgr.list_all();
718 if configs.is_empty() {
719 return Ok(ScanReport::default());
720 }
721 let now = Utc::now();
722 let mut report = ScanReport {
723 configs_evaluated: configs.len(),
724 ..ScanReport::default()
725 };
726 let mut walked_buckets: std::collections::HashSet<String> = std::collections::HashSet::new();
730 for cfg in configs {
731 if !mgr.due(&cfg.bucket, &cfg.id, now) {
732 continue;
733 }
734 walked_buckets.insert(cfg.bucket.clone());
735 match scan_one_config(s4, &cfg, now, &mut report).await {
736 Ok(()) => {
737 mgr.mark_run(&cfg.bucket, &cfg.id, now);
738 report.csvs_written = report.csvs_written.saturating_add(1);
739 }
740 Err(e) => {
741 warn!(
742 bucket = %cfg.bucket,
743 id = %cfg.id,
744 error = %e,
745 "S4 inventory: scan failed for config",
746 );
747 report.errors = report.errors.saturating_add(1);
748 }
749 }
750 }
751 report.buckets_scanned = walked_buckets.len();
752 Ok(report)
753}
754
755async fn scan_one_config<B: S3 + Send + Sync + 'static>(
759 s4: &Arc<crate::S4Service<B>>,
760 cfg: &InventoryConfig,
761 now: DateTime<Utc>,
762 report: &mut ScanReport,
763) -> Result<(), String> {
764 let mut rows: Vec<InventoryRow> = Vec::new();
765 let mut continuation: Option<String> = None;
766 loop {
767 let list_input = ListObjectsV2Input {
768 bucket: cfg.bucket.clone(),
769 continuation_token: continuation.clone(),
770 ..Default::default()
771 };
772 let list_req = synthetic_request(
773 list_input,
774 http::Method::GET,
775 &format!("/{src}?list-type=2", src = cfg.bucket),
776 );
777 let resp = s4
778 .as_ref()
779 .list_objects_v2(list_req)
780 .await
781 .map_err(|e| format!("list_objects_v2: {e}"))?;
782 let output = resp.output;
783 let contents = output.contents.unwrap_or_default();
784 for obj in &contents {
785 let Some(key) = obj.key.as_deref() else {
786 continue;
787 };
788 if key.ends_with(".s4index") {
793 continue;
794 }
795 report.objects_listed = report.objects_listed.saturating_add(1);
796 let head_input = HeadObjectInput {
801 bucket: cfg.bucket.clone(),
802 key: key.to_owned(),
803 ..Default::default()
804 };
805 let head_req = synthetic_request(
806 head_input,
807 http::Method::HEAD,
808 &format!("/{src}/{key}", src = cfg.bucket),
809 );
810 let head = match s4.as_ref().head_object(head_req).await {
811 Ok(r) => r.output,
812 Err(e) => {
813 warn!(
814 bucket = %cfg.bucket,
815 key = %key,
816 error = %e,
817 "S4 inventory: head_object failed; emitting row with listing-only metadata",
818 );
819 HeadObjectOutput::default()
820 }
821 };
822 let size = head
823 .content_length
824 .unwrap_or_else(|| obj.size.unwrap_or(0))
825 .max(0) as u64;
826 let last_modified = head
827 .last_modified
828 .as_ref()
829 .and_then(timestamp_to_chrono_utc)
830 .or_else(|| obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc))
831 .unwrap_or(now);
832 let etag: String = head
833 .e_tag
834 .as_ref()
835 .or(obj.e_tag.as_ref())
836 .map(|e| e.value().to_owned())
837 .unwrap_or_default();
838 let storage_class = head
839 .storage_class
840 .as_ref()
841 .map(|s| s.as_str().to_owned())
842 .or_else(|| obj.storage_class.as_ref().map(|s| s.as_str().to_owned()))
843 .unwrap_or_else(|| "STANDARD".to_owned());
844 let encryption_status = encryption_status_from_head(&head);
845 rows.push(InventoryRow {
846 bucket: cfg.bucket.clone(),
847 key: key.to_owned(),
848 version_id: None,
849 is_latest: true,
850 is_delete_marker: false,
851 size,
852 last_modified,
853 etag,
854 storage_class,
855 encryption_status,
856 });
857 }
858 if output.is_truncated.unwrap_or(false) {
859 continuation = output.next_continuation_token;
860 if continuation.is_none() {
861 break;
865 }
866 } else {
867 break;
868 }
869 }
870
871 let csv_bytes = render_csv(rows.into_iter());
878 let csv_md5 = md5_hex(&csv_bytes);
879 let csv_key = csv_destination_key(cfg, now);
880 let manifest_key = manifest_destination_key(cfg, now);
881 let manifest_body = render_manifest_json(
882 cfg,
883 std::slice::from_ref(&csv_key),
884 std::slice::from_ref(&csv_md5),
885 now,
886 )
887 .into_bytes();
888 put_destination_object(s4, &cfg.destination_bucket, &csv_key, csv_bytes).await?;
889 put_destination_object(s4, &cfg.destination_bucket, &manifest_key, manifest_body).await?;
890 Ok(())
891}
892
893async fn put_destination_object<B: S3 + Send + Sync + 'static>(
899 s4: &Arc<crate::S4Service<B>>,
900 dst_bucket: &str,
901 dst_key: &str,
902 body: Vec<u8>,
903) -> Result<(), String> {
904 let body_bytes = bytes::Bytes::from(body);
905 let input = PutObjectInput {
906 bucket: dst_bucket.to_owned(),
907 key: dst_key.to_owned(),
908 body: Some(crate::blob::bytes_to_blob(body_bytes)),
909 ..Default::default()
910 };
911 let req = synthetic_request(
912 input,
913 http::Method::PUT,
914 &format!("/{dst_bucket}/{dst_key}"),
915 );
916 s4.as_ref()
917 .put_object(req)
918 .await
919 .map(|_| ())
920 .map_err(|e| format!("destination put_object {dst_bucket}/{dst_key}: {e}"))
921}
922
923#[cfg(test)]
924mod tests {
925 use super::*;
926
927 fn sample_config() -> InventoryConfig {
928 InventoryConfig {
929 id: "daily-csv".into(),
930 bucket: "src".into(),
931 destination_bucket: "dst".into(),
932 destination_prefix: "inv".into(),
933 frequency_hours: 24,
934 format: InventoryFormat::Csv,
935 included_object_versions: IncludedVersions::Current,
936 }
937 }
938
939 fn sample_row(key: &str, size: u64) -> InventoryRow {
940 InventoryRow {
941 bucket: "src".into(),
942 key: key.into(),
943 version_id: None,
944 is_latest: true,
945 is_delete_marker: false,
946 size,
947 last_modified: DateTime::parse_from_rfc3339("2026-05-13T12:34:56.789Z")
948 .unwrap()
949 .with_timezone(&Utc),
950 etag: "abc123".into(),
951 storage_class: "STANDARD".into(),
952 encryption_status: "NOT-SSE".into(),
953 }
954 }
955
956 #[test]
957 fn config_json_round_trip() {
958 let m = InventoryManager::new();
959 m.put(sample_config());
960 let json = m.to_json().expect("to_json");
961 let m2 = InventoryManager::from_json(&json).expect("from_json");
962 assert_eq!(m2.get("src", "daily-csv"), Some(sample_config()));
963 }
964
965 #[test]
966 fn due_returns_true_when_never_run() {
967 let m = InventoryManager::new();
968 m.put(sample_config());
969 assert!(m.due("src", "daily-csv", Utc::now()));
970 }
971
972 #[test]
973 fn due_returns_true_when_interval_elapsed() {
974 let m = InventoryManager::new();
975 m.put(sample_config());
976 let then = Utc::now() - chrono::Duration::hours(25);
977 m.mark_run("src", "daily-csv", then);
978 assert!(m.due("src", "daily-csv", Utc::now()));
979 }
980
981 #[test]
982 fn due_returns_false_when_interval_not_yet_elapsed() {
983 let m = InventoryManager::new();
984 m.put(sample_config());
985 let just_now = Utc::now() - chrono::Duration::minutes(5);
986 m.mark_run("src", "daily-csv", just_now);
987 assert!(!m.due("src", "daily-csv", Utc::now()));
988 }
989
990 #[test]
991 fn due_returns_false_when_config_missing() {
992 let m = InventoryManager::new();
993 assert!(!m.due("ghost", "nothing", Utc::now()));
994 }
995
996 #[test]
997 fn list_for_bucket_filters_and_sorts() {
998 let m = InventoryManager::new();
999 let mut a = sample_config();
1000 a.id = "z-last".into();
1001 let mut b = sample_config();
1002 b.id = "a-first".into();
1003 let mut c = sample_config();
1004 c.bucket = "other".into();
1005 c.id = "should-not-appear".into();
1006 m.put(a);
1007 m.put(b);
1008 m.put(c);
1009 let list = m.list_for_bucket("src");
1010 assert_eq!(list.len(), 2);
1011 assert_eq!(list[0].id, "a-first");
1012 assert_eq!(list[1].id, "z-last");
1013 }
1014
1015 #[test]
1016 fn render_csv_matches_aws_header_and_quotes_cells() {
1017 let rows = vec![
1018 sample_row("a/b.txt", 100),
1019 sample_row("comma,here.txt", 200),
1020 sample_row("quote\"inside.txt", 300),
1021 ];
1022 let csv = render_csv(rows.into_iter());
1023 let s = String::from_utf8(csv).expect("utf8");
1024 let mut lines = s.lines();
1025 assert_eq!(
1026 lines.next().unwrap(),
1027 "Bucket,Key,VersionId,IsLatest,IsDeleteMarker,Size,LastModifiedDate,ETag,StorageClass,EncryptionStatus"
1028 );
1029 let row1 = lines.next().unwrap();
1031 assert!(row1.starts_with("\"src\",\"a/b.txt\","));
1032 assert!(row1.contains(",\"100\","));
1033 assert!(row1.contains("\"2026-05-13T12:34:56.789Z\""));
1034 let row2 = lines.next().unwrap();
1036 assert!(row2.contains("\"comma,here.txt\""));
1037 let row3 = lines.next().unwrap();
1039 assert!(row3.contains("\"quote\"\"inside.txt\""));
1040 assert_eq!(lines.next(), None);
1041 }
1042
1043 #[test]
1044 fn render_manifest_json_carries_required_fields() {
1045 let cfg = sample_config();
1046 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1047 .unwrap()
1048 .with_timezone(&Utc);
1049 let manifest = render_manifest_json(
1050 &cfg,
1051 &["inv/src/daily-csv/data/2026-05-13T000000Z.csv".into()],
1052 &["d41d8cd98f00b204e9800998ecf8427e".into()],
1053 now,
1054 );
1055 let v: serde_json::Value = serde_json::from_str(&manifest).expect("manifest must be JSON");
1056 assert_eq!(v["sourceBucket"], "src");
1057 assert_eq!(v["destinationBucket"], "dst");
1058 assert_eq!(v["fileFormat"], "CSV");
1059 assert_eq!(v["version"], "2016-11-30");
1060 let files = v["files"].as_array().expect("files array");
1061 assert_eq!(files.len(), 1);
1062 assert_eq!(
1063 files[0]["key"],
1064 "inv/src/daily-csv/data/2026-05-13T000000Z.csv"
1065 );
1066 assert_eq!(files[0]["MD5checksum"], "d41d8cd98f00b204e9800998ecf8427e");
1067 assert_eq!(v["creationTimestamp"], now.timestamp_millis().to_string());
1068 let schema = v["fileSchema"].as_str().expect("fileSchema string");
1069 assert!(schema.starts_with("Bucket, Key, VersionId"));
1070 assert!(schema.ends_with("StorageClass, EncryptionStatus"));
1071 }
1072
1073 #[test]
1074 fn destination_keys_are_under_prefix_and_namespaced_by_source_bucket() {
1075 let cfg = sample_config();
1076 let now = DateTime::parse_from_rfc3339("2026-05-13T01:02:03.000Z")
1077 .unwrap()
1078 .with_timezone(&Utc);
1079 let csv_key = csv_destination_key(&cfg, now);
1080 let manifest_key = manifest_destination_key(&cfg, now);
1081 assert_eq!(csv_key, "inv/src/daily-csv/data/2026-05-13T010203Z.csv");
1082 assert_eq!(
1083 manifest_key,
1084 "inv/src/daily-csv/2026-05-13T010203Z/manifest.json"
1085 );
1086 let mut cfg2 = cfg.clone();
1088 cfg2.destination_prefix = "inv/".into();
1089 assert_eq!(
1090 csv_destination_key(&cfg2, now),
1091 "inv/src/daily-csv/data/2026-05-13T010203Z.csv"
1092 );
1093 }
1094
1095 #[test]
1096 fn run_once_writes_csv_and_manifest_and_marks_run() {
1097 let m = InventoryManager::new();
1098 m.put(sample_config());
1099 let now = DateTime::parse_from_rfc3339("2026-05-13T00:00:00.000Z")
1100 .unwrap()
1101 .with_timezone(&Utc);
1102 let written = std::sync::Mutex::new(Vec::<(String, String, Vec<u8>)>::new());
1103 let keys = m
1104 .run_once_for_test(
1105 "src",
1106 "daily-csv",
1107 vec![sample_row("a", 1), sample_row("b", 2)],
1108 now,
1109 |dst_bucket, dst_key, body| {
1110 written
1111 .lock()
1112 .unwrap()
1113 .push((dst_bucket.to_owned(), dst_key.to_owned(), body));
1114 Ok(())
1115 },
1116 )
1117 .expect("run_once_for_test");
1118 assert_eq!(keys.len(), 2);
1119 assert!(keys[0].ends_with(".csv"));
1120 assert!(keys[1].ends_with("manifest.json"));
1121 let written = written.into_inner().unwrap();
1122 assert_eq!(written.len(), 2);
1123 for (bucket, _, _) in &written {
1124 assert_eq!(bucket, "dst");
1125 }
1126 assert!(!m.due("src", "daily-csv", now + chrono::Duration::hours(1)));
1129 assert!(m.due("src", "daily-csv", now + chrono::Duration::hours(25)));
1130 }
1131
1132 #[test]
1133 fn run_once_unknown_config_is_an_error() {
1134 let m = InventoryManager::new();
1135 let now = Utc::now();
1136 let err = m.run_once_for_test(
1137 "ghost",
1138 "nothing",
1139 std::iter::empty(),
1140 now,
1141 |_, _, _| Ok(()),
1142 );
1143 assert!(matches!(err, Err(RunError::UnknownConfig(_, _))));
1144 }
1145
1146 #[test]
1157 fn encryption_status_sse_kms_via_aws_kms_string() {
1158 let head = HeadObjectOutput {
1159 server_side_encryption: Some(ServerSideEncryption::from_static(
1160 ServerSideEncryption::AWS_KMS,
1161 )),
1162 ..Default::default()
1163 };
1164 assert_eq!(encryption_status_from_head(&head), "SSE-KMS");
1165 }
1166
1167 #[test]
1168 fn encryption_status_sse_c_via_customer_algorithm() {
1169 let head = HeadObjectOutput {
1170 sse_customer_algorithm: Some("AES256".to_owned()),
1171 ..Default::default()
1172 };
1173 assert_eq!(encryption_status_from_head(&head), "SSE-C");
1174 }
1175
1176 #[test]
1177 fn encryption_status_sse_s4_via_metadata_flag() {
1178 let mut metadata = HashMap::new();
1179 metadata.insert("s4-encrypted".to_owned(), "aes-256-gcm".to_owned());
1180 let head = HeadObjectOutput {
1181 metadata: Some(metadata),
1182 ..Default::default()
1183 };
1184 assert_eq!(encryption_status_from_head(&head), "SSE-S4");
1185 }
1186
1187 #[test]
1188 fn encryption_status_not_sse_when_all_absent() {
1189 let head = HeadObjectOutput::default();
1190 assert_eq!(encryption_status_from_head(&head), "NOT-SSE");
1191 }
1192
1193 use std::collections::HashMap as StdHashMap;
1204 use std::sync::Mutex as StdMutex;
1205
1206 use bytes::Bytes;
1207 use s3s::dto as dto2;
1208 use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1209 use s4_codec::dispatcher::AlwaysDispatcher;
1210 use s4_codec::passthrough::Passthrough;
1211 use s4_codec::{CodecKind, CodecRegistry};
1212
1213 use crate::S4Service;
1214
1215 #[derive(Default)]
1216 struct InvScannerMemBackend {
1217 objects: StdMutex<StdHashMap<(String, String), InvScannerStored>>,
1218 }
1219
1220 #[derive(Clone)]
1221 struct InvScannerStored {
1222 body: Bytes,
1223 last_modified: dto2::Timestamp,
1224 }
1225
1226 impl InvScannerMemBackend {
1227 fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1228 self.objects.lock().unwrap().insert(
1229 (bucket.to_owned(), key.to_owned()),
1230 InvScannerStored {
1231 body,
1232 last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1233 },
1234 );
1235 }
1236 }
1237
1238 #[async_trait::async_trait]
1239 impl S3 for InvScannerMemBackend {
1240 async fn put_object(
1241 &self,
1242 req: S3Request<dto2::PutObjectInput>,
1243 ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1244 let body = match req.input.body {
1247 Some(blob) => crate::blob::collect_blob(blob, usize::MAX)
1248 .await
1249 .map_err(|e| {
1250 S3Error::with_message(S3ErrorCode::InternalError, format!("{e}"))
1251 })?,
1252 None => Bytes::new(),
1253 };
1254 self.put_now(&req.input.bucket, &req.input.key, body);
1255 Ok(S3Response::new(dto2::PutObjectOutput::default()))
1256 }
1257
1258 async fn head_object(
1259 &self,
1260 req: S3Request<dto2::HeadObjectInput>,
1261 ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1262 let key = (req.input.bucket.clone(), req.input.key.clone());
1263 let lock = self.objects.lock().unwrap();
1264 let stored = lock
1265 .get(&key)
1266 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1267 Ok(S3Response::new(dto2::HeadObjectOutput {
1268 content_length: Some(stored.body.len() as i64),
1269 last_modified: Some(stored.last_modified.clone()),
1270 e_tag: Some(dto2::ETag::Strong(format!("etag-{}", stored.body.len()))),
1271 ..Default::default()
1272 }))
1273 }
1274
1275 async fn list_objects_v2(
1276 &self,
1277 req: S3Request<dto2::ListObjectsV2Input>,
1278 ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1279 let prefix = req.input.bucket.clone();
1280 let lock = self.objects.lock().unwrap();
1281 let mut contents: Vec<dto2::Object> = lock
1282 .iter()
1283 .filter(|((b, _), _)| b == &prefix)
1284 .map(|((_, k), v)| dto2::Object {
1285 key: Some(k.clone()),
1286 size: Some(v.body.len() as i64),
1287 last_modified: Some(v.last_modified.clone()),
1288 e_tag: Some(dto2::ETag::Strong(format!("etag-{}", v.body.len()))),
1289 ..Default::default()
1290 })
1291 .collect();
1292 contents.sort_by(|a, b| a.key.cmp(&b.key));
1293 let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1294 Ok(S3Response::new(dto2::ListObjectsV2Output {
1295 name: Some(prefix),
1296 contents: Some(contents),
1297 key_count: Some(key_count),
1298 is_truncated: Some(false),
1299 ..Default::default()
1300 }))
1301 }
1302
1303 async fn get_object(
1304 &self,
1305 req: S3Request<dto2::GetObjectInput>,
1306 ) -> S3Result<S3Response<dto2::GetObjectOutput>> {
1307 let key = (req.input.bucket.clone(), req.input.key.clone());
1308 let lock = self.objects.lock().unwrap();
1309 let stored = lock
1310 .get(&key)
1311 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1312 Ok(S3Response::new(dto2::GetObjectOutput {
1313 content_length: Some(stored.body.len() as i64),
1314 last_modified: Some(stored.last_modified.clone()),
1315 body: Some(crate::blob::bytes_to_blob(stored.body.clone())),
1316 ..Default::default()
1317 }))
1318 }
1319 }
1320
1321 fn make_codec() -> (Arc<CodecRegistry>, Arc<AlwaysDispatcher>) {
1322 (
1323 Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough))),
1324 Arc::new(AlwaysDispatcher(CodecKind::Passthrough)),
1325 )
1326 }
1327
1328 fn make_inv_service(
1334 backend: InvScannerMemBackend,
1335 with_inv: Option<Arc<InventoryManager>>,
1336 ) -> Arc<S4Service<InvScannerMemBackend>> {
1337 let (registry, dispatcher) = make_codec();
1338 let svc = S4Service::new(backend, registry, dispatcher);
1339 let svc = match with_inv {
1340 Some(m) => svc.with_inventory(m),
1341 None => svc,
1342 };
1343 Arc::new(svc)
1344 }
1345
1346 #[tokio::test]
1347 async fn run_scan_once_no_inventory_manager_returns_empty_report() {
1348 let s4 = make_inv_service(InvScannerMemBackend::default(), None);
1350 let report = run_scan_once(&s4).await.expect("scan");
1351 assert_eq!(report, ScanReport::default());
1352 }
1353
1354 #[tokio::test]
1355 async fn run_scan_once_no_configs_returns_empty_report() {
1356 let mgr = Arc::new(InventoryManager::new());
1358 let s4 = make_inv_service(InvScannerMemBackend::default(), Some(Arc::clone(&mgr)));
1359 let report = run_scan_once(&s4).await.expect("scan");
1360 assert_eq!(report.configs_evaluated, 0);
1361 assert_eq!(report.csvs_written, 0);
1362 assert_eq!(report.objects_listed, 0);
1363 }
1364
1365 #[tokio::test]
1366 async fn run_scan_once_walks_bucket_and_writes_csv_and_manifest() {
1367 let mgr = Arc::new(InventoryManager::new());
1372 mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1373 let backend = InvScannerMemBackend::default();
1374 for (key, body) in [
1375 ("alpha.txt", &b"AAA"[..]),
1376 ("nested/beta.bin", &b"BB"[..]),
1377 ("z.txt", &b"Z"[..]),
1378 ] {
1379 backend.put_now("src", key, Bytes::copy_from_slice(body));
1380 }
1381 let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1382
1383 let report = run_scan_once(&s4).await.expect("scan");
1384 assert_eq!(report.configs_evaluated, 1);
1385 assert_eq!(report.buckets_scanned, 1);
1386 assert_eq!(report.objects_listed, 3);
1387 assert_eq!(report.csvs_written, 1);
1388 assert_eq!(report.errors, 0);
1389
1390 let list_req = synthetic_request(
1395 ListObjectsV2Input {
1396 bucket: "dst".into(),
1397 ..Default::default()
1398 },
1399 http::Method::GET,
1400 "/dst?list-type=2",
1401 );
1402 let list_resp = s4
1403 .as_ref()
1404 .list_objects_v2(list_req)
1405 .await
1406 .expect("post-scan list");
1407 let dst_keys: Vec<String> = list_resp
1408 .output
1409 .contents
1410 .unwrap_or_default()
1411 .into_iter()
1412 .filter_map(|o| o.key)
1413 .collect();
1414 let csv_keys: Vec<String> = dst_keys
1415 .iter()
1416 .filter(|k| k.ends_with(".csv"))
1417 .cloned()
1418 .collect();
1419 let manifest_keys: Vec<String> = dst_keys
1420 .iter()
1421 .filter(|k| k.ends_with("manifest.json"))
1422 .cloned()
1423 .collect();
1424 assert_eq!(
1425 csv_keys.len(),
1426 1,
1427 "exactly one CSV must land; got {dst_keys:?}"
1428 );
1429 assert_eq!(
1430 manifest_keys.len(),
1431 1,
1432 "exactly one manifest.json must land; got {dst_keys:?}"
1433 );
1434 assert!(
1435 csv_keys[0].starts_with("inv/src/d1/data/"),
1436 "CSV key must be under <prefix>/<bucket>/<id>/data/, got {}",
1437 csv_keys[0]
1438 );
1439 assert!(
1440 manifest_keys[0].starts_with("inv/src/d1/"),
1441 "manifest key must be under <prefix>/<bucket>/<id>/, got {}",
1442 manifest_keys[0]
1443 );
1444
1445 let get_req = synthetic_request(
1447 GetObjectInput {
1448 bucket: "dst".into(),
1449 key: csv_keys[0].clone(),
1450 ..Default::default()
1451 },
1452 http::Method::GET,
1453 &format!("/dst/{}", csv_keys[0]),
1454 );
1455 let get_resp = s4.as_ref().get_object(get_req).await.expect("read CSV");
1456 let body = get_resp.output.body.expect("body");
1457 let csv_bytes = crate::blob::collect_blob(body, usize::MAX)
1458 .await
1459 .expect("collect");
1460 let csv_text = std::str::from_utf8(&csv_bytes).expect("utf8");
1461 let line_count = csv_text.lines().count();
1462 assert_eq!(line_count, 4, "header + 3 data rows; got:\n{csv_text}");
1463 assert!(csv_text.starts_with("Bucket,Key,VersionId"));
1464 assert!(csv_text.contains("\"alpha.txt\""));
1466 assert!(csv_text.contains("\"nested/beta.bin\""));
1467 assert!(csv_text.contains("\"z.txt\""));
1468 }
1469
1470 #[tokio::test]
1471 async fn run_scan_once_skips_configs_that_are_not_due() {
1472 let mgr = Arc::new(InventoryManager::new());
1476 mgr.put(InventoryConfig::daily_csv("d1", "src", "dst", "inv"));
1477 mgr.mark_run("src", "d1", Utc::now());
1478 let backend = InvScannerMemBackend::default();
1479 backend.put_now("src", "alpha.txt", Bytes::from_static(b"A"));
1480 let s4 = make_inv_service(backend, Some(Arc::clone(&mgr)));
1481
1482 let report = run_scan_once(&s4).await.expect("scan");
1483 assert_eq!(report.configs_evaluated, 1);
1484 assert_eq!(report.buckets_scanned, 0, "no walk; due() returned false");
1485 assert_eq!(report.csvs_written, 0);
1486 assert_eq!(report.objects_listed, 0);
1487 assert_eq!(report.errors, 0);
1488
1489 let list_req = synthetic_request(
1491 ListObjectsV2Input {
1492 bucket: "dst".into(),
1493 ..Default::default()
1494 },
1495 http::Method::GET,
1496 "/dst?list-type=2",
1497 );
1498 let list_resp = s4
1499 .as_ref()
1500 .list_objects_v2(list_req)
1501 .await
1502 .expect("post-scan list");
1503 assert!(
1504 list_resp.output.contents.unwrap_or_default().is_empty(),
1505 "no destination writes expected when config is not due"
1506 );
1507 }
1508
1509 #[test]
1514 fn inventory_to_json_after_panic_recovers_via_poison() {
1515 let mgr = std::sync::Arc::new(InventoryManager::new());
1516 mgr.put(InventoryConfig::daily_csv("inv1", "src", "dst", "reports/"));
1517 let mgr_cl = std::sync::Arc::clone(&mgr);
1518 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1519 let mut g = mgr_cl.configs.write().expect("clean lock");
1520 g.insert(
1521 ("src2".into(), "inv2".into()),
1522 InventoryConfig::daily_csv("inv2", "src2", "dst2", "r/"),
1523 );
1524 panic!("force-poison");
1525 }));
1526 assert!(
1527 mgr.configs.is_poisoned(),
1528 "write panic must poison configs lock"
1529 );
1530 let json = mgr.to_json().expect("to_json after poison must succeed");
1531 let mgr2 = InventoryManager::from_json(&json).expect("from_json");
1532 assert!(
1533 mgr2.get("src", "inv1").is_some(),
1534 "recovered snapshot keeps original entry"
1535 );
1536 }
1537}