s4_server/lifecycle.rs
1//! S3 Lifecycle execution — per-bucket rule evaluation + manager skeleton
2//! (v0.6 #37).
3//!
4//! AWS S3 Lifecycle attaches a **list of rules** to a bucket; each rule may
5//! request that S3
6//!
7//! 1. **Expire** an object once its age (or the calendar date) crosses a
8//! threshold (`Expiration { Days | Date }`),
9//! 2. **Transition** an object to a different storage class (`Transition
10//! { Days, StorageClass }` — `STANDARD_IA`, `GLACIER_IR`, ...),
11//! 3. **Expire noncurrent versions** in a versioning-enabled bucket
12//! (`NoncurrentVersionExpiration { NoncurrentDays }`).
13//!
14//! Until v0.6 #37 the matching `PutBucketLifecycleConfiguration` /
15//! `GetBucketLifecycleConfiguration` / `DeleteBucketLifecycle` handlers
16//! in `crates/s4-server/src/service.rs` were pure passthroughs (the s3s
17//! framework's default backend stored them but nothing read the rules).
18//! This module owns the in-memory configuration store + the rule
19//! evaluator that decides, for any single object, whether an action
20//! should fire **right now**.
21//!
22//! ## responsibilities (v0.6 #37)
23//!
24//! - in-memory `bucket -> LifecycleConfig` map with JSON snapshot
25//! round-trip (mirroring `versioning.rs` / `object_lock.rs` /
26//! `inventory.rs`'s shape so `--lifecycle-state-file` is a one-line
27//! addition in `main.rs`).
28//! - per-bucket action counters (`actions_total`) — bumped by the
29//! future scanner when an Expiration / Transition /
30//! NoncurrentExpiration action is taken, surfaced via Prometheus
31//! (`s4_lifecycle_actions_total`, see `metrics.rs`).
32//! - [`LifecycleManager::evaluate`] — given one (bucket, key, age,
33//! size, tags) tuple, walk the bucket's rules in declaration order
34//! and return the first matching action. Returns `None` when no
35//! rule matches (or when the matching rule is `Disabled`).
36//! - [`evaluate_batch`] — batched form for the test path: walks a
37//! slice of `(key, age, size, tags)` tuples and returns the (key,
38//! action) pairs that should fire. The actual backend invocation
39//! (S3.delete_object / metadata rewrite) is the caller's job.
40//!
41//! ## scope limitations (v0.6 #37)
42//!
43//! - **Background scanner is a skeleton only.** `main.rs`'s
44//! `--lifecycle-scan-interval-hours` flag spawns a tokio task that
45//! logs the bucket list and stamps a "would-have-run" marker;
46//! walking the source bucket via `list_objects_v2` and actually
47//! invoking `delete_object` / metadata rewrite for each evaluated
48//! action is deferred to v0.7+. Wiring the scheduler to walk a real
49//! bucket end-to-end requires a back-reference from the scheduler
50//! into `S4Service` for the `list_objects_v2` walk and that
51//! reshuffle is out of scope for this issue. The
52//! [`crate::S4Service::run_lifecycle_once_for_test`] entry covers
53//! the in-memory equivalent so the unit + E2E tests exercise the
54//! evaluator end-to-end.
55//! - **`AbortIncompleteMultipartUpload`** is parsed and stored on the
56//! `LifecycleRule` (so PutBucketLifecycleConfiguration round-trips
57//! the field) but not enforced — multipart abort sweeping is a
58//! separate scanner that lives next to the multipart upload manager
59//! (v0.7+).
60//! - **`expiration_date` (calendar date)** is supported in the
61//! evaluator: a rule with `expiration_date` past `now` fires
62//! Expiration immediately. Same wire form as AWS S3.
63//! - **Multi-instance replication.** All state is single-instance
64//! in-memory; `--lifecycle-state-file <PATH>` provides restart
65//! recovery via JSON snapshot, matching the
66//! `--versioning-state-file` shape.
67//! - **Object Lock interplay**: the evaluator does NOT consult the
68//! `ObjectLockManager` directly (the evaluator API is
69//! object-tags-and-size only); the scanner caller is expected to
70//! skip locked objects — see the `evaluate_batch_skips_locked` test
71//! for the canonical pattern. Locking always wins over Lifecycle.
72//! - **Versioning interplay**: the evaluator treats noncurrent
73//! versions as a separate input — pass `is_noncurrent = true` to
74//! [`LifecycleManager::evaluate_with_flags`] for noncurrent version
75//! expiration matching. The legacy `evaluate` shorthand defaults
76//! `is_noncurrent = false` (current version) so existing call sites
77//! stay one-liners.
78
79use std::collections::HashMap;
80use std::sync::Arc;
81use std::sync::RwLock;
82
83use chrono::{DateTime, Duration, Utc};
84use s3s::S3;
85use s3s::S3Request;
86use s3s::dto::*;
87use serde::{Deserialize, Serialize};
88use tracing::warn;
89
90/// Whether a rule is currently being applied. Mirrors AWS S3
91/// `ExpirationStatus` (`"Enabled"` / `"Disabled"`).
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub enum LifecycleStatus {
94 Enabled,
95 Disabled,
96}
97
98impl LifecycleStatus {
99 /// Wire form used by AWS S3 (`"Enabled"` / `"Disabled"`).
100 #[must_use]
101 pub fn as_aws_str(self) -> &'static str {
102 match self {
103 Self::Enabled => "Enabled",
104 Self::Disabled => "Disabled",
105 }
106 }
107
108 /// Parse the AWS wire form (case-insensitive). Falls back to `Disabled`
109 /// on unrecognised input — this matches AWS conservative behaviour
110 /// (an unparseable status is treated as "off" so a typo doesn't silently
111 /// expire data).
112 #[must_use]
113 pub fn from_aws_str(s: &str) -> Self {
114 if s.eq_ignore_ascii_case("Enabled") {
115 Self::Enabled
116 } else {
117 Self::Disabled
118 }
119 }
120}
121
122/// Per-rule object filter. AWS S3 represents the filter as one of `Prefix`,
123/// `Tag`, `ObjectSizeGreaterThan`, `ObjectSizeLessThan`, or `And` (= AND of
124/// any subset of those predicates). For internal storage we flatten the
125/// "And" form into a struct of optional fields plus a vector of (key, value)
126/// tags — every present field must match (logical AND). An empty filter (all
127/// fields `None` / empty `tags`) matches every object in the bucket.
128#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
129pub struct LifecycleFilter {
130 /// Object key prefix (empty / `None` = no prefix gating).
131 #[serde(default)]
132 pub prefix: Option<String>,
133 /// Logical AND across every entry: every (key, value) must match the
134 /// object's own tag set.
135 #[serde(default)]
136 pub tags: Vec<(String, String)>,
137 /// Object must be *strictly greater than* this size in bytes.
138 #[serde(default)]
139 pub object_size_greater_than: Option<u64>,
140 /// Object must be *strictly less than* this size in bytes.
141 #[serde(default)]
142 pub object_size_less_than: Option<u64>,
143}
144
145impl LifecycleFilter {
146 /// `true` when this filter accepts the candidate. Empty filter accepts
147 /// every object. Tag matching is AND of all listed tags (each present in
148 /// `object_tags` with the matching value).
149 #[must_use]
150 pub fn matches(&self, key: &str, size: u64, object_tags: &[(String, String)]) -> bool {
151 if let Some(p) = &self.prefix
152 && !key.starts_with(p)
153 {
154 return false;
155 }
156 if let Some(min) = self.object_size_greater_than
157 && size <= min
158 {
159 return false;
160 }
161 if let Some(max) = self.object_size_less_than
162 && size >= max
163 {
164 return false;
165 }
166 for (tk, tv) in &self.tags {
167 let matched = object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv);
168 if !matched {
169 return false;
170 }
171 }
172 true
173 }
174}
175
176/// A single transition step (object age threshold + target storage class).
177/// `days` is days since the object was created. AWS S3 also accepts `Date`
178/// for transitions but Lifecycle deployments overwhelmingly use `Days`; the
179/// `Date` form is omitted here on purpose to keep the evaluator narrow
180/// (operators wanting calendar transitions can synthesise a one-shot rule
181/// at the cadence of their scanner).
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
183pub struct TransitionRule {
184 pub days: u32,
185 /// Target storage class (`"STANDARD_IA"` / `"GLACIER_IR"` /
186 /// `"GLACIER"` / `"DEEP_ARCHIVE"` / `"INTELLIGENT_TIERING"` /
187 /// `"ONEZONE_IA"`). Stored as the AWS wire string so PutBucket /
188 /// GetBucket round-trip is a no-op.
189 pub storage_class: String,
190}
191
192/// One lifecycle rule. AWS S3's `LifecycleRule` flattened into the subset
193/// the v0.6 #37 evaluator handles. `id` is the operator-supplied label and
194/// makes Get / Put round-trips non-lossy.
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196pub struct LifecycleRule {
197 pub id: String,
198 pub status: LifecycleStatus,
199 #[serde(default)]
200 pub filter: LifecycleFilter,
201 /// Days since the object was created. Mutually exclusive with
202 /// [`Self::expiration_date`] in AWS — both fields are accepted here on
203 /// input (the evaluator picks `expiration_days` first, then
204 /// `expiration_date`) so a malformed rule with both set still evaluates
205 /// deterministically rather than silently dropping the action.
206 #[serde(default)]
207 pub expiration_days: Option<u32>,
208 /// Calendar date past which matching objects are expired (AWS wire form
209 /// is ISO 8601; here we keep it as a `DateTime<Utc>` so round-trips
210 /// through `serde_json` survive without re-parsing).
211 #[serde(default)]
212 pub expiration_date: Option<DateTime<Utc>>,
213 /// Transition steps in declaration order. The evaluator picks the
214 /// deepest transition (largest `days` ≤ object age) and resolves any
215 /// conflict with expiration in [`LifecycleManager::evaluate_with_flags`].
216 #[serde(default)]
217 pub transitions: Vec<TransitionRule>,
218 /// Days an object has been noncurrent before the noncurrent-version
219 /// expiration fires. Only consulted when the evaluator is asked about
220 /// a noncurrent object (`is_noncurrent = true`).
221 #[serde(default)]
222 pub noncurrent_version_expiration_days: Option<u32>,
223 /// Days after a multipart upload is initiated before the abort fires.
224 /// Stored so PutBucket round-trips, but **not enforced** in the
225 /// v0.6 #37 evaluator — multipart sweeping lives elsewhere.
226 #[serde(default)]
227 pub abort_incomplete_multipart_upload_days: Option<u32>,
228}
229
230impl LifecycleRule {
231 /// Convenience constructor for a "expire after N days" rule. Useful in
232 /// tests + operator scripts.
233 #[must_use]
234 pub fn expire_after_days(id: impl Into<String>, days: u32) -> Self {
235 Self {
236 id: id.into(),
237 status: LifecycleStatus::Enabled,
238 filter: LifecycleFilter::default(),
239 expiration_days: Some(days),
240 expiration_date: None,
241 transitions: Vec::new(),
242 noncurrent_version_expiration_days: None,
243 abort_incomplete_multipart_upload_days: None,
244 }
245 }
246}
247
248/// Per-bucket lifecycle configuration (ordered list of rules — first match
249/// wins, matching AWS S3 semantics).
250#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
251pub struct LifecycleConfig {
252 pub rules: Vec<LifecycleRule>,
253}
254
255/// The action a single rule wants to take **right now** for a candidate
256/// object.
257#[derive(Clone, Debug, PartialEq, Eq)]
258pub enum LifecycleAction {
259 /// Delete the object (`Expiration` / `NoncurrentVersionExpiration`).
260 Expire,
261 /// Move the object to a different storage class (`Transition`). The
262 /// inner string is the AWS wire form (e.g. `"GLACIER_IR"`).
263 Transition { storage_class: String },
264 /// v0.8.3 #69 (audit M-2): abort an in-flight multipart upload that
265 /// has been initiated longer ago than the rule's
266 /// `abort_incomplete_multipart_upload_days`. The inner string is the
267 /// backend-issued `upload_id` (so the scanner can route the
268 /// `AbortMultipartUpload` call without re-listing). Same wire
269 /// semantic as AWS S3 `AbortIncompleteMultipartUpload`.
270 AbortMultipartUpload { upload_id: String },
271}
272
273impl LifecycleAction {
274 /// Stable label suitable for a metric counter
275 /// (`s4_lifecycle_actions_total{action="..."}`).
276 #[must_use]
277 pub fn metric_label(&self) -> &'static str {
278 match self {
279 Self::Expire => "expire",
280 Self::Transition { .. } => "transition",
281 Self::AbortMultipartUpload { .. } => "abort_incomplete_multipart",
282 }
283 }
284}
285
286/// v0.8.3 #69: one in-flight multipart upload the lifecycle scanner
287/// considers for abort. Mirrors the (subset of) `MultipartUpload` fields
288/// the rule evaluator needs (key, upload_id, initiated). `tags` is kept
289/// in the shape the existing object-path evaluator uses
290/// (`Vec<(String, String)>`) so a future enhancement that surfaces
291/// upload-time tags from `MultipartStateStore` can flow through the
292/// same filter check without API churn — AWS S3 itself does not attach
293/// tags to in-flight multipart uploads, so for the scanner-driven path
294/// the slice is always empty (the filter's prefix / size predicates
295/// still apply via [`LifecycleFilter::matches`], passing size = 0).
296#[derive(Clone, Debug)]
297pub struct MultipartUploadCandidate {
298 pub upload_id: String,
299 pub key: String,
300 pub initiated: DateTime<Utc>,
301 pub tags: Vec<(String, String)>,
302}
303
304/// snapshot のシリアライズ format。`to_json` / `from_json` 用。
305#[derive(Debug, Default, Serialize, Deserialize)]
306struct LifecycleSnapshot {
307 by_bucket: HashMap<String, LifecycleConfig>,
308}
309
310/// Per-bucket lifecycle configuration manager.
311///
312/// All read / write operations go through `RwLock` for thread safety;
313/// clones are cheap (`Arc<LifecycleManager>` is the expected handle shape).
314/// `actions_total` is a parallel `RwLock<HashMap<...>>` of `(bucket,
315/// action_label) -> count` so the future background scanner can stamp
316/// successful actions and operators can `GET /metrics` to see the running
317/// totals (the metric is also surfaced via `metrics::counter!` — see
318/// [`crate::metrics::record_lifecycle_action`]).
319#[derive(Debug, Default)]
320pub struct LifecycleManager {
321 by_bucket: RwLock<HashMap<String, LifecycleConfig>>,
322 /// `(bucket, action_label) -> count`. Bumped by the scanner via
323 /// [`Self::record_action`]. Action labels are the
324 /// [`LifecycleAction::metric_label`] values
325 /// (`"expire"` / `"transition"`).
326 actions_total: RwLock<HashMap<(String, String), u64>>,
327}
328
329impl LifecycleManager {
330 /// Empty manager — no bucket has rules.
331 #[must_use]
332 pub fn new() -> Self {
333 Self::default()
334 }
335
336 /// Replace (or create) the lifecycle configuration for `bucket`. Drops
337 /// any previously-attached rules in one shot — matches AWS S3
338 /// `PutBucketLifecycleConfiguration` (full replace, no merge).
339 pub fn put(&self, bucket: &str, config: LifecycleConfig) {
340 self.by_bucket
341 .write()
342 .expect("lifecycle state RwLock poisoned")
343 .insert(bucket.to_owned(), config);
344 }
345
346 /// Return a clone of the bucket's configuration, if any.
347 #[must_use]
348 pub fn get(&self, bucket: &str) -> Option<LifecycleConfig> {
349 self.by_bucket
350 .read()
351 .expect("lifecycle state RwLock poisoned")
352 .get(bucket)
353 .cloned()
354 }
355
356 /// Drop the bucket's lifecycle configuration (idempotent — missing
357 /// bucket is OK).
358 pub fn delete(&self, bucket: &str) {
359 self.by_bucket
360 .write()
361 .expect("lifecycle state RwLock poisoned")
362 .remove(bucket);
363 }
364
365 /// JSON snapshot for restart-recoverable state. Pair with
366 /// [`Self::from_json`].
367 pub fn to_json(&self) -> Result<String, serde_json::Error> {
368 let by_bucket = self
369 .by_bucket
370 .read()
371 .expect("lifecycle state RwLock poisoned")
372 .clone();
373 let snap = LifecycleSnapshot { by_bucket };
374 serde_json::to_string(&snap)
375 }
376
377 /// Restore from a JSON snapshot produced by [`Self::to_json`]. Action
378 /// counters are intentionally not snapshotted — they're transient
379 /// observability data and should reset across process restarts so
380 /// `rate(s4_lifecycle_actions_total[1h])` doesn't double-count.
381 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
382 let snap: LifecycleSnapshot = serde_json::from_str(s)?;
383 Ok(Self {
384 by_bucket: RwLock::new(snap.by_bucket),
385 actions_total: RwLock::new(HashMap::new()),
386 })
387 }
388
389 /// Evaluate which rule (if any) applies to a single **current-version**
390 /// object right now. Walks the bucket's rules in declaration order;
391 /// returns the first matching action. Returns `None` when no rule
392 /// matches (or when the matching rule is `Disabled`, or when the
393 /// bucket has no lifecycle configuration).
394 ///
395 /// Within a single rule the precedence is:
396 ///
397 /// 1. Pick the deepest transition whose `days` threshold is currently
398 /// met (= largest `days ≤ object age`).
399 /// 2. Conflict with expiration: if `expiration_days <=
400 /// transition_days` for the chosen transition, expiration wins
401 /// (the rule wants the object gone before it would have been
402 /// transitioned). Otherwise transition wins (e.g. transition at
403 /// 30d, expiration at 365d, age 60d → transition fires now,
404 /// expiration is future).
405 /// 3. `expiration_date` matches when `now >= expiration_date` and no
406 /// transition is currently applicable.
407 ///
408 /// `object_age` is "now - created_at" supplied by the caller — keeping
409 /// the evaluator pure of the wall clock makes deterministic testing
410 /// trivial.
411 #[must_use]
412 pub fn evaluate(
413 &self,
414 bucket: &str,
415 key: &str,
416 object_age: Duration,
417 object_size: u64,
418 object_tags: &[(String, String)],
419 ) -> Option<LifecycleAction> {
420 self.evaluate_with_flags(
421 bucket,
422 key,
423 object_age,
424 object_size,
425 object_tags,
426 EvaluateFlags::default(),
427 )
428 }
429
430 /// Full-form evaluator with flags for noncurrent-version handling.
431 /// Use this when the scanner is walking a versioning-enabled bucket;
432 /// pass `is_noncurrent = true` for entries that are not the latest
433 /// non-delete-marker version.
434 #[must_use]
435 pub fn evaluate_with_flags(
436 &self,
437 bucket: &str,
438 key: &str,
439 object_age: Duration,
440 object_size: u64,
441 object_tags: &[(String, String)],
442 flags: EvaluateFlags,
443 ) -> Option<LifecycleAction> {
444 let cfg = self.get(bucket)?;
445 let now_for_date = flags.now.unwrap_or_else(Utc::now);
446 let age_days = object_age.num_days().max(0);
447 let age_days_u32 = u32::try_from(age_days).unwrap_or(u32::MAX);
448 for rule in &cfg.rules {
449 if rule.status != LifecycleStatus::Enabled {
450 continue;
451 }
452 if !rule.filter.matches(key, object_size, object_tags) {
453 continue;
454 }
455 // Noncurrent-version expiration: only consulted when the
456 // caller explicitly flags this entry as noncurrent. The
457 // current-version expiration / transition rules do not fire
458 // for noncurrent versions in AWS S3 semantics.
459 if flags.is_noncurrent {
460 if let Some(days) = rule.noncurrent_version_expiration_days
461 && age_days_u32 >= days
462 {
463 return Some(LifecycleAction::Expire);
464 }
465 continue;
466 }
467 // Current-version path.
468 let exp_days_match = rule.expiration_days.filter(|d| age_days_u32 >= *d);
469 let exp_date_match = rule.expiration_date.filter(|d| now_for_date >= *d);
470 // Pick the deepest transition whose threshold is at or
471 // below the object's age. Transitions are typically
472 // declaration-ordered by ascending `days`, but we don't
473 // require it — taking the largest threshold means an
474 // object aged 90d gets `GLACIER` over `STANDARD_IA` even
475 // if `STANDARD_IA(30d)` was declared first.
476 let chosen_transition = rule
477 .transitions
478 .iter()
479 .filter(|t| age_days_u32 >= t.days)
480 .max_by_key(|t| t.days);
481 // Conflict resolution: when `expiration_days` fires AND a
482 // transition fires, expiration wins iff
483 // `expiration_days <= transition_days` (rule wants object
484 // gone before / at the same time it would have been
485 // transitioned). Otherwise the transition wins.
486 if let Some(exp_threshold) = exp_days_match {
487 let trans_threshold = chosen_transition.map(|t| t.days).unwrap_or(u32::MAX);
488 if exp_threshold <= trans_threshold {
489 return Some(LifecycleAction::Expire);
490 }
491 }
492 if let Some(t) = chosen_transition {
493 return Some(LifecycleAction::Transition {
494 storage_class: t.storage_class.clone(),
495 });
496 }
497 // Calendar-date expiration (no transition currently
498 // applicable, but the rule's expiration_date is past).
499 if exp_date_match.is_some() {
500 return Some(LifecycleAction::Expire);
501 }
502 // Fall through to the next rule when no action fires for
503 // this rule — first-match-wins applies only to *firing*
504 // rules, matching AWS semantics where overlapping rules
505 // with disjoint thresholds compose.
506 }
507 None
508 }
509
510 /// v0.8.3 #69 (audit M-2): evaluate one in-flight multipart upload
511 /// against the bucket's rules. Returns
512 /// [`LifecycleAction::AbortMultipartUpload`] when at least one
513 /// `Enabled` rule (a) accepts the upload's key via its filter and
514 /// (b) carries an `abort_incomplete_multipart_upload_days`
515 /// threshold whose age (`now - initiated`) is currently met.
516 /// Returns `None` otherwise (no matching rule, no
517 /// abort-multipart-upload-days set, or the upload is too young).
518 ///
519 /// Filter matching reuses [`LifecycleFilter::matches`] with
520 /// `object_size = 0` — in-flight uploads have no assembled size
521 /// yet (the parts are stored independently in the backend), so
522 /// any rule whose filter sets `object_size_greater_than` /
523 /// `object_size_less_than` is treated as if the upload were
524 /// 0 bytes. AWS S3 itself does not gate
525 /// `AbortIncompleteMultipartUpload` on size; this matches the
526 /// AWS semantic (size predicates simply do not apply to the
527 /// abort path) for the typical filter shape (no size predicate).
528 /// Operators wanting size-gated abort can carry the upload's
529 /// declared part length on the `MultipartUploadCandidate` in a
530 /// follow-up issue — the API extension is additive.
531 #[must_use]
532 pub fn evaluate_in_flight_multipart(
533 &self,
534 bucket: &str,
535 upload: &MultipartUploadCandidate,
536 now: DateTime<Utc>,
537 ) -> Option<LifecycleAction> {
538 let cfg = self.get(bucket)?;
539 for rule in &cfg.rules {
540 if rule.status != LifecycleStatus::Enabled {
541 continue;
542 }
543 if !rule.filter.matches(&upload.key, 0, &upload.tags) {
544 continue;
545 }
546 if let Some(days) = rule.abort_incomplete_multipart_upload_days {
547 let age = now.signed_duration_since(upload.initiated);
548 if age >= Duration::days(i64::from(days)) {
549 return Some(LifecycleAction::AbortMultipartUpload {
550 upload_id: upload.upload_id.clone(),
551 });
552 }
553 }
554 }
555 None
556 }
557
558 /// Stamp the per-bucket action counter and bump the matching
559 /// Prometheus counter. Called by the future scanner after a successful
560 /// delete / metadata rewrite.
561 pub fn record_action(&self, bucket: &str, action: &LifecycleAction) {
562 let label = action.metric_label();
563 let key = (bucket.to_owned(), label.to_owned());
564 let mut guard = self
565 .actions_total
566 .write()
567 .expect("lifecycle actions counter RwLock poisoned");
568 let entry = guard.entry(key).or_insert(0);
569 *entry = entry.saturating_add(1);
570 crate::metrics::record_lifecycle_action(bucket, label);
571 }
572
573 /// Read-only snapshot of the per-(bucket, action) counter map.
574 /// Useful for tests + introspection (`/admin/lifecycle/stats` style
575 /// endpoints in the future).
576 #[must_use]
577 pub fn actions_snapshot(&self) -> HashMap<(String, String), u64> {
578 self.actions_total
579 .read()
580 .expect("lifecycle actions counter RwLock poisoned")
581 .clone()
582 }
583
584 /// All buckets with a lifecycle configuration attached. Sorted for
585 /// stable scanner ordering.
586 #[must_use]
587 pub fn buckets(&self) -> Vec<String> {
588 let map = self
589 .by_bucket
590 .read()
591 .expect("lifecycle state RwLock poisoned");
592 let mut out: Vec<String> = map.keys().cloned().collect();
593 out.sort();
594 out
595 }
596}
597
598/// Flags for [`LifecycleManager::evaluate_with_flags`]. Default is
599/// "current-version object, evaluator picks `Utc::now()` for the date
600/// comparison". Tests override `now` for determinism.
601#[derive(Clone, Copy, Debug, Default)]
602pub struct EvaluateFlags {
603 pub is_noncurrent: bool,
604 pub now: Option<DateTime<Utc>>,
605}
606
607/// One object the evaluator considers in a batch:
608/// `(key, object_age, object_size, object_tags)`. Defined as a type alias
609/// so [`evaluate_batch`] / [`crate::S4Service::run_lifecycle_once_for_test`]
610/// don't trip clippy's `type-complexity` lint, and so callers building the
611/// list have a single canonical shape to reach for.
612pub type EvaluateBatchEntry = (String, Duration, u64, Vec<(String, String)>);
613
614/// Test-driven scan entry: walks a list of [`EvaluateBatchEntry`] tuples
615/// and produces (key, action) pairs for every object that should fire an
616/// action **right now**. The actual backend invocation (S3.delete_object /
617/// metadata rewrite) is the caller's job. Used by both unit tests and the
618/// E2E test in `tests/roundtrip.rs`; the future background scanner will
619/// reuse the same entry once the bucket-walk is wired through the backend.
620#[must_use]
621pub fn evaluate_batch(
622 manager: &LifecycleManager,
623 bucket: &str,
624 objects: &[EvaluateBatchEntry],
625) -> Vec<(String, LifecycleAction)> {
626 let mut out = Vec::with_capacity(objects.len());
627 for (key, age, size, tags) in objects {
628 if let Some(action) = manager.evaluate(bucket, key, *age, *size, tags) {
629 out.push((key.clone(), action));
630 }
631 }
632 out
633}
634
635/// Per-invocation scanner counters returned by [`run_scan_once`]. Useful
636/// for tests, the `--lifecycle-scan-interval-hours` log line, and any
637/// future `/admin/lifecycle/scan` introspection endpoint. Operators see
638/// the same numbers via Prometheus
639/// (`s4_lifecycle_actions_total{action="expire"|"transition"}`).
640#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
641pub struct ScanReport {
642 /// Number of buckets the scanner walked (= buckets with a lifecycle
643 /// configuration attached at the moment the scanner ran).
644 pub buckets_scanned: usize,
645 /// Number of distinct keys the scanner evaluated. Multi-page lists
646 /// count one key once even if the listing was paginated.
647 pub objects_evaluated: usize,
648 /// Number of objects deleted as a result of an Expiration action.
649 pub expired: usize,
650 /// Number of objects whose `x-amz-storage-class` was rewritten as a
651 /// result of a Transition action.
652 pub transitioned: usize,
653 /// Number of objects skipped because an Object Lock (Compliance,
654 /// Governance, or legal hold) was in effect. The Lock always wins
655 /// over Lifecycle, matching AWS S3 semantics.
656 pub skipped_locked: usize,
657 /// v0.8.3 #69 (audit M-2): number of in-flight multipart uploads
658 /// the scanner aborted as a result of an
659 /// `AbortIncompleteMultipartUpload` action. Pair with the
660 /// Prometheus counter
661 /// `s4_lifecycle_actions_total{action="abort_incomplete_multipart"}`.
662 /// Only counts successful aborts — a backend
663 /// `abort_multipart_upload` failure bumps `action_errors` instead
664 /// (matching the existing Expire / Transition error-path).
665 pub aborted_multipart: usize,
666 /// Number of objects the evaluator wanted to act on but the action
667 /// failed (e.g. backend `delete_object` returned an error). Logged
668 /// individually at WARN level; this counter exists so tests / metrics
669 /// can assert no silent loss.
670 pub action_errors: usize,
671}
672
673/// Convert an s3s `Timestamp` (`time::OffsetDateTime` underneath) into a
674/// `chrono::DateTime<Utc>` via the RFC3339 wire form. Used by the scanner
675/// to compute object age (= `now - last_modified`). Returns `None` when
676/// the stamp is unparseable, in which case the caller falls back to
677/// treating the object as freshly created (age = 0).
678fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
679 let mut buf = Vec::new();
680 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
681 let s = std::str::from_utf8(&buf).ok()?;
682 chrono::DateTime::parse_from_rfc3339(s)
683 .ok()
684 .map(|dt| dt.with_timezone(&Utc))
685}
686
687/// Build a synthetic `S3Request` with the minimum metadata the
688/// scanner-internal calls need. The lifecycle scanner is a
689/// system-internal caller (no end-user credentials, no real HTTP method
690/// / URI), so policy gates downstream see `credentials = None` /
691/// `region = None` and treat the call as anonymous-internal. Backends
692/// that do not gate internal traffic ignore these fields entirely.
693fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
694 S3Request {
695 input,
696 method,
697 uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
698 headers: http::HeaderMap::new(),
699 extensions: http::Extensions::new(),
700 credentials: None,
701 region: None,
702 service: None,
703 trailing_headers: None,
704 }
705}
706
707/// Walk every bucket that has a lifecycle configuration attached, list
708/// its objects via `list_objects_v2` (continuation-token pagination), and
709/// for each object evaluate the rule set + execute the matching
710/// Expiration / Transition action. Object-Lock-protected objects are
711/// **skipped** (the Lock always wins over Lifecycle). Versioning chains
712/// are intentionally out of scope for v0.7 #45 — see the module-level
713/// limitations note.
714///
715/// ## error handling
716///
717/// Per-bucket / per-object failures are logged at WARN level and bumped
718/// in `ScanReport::action_errors`; the scanner does NOT abort early on a
719/// single bad object so one slow / faulty bucket can't starve every
720/// other bucket's lifecycle. The function only returns `Err(_)` when the
721/// scanner cannot make progress at all (no current usage — kept for the
722/// future case where the manager itself becomes unavailable).
723///
724/// ## scope (v0.7 #45)
725///
726/// - Current-version objects only (Versioning-enabled chains rely on
727/// `evaluate_with_flags(is_noncurrent = true)`, but walking the
728/// shadow keys requires the version chain access pattern from
729/// `versioning.rs` and is deferred to a follow-up issue).
730/// - `head_object`'s `last_modified` is used to compute age. When the
731/// backend omits the field (some S3-compatible backends do), the
732/// object is treated as age 0 and skipped — matches AWS conservative
733/// behaviour where a malformed timestamp must not silently expire data.
734/// - Tags are looked up via the attached
735/// [`crate::tagging::TagManager`] (when wired). Buckets without a
736/// tag manager pass an empty tag list to the evaluator.
737/// - Transition rewrites the object's `x-amz-storage-class` via
738/// `copy_object` (same bucket / same key, `MetadataDirective: COPY`,
739/// new `StorageClass`). Backends that ignore the storage class
740/// header silently no-op the transition; the counter still bumps to
741/// reflect "the scanner asked for a transition" (matching AWS where
742/// a no-op transition still costs a request).
743pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
744 s4: &Arc<crate::S4Service<B>>,
745) -> Result<ScanReport, String> {
746 let Some(mgr) = s4.lifecycle_manager().cloned() else {
747 // No lifecycle manager attached (e.g. operator did not set
748 // `--lifecycle-state-file`). Scan is a no-op.
749 return Ok(ScanReport::default());
750 };
751 let buckets = mgr.buckets();
752 if buckets.is_empty() {
753 return Ok(ScanReport::default());
754 }
755 let now = Utc::now();
756 let mut report = ScanReport {
757 buckets_scanned: buckets.len(),
758 ..ScanReport::default()
759 };
760 for bucket in buckets {
761 scan_bucket(s4, &mgr, &bucket, now, &mut report).await;
762 // v0.8.3 #69 (audit M-2): walk in-flight multipart uploads for
763 // the same bucket and abort any whose `Initiated` time is past
764 // the rule's `abort_incomplete_multipart_upload_days` threshold.
765 // Run after the object walk so the (typically smaller) multipart
766 // pass still happens even if the object walk hit a transient
767 // backend error mid-stream (per-bucket failure isolation —
768 // matching the existing one-bad-bucket-doesn't-starve-others
769 // policy).
770 scan_in_flight_multipart_uploads(s4, &mgr, &bucket, now, &mut report).await;
771 }
772 Ok(report)
773}
774
775/// Walk one bucket end-to-end. Pagination uses the `continuation_token`
776/// loop documented in
777/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>.
778async fn scan_bucket<B: S3 + Send + Sync + 'static>(
779 s4: &Arc<crate::S4Service<B>>,
780 mgr: &Arc<LifecycleManager>,
781 bucket: &str,
782 now: DateTime<Utc>,
783 report: &mut ScanReport,
784) {
785 let mut continuation: Option<String> = None;
786 loop {
787 let list_input = ListObjectsV2Input {
788 bucket: bucket.to_owned(),
789 continuation_token: continuation.clone(),
790 ..Default::default()
791 };
792 let list_req = synthetic_request(
793 list_input,
794 http::Method::GET,
795 &format!("/{bucket}?list-type=2"),
796 );
797 let resp = match s4.as_ref().list_objects_v2(list_req).await {
798 Ok(r) => r,
799 Err(e) => {
800 warn!(
801 bucket = %bucket,
802 error = %e,
803 "S4 lifecycle: list_objects_v2 failed; skipping bucket for this scan",
804 );
805 report.action_errors = report.action_errors.saturating_add(1);
806 return;
807 }
808 };
809 let output = resp.output;
810 let contents = output.contents.unwrap_or_default();
811 for obj in &contents {
812 let Some(key) = obj.key.as_deref() else {
813 continue;
814 };
815 // Filter out S4-internal sidecars / shadow versions early so
816 // the lifecycle scanner mirrors the same "client-visible
817 // object set" the customer sees through `list_objects_v2`.
818 // (The S4Service.list_objects_v2 handler already drops them
819 // before returning, but this is a belt-and-braces guard for
820 // any future bypass that builds the list elsewhere.)
821 if key.ends_with(".s4index") {
822 continue;
823 }
824 report.objects_evaluated = report.objects_evaluated.saturating_add(1);
825 let size = obj.size.unwrap_or(0).max(0) as u64;
826 let age = match obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc) {
827 Some(lm) => now.signed_duration_since(lm),
828 None => Duration::zero(),
829 };
830 let tags: Vec<(String, String)> = s4
831 .as_ref()
832 .tag_manager()
833 .and_then(|m| m.get_object_tags(bucket, key))
834 .map(|set| set.iter().cloned().collect())
835 .unwrap_or_default();
836 let Some(action) = mgr.evaluate(bucket, key, age, size, &tags) else {
837 continue;
838 };
839 // Object-Lock-protected objects are skipped before any
840 // backend-mutating call. Lock wins over Lifecycle, full
841 // stop — matches AWS behaviour where an Expiration on a
842 // locked object is dropped, not retried.
843 //
844 // v0.8.3 #65 (audit C-2): in addition to bumping the
845 // in-report counter, emit a Prometheus
846 // `s4_lifecycle_actions_total{action="skipped_locked"}`
847 // sample so operator dashboards can alert on the
848 // "lifecycle wanted to act but Object Lock vetoed" path
849 // (previously a silent skip — the scanner's
850 // `list_objects_v2` walked the key and `evaluate(...)`
851 // returned an action, but no observable signal fired
852 // when the backend would have refused the DELETE).
853 if let Some(lock_mgr) = s4.as_ref().object_lock_manager()
854 && let Some(state) = lock_mgr.get(bucket, key)
855 && state.is_locked(now)
856 {
857 report.skipped_locked = report.skipped_locked.saturating_add(1);
858 crate::metrics::record_lifecycle_action(bucket, "skipped_locked");
859 continue;
860 }
861 match action {
862 LifecycleAction::Expire => match execute_expire(s4, bucket, key).await {
863 Ok(()) => {
864 mgr.record_action(bucket, &LifecycleAction::Expire);
865 report.expired = report.expired.saturating_add(1);
866 }
867 Err(e) => {
868 warn!(
869 bucket = %bucket,
870 key = %key,
871 error = %e,
872 "S4 lifecycle: Expire action failed",
873 );
874 report.action_errors = report.action_errors.saturating_add(1);
875 }
876 },
877 LifecycleAction::Transition { storage_class } => {
878 match execute_transition(s4, bucket, key, &storage_class).await {
879 Ok(()) => {
880 mgr.record_action(
881 bucket,
882 &LifecycleAction::Transition {
883 storage_class: storage_class.clone(),
884 },
885 );
886 report.transitioned = report.transitioned.saturating_add(1);
887 }
888 Err(e) => {
889 warn!(
890 bucket = %bucket,
891 key = %key,
892 storage_class = %storage_class,
893 error = %e,
894 "S4 lifecycle: Transition action failed",
895 );
896 report.action_errors = report.action_errors.saturating_add(1);
897 }
898 }
899 }
900 // v0.8.3 #69 (audit M-2): the per-key path's
901 // `evaluate(...)` only ever returns Expire /
902 // Transition (the AbortMultipartUpload variant comes
903 // from the in-flight multipart walker further down,
904 // which uses `evaluate_in_flight_multipart`). Match
905 // exhaustiveness still requires an arm; logging at
906 // warn keeps the control-flow honest if the
907 // evaluator ever grows a path that surfaces an
908 // abort here (e.g. someone wires a future evaluator
909 // that returns abort for a regular object key — the
910 // arm prevents silent dispatch and the warn surfaces
911 // the misuse).
912 LifecycleAction::AbortMultipartUpload { upload_id } => {
913 warn!(
914 bucket = %bucket,
915 key = %key,
916 upload_id = %upload_id,
917 "S4 lifecycle: AbortMultipartUpload returned for a key path; \
918 this is unexpected — the per-key evaluator should only \
919 emit Expire / Transition. Dropping action.",
920 );
921 report.action_errors = report.action_errors.saturating_add(1);
922 }
923 }
924 }
925 if output.is_truncated.unwrap_or(false) {
926 continuation = output.next_continuation_token;
927 if continuation.is_none() {
928 // Defensive: AWS guarantees a NextContinuationToken when
929 // is_truncated=true, but a malformed backend could omit
930 // it; break to avoid an infinite loop.
931 break;
932 }
933 } else {
934 break;
935 }
936 }
937}
938
939/// v0.8.3 #69 (audit M-2): walk every in-flight multipart upload for
940/// `bucket` via `list_multipart_uploads` (key-marker / upload-id-marker
941/// pagination) and abort any whose `Initiated` time is older than the
942/// rule's `abort_incomplete_multipart_upload_days` threshold. Successful
943/// aborts bump `report.aborted_multipart` AND (`mgr.record_action`)
944/// `s4_lifecycle_actions_total{action="abort_incomplete_multipart"}` so
945/// operator dashboards see the same signal whether they look at
946/// in-process counters or Prometheus.
947///
948/// On a successful abort the entry in `MultipartStateStore` (which
949/// holds the per-upload SSE-C key bytes / tag set / object-lock recipe
950/// captured at `CreateMultipartUpload` time) is also dropped — same
951/// shape as the user-facing `abort_multipart_upload` handler in
952/// `service.rs`. Without the drop the abandoned upload's `Zeroizing<[u8;
953/// 32]>` SSE-C key would linger in `multipart_state` until the
954/// `sweep_stale` background tick (v0.8.2 #62) reaped it on TTL.
955///
956/// Per-page / per-upload backend failures are logged at WARN and bumped
957/// in `report.action_errors`; the loop does NOT abort the bucket — one
958/// bad upload must not prevent the rest of the bucket's stale uploads
959/// from being cleaned up. Mirrors the same isolation policy
960/// `scan_bucket` uses for `list_objects_v2` failures.
961async fn scan_in_flight_multipart_uploads<B: S3 + Send + Sync + 'static>(
962 s4: &Arc<crate::S4Service<B>>,
963 mgr: &Arc<LifecycleManager>,
964 bucket: &str,
965 now: DateTime<Utc>,
966 report: &mut ScanReport,
967) {
968 let mut key_marker: Option<String> = None;
969 let mut upload_id_marker: Option<String> = None;
970 loop {
971 let list_input = ListMultipartUploadsInput {
972 bucket: bucket.to_owned(),
973 key_marker: key_marker.clone(),
974 upload_id_marker: upload_id_marker.clone(),
975 ..Default::default()
976 };
977 let list_req = synthetic_request(
978 list_input,
979 http::Method::GET,
980 &format!("/{bucket}?uploads"),
981 );
982 let resp = match s4.as_ref().list_multipart_uploads(list_req).await {
983 Ok(r) => r,
984 Err(e) => {
985 warn!(
986 bucket = %bucket,
987 error = %e,
988 "S4 lifecycle: list_multipart_uploads failed; \
989 skipping bucket multipart sweep for this scan",
990 );
991 report.action_errors = report.action_errors.saturating_add(1);
992 return;
993 }
994 };
995 let output = resp.output;
996 let uploads = output.uploads.unwrap_or_default();
997 for upload in &uploads {
998 let Some(upload_id) = upload.upload_id.as_deref() else {
999 continue;
1000 };
1001 let Some(key) = upload.key.as_deref() else {
1002 continue;
1003 };
1004 // `Initiated` is `Option<Timestamp>`; absent or
1005 // unparseable → treat as "freshly initiated" (age 0)
1006 // and skip. Matches the conservative `last_modified`
1007 // handling in `scan_bucket` — never abort an upload
1008 // whose age we cannot determine.
1009 let Some(initiated) = upload
1010 .initiated
1011 .as_ref()
1012 .and_then(timestamp_to_chrono_utc)
1013 else {
1014 continue;
1015 };
1016 let candidate = MultipartUploadCandidate {
1017 upload_id: upload_id.to_owned(),
1018 key: key.to_owned(),
1019 initiated,
1020 tags: Vec::new(),
1021 };
1022 let Some(action) = mgr.evaluate_in_flight_multipart(bucket, &candidate, now)
1023 else {
1024 continue;
1025 };
1026 let LifecycleAction::AbortMultipartUpload { upload_id: action_upload_id } =
1027 action
1028 else {
1029 // The evaluator is contractually
1030 // AbortMultipartUpload-only on this path; this arm
1031 // exists only to satisfy match exhaustiveness if a
1032 // future rev returns a different variant. Treat as
1033 // an error so the divergence is observable.
1034 warn!(
1035 bucket = %bucket,
1036 key = %key,
1037 upload_id = %upload_id,
1038 "S4 lifecycle: evaluate_in_flight_multipart returned \
1039 non-Abort action; dropping",
1040 );
1041 report.action_errors = report.action_errors.saturating_add(1);
1042 continue;
1043 };
1044 match execute_abort_multipart(s4, bucket, key, &action_upload_id).await {
1045 Ok(()) => {
1046 mgr.record_action(
1047 bucket,
1048 &LifecycleAction::AbortMultipartUpload {
1049 upload_id: action_upload_id.clone(),
1050 },
1051 );
1052 report.aborted_multipart =
1053 report.aborted_multipart.saturating_add(1);
1054 // Drop the per-upload state so the
1055 // (Zeroizing-wrapped) SSE-C key bytes / tag
1056 // recipe / object-lock recipe go away
1057 // immediately rather than waiting for the
1058 // hourly `sweep_stale` tick. Idempotent —
1059 // `remove(...)` on a missing key is a no-op
1060 // (some uploads may not have been registered
1061 // here, e.g. a server restart between Create
1062 // and the lifecycle sweep).
1063 s4.as_ref()
1064 .multipart_state()
1065 .remove(&action_upload_id);
1066 }
1067 Err(e) => {
1068 warn!(
1069 bucket = %bucket,
1070 key = %key,
1071 upload_id = %action_upload_id,
1072 error = %e,
1073 "S4 lifecycle: AbortMultipartUpload action failed",
1074 );
1075 report.action_errors = report.action_errors.saturating_add(1);
1076 }
1077 }
1078 }
1079 if output.is_truncated.unwrap_or(false) {
1080 // AWS guarantees both NextKeyMarker and (when present)
1081 // NextUploadIdMarker on a truncated response. Defensive
1082 // break when neither moved (avoid infinite loop on a
1083 // misbehaving backend).
1084 let next_key = output.next_key_marker;
1085 let next_upload_id = output.next_upload_id_marker;
1086 if next_key == key_marker && next_upload_id == upload_id_marker {
1087 break;
1088 }
1089 key_marker = next_key;
1090 upload_id_marker = next_upload_id;
1091 } else {
1092 break;
1093 }
1094 }
1095}
1096
1097/// v0.8.3 #69 (audit M-2): issue `abort_multipart_upload` against the
1098/// wrapped `S4Service`. The handler in `service.rs` does the
1099/// `multipart_state.remove(...)` itself before forwarding to the
1100/// backend; we additionally `remove` from the lifecycle scanner side
1101/// (in [`scan_in_flight_multipart_uploads`]) to defensively cover the
1102/// case where the backend abort succeeds but the response routing
1103/// shortens early.
1104async fn execute_abort_multipart<B: S3 + Send + Sync + 'static>(
1105 s4: &Arc<crate::S4Service<B>>,
1106 bucket: &str,
1107 key: &str,
1108 upload_id: &str,
1109) -> Result<(), String> {
1110 let input = AbortMultipartUploadInput {
1111 bucket: bucket.to_owned(),
1112 key: key.to_owned(),
1113 upload_id: upload_id.to_owned(),
1114 ..Default::default()
1115 };
1116 let req = synthetic_request(
1117 input,
1118 http::Method::DELETE,
1119 &format!("/{bucket}/{key}?uploadId={upload_id}"),
1120 );
1121 s4.as_ref()
1122 .abort_multipart_upload(req)
1123 .await
1124 .map(|_| ())
1125 .map_err(|e| format!("{e}"))
1126}
1127
1128/// Issue `delete_object` against the wrapped `S4Service`. The handler in
1129/// `service.rs` runs the WORM check itself, so even if the scanner's
1130/// pre-check missed (race with an MFA-Delete put-bucket-versioning), the
1131/// backend refuses the delete with `AccessDenied` and the error path
1132/// above bumps `action_errors` rather than silently losing data.
1133async fn execute_expire<B: S3 + Send + Sync + 'static>(
1134 s4: &Arc<crate::S4Service<B>>,
1135 bucket: &str,
1136 key: &str,
1137) -> Result<(), String> {
1138 let input = DeleteObjectInput {
1139 bucket: bucket.to_owned(),
1140 key: key.to_owned(),
1141 ..Default::default()
1142 };
1143 let req = synthetic_request(
1144 input,
1145 http::Method::DELETE,
1146 &format!("/{bucket}/{key}"),
1147 );
1148 s4.as_ref()
1149 .delete_object(req)
1150 .await
1151 .map(|_| ())
1152 .map_err(|e| format!("{e}"))
1153}
1154
1155/// Rewrite the object's storage class via a same-key `copy_object` with
1156/// `MetadataDirective: COPY` (preserves user metadata) and the new
1157/// `storage_class`. Backends that ignore storage-class headers
1158/// effectively no-op; the counter still records the attempt so dashboards
1159/// reflect the scanner's intent.
1160async fn execute_transition<B: S3 + Send + Sync + 'static>(
1161 s4: &Arc<crate::S4Service<B>>,
1162 bucket: &str,
1163 key: &str,
1164 storage_class: &str,
1165) -> Result<(), String> {
1166 // CopyObjectInput has dozens of `Option` fields plus three required
1167 // (bucket / key / copy_source); the s3s-generated `builder()` is
1168 // the path that fills the optional ones with `None` for us. The
1169 // `set_*` setters return `&mut Self`, so we drive them in
1170 // statement form rather than as a method chain.
1171 let mut builder = CopyObjectInput::builder();
1172 builder.set_bucket(bucket.to_owned());
1173 builder.set_key(key.to_owned());
1174 builder.set_copy_source(CopySource::Bucket {
1175 bucket: bucket.to_owned().into_boxed_str(),
1176 key: key.to_owned().into_boxed_str(),
1177 version_id: None,
1178 });
1179 builder.set_metadata_directive(Some(MetadataDirective::from_static(MetadataDirective::COPY)));
1180 builder.set_storage_class(Some(StorageClass::from(storage_class.to_owned())));
1181 let input = builder
1182 .build()
1183 .map_err(|e| format!("CopyObjectInput build: {e}"))?;
1184 let req = synthetic_request(
1185 input,
1186 http::Method::PUT,
1187 &format!("/{bucket}/{key}"),
1188 );
1189 s4.as_ref()
1190 .copy_object(req)
1191 .await
1192 .map(|_| ())
1193 .map_err(|e| format!("{e}"))
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198 use super::*;
1199
1200 fn enabled(rule: LifecycleRule) -> LifecycleRule {
1201 LifecycleRule {
1202 status: LifecycleStatus::Enabled,
1203 ..rule
1204 }
1205 }
1206
1207 fn cfg_with(rules: Vec<LifecycleRule>) -> LifecycleConfig {
1208 LifecycleConfig { rules }
1209 }
1210
1211 fn manager_with(bucket: &str, rules: Vec<LifecycleRule>) -> LifecycleManager {
1212 let m = LifecycleManager::new();
1213 m.put(bucket, cfg_with(rules));
1214 m
1215 }
1216
1217 #[test]
1218 fn evaluate_age_past_expiration_returns_expire() {
1219 let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
1220 let action = m.evaluate("b", "k", Duration::days(31), 100, &[]);
1221 assert_eq!(action, Some(LifecycleAction::Expire));
1222 }
1223
1224 #[test]
1225 fn evaluate_age_before_expiration_returns_none() {
1226 let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
1227 let action = m.evaluate("b", "k", Duration::days(5), 100, &[]);
1228 assert_eq!(action, None);
1229 }
1230
1231 #[test]
1232 fn evaluate_prefix_filter_matches() {
1233 let mut rule = LifecycleRule::expire_after_days("r", 1);
1234 rule.filter.prefix = Some("logs/".into());
1235 let m = manager_with("b", vec![rule]);
1236 assert_eq!(
1237 m.evaluate("b", "logs/2026/a.log", Duration::days(2), 1, &[]),
1238 Some(LifecycleAction::Expire)
1239 );
1240 assert_eq!(
1241 m.evaluate("b", "data/keep.bin", Duration::days(2), 1, &[]),
1242 None
1243 );
1244 }
1245
1246 #[test]
1247 fn evaluate_tag_filter_requires_all_tags_to_match() {
1248 let mut rule = LifecycleRule::expire_after_days("r", 1);
1249 rule.filter.tags = vec![
1250 ("env".into(), "dev".into()),
1251 ("expirable".into(), "yes".into()),
1252 ];
1253 let m = manager_with("b", vec![rule]);
1254 // All tags present + matching → fire.
1255 assert_eq!(
1256 m.evaluate(
1257 "b",
1258 "k",
1259 Duration::days(2),
1260 1,
1261 &[
1262 ("env".into(), "dev".into()),
1263 ("expirable".into(), "yes".into()),
1264 ("owner".into(), "alice".into()),
1265 ]
1266 ),
1267 Some(LifecycleAction::Expire)
1268 );
1269 // One tag missing → no fire.
1270 assert_eq!(
1271 m.evaluate(
1272 "b",
1273 "k",
1274 Duration::days(2),
1275 1,
1276 &[("env".into(), "dev".into())]
1277 ),
1278 None
1279 );
1280 // Tag present but with the wrong value → no fire.
1281 assert_eq!(
1282 m.evaluate(
1283 "b",
1284 "k",
1285 Duration::days(2),
1286 1,
1287 &[
1288 ("env".into(), "prod".into()),
1289 ("expirable".into(), "yes".into()),
1290 ]
1291 ),
1292 None
1293 );
1294 }
1295
1296 #[test]
1297 fn evaluate_size_filters_gate_action() {
1298 let mut rule = LifecycleRule::expire_after_days("r", 1);
1299 rule.filter.object_size_greater_than = Some(1024);
1300 rule.filter.object_size_less_than = Some(10 * 1024);
1301 let m = manager_with("b", vec![rule]);
1302 // Inside the (1024, 10*1024) range → fire.
1303 assert_eq!(
1304 m.evaluate("b", "k", Duration::days(2), 4096, &[]),
1305 Some(LifecycleAction::Expire)
1306 );
1307 // At the boundary (size == greater_than) → strict `>`, no fire.
1308 assert_eq!(m.evaluate("b", "k", Duration::days(2), 1024, &[]), None);
1309 // Above the upper bound → no fire.
1310 assert_eq!(
1311 m.evaluate("b", "k", Duration::days(2), 100 * 1024, &[]),
1312 None
1313 );
1314 }
1315
1316 #[test]
1317 fn evaluate_transition_fires_before_expiration() {
1318 // Transition at 30d, expiration at 365d, age 60d → transition.
1319 let rule = enabled(LifecycleRule {
1320 id: "r".into(),
1321 status: LifecycleStatus::Enabled,
1322 filter: LifecycleFilter::default(),
1323 expiration_days: Some(365),
1324 expiration_date: None,
1325 transitions: vec![TransitionRule {
1326 days: 30,
1327 storage_class: "GLACIER_IR".into(),
1328 }],
1329 noncurrent_version_expiration_days: None,
1330 abort_incomplete_multipart_upload_days: None,
1331 });
1332 let m = manager_with("b", vec![rule]);
1333 let action = m.evaluate("b", "k", Duration::days(60), 1, &[]);
1334 assert_eq!(
1335 action,
1336 Some(LifecycleAction::Transition {
1337 storage_class: "GLACIER_IR".into(),
1338 })
1339 );
1340 }
1341
1342 #[test]
1343 fn evaluate_expiration_wins_when_threshold_is_earlier_than_transition() {
1344 // Expiration at 30d, transition at 90d, age 100d → expire (the
1345 // rule wants the object gone *before* it would have transitioned).
1346 let rule = enabled(LifecycleRule {
1347 id: "r".into(),
1348 status: LifecycleStatus::Enabled,
1349 filter: LifecycleFilter::default(),
1350 expiration_days: Some(30),
1351 expiration_date: None,
1352 transitions: vec![TransitionRule {
1353 days: 90,
1354 storage_class: "GLACIER".into(),
1355 }],
1356 noncurrent_version_expiration_days: None,
1357 abort_incomplete_multipart_upload_days: None,
1358 });
1359 let m = manager_with("b", vec![rule]);
1360 let action = m.evaluate("b", "k", Duration::days(100), 1, &[]);
1361 assert_eq!(action, Some(LifecycleAction::Expire));
1362 }
1363
1364 #[test]
1365 fn evaluate_disabled_rule_never_fires() {
1366 let mut rule = LifecycleRule::expire_after_days("r", 1);
1367 rule.status = LifecycleStatus::Disabled;
1368 let m = manager_with("b", vec![rule]);
1369 assert_eq!(m.evaluate("b", "k", Duration::days(365), 1, &[]), None);
1370 }
1371
1372 #[test]
1373 fn evaluate_unknown_bucket_returns_none() {
1374 let m = LifecycleManager::new();
1375 assert_eq!(m.evaluate("ghost", "k", Duration::days(365), 1, &[]), None);
1376 }
1377
1378 #[test]
1379 fn evaluate_noncurrent_version_expiration() {
1380 let rule = enabled(LifecycleRule {
1381 id: "r".into(),
1382 status: LifecycleStatus::Enabled,
1383 filter: LifecycleFilter::default(),
1384 expiration_days: None,
1385 expiration_date: None,
1386 transitions: vec![],
1387 noncurrent_version_expiration_days: Some(7),
1388 abort_incomplete_multipart_upload_days: None,
1389 });
1390 let m = manager_with("b", vec![rule]);
1391 // current-version path → no rule matches (no expiration_days set).
1392 assert_eq!(m.evaluate("b", "k", Duration::days(30), 1, &[]), None);
1393 // noncurrent path with age past 7d → expire.
1394 let action = m.evaluate_with_flags(
1395 "b",
1396 "k",
1397 Duration::days(8),
1398 1,
1399 &[],
1400 EvaluateFlags {
1401 is_noncurrent: true,
1402 now: None,
1403 },
1404 );
1405 assert_eq!(action, Some(LifecycleAction::Expire));
1406 // noncurrent path with age before 7d → no fire.
1407 let action = m.evaluate_with_flags(
1408 "b",
1409 "k",
1410 Duration::days(3),
1411 1,
1412 &[],
1413 EvaluateFlags {
1414 is_noncurrent: true,
1415 now: None,
1416 },
1417 );
1418 assert_eq!(action, None);
1419 }
1420
1421 #[test]
1422 fn evaluate_batch_distributes_actions_across_object_ages() {
1423 // Transition at 30d, expiration at 60d. Conflict resolver picks
1424 // expire iff `exp_days <= trans_days` for the chosen transition.
1425 // With exp=60, trans=30: at age 40-59 the transition fires; at
1426 // age >= 60 expiration wins (because exp_days=60 <= trans_days=30
1427 // is false, so... wait — re-read: the resolver compares
1428 // exp_threshold (60) vs trans_threshold (30) and triggers expire
1429 // ONLY when 60 <= 30, which is false → transition keeps winning
1430 // until both thresholds met but exp <= trans). For exp=60 trans=30
1431 // pair, transition always wins regardless of age (rule pattern is
1432 // "transition first, expire later" — the next scanner pass
1433 // picks up the expiration). So expect 4 transitions.
1434 let rule = enabled(LifecycleRule {
1435 id: "r".into(),
1436 status: LifecycleStatus::Enabled,
1437 filter: LifecycleFilter::default(),
1438 expiration_days: Some(60),
1439 expiration_date: None,
1440 transitions: vec![TransitionRule {
1441 days: 30,
1442 storage_class: "STANDARD_IA".into(),
1443 }],
1444 noncurrent_version_expiration_days: None,
1445 abort_incomplete_multipart_upload_days: None,
1446 });
1447 let m = manager_with("b", vec![rule]);
1448 let objects = vec![
1449 ("young".to_string(), Duration::days(10), 1u64, vec![]),
1450 ("middle".to_string(), Duration::days(40), 1u64, vec![]),
1451 ("middle2".to_string(), Duration::days(45), 1u64, vec![]),
1452 ("old".to_string(), Duration::days(90), 1u64, vec![]),
1453 ("ancient".to_string(), Duration::days(365), 1u64, vec![]),
1454 ];
1455 let actions = evaluate_batch(&m, "b", &objects);
1456 assert_eq!(actions.len(), 4);
1457 for (_, a) in &actions {
1458 assert!(matches!(a, LifecycleAction::Transition { .. }));
1459 }
1460 }
1461
1462 #[test]
1463 fn json_round_trip_preserves_rules() {
1464 let rule = enabled(LifecycleRule {
1465 id: "complex".into(),
1466 status: LifecycleStatus::Enabled,
1467 filter: LifecycleFilter {
1468 prefix: Some("logs/".into()),
1469 tags: vec![("env".into(), "prod".into())],
1470 object_size_greater_than: Some(1024),
1471 object_size_less_than: None,
1472 },
1473 expiration_days: Some(365),
1474 expiration_date: None,
1475 transitions: vec![TransitionRule {
1476 days: 30,
1477 storage_class: "STANDARD_IA".into(),
1478 }],
1479 noncurrent_version_expiration_days: Some(7),
1480 abort_incomplete_multipart_upload_days: Some(3),
1481 });
1482 let m = manager_with("b1", vec![rule.clone()]);
1483 let json = m.to_json().expect("to_json");
1484 let m2 = LifecycleManager::from_json(&json).expect("from_json");
1485 let cfg = m2.get("b1").expect("bucket survives roundtrip");
1486 assert_eq!(cfg.rules.len(), 1);
1487 assert_eq!(cfg.rules[0], rule);
1488 }
1489
1490 #[test]
1491 fn lifecycle_config_default_is_empty() {
1492 let cfg = LifecycleConfig::default();
1493 assert!(cfg.rules.is_empty());
1494 }
1495
1496 #[test]
1497 fn evaluate_batch_skips_locked_objects_at_caller_layer() {
1498 // The evaluator itself does not consult ObjectLock; the scanner
1499 // (and tests) are expected to filter locked keys out before /
1500 // after calling `evaluate_batch`. This test documents the
1501 // canonical pattern.
1502 let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 1)]);
1503 let objects = vec![
1504 ("locked".to_string(), Duration::days(30), 1u64, vec![]),
1505 ("free".to_string(), Duration::days(30), 1u64, vec![]),
1506 ];
1507 let locked_keys: std::collections::HashSet<&str> = ["locked"].into_iter().collect();
1508 let raw = evaluate_batch(&m, "b", &objects);
1509 let filtered: Vec<_> = raw
1510 .into_iter()
1511 .filter(|(k, _)| !locked_keys.contains(k.as_str()))
1512 .collect();
1513 assert_eq!(filtered.len(), 1);
1514 assert_eq!(filtered[0].0, "free");
1515 }
1516
1517 #[test]
1518 fn record_action_bumps_per_bucket_counter() {
1519 let m = LifecycleManager::new();
1520 m.record_action("b", &LifecycleAction::Expire);
1521 m.record_action("b", &LifecycleAction::Expire);
1522 m.record_action(
1523 "b",
1524 &LifecycleAction::Transition {
1525 storage_class: "GLACIER".into(),
1526 },
1527 );
1528 m.record_action(
1529 "b",
1530 &LifecycleAction::AbortMultipartUpload {
1531 upload_id: "u-xyz".into(),
1532 },
1533 );
1534 let snap = m.actions_snapshot();
1535 assert_eq!(snap.get(&("b".into(), "expire".into())).copied(), Some(2));
1536 assert_eq!(
1537 snap.get(&("b".into(), "transition".into())).copied(),
1538 Some(1)
1539 );
1540 assert_eq!(
1541 snap.get(&("b".into(), "abort_incomplete_multipart".into()))
1542 .copied(),
1543 Some(1),
1544 "v0.8.3 #69: AbortMultipartUpload metric_label must bump \
1545 `abort_incomplete_multipart` counter",
1546 );
1547 }
1548
1549 // ---- v0.8.3 #69 (audit M-2): AbortIncompleteMultipartUpload --------
1550 //
1551 // Three unit tests covering the `evaluate_in_flight_multipart`
1552 // path: (a) age past threshold → AbortMultipartUpload, (b) age
1553 // before threshold → None, (c) the rule is `Disabled` → None
1554 // (a Disabled rule must never fire even on a stale upload).
1555 //
1556 // Test fixtures fake `now` and `initiated` so the assertion is
1557 // deterministic regardless of when the test runs.
1558
1559 fn abort_rule(id: &str, days: u32) -> LifecycleRule {
1560 LifecycleRule {
1561 id: id.into(),
1562 status: LifecycleStatus::Enabled,
1563 filter: LifecycleFilter::default(),
1564 expiration_days: None,
1565 expiration_date: None,
1566 transitions: Vec::new(),
1567 noncurrent_version_expiration_days: None,
1568 abort_incomplete_multipart_upload_days: Some(days),
1569 }
1570 }
1571
1572 /// Upload age 8 days, rule threshold 7 days → AbortMultipartUpload
1573 /// fires with the upload's `upload_id`.
1574 #[test]
1575 fn evaluate_in_flight_multipart_aborts_past_threshold() {
1576 let m = manager_with("b", vec![abort_rule("r", 7)]);
1577 let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1578 .expect("parse now")
1579 .with_timezone(&Utc);
1580 let initiated = now - Duration::days(8);
1581 let candidate = MultipartUploadCandidate {
1582 upload_id: "u-stale".into(),
1583 key: "uploads/big.bin".into(),
1584 initiated,
1585 tags: Vec::new(),
1586 };
1587 let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1588 assert_eq!(
1589 action,
1590 Some(LifecycleAction::AbortMultipartUpload {
1591 upload_id: "u-stale".into(),
1592 }),
1593 );
1594 }
1595
1596 /// Upload age 1 day, rule threshold 7 days → no fire (upload is
1597 /// fresh enough to keep around).
1598 #[test]
1599 fn evaluate_in_flight_multipart_keeps_recent_upload() {
1600 let m = manager_with("b", vec![abort_rule("r", 7)]);
1601 let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1602 .expect("parse now")
1603 .with_timezone(&Utc);
1604 let initiated = now - Duration::days(1);
1605 let candidate = MultipartUploadCandidate {
1606 upload_id: "u-fresh".into(),
1607 key: "uploads/big.bin".into(),
1608 initiated,
1609 tags: Vec::new(),
1610 };
1611 let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1612 assert_eq!(action, None);
1613 }
1614
1615 /// `Disabled` rule must never fire even when the upload is well
1616 /// past the threshold — Disabled means the operator is staging the
1617 /// rule (preview / dry-run), the action must wait for Enable.
1618 #[test]
1619 fn evaluate_in_flight_multipart_skips_disabled_rule() {
1620 let mut rule = abort_rule("r", 1);
1621 rule.status = LifecycleStatus::Disabled;
1622 let m = manager_with("b", vec![rule]);
1623 let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1624 .expect("parse now")
1625 .with_timezone(&Utc);
1626 let initiated = now - Duration::days(365);
1627 let candidate = MultipartUploadCandidate {
1628 upload_id: "u-ancient".into(),
1629 key: "uploads/big.bin".into(),
1630 initiated,
1631 tags: Vec::new(),
1632 };
1633 let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1634 assert_eq!(
1635 action, None,
1636 "Disabled rule must not abort even a 365-day-old upload",
1637 );
1638 }
1639
1640 // ---- v0.7 #45: scanner runner tests --------------------------------
1641 //
1642 // These tests stand up an in-memory `S4Service` over a tiny
1643 // `ScannerMemBackend` (separate from the larger `MemoryBackend` in
1644 // `tests/roundtrip.rs` so this module stays self-contained). The
1645 // backend implements only the four `S3` methods the scanner touches:
1646 // `put_object`, `head_object`, `delete_object`, `list_objects_v2`.
1647 // Tags are exercised via the optional `with_tagging(...)` manager.
1648 //
1649 // Object age is faked by setting an `expire_after_days(0)` rule, so
1650 // any object whose backend-recorded `last_modified` is at or before
1651 // "now" matches — sidesteps the `head_object`/`Timestamp` parsing
1652 // entirely (and matches the canonical "operator just put the bucket
1653 // on aggressive expiration" test path).
1654
1655 use std::collections::HashMap;
1656 use std::sync::Mutex as StdMutex;
1657
1658 use bytes::Bytes;
1659 use s3s::dto as dto2;
1660 use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1661 use s4_codec::dispatcher::AlwaysDispatcher;
1662 use s4_codec::passthrough::Passthrough;
1663 use s4_codec::{CodecKind, CodecRegistry};
1664
1665 use crate::S4Service;
1666 use crate::object_lock::{LockMode, ObjectLockManager, ObjectLockState};
1667
1668 #[derive(Default)]
1669 struct ScannerMemBackend {
1670 objects: StdMutex<HashMap<(String, String), ScannerStored>>,
1671 /// v0.8.3 #69: in-flight multipart uploads keyed by
1672 /// `(bucket, upload_id)`. Tests seed entries via
1673 /// `put_multipart_upload(...)` so the lifecycle scanner's
1674 /// `list_multipart_uploads` walk has something to consume.
1675 multipart_uploads: StdMutex<HashMap<(String, String), ScannerMultipart>>,
1676 }
1677
1678 #[derive(Clone)]
1679 struct ScannerStored {
1680 body: Bytes,
1681 last_modified: dto2::Timestamp,
1682 }
1683
1684 /// v0.8.3 #69: minimal multipart-upload record the test backend
1685 /// returns from `list_multipart_uploads`. `initiated` is a
1686 /// `chrono::DateTime<Utc>` so tests can fake an old upload by
1687 /// passing `Utc::now() - Duration::days(N)` directly (no
1688 /// SystemTime arithmetic).
1689 #[derive(Clone)]
1690 struct ScannerMultipart {
1691 key: String,
1692 initiated: chrono::DateTime<Utc>,
1693 }
1694
1695 impl ScannerMemBackend {
1696 fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1697 self.objects.lock().unwrap().insert(
1698 (bucket.to_owned(), key.to_owned()),
1699 ScannerStored {
1700 body,
1701 last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1702 },
1703 );
1704 }
1705
1706 /// v0.8.3 #69: seed an in-flight multipart upload the
1707 /// lifecycle scanner can then walk + abort.
1708 fn put_multipart_upload(
1709 &self,
1710 bucket: &str,
1711 upload_id: &str,
1712 key: &str,
1713 initiated: chrono::DateTime<Utc>,
1714 ) {
1715 self.multipart_uploads.lock().unwrap().insert(
1716 (bucket.to_owned(), upload_id.to_owned()),
1717 ScannerMultipart {
1718 key: key.to_owned(),
1719 initiated,
1720 },
1721 );
1722 }
1723 }
1724
1725 #[async_trait::async_trait]
1726 impl S3 for ScannerMemBackend {
1727 async fn put_object(
1728 &self,
1729 req: S3Request<dto2::PutObjectInput>,
1730 ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1731 self.put_now(&req.input.bucket, &req.input.key, Bytes::new());
1732 Ok(S3Response::new(dto2::PutObjectOutput::default()))
1733 }
1734
1735 async fn head_object(
1736 &self,
1737 req: S3Request<dto2::HeadObjectInput>,
1738 ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1739 let key = (req.input.bucket.clone(), req.input.key.clone());
1740 let lock = self.objects.lock().unwrap();
1741 let stored = lock
1742 .get(&key)
1743 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1744 Ok(S3Response::new(dto2::HeadObjectOutput {
1745 content_length: Some(stored.body.len() as i64),
1746 last_modified: Some(stored.last_modified.clone()),
1747 ..Default::default()
1748 }))
1749 }
1750
1751 async fn delete_object(
1752 &self,
1753 req: S3Request<dto2::DeleteObjectInput>,
1754 ) -> S3Result<S3Response<dto2::DeleteObjectOutput>> {
1755 let key = (req.input.bucket.clone(), req.input.key.clone());
1756 self.objects.lock().unwrap().remove(&key);
1757 Ok(S3Response::new(dto2::DeleteObjectOutput::default()))
1758 }
1759
1760 async fn list_objects_v2(
1761 &self,
1762 req: S3Request<dto2::ListObjectsV2Input>,
1763 ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1764 let prefix = req.input.bucket.clone();
1765 let lock = self.objects.lock().unwrap();
1766 let mut contents: Vec<dto2::Object> = lock
1767 .iter()
1768 .filter(|((b, _), _)| b == &prefix)
1769 .map(|((_, k), v)| dto2::Object {
1770 key: Some(k.clone()),
1771 size: Some(v.body.len() as i64),
1772 last_modified: Some(v.last_modified.clone()),
1773 ..Default::default()
1774 })
1775 .collect();
1776 contents.sort_by(|a, b| a.key.cmp(&b.key));
1777 let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1778 Ok(S3Response::new(dto2::ListObjectsV2Output {
1779 name: Some(prefix),
1780 contents: Some(contents),
1781 key_count: Some(key_count),
1782 is_truncated: Some(false),
1783 ..Default::default()
1784 }))
1785 }
1786
1787 async fn copy_object(
1788 &self,
1789 _req: S3Request<dto2::CopyObjectInput>,
1790 ) -> S3Result<S3Response<dto2::CopyObjectOutput>> {
1791 // Transition path: scanner copies same-key with new
1792 // storage_class. The mem backend doesn't track storage
1793 // class, so it's a no-op success — exactly the AWS-side
1794 // behaviour for a backend that ignores the field.
1795 Ok(S3Response::new(dto2::CopyObjectOutput::default()))
1796 }
1797
1798 // ---- v0.8.3 #69: multipart abort path -----------------------
1799 //
1800 // The lifecycle scanner walks `list_multipart_uploads` per
1801 // bucket and calls `abort_multipart_upload` on every upload
1802 // whose `Initiated` time is past the rule's threshold. The
1803 // test backend returns the seeded entries on listing and
1804 // drops them on abort so post-conditions are observable.
1805
1806 async fn list_multipart_uploads(
1807 &self,
1808 req: S3Request<dto2::ListMultipartUploadsInput>,
1809 ) -> S3Result<S3Response<dto2::ListMultipartUploadsOutput>> {
1810 let bucket = req.input.bucket.clone();
1811 let lock = self.multipart_uploads.lock().unwrap();
1812 let mut uploads: Vec<dto2::MultipartUpload> = lock
1813 .iter()
1814 .filter(|((b, _), _)| b == &bucket)
1815 .map(|((_, upload_id), v)| {
1816 let st: std::time::SystemTime = v.initiated.into();
1817 dto2::MultipartUpload {
1818 upload_id: Some(upload_id.clone()),
1819 key: Some(v.key.clone()),
1820 initiated: Some(dto2::Timestamp::from(st)),
1821 ..Default::default()
1822 }
1823 })
1824 .collect();
1825 // Stable order so test assertions on count + post-condition
1826 // do not race on the HashMap iteration order.
1827 uploads.sort_by(|a, b| a.upload_id.cmp(&b.upload_id));
1828 Ok(S3Response::new(dto2::ListMultipartUploadsOutput {
1829 bucket: Some(bucket),
1830 uploads: Some(uploads),
1831 is_truncated: Some(false),
1832 ..Default::default()
1833 }))
1834 }
1835
1836 async fn abort_multipart_upload(
1837 &self,
1838 req: S3Request<dto2::AbortMultipartUploadInput>,
1839 ) -> S3Result<S3Response<dto2::AbortMultipartUploadOutput>> {
1840 let bucket = req.input.bucket.clone();
1841 let upload_id = req.input.upload_id.clone();
1842 self.multipart_uploads
1843 .lock()
1844 .unwrap()
1845 .remove(&(bucket, upload_id));
1846 Ok(S3Response::new(dto2::AbortMultipartUploadOutput::default()))
1847 }
1848 }
1849
1850 fn make_service() -> Arc<S4Service<ScannerMemBackend>> {
1851 let registry = Arc::new(
1852 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1853 );
1854 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1855 Arc::new(S4Service::new(
1856 ScannerMemBackend::default(),
1857 registry,
1858 dispatcher,
1859 ))
1860 }
1861
1862 #[tokio::test]
1863 async fn run_scan_once_no_lifecycle_manager_returns_empty_report() {
1864 // Service has no lifecycle manager attached — scanner must
1865 // no-op cleanly (operator might not have set
1866 // `--lifecycle-state-file`). Also covers the empty-buckets
1867 // path in `run_scan_once`.
1868 let s4 = make_service();
1869 let report = run_scan_once(&s4).await.expect("scan");
1870 assert_eq!(report, ScanReport::default());
1871
1872 // And: lifecycle manager attached but no buckets configured.
1873 let mgr = Arc::new(LifecycleManager::new());
1874 let backend = ScannerMemBackend::default();
1875 let registry = Arc::new(
1876 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1877 );
1878 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1879 let s4_empty = Arc::new(
1880 S4Service::new(backend, registry, dispatcher).with_lifecycle(mgr),
1881 );
1882 let report = run_scan_once(&s4_empty).await.expect("scan empty");
1883 assert_eq!(report, ScanReport::default());
1884 }
1885
1886 #[tokio::test]
1887 async fn run_scan_once_expires_matching_objects_via_backend() {
1888 // Three objects: only "stale.log" matches the rule (prefix
1889 // gating). The other two are written but not under the prefix,
1890 // so the evaluator returns None for them.
1891 let backend = ScannerMemBackend::default();
1892 backend.put_now("b", "stale.log", Bytes::from_static(b"x"));
1893 backend.put_now("b", "data/keep1.bin", Bytes::from_static(b"y"));
1894 backend.put_now("b", "data/keep2.bin", Bytes::from_static(b"z"));
1895 // Rule: any object under `stale.` prefix is expired immediately
1896 // (`expire_after_days(0)` matches age >= 0d, which is every
1897 // backend object).
1898 let mgr = Arc::new(LifecycleManager::new());
1899 let mut rule = LifecycleRule::expire_after_days("r", 0);
1900 rule.filter.prefix = Some("stale.".into());
1901 mgr.put("b", LifecycleConfig { rules: vec![rule] });
1902 let registry = Arc::new(
1903 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1904 );
1905 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1906 let s4 = Arc::new(
1907 S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
1908 );
1909
1910 let report = run_scan_once(&s4).await.expect("scan");
1911 assert_eq!(report.buckets_scanned, 1);
1912 assert_eq!(report.objects_evaluated, 3);
1913 assert_eq!(report.expired, 1);
1914 assert_eq!(report.transitioned, 0);
1915 assert_eq!(report.skipped_locked, 0);
1916 assert_eq!(report.action_errors, 0);
1917 // Backend post-condition: the matching key is gone, the others
1918 // remain. Read back through the service's own list_objects_v2
1919 // path (which is also what the customer-visible HTTP layer
1920 // serves) so we exercise the same code the scanner walked.
1921 let req = synthetic_request(
1922 ListObjectsV2Input {
1923 bucket: "b".into(),
1924 ..Default::default()
1925 },
1926 http::Method::GET,
1927 "/b?list-type=2",
1928 );
1929 let resp = s4
1930 .as_ref()
1931 .list_objects_v2(req)
1932 .await
1933 .expect("post-scan list");
1934 let keys: Vec<String> = resp
1935 .output
1936 .contents
1937 .unwrap_or_default()
1938 .into_iter()
1939 .filter_map(|o| o.key)
1940 .collect();
1941 assert!(!keys.contains(&"stale.log".to_string()));
1942 assert!(keys.contains(&"data/keep1.bin".to_string()));
1943 assert!(keys.contains(&"data/keep2.bin".to_string()));
1944 // Lifecycle action counter: one Expire bumped on bucket "b".
1945 let snap = mgr.actions_snapshot();
1946 assert_eq!(
1947 snap.get(&("b".into(), "expire".into())).copied(),
1948 Some(1)
1949 );
1950 }
1951
1952 #[tokio::test]
1953 async fn run_scan_once_skips_object_lock_protected_keys() {
1954 let backend = ScannerMemBackend::default();
1955 backend.put_now("b", "locked.log", Bytes::from_static(b"x"));
1956 backend.put_now("b", "free.log", Bytes::from_static(b"y"));
1957 let registry = Arc::new(
1958 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1959 );
1960 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1961 let mgr = Arc::new(LifecycleManager::new());
1962 // Aggressive: every object expires immediately.
1963 mgr.put(
1964 "b",
1965 LifecycleConfig {
1966 rules: vec![LifecycleRule::expire_after_days("r", 0)],
1967 },
1968 );
1969 let lock_mgr = Arc::new(ObjectLockManager::new());
1970 // Lock retains "locked.log" until the year 2099 — Compliance
1971 // mode means even Governance bypass cannot delete it.
1972 let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
1973 .expect("parse")
1974 .with_timezone(&Utc);
1975 lock_mgr.set(
1976 "b",
1977 "locked.log",
1978 ObjectLockState {
1979 mode: Some(LockMode::Compliance),
1980 retain_until: Some(retain_until),
1981 legal_hold_on: false,
1982 },
1983 );
1984 let s4 = Arc::new(
1985 S4Service::new(backend, registry, dispatcher)
1986 .with_lifecycle(Arc::clone(&mgr))
1987 .with_object_lock(Arc::clone(&lock_mgr)),
1988 );
1989
1990 let report = run_scan_once(&s4).await.expect("scan");
1991 assert_eq!(report.buckets_scanned, 1);
1992 assert_eq!(report.objects_evaluated, 2);
1993 assert_eq!(report.expired, 1, "free.log should have been expired");
1994 assert_eq!(report.skipped_locked, 1, "locked.log must be skipped");
1995 assert_eq!(report.action_errors, 0);
1996 }
1997
1998 /// v0.8.3 #65 (audit C-2): full scanner walk with a mix of free
1999 /// and locked objects must (a) leave outer/free objects expired,
2000 /// (b) skip the middle locked object, (c) bump
2001 /// `ScanReport::skipped_locked`, and (d) emit a Prometheus
2002 /// `s4_lifecycle_actions_total{action="skipped_locked"}` sample.
2003 /// Previously (v0.7 #45) the skip path bumped only the in-report
2004 /// counter — operator dashboards saw no signal when Object Lock
2005 /// vetoed a Lifecycle Expiration, which is the silent-failure
2006 /// observability gap audit C-2 called out.
2007 #[tokio::test]
2008 async fn scan_one_config_skips_locked_objects_and_bumps_metric() {
2009 // The Prometheus recorder is a process-global slot. Multiple
2010 // tests in the same binary race on `install_recorder()`, so
2011 // we route through `crate::metrics::test_metrics_handle()`
2012 // which is OnceLock-guarded and shared with the
2013 // `metrics::tests::install_and_render_basic_counters` test.
2014 // Use a unique bucket label so this test's sample line is
2015 // identifiable even when other tests in the binary also bump
2016 // the lifecycle counter under different bucket labels.
2017 let metrics_handle = crate::metrics::test_metrics_handle();
2018
2019 let bucket = "lc-locked-metric-65";
2020 let backend = ScannerMemBackend::default();
2021 // Three objects; the middle one ("middle.log") will be
2022 // Object-Lock-Compliance-locked until 2099. The two outer
2023 // objects ("outer-a.log", "outer-c.log") have no lock state
2024 // attached, so the aggressive `expire_after_days(0)` rule
2025 // matches and the scanner's `delete_object` actually fires.
2026 backend.put_now(bucket, "outer-a.log", Bytes::from_static(b"a"));
2027 backend.put_now(bucket, "middle.log", Bytes::from_static(b"m"));
2028 backend.put_now(bucket, "outer-c.log", Bytes::from_static(b"c"));
2029
2030 let registry = Arc::new(
2031 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
2032 );
2033 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2034 let mgr = Arc::new(LifecycleManager::new());
2035 mgr.put(
2036 bucket,
2037 LifecycleConfig {
2038 rules: vec![LifecycleRule::expire_after_days("r", 1)],
2039 },
2040 );
2041 // Object-Lock Compliance retain until far in the future (2099).
2042 // `is_locked(now)` then returns `true` regardless of when the
2043 // test actually runs.
2044 let lock_mgr = Arc::new(ObjectLockManager::new());
2045 let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
2046 .expect("parse retain_until")
2047 .with_timezone(&Utc);
2048 lock_mgr.set(
2049 bucket,
2050 "middle.log",
2051 ObjectLockState {
2052 mode: Some(LockMode::Compliance),
2053 retain_until: Some(retain_until),
2054 legal_hold_on: false,
2055 },
2056 );
2057 let s4 = Arc::new(
2058 S4Service::new(backend, registry, dispatcher)
2059 .with_lifecycle(Arc::clone(&mgr))
2060 .with_object_lock(Arc::clone(&lock_mgr)),
2061 );
2062
2063 // The objects above were `put_now(...)` with `last_modified =
2064 // SystemTime::now()`, so their computed `age` is roughly zero
2065 // and the `expire_after_days(1)` rule alone would NOT match.
2066 // Force the rule threshold down to zero days so all three
2067 // objects qualify for Expiration — the test is about the Lock
2068 // veto, not the age math.
2069 mgr.put(
2070 bucket,
2071 LifecycleConfig {
2072 rules: vec![LifecycleRule::expire_after_days("r", 0)],
2073 },
2074 );
2075
2076 let report = run_scan_once(&s4).await.expect("scan");
2077 assert_eq!(report.buckets_scanned, 1);
2078 assert_eq!(report.objects_evaluated, 3);
2079 assert_eq!(
2080 report.expired, 2,
2081 "outer-a.log + outer-c.log must be DELETEd; got {report:?}"
2082 );
2083 assert_eq!(
2084 report.skipped_locked, 1,
2085 "middle.log is Compliance-locked → scanner must skip; got {report:?}"
2086 );
2087 assert_eq!(report.transitioned, 0);
2088 assert_eq!(report.action_errors, 0);
2089
2090 // Render the Prometheus exporter and assert that a sample line
2091 // for `s4_lifecycle_actions_total{...action="skipped_locked",
2092 // bucket="lc-locked-metric-65"...}` is present with value >= 1.
2093 // The metrics-exporter-prometheus crate sorts labels
2094 // alphabetically (`bucket` appears before `action` in the
2095 // rendered output), so we substring-match both label fragments
2096 // rather than rely on a fixed ordering. We use `>=` (not
2097 // `==`) because the recorder is process-global and a parallel
2098 // run of the same test in a future session could legitimately
2099 // bump it again — but since the bucket label embeds an
2100 // issue-unique suffix, no other test in this binary touches
2101 // this specific (action, bucket) pair.
2102 let rendered = metrics_handle.render();
2103 let bucket_frag = format!("bucket=\"{bucket}\"");
2104 let action_frag = "action=\"skipped_locked\"";
2105 let line = rendered
2106 .lines()
2107 .find(|l| {
2108 l.starts_with("s4_lifecycle_actions_total{")
2109 && l.contains(&bucket_frag)
2110 && l.contains(action_frag)
2111 })
2112 .unwrap_or_else(|| {
2113 panic!(
2114 "Prometheus output missing skipped_locked sample for {bucket}; \
2115 full render:\n{rendered}"
2116 )
2117 });
2118 // Parse the trailing counter value (whitespace-separated).
2119 let value: u64 = line
2120 .split_whitespace()
2121 .next_back()
2122 .expect("counter value column")
2123 .parse()
2124 .expect("counter value is u64");
2125 assert!(
2126 value >= 1,
2127 "skipped_locked counter must be >= 1 after scan; line: {line}"
2128 );
2129 }
2130
2131 /// v0.8.3 #69 (audit M-2): end-to-end test of the multipart sweep.
2132 /// Two in-flight uploads are seeded — `u-stale` initiated 8 days
2133 /// ago, `u-fresh` initiated 1 hour ago. The lifecycle rule sets
2134 /// `abort_incomplete_multipart_upload_days = 7`. The scanner must
2135 /// abort `u-stale` (bumping `report.aborted_multipart`) but leave
2136 /// `u-fresh` alone. Object walk is a no-op (no objects seeded), so
2137 /// the report's expire / transition counters stay at zero.
2138 #[tokio::test]
2139 async fn run_scan_once_aborts_stale_multipart_upload() {
2140 let backend = ScannerMemBackend::default();
2141 let bucket = "lc-mp-69";
2142 let now = Utc::now();
2143 backend.put_multipart_upload(bucket, "u-stale", "uploads/big.bin", now - Duration::days(8));
2144 backend.put_multipart_upload(bucket, "u-fresh", "uploads/fresh.bin", now - Duration::hours(1));
2145
2146 let registry = Arc::new(
2147 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
2148 );
2149 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2150 let mgr = Arc::new(LifecycleManager::new());
2151 let mut rule = LifecycleRule {
2152 id: "abort-7d".into(),
2153 status: LifecycleStatus::Enabled,
2154 filter: LifecycleFilter::default(),
2155 expiration_days: None,
2156 expiration_date: None,
2157 transitions: Vec::new(),
2158 noncurrent_version_expiration_days: None,
2159 abort_incomplete_multipart_upload_days: Some(7),
2160 };
2161 rule.filter.prefix = Some("uploads/".into());
2162 mgr.put(bucket, LifecycleConfig { rules: vec![rule] });
2163 let s4 = Arc::new(
2164 S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
2165 );
2166
2167 let report = run_scan_once(&s4).await.expect("scan");
2168 assert_eq!(report.buckets_scanned, 1);
2169 assert_eq!(
2170 report.aborted_multipart, 1,
2171 "u-stale must be aborted; got {report:?}",
2172 );
2173 assert_eq!(report.action_errors, 0);
2174 assert_eq!(report.expired, 0);
2175 assert_eq!(report.transitioned, 0);
2176
2177 // Backend post-condition via the wire-side
2178 // `list_multipart_uploads` path: only the fresh upload
2179 // (`u-fresh`) survives — `u-stale` was aborted by the
2180 // scanner.
2181 let post_req = synthetic_request(
2182 ListMultipartUploadsInput {
2183 bucket: bucket.into(),
2184 ..Default::default()
2185 },
2186 http::Method::GET,
2187 &format!("/{bucket}?uploads"),
2188 );
2189 let post = s4
2190 .as_ref()
2191 .list_multipart_uploads(post_req)
2192 .await
2193 .expect("post-scan list_multipart_uploads");
2194 let remaining_ids: Vec<String> = post
2195 .output
2196 .uploads
2197 .unwrap_or_default()
2198 .into_iter()
2199 .filter_map(|u| u.upload_id)
2200 .collect();
2201 assert_eq!(
2202 remaining_ids,
2203 vec!["u-fresh".to_string()],
2204 "exactly u-fresh must remain after the sweep; got {remaining_ids:?}",
2205 );
2206
2207 // Counter snapshot agrees with the report.
2208 let snap = mgr.actions_snapshot();
2209 assert_eq!(
2210 snap.get(&(bucket.into(), "abort_incomplete_multipart".into()))
2211 .copied(),
2212 Some(1),
2213 "v0.8.3 #69: abort_incomplete_multipart counter must be 1",
2214 );
2215 }
2216}