Skip to main content

s4_server/
lifecycle.rs

1//! S3 Lifecycle execution — per-bucket rule evaluation + manager skeleton
2//! (v0.6 #37).
3//!
4//! AWS S3 Lifecycle attaches a **list of rules** to a bucket; each rule may
5//! request that S3
6//!
7//! 1. **Expire** an object once its age (or the calendar date) crosses a
8//!    threshold (`Expiration { Days | Date }`),
9//! 2. **Transition** an object to a different storage class (`Transition
10//!    { Days, StorageClass }` — `STANDARD_IA`, `GLACIER_IR`, ...),
11//! 3. **Expire noncurrent versions** in a versioning-enabled bucket
12//!    (`NoncurrentVersionExpiration { NoncurrentDays }`).
13//!
14//! Until v0.6 #37 the matching `PutBucketLifecycleConfiguration` /
15//! `GetBucketLifecycleConfiguration` / `DeleteBucketLifecycle` handlers
16//! in `crates/s4-server/src/service.rs` were pure passthroughs (the s3s
17//! framework's default backend stored them but nothing read the rules).
18//! This module owns the in-memory configuration store + the rule
19//! evaluator that decides, for any single object, whether an action
20//! should fire **right now**.
21//!
22//! ## responsibilities (v0.6 #37)
23//!
24//! - in-memory `bucket -> LifecycleConfig` map with JSON snapshot
25//!   round-trip (mirroring `versioning.rs` / `object_lock.rs` /
26//!   `inventory.rs`'s shape so `--lifecycle-state-file` is a one-line
27//!   addition in `main.rs`).
28//! - per-bucket action counters (`actions_total`) — bumped by the
29//!   future scanner when an Expiration / Transition /
30//!   NoncurrentExpiration action is taken, surfaced via Prometheus
31//!   (`s4_lifecycle_actions_total`, see `metrics.rs`).
32//! - [`LifecycleManager::evaluate`] — given one (bucket, key, age,
33//!   size, tags) tuple, walk the bucket's rules in declaration order
34//!   and return the first matching action. Returns `None` when no
35//!   rule matches (or when the matching rule is `Disabled`).
36//! - [`evaluate_batch`] — batched form for the test path: walks a
37//!   slice of `(key, age, size, tags)` tuples and returns the (key,
38//!   action) pairs that should fire. The actual backend invocation
39//!   (S3.delete_object / metadata rewrite) is the caller's job.
40//!
41//! ## scope limitations (v0.6 #37)
42//!
43//! - **Background scanner is a skeleton only.** `main.rs`'s
44//!   `--lifecycle-scan-interval-hours` flag spawns a tokio task that
45//!   logs the bucket list and stamps a "would-have-run" marker;
46//!   walking the source bucket via `list_objects_v2` and actually
47//!   invoking `delete_object` / metadata rewrite for each evaluated
48//!   action is deferred to v0.7+. Wiring the scheduler to walk a real
49//!   bucket end-to-end requires a back-reference from the scheduler
50//!   into `S4Service` for the `list_objects_v2` walk and that
51//!   reshuffle is out of scope for this issue. The
52//!   [`crate::S4Service::run_lifecycle_once_for_test`] entry covers
53//!   the in-memory equivalent so the unit + E2E tests exercise the
54//!   evaluator end-to-end.
55//! - **`AbortIncompleteMultipartUpload`** is parsed and stored on the
56//!   `LifecycleRule` (so PutBucketLifecycleConfiguration round-trips
57//!   the field) but not enforced — multipart abort sweeping is a
58//!   separate scanner that lives next to the multipart upload manager
59//!   (v0.7+).
60//! - **`expiration_date` (calendar date)** is supported in the
61//!   evaluator: a rule with `expiration_date` past `now` fires
62//!   Expiration immediately. Same wire form as AWS S3.
63//! - **Multi-instance replication.** All state is single-instance
64//!   in-memory; `--lifecycle-state-file <PATH>` provides restart
65//!   recovery via JSON snapshot, matching the
66//!   `--versioning-state-file` shape.
67//! - **Object Lock interplay**: the evaluator does NOT consult the
68//!   `ObjectLockManager` directly (the evaluator API is
69//!   object-tags-and-size only); the scanner caller is expected to
70//!   skip locked objects — see the `evaluate_batch_skips_locked` test
71//!   for the canonical pattern. Locking always wins over Lifecycle.
72//! - **Versioning interplay**: the evaluator treats noncurrent
73//!   versions as a separate input — pass `is_noncurrent = true` to
74//!   [`LifecycleManager::evaluate_with_flags`] for noncurrent version
75//!   expiration matching. The legacy `evaluate` shorthand defaults
76//!   `is_noncurrent = false` (current version) so existing call sites
77//!   stay one-liners.
78
79use std::collections::HashMap;
80use std::sync::Arc;
81use std::sync::RwLock;
82
83use chrono::{DateTime, Duration, Utc};
84use s3s::S3;
85use s3s::S3Request;
86use s3s::dto::*;
87use serde::{Deserialize, Serialize};
88use tracing::warn;
89
90/// Whether a rule is currently being applied. Mirrors AWS S3
91/// `ExpirationStatus` (`"Enabled"` / `"Disabled"`).
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub enum LifecycleStatus {
94    Enabled,
95    Disabled,
96}
97
98impl LifecycleStatus {
99    /// Wire form used by AWS S3 (`"Enabled"` / `"Disabled"`).
100    #[must_use]
101    pub fn as_aws_str(self) -> &'static str {
102        match self {
103            Self::Enabled => "Enabled",
104            Self::Disabled => "Disabled",
105        }
106    }
107
108    /// Parse the AWS wire form (case-insensitive). Falls back to `Disabled`
109    /// on unrecognised input — this matches AWS conservative behaviour
110    /// (an unparseable status is treated as "off" so a typo doesn't silently
111    /// expire data).
112    #[must_use]
113    pub fn from_aws_str(s: &str) -> Self {
114        if s.eq_ignore_ascii_case("Enabled") {
115            Self::Enabled
116        } else {
117            Self::Disabled
118        }
119    }
120}
121
122/// Per-rule object filter. AWS S3 represents the filter as one of `Prefix`,
123/// `Tag`, `ObjectSizeGreaterThan`, `ObjectSizeLessThan`, or `And` (= AND of
124/// any subset of those predicates). For internal storage we flatten the
125/// "And" form into a struct of optional fields plus a vector of (key, value)
126/// tags — every present field must match (logical AND). An empty filter (all
127/// fields `None` / empty `tags`) matches every object in the bucket.
128#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
129pub struct LifecycleFilter {
130    /// Object key prefix (empty / `None` = no prefix gating).
131    #[serde(default)]
132    pub prefix: Option<String>,
133    /// Logical AND across every entry: every (key, value) must match the
134    /// object's own tag set.
135    #[serde(default)]
136    pub tags: Vec<(String, String)>,
137    /// Object must be *strictly greater than* this size in bytes.
138    #[serde(default)]
139    pub object_size_greater_than: Option<u64>,
140    /// Object must be *strictly less than* this size in bytes.
141    #[serde(default)]
142    pub object_size_less_than: Option<u64>,
143}
144
145impl LifecycleFilter {
146    /// `true` when this filter accepts the candidate. Empty filter accepts
147    /// every object. Tag matching is AND of all listed tags (each present in
148    /// `object_tags` with the matching value).
149    #[must_use]
150    pub fn matches(&self, key: &str, size: u64, object_tags: &[(String, String)]) -> bool {
151        if let Some(p) = &self.prefix
152            && !key.starts_with(p)
153        {
154            return false;
155        }
156        if let Some(min) = self.object_size_greater_than
157            && size <= min
158        {
159            return false;
160        }
161        if let Some(max) = self.object_size_less_than
162            && size >= max
163        {
164            return false;
165        }
166        for (tk, tv) in &self.tags {
167            let matched = object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv);
168            if !matched {
169                return false;
170            }
171        }
172        true
173    }
174}
175
176/// A single transition step (object age threshold + target storage class).
177/// `days` is days since the object was created. AWS S3 also accepts `Date`
178/// for transitions but Lifecycle deployments overwhelmingly use `Days`; the
179/// `Date` form is omitted here on purpose to keep the evaluator narrow
180/// (operators wanting calendar transitions can synthesise a one-shot rule
181/// at the cadence of their scanner).
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
183pub struct TransitionRule {
184    pub days: u32,
185    /// Target storage class (`"STANDARD_IA"` / `"GLACIER_IR"` /
186    /// `"GLACIER"` / `"DEEP_ARCHIVE"` / `"INTELLIGENT_TIERING"` /
187    /// `"ONEZONE_IA"`). Stored as the AWS wire string so PutBucket /
188    /// GetBucket round-trip is a no-op.
189    pub storage_class: String,
190}
191
192/// One lifecycle rule. AWS S3's `LifecycleRule` flattened into the subset
193/// the v0.6 #37 evaluator handles. `id` is the operator-supplied label and
194/// makes Get / Put round-trips non-lossy.
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196pub struct LifecycleRule {
197    pub id: String,
198    pub status: LifecycleStatus,
199    #[serde(default)]
200    pub filter: LifecycleFilter,
201    /// Days since the object was created. Mutually exclusive with
202    /// [`Self::expiration_date`] in AWS — both fields are accepted here on
203    /// input (the evaluator picks `expiration_days` first, then
204    /// `expiration_date`) so a malformed rule with both set still evaluates
205    /// deterministically rather than silently dropping the action.
206    #[serde(default)]
207    pub expiration_days: Option<u32>,
208    /// Calendar date past which matching objects are expired (AWS wire form
209    /// is ISO 8601; here we keep it as a `DateTime<Utc>` so round-trips
210    /// through `serde_json` survive without re-parsing).
211    #[serde(default)]
212    pub expiration_date: Option<DateTime<Utc>>,
213    /// Transition steps in declaration order. The evaluator picks the
214    /// deepest transition (largest `days` ≤ object age) and resolves any
215    /// conflict with expiration in [`LifecycleManager::evaluate_with_flags`].
216    #[serde(default)]
217    pub transitions: Vec<TransitionRule>,
218    /// Days an object has been noncurrent before the noncurrent-version
219    /// expiration fires. Only consulted when the evaluator is asked about
220    /// a noncurrent object (`is_noncurrent = true`).
221    #[serde(default)]
222    pub noncurrent_version_expiration_days: Option<u32>,
223    /// Days after a multipart upload is initiated before the abort fires.
224    /// Stored so PutBucket round-trips, but **not enforced** in the
225    /// v0.6 #37 evaluator — multipart sweeping lives elsewhere.
226    #[serde(default)]
227    pub abort_incomplete_multipart_upload_days: Option<u32>,
228}
229
230impl LifecycleRule {
231    /// Convenience constructor for a "expire after N days" rule. Useful in
232    /// tests + operator scripts.
233    #[must_use]
234    pub fn expire_after_days(id: impl Into<String>, days: u32) -> Self {
235        Self {
236            id: id.into(),
237            status: LifecycleStatus::Enabled,
238            filter: LifecycleFilter::default(),
239            expiration_days: Some(days),
240            expiration_date: None,
241            transitions: Vec::new(),
242            noncurrent_version_expiration_days: None,
243            abort_incomplete_multipart_upload_days: None,
244        }
245    }
246}
247
248/// Per-bucket lifecycle configuration (ordered list of rules — first match
249/// wins, matching AWS S3 semantics).
250#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
251pub struct LifecycleConfig {
252    pub rules: Vec<LifecycleRule>,
253}
254
255/// The action a single rule wants to take **right now** for a candidate
256/// object.
257#[derive(Clone, Debug, PartialEq, Eq)]
258pub enum LifecycleAction {
259    /// Delete the object (`Expiration` / `NoncurrentVersionExpiration`).
260    Expire,
261    /// Move the object to a different storage class (`Transition`). The
262    /// inner string is the AWS wire form (e.g. `"GLACIER_IR"`).
263    Transition { storage_class: String },
264    /// v0.8.3 #69 (audit M-2): abort an in-flight multipart upload that
265    /// has been initiated longer ago than the rule's
266    /// `abort_incomplete_multipart_upload_days`. The inner string is the
267    /// backend-issued `upload_id` (so the scanner can route the
268    /// `AbortMultipartUpload` call without re-listing). Same wire
269    /// semantic as AWS S3 `AbortIncompleteMultipartUpload`.
270    AbortMultipartUpload { upload_id: String },
271}
272
273impl LifecycleAction {
274    /// Stable label suitable for a metric counter
275    /// (`s4_lifecycle_actions_total{action="..."}`).
276    #[must_use]
277    pub fn metric_label(&self) -> &'static str {
278        match self {
279            Self::Expire => "expire",
280            Self::Transition { .. } => "transition",
281            Self::AbortMultipartUpload { .. } => "abort_incomplete_multipart",
282        }
283    }
284}
285
286/// v0.8.3 #69: one in-flight multipart upload the lifecycle scanner
287/// considers for abort. Mirrors the (subset of) `MultipartUpload` fields
288/// the rule evaluator needs (key, upload_id, initiated). `tags` is kept
289/// in the shape the existing object-path evaluator uses
290/// (`Vec<(String, String)>`) so a future enhancement that surfaces
291/// upload-time tags from `MultipartStateStore` can flow through the
292/// same filter check without API churn — AWS S3 itself does not attach
293/// tags to in-flight multipart uploads, so for the scanner-driven path
294/// the slice is always empty (the filter's prefix / size predicates
295/// still apply via [`LifecycleFilter::matches`], passing size = 0).
296#[derive(Clone, Debug)]
297pub struct MultipartUploadCandidate {
298    pub upload_id: String,
299    pub key: String,
300    pub initiated: DateTime<Utc>,
301    pub tags: Vec<(String, String)>,
302}
303
304/// snapshot のシリアライズ format。`to_json` / `from_json` 用。
305#[derive(Debug, Default, Serialize, Deserialize)]
306struct LifecycleSnapshot {
307    by_bucket: HashMap<String, LifecycleConfig>,
308}
309
310/// Per-bucket lifecycle configuration manager.
311///
312/// All read / write operations go through `RwLock` for thread safety;
313/// clones are cheap (`Arc<LifecycleManager>` is the expected handle shape).
314/// `actions_total` is a parallel `RwLock<HashMap<...>>` of `(bucket,
315/// action_label) -> count` so the future background scanner can stamp
316/// successful actions and operators can `GET /metrics` to see the running
317/// totals (the metric is also surfaced via `metrics::counter!` — see
318/// [`crate::metrics::record_lifecycle_action`]).
319#[derive(Debug, Default)]
320pub struct LifecycleManager {
321    by_bucket: RwLock<HashMap<String, LifecycleConfig>>,
322    /// `(bucket, action_label) -> count`. Bumped by the scanner via
323    /// [`Self::record_action`]. Action labels are the
324    /// [`LifecycleAction::metric_label`] values
325    /// (`"expire"` / `"transition"`).
326    actions_total: RwLock<HashMap<(String, String), u64>>,
327}
328
329impl LifecycleManager {
330    /// Empty manager — no bucket has rules.
331    #[must_use]
332    pub fn new() -> Self {
333        Self::default()
334    }
335
336    /// Replace (or create) the lifecycle configuration for `bucket`. Drops
337    /// any previously-attached rules in one shot — matches AWS S3
338    /// `PutBucketLifecycleConfiguration` (full replace, no merge).
339    pub fn put(&self, bucket: &str, config: LifecycleConfig) {
340        self.by_bucket
341            .write()
342            .expect("lifecycle state RwLock poisoned")
343            .insert(bucket.to_owned(), config);
344    }
345
346    /// Return a clone of the bucket's configuration, if any.
347    #[must_use]
348    pub fn get(&self, bucket: &str) -> Option<LifecycleConfig> {
349        self.by_bucket
350            .read()
351            .expect("lifecycle state RwLock poisoned")
352            .get(bucket)
353            .cloned()
354    }
355
356    /// Drop the bucket's lifecycle configuration (idempotent — missing
357    /// bucket is OK).
358    pub fn delete(&self, bucket: &str) {
359        self.by_bucket
360            .write()
361            .expect("lifecycle state RwLock poisoned")
362            .remove(bucket);
363    }
364
365    /// JSON snapshot for restart-recoverable state. Pair with
366    /// [`Self::from_json`].
367    pub fn to_json(&self) -> Result<String, serde_json::Error> {
368        let by_bucket = self
369            .by_bucket
370            .read()
371            .expect("lifecycle state RwLock poisoned")
372            .clone();
373        let snap = LifecycleSnapshot { by_bucket };
374        serde_json::to_string(&snap)
375    }
376
377    /// Restore from a JSON snapshot produced by [`Self::to_json`]. Action
378    /// counters are intentionally not snapshotted — they're transient
379    /// observability data and should reset across process restarts so
380    /// `rate(s4_lifecycle_actions_total[1h])` doesn't double-count.
381    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
382        let snap: LifecycleSnapshot = serde_json::from_str(s)?;
383        Ok(Self {
384            by_bucket: RwLock::new(snap.by_bucket),
385            actions_total: RwLock::new(HashMap::new()),
386        })
387    }
388
389    /// Evaluate which rule (if any) applies to a single **current-version**
390    /// object right now. Walks the bucket's rules in declaration order;
391    /// returns the first matching action. Returns `None` when no rule
392    /// matches (or when the matching rule is `Disabled`, or when the
393    /// bucket has no lifecycle configuration).
394    ///
395    /// Within a single rule the precedence is:
396    ///
397    /// 1. Pick the deepest transition whose `days` threshold is currently
398    ///    met (= largest `days ≤ object age`).
399    /// 2. Conflict with expiration: if `expiration_days <=
400    ///    transition_days` for the chosen transition, expiration wins
401    ///    (the rule wants the object gone before it would have been
402    ///    transitioned). Otherwise transition wins (e.g. transition at
403    ///    30d, expiration at 365d, age 60d → transition fires now,
404    ///    expiration is future).
405    /// 3. `expiration_date` matches when `now >= expiration_date` and no
406    ///    transition is currently applicable.
407    ///
408    /// `object_age` is "now - created_at" supplied by the caller — keeping
409    /// the evaluator pure of the wall clock makes deterministic testing
410    /// trivial.
411    #[must_use]
412    pub fn evaluate(
413        &self,
414        bucket: &str,
415        key: &str,
416        object_age: Duration,
417        object_size: u64,
418        object_tags: &[(String, String)],
419    ) -> Option<LifecycleAction> {
420        self.evaluate_with_flags(
421            bucket,
422            key,
423            object_age,
424            object_size,
425            object_tags,
426            EvaluateFlags::default(),
427        )
428    }
429
430    /// Full-form evaluator with flags for noncurrent-version handling.
431    /// Use this when the scanner is walking a versioning-enabled bucket;
432    /// pass `is_noncurrent = true` for entries that are not the latest
433    /// non-delete-marker version.
434    #[must_use]
435    pub fn evaluate_with_flags(
436        &self,
437        bucket: &str,
438        key: &str,
439        object_age: Duration,
440        object_size: u64,
441        object_tags: &[(String, String)],
442        flags: EvaluateFlags,
443    ) -> Option<LifecycleAction> {
444        let cfg = self.get(bucket)?;
445        let now_for_date = flags.now.unwrap_or_else(Utc::now);
446        let age_days = object_age.num_days().max(0);
447        let age_days_u32 = u32::try_from(age_days).unwrap_or(u32::MAX);
448        for rule in &cfg.rules {
449            if rule.status != LifecycleStatus::Enabled {
450                continue;
451            }
452            if !rule.filter.matches(key, object_size, object_tags) {
453                continue;
454            }
455            // Noncurrent-version expiration: only consulted when the
456            // caller explicitly flags this entry as noncurrent. The
457            // current-version expiration / transition rules do not fire
458            // for noncurrent versions in AWS S3 semantics.
459            if flags.is_noncurrent {
460                if let Some(days) = rule.noncurrent_version_expiration_days
461                    && age_days_u32 >= days
462                {
463                    return Some(LifecycleAction::Expire);
464                }
465                continue;
466            }
467            // Current-version path.
468            let exp_days_match = rule.expiration_days.filter(|d| age_days_u32 >= *d);
469            let exp_date_match = rule.expiration_date.filter(|d| now_for_date >= *d);
470            // Pick the deepest transition whose threshold is at or
471            // below the object's age. Transitions are typically
472            // declaration-ordered by ascending `days`, but we don't
473            // require it — taking the largest threshold means an
474            // object aged 90d gets `GLACIER` over `STANDARD_IA` even
475            // if `STANDARD_IA(30d)` was declared first.
476            let chosen_transition = rule
477                .transitions
478                .iter()
479                .filter(|t| age_days_u32 >= t.days)
480                .max_by_key(|t| t.days);
481            // Conflict resolution: when `expiration_days` fires AND a
482            // transition fires, expiration wins iff
483            // `expiration_days <= transition_days` (rule wants object
484            // gone before / at the same time it would have been
485            // transitioned). Otherwise the transition wins.
486            if let Some(exp_threshold) = exp_days_match {
487                let trans_threshold = chosen_transition.map(|t| t.days).unwrap_or(u32::MAX);
488                if exp_threshold <= trans_threshold {
489                    return Some(LifecycleAction::Expire);
490                }
491            }
492            if let Some(t) = chosen_transition {
493                return Some(LifecycleAction::Transition {
494                    storage_class: t.storage_class.clone(),
495                });
496            }
497            // Calendar-date expiration (no transition currently
498            // applicable, but the rule's expiration_date is past).
499            if exp_date_match.is_some() {
500                return Some(LifecycleAction::Expire);
501            }
502            // Fall through to the next rule when no action fires for
503            // this rule — first-match-wins applies only to *firing*
504            // rules, matching AWS semantics where overlapping rules
505            // with disjoint thresholds compose.
506        }
507        None
508    }
509
510    /// v0.8.3 #69 (audit M-2): evaluate one in-flight multipart upload
511    /// against the bucket's rules. Returns
512    /// [`LifecycleAction::AbortMultipartUpload`] when at least one
513    /// `Enabled` rule (a) accepts the upload's key via its filter and
514    /// (b) carries an `abort_incomplete_multipart_upload_days`
515    /// threshold whose age (`now - initiated`) is currently met.
516    /// Returns `None` otherwise (no matching rule, no
517    /// abort-multipart-upload-days set, or the upload is too young).
518    ///
519    /// Filter matching reuses [`LifecycleFilter::matches`] with
520    /// `object_size = 0` — in-flight uploads have no assembled size
521    /// yet (the parts are stored independently in the backend), so
522    /// any rule whose filter sets `object_size_greater_than` /
523    /// `object_size_less_than` is treated as if the upload were
524    /// 0 bytes. AWS S3 itself does not gate
525    /// `AbortIncompleteMultipartUpload` on size; this matches the
526    /// AWS semantic (size predicates simply do not apply to the
527    /// abort path) for the typical filter shape (no size predicate).
528    /// Operators wanting size-gated abort can carry the upload's
529    /// declared part length on the `MultipartUploadCandidate` in a
530    /// follow-up issue — the API extension is additive.
531    #[must_use]
532    pub fn evaluate_in_flight_multipart(
533        &self,
534        bucket: &str,
535        upload: &MultipartUploadCandidate,
536        now: DateTime<Utc>,
537    ) -> Option<LifecycleAction> {
538        let cfg = self.get(bucket)?;
539        for rule in &cfg.rules {
540            if rule.status != LifecycleStatus::Enabled {
541                continue;
542            }
543            if !rule.filter.matches(&upload.key, 0, &upload.tags) {
544                continue;
545            }
546            if let Some(days) = rule.abort_incomplete_multipart_upload_days {
547                let age = now.signed_duration_since(upload.initiated);
548                if age >= Duration::days(i64::from(days)) {
549                    return Some(LifecycleAction::AbortMultipartUpload {
550                        upload_id: upload.upload_id.clone(),
551                    });
552                }
553            }
554        }
555        None
556    }
557
558    /// Stamp the per-bucket action counter and bump the matching
559    /// Prometheus counter. Called by the future scanner after a successful
560    /// delete / metadata rewrite.
561    pub fn record_action(&self, bucket: &str, action: &LifecycleAction) {
562        let label = action.metric_label();
563        let key = (bucket.to_owned(), label.to_owned());
564        let mut guard = self
565            .actions_total
566            .write()
567            .expect("lifecycle actions counter RwLock poisoned");
568        let entry = guard.entry(key).or_insert(0);
569        *entry = entry.saturating_add(1);
570        crate::metrics::record_lifecycle_action(bucket, label);
571    }
572
573    /// Read-only snapshot of the per-(bucket, action) counter map.
574    /// Useful for tests + introspection (`/admin/lifecycle/stats` style
575    /// endpoints in the future).
576    #[must_use]
577    pub fn actions_snapshot(&self) -> HashMap<(String, String), u64> {
578        self.actions_total
579            .read()
580            .expect("lifecycle actions counter RwLock poisoned")
581            .clone()
582    }
583
584    /// All buckets with a lifecycle configuration attached. Sorted for
585    /// stable scanner ordering.
586    #[must_use]
587    pub fn buckets(&self) -> Vec<String> {
588        let map = self
589            .by_bucket
590            .read()
591            .expect("lifecycle state RwLock poisoned");
592        let mut out: Vec<String> = map.keys().cloned().collect();
593        out.sort();
594        out
595    }
596}
597
598/// Flags for [`LifecycleManager::evaluate_with_flags`]. Default is
599/// "current-version object, evaluator picks `Utc::now()` for the date
600/// comparison". Tests override `now` for determinism.
601#[derive(Clone, Copy, Debug, Default)]
602pub struct EvaluateFlags {
603    pub is_noncurrent: bool,
604    pub now: Option<DateTime<Utc>>,
605}
606
607/// One object the evaluator considers in a batch:
608/// `(key, object_age, object_size, object_tags)`. Defined as a type alias
609/// so [`evaluate_batch`] / [`crate::S4Service::run_lifecycle_once_for_test`]
610/// don't trip clippy's `type-complexity` lint, and so callers building the
611/// list have a single canonical shape to reach for.
612pub type EvaluateBatchEntry = (String, Duration, u64, Vec<(String, String)>);
613
614/// Test-driven scan entry: walks a list of [`EvaluateBatchEntry`] tuples
615/// and produces (key, action) pairs for every object that should fire an
616/// action **right now**. The actual backend invocation (S3.delete_object /
617/// metadata rewrite) is the caller's job. Used by both unit tests and the
618/// E2E test in `tests/roundtrip.rs`; the future background scanner will
619/// reuse the same entry once the bucket-walk is wired through the backend.
620#[must_use]
621pub fn evaluate_batch(
622    manager: &LifecycleManager,
623    bucket: &str,
624    objects: &[EvaluateBatchEntry],
625) -> Vec<(String, LifecycleAction)> {
626    let mut out = Vec::with_capacity(objects.len());
627    for (key, age, size, tags) in objects {
628        if let Some(action) = manager.evaluate(bucket, key, *age, *size, tags) {
629            out.push((key.clone(), action));
630        }
631    }
632    out
633}
634
635/// Per-invocation scanner counters returned by [`run_scan_once`]. Useful
636/// for tests, the `--lifecycle-scan-interval-hours` log line, and any
637/// future `/admin/lifecycle/scan` introspection endpoint. Operators see
638/// the same numbers via Prometheus
639/// (`s4_lifecycle_actions_total{action="expire"|"transition"}`).
640#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
641pub struct ScanReport {
642    /// Number of buckets the scanner walked (= buckets with a lifecycle
643    /// configuration attached at the moment the scanner ran).
644    pub buckets_scanned: usize,
645    /// Number of distinct keys the scanner evaluated. Multi-page lists
646    /// count one key once even if the listing was paginated.
647    pub objects_evaluated: usize,
648    /// Number of objects deleted as a result of an Expiration action.
649    pub expired: usize,
650    /// Number of objects whose `x-amz-storage-class` was rewritten as a
651    /// result of a Transition action.
652    pub transitioned: usize,
653    /// Number of objects skipped because an Object Lock (Compliance,
654    /// Governance, or legal hold) was in effect. The Lock always wins
655    /// over Lifecycle, matching AWS S3 semantics.
656    pub skipped_locked: usize,
657    /// v0.8.3 #69 (audit M-2): number of in-flight multipart uploads
658    /// the scanner aborted as a result of an
659    /// `AbortIncompleteMultipartUpload` action. Pair with the
660    /// Prometheus counter
661    /// `s4_lifecycle_actions_total{action="abort_incomplete_multipart"}`.
662    /// Only counts successful aborts — a backend
663    /// `abort_multipart_upload` failure bumps `action_errors` instead
664    /// (matching the existing Expire / Transition error-path).
665    pub aborted_multipart: usize,
666    /// Number of objects the evaluator wanted to act on but the action
667    /// failed (e.g. backend `delete_object` returned an error). Logged
668    /// individually at WARN level; this counter exists so tests / metrics
669    /// can assert no silent loss.
670    pub action_errors: usize,
671}
672
673/// Convert an s3s `Timestamp` (`time::OffsetDateTime` underneath) into a
674/// `chrono::DateTime<Utc>` via the RFC3339 wire form. Used by the scanner
675/// to compute object age (= `now - last_modified`). Returns `None` when
676/// the stamp is unparseable, in which case the caller falls back to
677/// treating the object as freshly created (age = 0).
678fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
679    let mut buf = Vec::new();
680    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
681    let s = std::str::from_utf8(&buf).ok()?;
682    chrono::DateTime::parse_from_rfc3339(s)
683        .ok()
684        .map(|dt| dt.with_timezone(&Utc))
685}
686
687/// Build a synthetic `S3Request` with the minimum metadata the
688/// scanner-internal calls need. The lifecycle scanner is a
689/// system-internal caller (no end-user credentials, no real HTTP method
690/// / URI), so policy gates downstream see `credentials = None` /
691/// `region = None` and treat the call as anonymous-internal. Backends
692/// that do not gate internal traffic ignore these fields entirely.
693fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
694    S3Request {
695        input,
696        method,
697        uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
698        headers: http::HeaderMap::new(),
699        extensions: http::Extensions::new(),
700        credentials: None,
701        region: None,
702        service: None,
703        trailing_headers: None,
704    }
705}
706
707/// Walk every bucket that has a lifecycle configuration attached, list
708/// its objects via `list_objects_v2` (continuation-token pagination), and
709/// for each object evaluate the rule set + execute the matching
710/// Expiration / Transition action. Object-Lock-protected objects are
711/// **skipped** (the Lock always wins over Lifecycle). Versioning chains
712/// are intentionally out of scope for v0.7 #45 — see the module-level
713/// limitations note.
714///
715/// ## error handling
716///
717/// Per-bucket / per-object failures are logged at WARN level and bumped
718/// in `ScanReport::action_errors`; the scanner does NOT abort early on a
719/// single bad object so one slow / faulty bucket can't starve every
720/// other bucket's lifecycle. The function only returns `Err(_)` when the
721/// scanner cannot make progress at all (no current usage — kept for the
722/// future case where the manager itself becomes unavailable).
723///
724/// ## scope (v0.7 #45)
725///
726/// - Current-version objects only (Versioning-enabled chains rely on
727///   `evaluate_with_flags(is_noncurrent = true)`, but walking the
728///   shadow keys requires the version chain access pattern from
729///   `versioning.rs` and is deferred to a follow-up issue).
730/// - `head_object`'s `last_modified` is used to compute age. When the
731///   backend omits the field (some S3-compatible backends do), the
732///   object is treated as age 0 and skipped — matches AWS conservative
733///   behaviour where a malformed timestamp must not silently expire data.
734/// - Tags are looked up via the attached
735///   [`crate::tagging::TagManager`] (when wired). Buckets without a
736///   tag manager pass an empty tag list to the evaluator.
737/// - Transition rewrites the object's `x-amz-storage-class` via
738///   `copy_object` (same bucket / same key, `MetadataDirective: COPY`,
739///   new `StorageClass`). Backends that ignore the storage class
740///   header silently no-op the transition; the counter still bumps to
741///   reflect "the scanner asked for a transition" (matching AWS where
742///   a no-op transition still costs a request).
743pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
744    s4: &Arc<crate::S4Service<B>>,
745) -> Result<ScanReport, String> {
746    let Some(mgr) = s4.lifecycle_manager().cloned() else {
747        // No lifecycle manager attached (e.g. operator did not set
748        // `--lifecycle-state-file`). Scan is a no-op.
749        return Ok(ScanReport::default());
750    };
751    let buckets = mgr.buckets();
752    if buckets.is_empty() {
753        return Ok(ScanReport::default());
754    }
755    let now = Utc::now();
756    let mut report = ScanReport {
757        buckets_scanned: buckets.len(),
758        ..ScanReport::default()
759    };
760    for bucket in buckets {
761        scan_bucket(s4, &mgr, &bucket, now, &mut report).await;
762        // v0.8.3 #69 (audit M-2): walk in-flight multipart uploads for
763        // the same bucket and abort any whose `Initiated` time is past
764        // the rule's `abort_incomplete_multipart_upload_days` threshold.
765        // Run after the object walk so the (typically smaller) multipart
766        // pass still happens even if the object walk hit a transient
767        // backend error mid-stream (per-bucket failure isolation —
768        // matching the existing one-bad-bucket-doesn't-starve-others
769        // policy).
770        scan_in_flight_multipart_uploads(s4, &mgr, &bucket, now, &mut report).await;
771    }
772    Ok(report)
773}
774
775/// Walk one bucket end-to-end. Pagination uses the `continuation_token`
776/// loop documented in
777/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>.
778async fn scan_bucket<B: S3 + Send + Sync + 'static>(
779    s4: &Arc<crate::S4Service<B>>,
780    mgr: &Arc<LifecycleManager>,
781    bucket: &str,
782    now: DateTime<Utc>,
783    report: &mut ScanReport,
784) {
785    let mut continuation: Option<String> = None;
786    loop {
787        let list_input = ListObjectsV2Input {
788            bucket: bucket.to_owned(),
789            continuation_token: continuation.clone(),
790            ..Default::default()
791        };
792        let list_req = synthetic_request(
793            list_input,
794            http::Method::GET,
795            &format!("/{bucket}?list-type=2"),
796        );
797        let resp = match s4.as_ref().list_objects_v2(list_req).await {
798            Ok(r) => r,
799            Err(e) => {
800                warn!(
801                    bucket = %bucket,
802                    error = %e,
803                    "S4 lifecycle: list_objects_v2 failed; skipping bucket for this scan",
804                );
805                report.action_errors = report.action_errors.saturating_add(1);
806                return;
807            }
808        };
809        let output = resp.output;
810        let contents = output.contents.unwrap_or_default();
811        for obj in &contents {
812            let Some(key) = obj.key.as_deref() else {
813                continue;
814            };
815            // Filter out S4-internal sidecars / shadow versions early so
816            // the lifecycle scanner mirrors the same "client-visible
817            // object set" the customer sees through `list_objects_v2`.
818            // (The S4Service.list_objects_v2 handler already drops them
819            // before returning, but this is a belt-and-braces guard for
820            // any future bypass that builds the list elsewhere.)
821            if key.ends_with(".s4index") {
822                continue;
823            }
824            report.objects_evaluated = report.objects_evaluated.saturating_add(1);
825            let size = obj.size.unwrap_or(0).max(0) as u64;
826            let age = match obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc) {
827                Some(lm) => now.signed_duration_since(lm),
828                None => Duration::zero(),
829            };
830            let tags: Vec<(String, String)> = s4
831                .as_ref()
832                .tag_manager()
833                .and_then(|m| m.get_object_tags(bucket, key))
834                .map(|set| set.iter().cloned().collect())
835                .unwrap_or_default();
836            let Some(action) = mgr.evaluate(bucket, key, age, size, &tags) else {
837                continue;
838            };
839            // Object-Lock-protected objects are skipped before any
840            // backend-mutating call. Lock wins over Lifecycle, full
841            // stop — matches AWS behaviour where an Expiration on a
842            // locked object is dropped, not retried.
843            //
844            // v0.8.3 #65 (audit C-2): in addition to bumping the
845            // in-report counter, emit a Prometheus
846            // `s4_lifecycle_actions_total{action="skipped_locked"}`
847            // sample so operator dashboards can alert on the
848            // "lifecycle wanted to act but Object Lock vetoed" path
849            // (previously a silent skip — the scanner's
850            // `list_objects_v2` walked the key and `evaluate(...)`
851            // returned an action, but no observable signal fired
852            // when the backend would have refused the DELETE).
853            if let Some(lock_mgr) = s4.as_ref().object_lock_manager()
854                && let Some(state) = lock_mgr.get(bucket, key)
855                && state.is_locked(now)
856            {
857                report.skipped_locked = report.skipped_locked.saturating_add(1);
858                crate::metrics::record_lifecycle_action(bucket, "skipped_locked");
859                continue;
860            }
861            match action {
862                LifecycleAction::Expire => match execute_expire(s4, bucket, key).await {
863                    Ok(()) => {
864                        mgr.record_action(bucket, &LifecycleAction::Expire);
865                        report.expired = report.expired.saturating_add(1);
866                    }
867                    Err(e) => {
868                        warn!(
869                            bucket = %bucket,
870                            key = %key,
871                            error = %e,
872                            "S4 lifecycle: Expire action failed",
873                        );
874                        report.action_errors = report.action_errors.saturating_add(1);
875                    }
876                },
877                LifecycleAction::Transition { storage_class } => {
878                    match execute_transition(s4, bucket, key, &storage_class).await {
879                        Ok(()) => {
880                            mgr.record_action(
881                                bucket,
882                                &LifecycleAction::Transition {
883                                    storage_class: storage_class.clone(),
884                                },
885                            );
886                            report.transitioned = report.transitioned.saturating_add(1);
887                        }
888                        Err(e) => {
889                            warn!(
890                                bucket = %bucket,
891                                key = %key,
892                                storage_class = %storage_class,
893                                error = %e,
894                                "S4 lifecycle: Transition action failed",
895                            );
896                            report.action_errors = report.action_errors.saturating_add(1);
897                        }
898                    }
899                }
900                // v0.8.3 #69 (audit M-2): the per-key path's
901                // `evaluate(...)` only ever returns Expire /
902                // Transition (the AbortMultipartUpload variant comes
903                // from the in-flight multipart walker further down,
904                // which uses `evaluate_in_flight_multipart`). Match
905                // exhaustiveness still requires an arm; logging at
906                // warn keeps the control-flow honest if the
907                // evaluator ever grows a path that surfaces an
908                // abort here (e.g. someone wires a future evaluator
909                // that returns abort for a regular object key — the
910                // arm prevents silent dispatch and the warn surfaces
911                // the misuse).
912                LifecycleAction::AbortMultipartUpload { upload_id } => {
913                    warn!(
914                        bucket = %bucket,
915                        key = %key,
916                        upload_id = %upload_id,
917                        "S4 lifecycle: AbortMultipartUpload returned for a key path; \
918                         this is unexpected — the per-key evaluator should only \
919                         emit Expire / Transition. Dropping action.",
920                    );
921                    report.action_errors = report.action_errors.saturating_add(1);
922                }
923            }
924        }
925        if output.is_truncated.unwrap_or(false) {
926            continuation = output.next_continuation_token;
927            if continuation.is_none() {
928                // Defensive: AWS guarantees a NextContinuationToken when
929                // is_truncated=true, but a malformed backend could omit
930                // it; break to avoid an infinite loop.
931                break;
932            }
933        } else {
934            break;
935        }
936    }
937}
938
939/// v0.8.3 #69 (audit M-2): walk every in-flight multipart upload for
940/// `bucket` via `list_multipart_uploads` (key-marker / upload-id-marker
941/// pagination) and abort any whose `Initiated` time is older than the
942/// rule's `abort_incomplete_multipart_upload_days` threshold. Successful
943/// aborts bump `report.aborted_multipart` AND (`mgr.record_action`)
944/// `s4_lifecycle_actions_total{action="abort_incomplete_multipart"}` so
945/// operator dashboards see the same signal whether they look at
946/// in-process counters or Prometheus.
947///
948/// On a successful abort the entry in `MultipartStateStore` (which
949/// holds the per-upload SSE-C key bytes / tag set / object-lock recipe
950/// captured at `CreateMultipartUpload` time) is also dropped — same
951/// shape as the user-facing `abort_multipart_upload` handler in
952/// `service.rs`. Without the drop the abandoned upload's `Zeroizing<[u8;
953/// 32]>` SSE-C key would linger in `multipart_state` until the
954/// `sweep_stale` background tick (v0.8.2 #62) reaped it on TTL.
955///
956/// Per-page / per-upload backend failures are logged at WARN and bumped
957/// in `report.action_errors`; the loop does NOT abort the bucket — one
958/// bad upload must not prevent the rest of the bucket's stale uploads
959/// from being cleaned up. Mirrors the same isolation policy
960/// `scan_bucket` uses for `list_objects_v2` failures.
961async fn scan_in_flight_multipart_uploads<B: S3 + Send + Sync + 'static>(
962    s4: &Arc<crate::S4Service<B>>,
963    mgr: &Arc<LifecycleManager>,
964    bucket: &str,
965    now: DateTime<Utc>,
966    report: &mut ScanReport,
967) {
968    let mut key_marker: Option<String> = None;
969    let mut upload_id_marker: Option<String> = None;
970    loop {
971        let list_input = ListMultipartUploadsInput {
972            bucket: bucket.to_owned(),
973            key_marker: key_marker.clone(),
974            upload_id_marker: upload_id_marker.clone(),
975            ..Default::default()
976        };
977        let list_req = synthetic_request(
978            list_input,
979            http::Method::GET,
980            &format!("/{bucket}?uploads"),
981        );
982        let resp = match s4.as_ref().list_multipart_uploads(list_req).await {
983            Ok(r) => r,
984            Err(e) => {
985                warn!(
986                    bucket = %bucket,
987                    error = %e,
988                    "S4 lifecycle: list_multipart_uploads failed; \
989                     skipping bucket multipart sweep for this scan",
990                );
991                report.action_errors = report.action_errors.saturating_add(1);
992                return;
993            }
994        };
995        let output = resp.output;
996        let uploads = output.uploads.unwrap_or_default();
997        for upload in &uploads {
998            let Some(upload_id) = upload.upload_id.as_deref() else {
999                continue;
1000            };
1001            let Some(key) = upload.key.as_deref() else {
1002                continue;
1003            };
1004            // `Initiated` is `Option<Timestamp>`; absent or
1005            // unparseable → treat as "freshly initiated" (age 0)
1006            // and skip. Matches the conservative `last_modified`
1007            // handling in `scan_bucket` — never abort an upload
1008            // whose age we cannot determine.
1009            let Some(initiated) = upload
1010                .initiated
1011                .as_ref()
1012                .and_then(timestamp_to_chrono_utc)
1013            else {
1014                continue;
1015            };
1016            let candidate = MultipartUploadCandidate {
1017                upload_id: upload_id.to_owned(),
1018                key: key.to_owned(),
1019                initiated,
1020                tags: Vec::new(),
1021            };
1022            let Some(action) = mgr.evaluate_in_flight_multipart(bucket, &candidate, now)
1023            else {
1024                continue;
1025            };
1026            let LifecycleAction::AbortMultipartUpload { upload_id: action_upload_id } =
1027                action
1028            else {
1029                // The evaluator is contractually
1030                // AbortMultipartUpload-only on this path; this arm
1031                // exists only to satisfy match exhaustiveness if a
1032                // future rev returns a different variant. Treat as
1033                // an error so the divergence is observable.
1034                warn!(
1035                    bucket = %bucket,
1036                    key = %key,
1037                    upload_id = %upload_id,
1038                    "S4 lifecycle: evaluate_in_flight_multipart returned \
1039                     non-Abort action; dropping",
1040                );
1041                report.action_errors = report.action_errors.saturating_add(1);
1042                continue;
1043            };
1044            match execute_abort_multipart(s4, bucket, key, &action_upload_id).await {
1045                Ok(()) => {
1046                    mgr.record_action(
1047                        bucket,
1048                        &LifecycleAction::AbortMultipartUpload {
1049                            upload_id: action_upload_id.clone(),
1050                        },
1051                    );
1052                    report.aborted_multipart =
1053                        report.aborted_multipart.saturating_add(1);
1054                    // Drop the per-upload state so the
1055                    // (Zeroizing-wrapped) SSE-C key bytes / tag
1056                    // recipe / object-lock recipe go away
1057                    // immediately rather than waiting for the
1058                    // hourly `sweep_stale` tick. Idempotent —
1059                    // `remove(...)` on a missing key is a no-op
1060                    // (some uploads may not have been registered
1061                    // here, e.g. a server restart between Create
1062                    // and the lifecycle sweep).
1063                    s4.as_ref()
1064                        .multipart_state()
1065                        .remove(&action_upload_id);
1066                }
1067                Err(e) => {
1068                    warn!(
1069                        bucket = %bucket,
1070                        key = %key,
1071                        upload_id = %action_upload_id,
1072                        error = %e,
1073                        "S4 lifecycle: AbortMultipartUpload action failed",
1074                    );
1075                    report.action_errors = report.action_errors.saturating_add(1);
1076                }
1077            }
1078        }
1079        if output.is_truncated.unwrap_or(false) {
1080            // AWS guarantees both NextKeyMarker and (when present)
1081            // NextUploadIdMarker on a truncated response. Defensive
1082            // break when neither moved (avoid infinite loop on a
1083            // misbehaving backend).
1084            let next_key = output.next_key_marker;
1085            let next_upload_id = output.next_upload_id_marker;
1086            if next_key == key_marker && next_upload_id == upload_id_marker {
1087                break;
1088            }
1089            key_marker = next_key;
1090            upload_id_marker = next_upload_id;
1091        } else {
1092            break;
1093        }
1094    }
1095}
1096
1097/// v0.8.3 #69 (audit M-2): issue `abort_multipart_upload` against the
1098/// wrapped `S4Service`. The handler in `service.rs` does the
1099/// `multipart_state.remove(...)` itself before forwarding to the
1100/// backend; we additionally `remove` from the lifecycle scanner side
1101/// (in [`scan_in_flight_multipart_uploads`]) to defensively cover the
1102/// case where the backend abort succeeds but the response routing
1103/// shortens early.
1104async fn execute_abort_multipart<B: S3 + Send + Sync + 'static>(
1105    s4: &Arc<crate::S4Service<B>>,
1106    bucket: &str,
1107    key: &str,
1108    upload_id: &str,
1109) -> Result<(), String> {
1110    let input = AbortMultipartUploadInput {
1111        bucket: bucket.to_owned(),
1112        key: key.to_owned(),
1113        upload_id: upload_id.to_owned(),
1114        ..Default::default()
1115    };
1116    let req = synthetic_request(
1117        input,
1118        http::Method::DELETE,
1119        &format!("/{bucket}/{key}?uploadId={upload_id}"),
1120    );
1121    s4.as_ref()
1122        .abort_multipart_upload(req)
1123        .await
1124        .map(|_| ())
1125        .map_err(|e| format!("{e}"))
1126}
1127
1128/// Issue `delete_object` against the wrapped `S4Service`. The handler in
1129/// `service.rs` runs the WORM check itself, so even if the scanner's
1130/// pre-check missed (race with an MFA-Delete put-bucket-versioning), the
1131/// backend refuses the delete with `AccessDenied` and the error path
1132/// above bumps `action_errors` rather than silently losing data.
1133async fn execute_expire<B: S3 + Send + Sync + 'static>(
1134    s4: &Arc<crate::S4Service<B>>,
1135    bucket: &str,
1136    key: &str,
1137) -> Result<(), String> {
1138    let input = DeleteObjectInput {
1139        bucket: bucket.to_owned(),
1140        key: key.to_owned(),
1141        ..Default::default()
1142    };
1143    let req = synthetic_request(
1144        input,
1145        http::Method::DELETE,
1146        &format!("/{bucket}/{key}"),
1147    );
1148    s4.as_ref()
1149        .delete_object(req)
1150        .await
1151        .map(|_| ())
1152        .map_err(|e| format!("{e}"))
1153}
1154
1155/// Rewrite the object's storage class via a same-key `copy_object` with
1156/// `MetadataDirective: COPY` (preserves user metadata) and the new
1157/// `storage_class`. Backends that ignore storage-class headers
1158/// effectively no-op; the counter still records the attempt so dashboards
1159/// reflect the scanner's intent.
1160async fn execute_transition<B: S3 + Send + Sync + 'static>(
1161    s4: &Arc<crate::S4Service<B>>,
1162    bucket: &str,
1163    key: &str,
1164    storage_class: &str,
1165) -> Result<(), String> {
1166    // CopyObjectInput has dozens of `Option` fields plus three required
1167    // (bucket / key / copy_source); the s3s-generated `builder()` is
1168    // the path that fills the optional ones with `None` for us. The
1169    // `set_*` setters return `&mut Self`, so we drive them in
1170    // statement form rather than as a method chain.
1171    let mut builder = CopyObjectInput::builder();
1172    builder.set_bucket(bucket.to_owned());
1173    builder.set_key(key.to_owned());
1174    builder.set_copy_source(CopySource::Bucket {
1175        bucket: bucket.to_owned().into_boxed_str(),
1176        key: key.to_owned().into_boxed_str(),
1177        version_id: None,
1178    });
1179    builder.set_metadata_directive(Some(MetadataDirective::from_static(MetadataDirective::COPY)));
1180    builder.set_storage_class(Some(StorageClass::from(storage_class.to_owned())));
1181    let input = builder
1182        .build()
1183        .map_err(|e| format!("CopyObjectInput build: {e}"))?;
1184    let req = synthetic_request(
1185        input,
1186        http::Method::PUT,
1187        &format!("/{bucket}/{key}"),
1188    );
1189    s4.as_ref()
1190        .copy_object(req)
1191        .await
1192        .map(|_| ())
1193        .map_err(|e| format!("{e}"))
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198    use super::*;
1199
1200    fn enabled(rule: LifecycleRule) -> LifecycleRule {
1201        LifecycleRule {
1202            status: LifecycleStatus::Enabled,
1203            ..rule
1204        }
1205    }
1206
1207    fn cfg_with(rules: Vec<LifecycleRule>) -> LifecycleConfig {
1208        LifecycleConfig { rules }
1209    }
1210
1211    fn manager_with(bucket: &str, rules: Vec<LifecycleRule>) -> LifecycleManager {
1212        let m = LifecycleManager::new();
1213        m.put(bucket, cfg_with(rules));
1214        m
1215    }
1216
1217    #[test]
1218    fn evaluate_age_past_expiration_returns_expire() {
1219        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
1220        let action = m.evaluate("b", "k", Duration::days(31), 100, &[]);
1221        assert_eq!(action, Some(LifecycleAction::Expire));
1222    }
1223
1224    #[test]
1225    fn evaluate_age_before_expiration_returns_none() {
1226        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
1227        let action = m.evaluate("b", "k", Duration::days(5), 100, &[]);
1228        assert_eq!(action, None);
1229    }
1230
1231    #[test]
1232    fn evaluate_prefix_filter_matches() {
1233        let mut rule = LifecycleRule::expire_after_days("r", 1);
1234        rule.filter.prefix = Some("logs/".into());
1235        let m = manager_with("b", vec![rule]);
1236        assert_eq!(
1237            m.evaluate("b", "logs/2026/a.log", Duration::days(2), 1, &[]),
1238            Some(LifecycleAction::Expire)
1239        );
1240        assert_eq!(
1241            m.evaluate("b", "data/keep.bin", Duration::days(2), 1, &[]),
1242            None
1243        );
1244    }
1245
1246    #[test]
1247    fn evaluate_tag_filter_requires_all_tags_to_match() {
1248        let mut rule = LifecycleRule::expire_after_days("r", 1);
1249        rule.filter.tags = vec![
1250            ("env".into(), "dev".into()),
1251            ("expirable".into(), "yes".into()),
1252        ];
1253        let m = manager_with("b", vec![rule]);
1254        // All tags present + matching → fire.
1255        assert_eq!(
1256            m.evaluate(
1257                "b",
1258                "k",
1259                Duration::days(2),
1260                1,
1261                &[
1262                    ("env".into(), "dev".into()),
1263                    ("expirable".into(), "yes".into()),
1264                    ("owner".into(), "alice".into()),
1265                ]
1266            ),
1267            Some(LifecycleAction::Expire)
1268        );
1269        // One tag missing → no fire.
1270        assert_eq!(
1271            m.evaluate(
1272                "b",
1273                "k",
1274                Duration::days(2),
1275                1,
1276                &[("env".into(), "dev".into())]
1277            ),
1278            None
1279        );
1280        // Tag present but with the wrong value → no fire.
1281        assert_eq!(
1282            m.evaluate(
1283                "b",
1284                "k",
1285                Duration::days(2),
1286                1,
1287                &[
1288                    ("env".into(), "prod".into()),
1289                    ("expirable".into(), "yes".into()),
1290                ]
1291            ),
1292            None
1293        );
1294    }
1295
1296    #[test]
1297    fn evaluate_size_filters_gate_action() {
1298        let mut rule = LifecycleRule::expire_after_days("r", 1);
1299        rule.filter.object_size_greater_than = Some(1024);
1300        rule.filter.object_size_less_than = Some(10 * 1024);
1301        let m = manager_with("b", vec![rule]);
1302        // Inside the (1024, 10*1024) range → fire.
1303        assert_eq!(
1304            m.evaluate("b", "k", Duration::days(2), 4096, &[]),
1305            Some(LifecycleAction::Expire)
1306        );
1307        // At the boundary (size == greater_than) → strict `>`, no fire.
1308        assert_eq!(m.evaluate("b", "k", Duration::days(2), 1024, &[]), None);
1309        // Above the upper bound → no fire.
1310        assert_eq!(
1311            m.evaluate("b", "k", Duration::days(2), 100 * 1024, &[]),
1312            None
1313        );
1314    }
1315
1316    #[test]
1317    fn evaluate_transition_fires_before_expiration() {
1318        // Transition at 30d, expiration at 365d, age 60d → transition.
1319        let rule = enabled(LifecycleRule {
1320            id: "r".into(),
1321            status: LifecycleStatus::Enabled,
1322            filter: LifecycleFilter::default(),
1323            expiration_days: Some(365),
1324            expiration_date: None,
1325            transitions: vec![TransitionRule {
1326                days: 30,
1327                storage_class: "GLACIER_IR".into(),
1328            }],
1329            noncurrent_version_expiration_days: None,
1330            abort_incomplete_multipart_upload_days: None,
1331        });
1332        let m = manager_with("b", vec![rule]);
1333        let action = m.evaluate("b", "k", Duration::days(60), 1, &[]);
1334        assert_eq!(
1335            action,
1336            Some(LifecycleAction::Transition {
1337                storage_class: "GLACIER_IR".into(),
1338            })
1339        );
1340    }
1341
1342    #[test]
1343    fn evaluate_expiration_wins_when_threshold_is_earlier_than_transition() {
1344        // Expiration at 30d, transition at 90d, age 100d → expire (the
1345        // rule wants the object gone *before* it would have transitioned).
1346        let rule = enabled(LifecycleRule {
1347            id: "r".into(),
1348            status: LifecycleStatus::Enabled,
1349            filter: LifecycleFilter::default(),
1350            expiration_days: Some(30),
1351            expiration_date: None,
1352            transitions: vec![TransitionRule {
1353                days: 90,
1354                storage_class: "GLACIER".into(),
1355            }],
1356            noncurrent_version_expiration_days: None,
1357            abort_incomplete_multipart_upload_days: None,
1358        });
1359        let m = manager_with("b", vec![rule]);
1360        let action = m.evaluate("b", "k", Duration::days(100), 1, &[]);
1361        assert_eq!(action, Some(LifecycleAction::Expire));
1362    }
1363
1364    #[test]
1365    fn evaluate_disabled_rule_never_fires() {
1366        let mut rule = LifecycleRule::expire_after_days("r", 1);
1367        rule.status = LifecycleStatus::Disabled;
1368        let m = manager_with("b", vec![rule]);
1369        assert_eq!(m.evaluate("b", "k", Duration::days(365), 1, &[]), None);
1370    }
1371
1372    #[test]
1373    fn evaluate_unknown_bucket_returns_none() {
1374        let m = LifecycleManager::new();
1375        assert_eq!(m.evaluate("ghost", "k", Duration::days(365), 1, &[]), None);
1376    }
1377
1378    #[test]
1379    fn evaluate_noncurrent_version_expiration() {
1380        let rule = enabled(LifecycleRule {
1381            id: "r".into(),
1382            status: LifecycleStatus::Enabled,
1383            filter: LifecycleFilter::default(),
1384            expiration_days: None,
1385            expiration_date: None,
1386            transitions: vec![],
1387            noncurrent_version_expiration_days: Some(7),
1388            abort_incomplete_multipart_upload_days: None,
1389        });
1390        let m = manager_with("b", vec![rule]);
1391        // current-version path → no rule matches (no expiration_days set).
1392        assert_eq!(m.evaluate("b", "k", Duration::days(30), 1, &[]), None);
1393        // noncurrent path with age past 7d → expire.
1394        let action = m.evaluate_with_flags(
1395            "b",
1396            "k",
1397            Duration::days(8),
1398            1,
1399            &[],
1400            EvaluateFlags {
1401                is_noncurrent: true,
1402                now: None,
1403            },
1404        );
1405        assert_eq!(action, Some(LifecycleAction::Expire));
1406        // noncurrent path with age before 7d → no fire.
1407        let action = m.evaluate_with_flags(
1408            "b",
1409            "k",
1410            Duration::days(3),
1411            1,
1412            &[],
1413            EvaluateFlags {
1414                is_noncurrent: true,
1415                now: None,
1416            },
1417        );
1418        assert_eq!(action, None);
1419    }
1420
1421    #[test]
1422    fn evaluate_batch_distributes_actions_across_object_ages() {
1423        // Transition at 30d, expiration at 60d. Conflict resolver picks
1424        // expire iff `exp_days <= trans_days` for the chosen transition.
1425        // With exp=60, trans=30: at age 40-59 the transition fires; at
1426        // age >= 60 expiration wins (because exp_days=60 <= trans_days=30
1427        // is false, so... wait — re-read: the resolver compares
1428        // exp_threshold (60) vs trans_threshold (30) and triggers expire
1429        // ONLY when 60 <= 30, which is false → transition keeps winning
1430        // until both thresholds met but exp <= trans). For exp=60 trans=30
1431        // pair, transition always wins regardless of age (rule pattern is
1432        // "transition first, expire later" — the next scanner pass
1433        // picks up the expiration). So expect 4 transitions.
1434        let rule = enabled(LifecycleRule {
1435            id: "r".into(),
1436            status: LifecycleStatus::Enabled,
1437            filter: LifecycleFilter::default(),
1438            expiration_days: Some(60),
1439            expiration_date: None,
1440            transitions: vec![TransitionRule {
1441                days: 30,
1442                storage_class: "STANDARD_IA".into(),
1443            }],
1444            noncurrent_version_expiration_days: None,
1445            abort_incomplete_multipart_upload_days: None,
1446        });
1447        let m = manager_with("b", vec![rule]);
1448        let objects = vec![
1449            ("young".to_string(), Duration::days(10), 1u64, vec![]),
1450            ("middle".to_string(), Duration::days(40), 1u64, vec![]),
1451            ("middle2".to_string(), Duration::days(45), 1u64, vec![]),
1452            ("old".to_string(), Duration::days(90), 1u64, vec![]),
1453            ("ancient".to_string(), Duration::days(365), 1u64, vec![]),
1454        ];
1455        let actions = evaluate_batch(&m, "b", &objects);
1456        assert_eq!(actions.len(), 4);
1457        for (_, a) in &actions {
1458            assert!(matches!(a, LifecycleAction::Transition { .. }));
1459        }
1460    }
1461
1462    #[test]
1463    fn json_round_trip_preserves_rules() {
1464        let rule = enabled(LifecycleRule {
1465            id: "complex".into(),
1466            status: LifecycleStatus::Enabled,
1467            filter: LifecycleFilter {
1468                prefix: Some("logs/".into()),
1469                tags: vec![("env".into(), "prod".into())],
1470                object_size_greater_than: Some(1024),
1471                object_size_less_than: None,
1472            },
1473            expiration_days: Some(365),
1474            expiration_date: None,
1475            transitions: vec![TransitionRule {
1476                days: 30,
1477                storage_class: "STANDARD_IA".into(),
1478            }],
1479            noncurrent_version_expiration_days: Some(7),
1480            abort_incomplete_multipart_upload_days: Some(3),
1481        });
1482        let m = manager_with("b1", vec![rule.clone()]);
1483        let json = m.to_json().expect("to_json");
1484        let m2 = LifecycleManager::from_json(&json).expect("from_json");
1485        let cfg = m2.get("b1").expect("bucket survives roundtrip");
1486        assert_eq!(cfg.rules.len(), 1);
1487        assert_eq!(cfg.rules[0], rule);
1488    }
1489
1490    #[test]
1491    fn lifecycle_config_default_is_empty() {
1492        let cfg = LifecycleConfig::default();
1493        assert!(cfg.rules.is_empty());
1494    }
1495
1496    #[test]
1497    fn evaluate_batch_skips_locked_objects_at_caller_layer() {
1498        // The evaluator itself does not consult ObjectLock; the scanner
1499        // (and tests) are expected to filter locked keys out before /
1500        // after calling `evaluate_batch`. This test documents the
1501        // canonical pattern.
1502        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 1)]);
1503        let objects = vec![
1504            ("locked".to_string(), Duration::days(30), 1u64, vec![]),
1505            ("free".to_string(), Duration::days(30), 1u64, vec![]),
1506        ];
1507        let locked_keys: std::collections::HashSet<&str> = ["locked"].into_iter().collect();
1508        let raw = evaluate_batch(&m, "b", &objects);
1509        let filtered: Vec<_> = raw
1510            .into_iter()
1511            .filter(|(k, _)| !locked_keys.contains(k.as_str()))
1512            .collect();
1513        assert_eq!(filtered.len(), 1);
1514        assert_eq!(filtered[0].0, "free");
1515    }
1516
1517    #[test]
1518    fn record_action_bumps_per_bucket_counter() {
1519        let m = LifecycleManager::new();
1520        m.record_action("b", &LifecycleAction::Expire);
1521        m.record_action("b", &LifecycleAction::Expire);
1522        m.record_action(
1523            "b",
1524            &LifecycleAction::Transition {
1525                storage_class: "GLACIER".into(),
1526            },
1527        );
1528        m.record_action(
1529            "b",
1530            &LifecycleAction::AbortMultipartUpload {
1531                upload_id: "u-xyz".into(),
1532            },
1533        );
1534        let snap = m.actions_snapshot();
1535        assert_eq!(snap.get(&("b".into(), "expire".into())).copied(), Some(2));
1536        assert_eq!(
1537            snap.get(&("b".into(), "transition".into())).copied(),
1538            Some(1)
1539        );
1540        assert_eq!(
1541            snap.get(&("b".into(), "abort_incomplete_multipart".into()))
1542                .copied(),
1543            Some(1),
1544            "v0.8.3 #69: AbortMultipartUpload metric_label must bump \
1545             `abort_incomplete_multipart` counter",
1546        );
1547    }
1548
1549    // ---- v0.8.3 #69 (audit M-2): AbortIncompleteMultipartUpload --------
1550    //
1551    // Three unit tests covering the `evaluate_in_flight_multipart`
1552    // path: (a) age past threshold → AbortMultipartUpload, (b) age
1553    // before threshold → None, (c) the rule is `Disabled` → None
1554    // (a Disabled rule must never fire even on a stale upload).
1555    //
1556    // Test fixtures fake `now` and `initiated` so the assertion is
1557    // deterministic regardless of when the test runs.
1558
1559    fn abort_rule(id: &str, days: u32) -> LifecycleRule {
1560        LifecycleRule {
1561            id: id.into(),
1562            status: LifecycleStatus::Enabled,
1563            filter: LifecycleFilter::default(),
1564            expiration_days: None,
1565            expiration_date: None,
1566            transitions: Vec::new(),
1567            noncurrent_version_expiration_days: None,
1568            abort_incomplete_multipart_upload_days: Some(days),
1569        }
1570    }
1571
1572    /// Upload age 8 days, rule threshold 7 days → AbortMultipartUpload
1573    /// fires with the upload's `upload_id`.
1574    #[test]
1575    fn evaluate_in_flight_multipart_aborts_past_threshold() {
1576        let m = manager_with("b", vec![abort_rule("r", 7)]);
1577        let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1578            .expect("parse now")
1579            .with_timezone(&Utc);
1580        let initiated = now - Duration::days(8);
1581        let candidate = MultipartUploadCandidate {
1582            upload_id: "u-stale".into(),
1583            key: "uploads/big.bin".into(),
1584            initiated,
1585            tags: Vec::new(),
1586        };
1587        let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1588        assert_eq!(
1589            action,
1590            Some(LifecycleAction::AbortMultipartUpload {
1591                upload_id: "u-stale".into(),
1592            }),
1593        );
1594    }
1595
1596    /// Upload age 1 day, rule threshold 7 days → no fire (upload is
1597    /// fresh enough to keep around).
1598    #[test]
1599    fn evaluate_in_flight_multipart_keeps_recent_upload() {
1600        let m = manager_with("b", vec![abort_rule("r", 7)]);
1601        let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1602            .expect("parse now")
1603            .with_timezone(&Utc);
1604        let initiated = now - Duration::days(1);
1605        let candidate = MultipartUploadCandidate {
1606            upload_id: "u-fresh".into(),
1607            key: "uploads/big.bin".into(),
1608            initiated,
1609            tags: Vec::new(),
1610        };
1611        let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1612        assert_eq!(action, None);
1613    }
1614
1615    /// `Disabled` rule must never fire even when the upload is well
1616    /// past the threshold — Disabled means the operator is staging the
1617    /// rule (preview / dry-run), the action must wait for Enable.
1618    #[test]
1619    fn evaluate_in_flight_multipart_skips_disabled_rule() {
1620        let mut rule = abort_rule("r", 1);
1621        rule.status = LifecycleStatus::Disabled;
1622        let m = manager_with("b", vec![rule]);
1623        let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1624            .expect("parse now")
1625            .with_timezone(&Utc);
1626        let initiated = now - Duration::days(365);
1627        let candidate = MultipartUploadCandidate {
1628            upload_id: "u-ancient".into(),
1629            key: "uploads/big.bin".into(),
1630            initiated,
1631            tags: Vec::new(),
1632        };
1633        let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1634        assert_eq!(
1635            action, None,
1636            "Disabled rule must not abort even a 365-day-old upload",
1637        );
1638    }
1639
1640    // ---- v0.7 #45: scanner runner tests --------------------------------
1641    //
1642    // These tests stand up an in-memory `S4Service` over a tiny
1643    // `ScannerMemBackend` (separate from the larger `MemoryBackend` in
1644    // `tests/roundtrip.rs` so this module stays self-contained). The
1645    // backend implements only the four `S3` methods the scanner touches:
1646    // `put_object`, `head_object`, `delete_object`, `list_objects_v2`.
1647    // Tags are exercised via the optional `with_tagging(...)` manager.
1648    //
1649    // Object age is faked by setting an `expire_after_days(0)` rule, so
1650    // any object whose backend-recorded `last_modified` is at or before
1651    // "now" matches — sidesteps the `head_object`/`Timestamp` parsing
1652    // entirely (and matches the canonical "operator just put the bucket
1653    // on aggressive expiration" test path).
1654
1655    use std::collections::HashMap;
1656    use std::sync::Mutex as StdMutex;
1657
1658    use bytes::Bytes;
1659    use s3s::dto as dto2;
1660    use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1661    use s4_codec::dispatcher::AlwaysDispatcher;
1662    use s4_codec::passthrough::Passthrough;
1663    use s4_codec::{CodecKind, CodecRegistry};
1664
1665    use crate::S4Service;
1666    use crate::object_lock::{LockMode, ObjectLockManager, ObjectLockState};
1667
1668    #[derive(Default)]
1669    struct ScannerMemBackend {
1670        objects: StdMutex<HashMap<(String, String), ScannerStored>>,
1671        /// v0.8.3 #69: in-flight multipart uploads keyed by
1672        /// `(bucket, upload_id)`. Tests seed entries via
1673        /// `put_multipart_upload(...)` so the lifecycle scanner's
1674        /// `list_multipart_uploads` walk has something to consume.
1675        multipart_uploads: StdMutex<HashMap<(String, String), ScannerMultipart>>,
1676    }
1677
1678    #[derive(Clone)]
1679    struct ScannerStored {
1680        body: Bytes,
1681        last_modified: dto2::Timestamp,
1682    }
1683
1684    /// v0.8.3 #69: minimal multipart-upload record the test backend
1685    /// returns from `list_multipart_uploads`. `initiated` is a
1686    /// `chrono::DateTime<Utc>` so tests can fake an old upload by
1687    /// passing `Utc::now() - Duration::days(N)` directly (no
1688    /// SystemTime arithmetic).
1689    #[derive(Clone)]
1690    struct ScannerMultipart {
1691        key: String,
1692        initiated: chrono::DateTime<Utc>,
1693    }
1694
1695    impl ScannerMemBackend {
1696        fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1697            self.objects.lock().unwrap().insert(
1698                (bucket.to_owned(), key.to_owned()),
1699                ScannerStored {
1700                    body,
1701                    last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1702                },
1703            );
1704        }
1705
1706        /// v0.8.3 #69: seed an in-flight multipart upload the
1707        /// lifecycle scanner can then walk + abort.
1708        fn put_multipart_upload(
1709            &self,
1710            bucket: &str,
1711            upload_id: &str,
1712            key: &str,
1713            initiated: chrono::DateTime<Utc>,
1714        ) {
1715            self.multipart_uploads.lock().unwrap().insert(
1716                (bucket.to_owned(), upload_id.to_owned()),
1717                ScannerMultipart {
1718                    key: key.to_owned(),
1719                    initiated,
1720                },
1721            );
1722        }
1723    }
1724
1725    #[async_trait::async_trait]
1726    impl S3 for ScannerMemBackend {
1727        async fn put_object(
1728            &self,
1729            req: S3Request<dto2::PutObjectInput>,
1730        ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1731            self.put_now(&req.input.bucket, &req.input.key, Bytes::new());
1732            Ok(S3Response::new(dto2::PutObjectOutput::default()))
1733        }
1734
1735        async fn head_object(
1736            &self,
1737            req: S3Request<dto2::HeadObjectInput>,
1738        ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1739            let key = (req.input.bucket.clone(), req.input.key.clone());
1740            let lock = self.objects.lock().unwrap();
1741            let stored = lock
1742                .get(&key)
1743                .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1744            Ok(S3Response::new(dto2::HeadObjectOutput {
1745                content_length: Some(stored.body.len() as i64),
1746                last_modified: Some(stored.last_modified.clone()),
1747                ..Default::default()
1748            }))
1749        }
1750
1751        async fn delete_object(
1752            &self,
1753            req: S3Request<dto2::DeleteObjectInput>,
1754        ) -> S3Result<S3Response<dto2::DeleteObjectOutput>> {
1755            let key = (req.input.bucket.clone(), req.input.key.clone());
1756            self.objects.lock().unwrap().remove(&key);
1757            Ok(S3Response::new(dto2::DeleteObjectOutput::default()))
1758        }
1759
1760        async fn list_objects_v2(
1761            &self,
1762            req: S3Request<dto2::ListObjectsV2Input>,
1763        ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1764            let prefix = req.input.bucket.clone();
1765            let lock = self.objects.lock().unwrap();
1766            let mut contents: Vec<dto2::Object> = lock
1767                .iter()
1768                .filter(|((b, _), _)| b == &prefix)
1769                .map(|((_, k), v)| dto2::Object {
1770                    key: Some(k.clone()),
1771                    size: Some(v.body.len() as i64),
1772                    last_modified: Some(v.last_modified.clone()),
1773                    ..Default::default()
1774                })
1775                .collect();
1776            contents.sort_by(|a, b| a.key.cmp(&b.key));
1777            let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1778            Ok(S3Response::new(dto2::ListObjectsV2Output {
1779                name: Some(prefix),
1780                contents: Some(contents),
1781                key_count: Some(key_count),
1782                is_truncated: Some(false),
1783                ..Default::default()
1784            }))
1785        }
1786
1787        async fn copy_object(
1788            &self,
1789            _req: S3Request<dto2::CopyObjectInput>,
1790        ) -> S3Result<S3Response<dto2::CopyObjectOutput>> {
1791            // Transition path: scanner copies same-key with new
1792            // storage_class. The mem backend doesn't track storage
1793            // class, so it's a no-op success — exactly the AWS-side
1794            // behaviour for a backend that ignores the field.
1795            Ok(S3Response::new(dto2::CopyObjectOutput::default()))
1796        }
1797
1798        // ---- v0.8.3 #69: multipart abort path -----------------------
1799        //
1800        // The lifecycle scanner walks `list_multipart_uploads` per
1801        // bucket and calls `abort_multipart_upload` on every upload
1802        // whose `Initiated` time is past the rule's threshold. The
1803        // test backend returns the seeded entries on listing and
1804        // drops them on abort so post-conditions are observable.
1805
1806        async fn list_multipart_uploads(
1807            &self,
1808            req: S3Request<dto2::ListMultipartUploadsInput>,
1809        ) -> S3Result<S3Response<dto2::ListMultipartUploadsOutput>> {
1810            let bucket = req.input.bucket.clone();
1811            let lock = self.multipart_uploads.lock().unwrap();
1812            let mut uploads: Vec<dto2::MultipartUpload> = lock
1813                .iter()
1814                .filter(|((b, _), _)| b == &bucket)
1815                .map(|((_, upload_id), v)| {
1816                    let st: std::time::SystemTime = v.initiated.into();
1817                    dto2::MultipartUpload {
1818                        upload_id: Some(upload_id.clone()),
1819                        key: Some(v.key.clone()),
1820                        initiated: Some(dto2::Timestamp::from(st)),
1821                        ..Default::default()
1822                    }
1823                })
1824                .collect();
1825            // Stable order so test assertions on count + post-condition
1826            // do not race on the HashMap iteration order.
1827            uploads.sort_by(|a, b| a.upload_id.cmp(&b.upload_id));
1828            Ok(S3Response::new(dto2::ListMultipartUploadsOutput {
1829                bucket: Some(bucket),
1830                uploads: Some(uploads),
1831                is_truncated: Some(false),
1832                ..Default::default()
1833            }))
1834        }
1835
1836        async fn abort_multipart_upload(
1837            &self,
1838            req: S3Request<dto2::AbortMultipartUploadInput>,
1839        ) -> S3Result<S3Response<dto2::AbortMultipartUploadOutput>> {
1840            let bucket = req.input.bucket.clone();
1841            let upload_id = req.input.upload_id.clone();
1842            self.multipart_uploads
1843                .lock()
1844                .unwrap()
1845                .remove(&(bucket, upload_id));
1846            Ok(S3Response::new(dto2::AbortMultipartUploadOutput::default()))
1847        }
1848    }
1849
1850    fn make_service() -> Arc<S4Service<ScannerMemBackend>> {
1851        let registry = Arc::new(
1852            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1853        );
1854        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1855        Arc::new(S4Service::new(
1856            ScannerMemBackend::default(),
1857            registry,
1858            dispatcher,
1859        ))
1860    }
1861
1862    #[tokio::test]
1863    async fn run_scan_once_no_lifecycle_manager_returns_empty_report() {
1864        // Service has no lifecycle manager attached — scanner must
1865        // no-op cleanly (operator might not have set
1866        // `--lifecycle-state-file`). Also covers the empty-buckets
1867        // path in `run_scan_once`.
1868        let s4 = make_service();
1869        let report = run_scan_once(&s4).await.expect("scan");
1870        assert_eq!(report, ScanReport::default());
1871
1872        // And: lifecycle manager attached but no buckets configured.
1873        let mgr = Arc::new(LifecycleManager::new());
1874        let backend = ScannerMemBackend::default();
1875        let registry = Arc::new(
1876            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1877        );
1878        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1879        let s4_empty = Arc::new(
1880            S4Service::new(backend, registry, dispatcher).with_lifecycle(mgr),
1881        );
1882        let report = run_scan_once(&s4_empty).await.expect("scan empty");
1883        assert_eq!(report, ScanReport::default());
1884    }
1885
1886    #[tokio::test]
1887    async fn run_scan_once_expires_matching_objects_via_backend() {
1888        // Three objects: only "stale.log" matches the rule (prefix
1889        // gating). The other two are written but not under the prefix,
1890        // so the evaluator returns None for them.
1891        let backend = ScannerMemBackend::default();
1892        backend.put_now("b", "stale.log", Bytes::from_static(b"x"));
1893        backend.put_now("b", "data/keep1.bin", Bytes::from_static(b"y"));
1894        backend.put_now("b", "data/keep2.bin", Bytes::from_static(b"z"));
1895        // Rule: any object under `stale.` prefix is expired immediately
1896        // (`expire_after_days(0)` matches age >= 0d, which is every
1897        // backend object).
1898        let mgr = Arc::new(LifecycleManager::new());
1899        let mut rule = LifecycleRule::expire_after_days("r", 0);
1900        rule.filter.prefix = Some("stale.".into());
1901        mgr.put("b", LifecycleConfig { rules: vec![rule] });
1902        let registry = Arc::new(
1903            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1904        );
1905        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1906        let s4 = Arc::new(
1907            S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
1908        );
1909
1910        let report = run_scan_once(&s4).await.expect("scan");
1911        assert_eq!(report.buckets_scanned, 1);
1912        assert_eq!(report.objects_evaluated, 3);
1913        assert_eq!(report.expired, 1);
1914        assert_eq!(report.transitioned, 0);
1915        assert_eq!(report.skipped_locked, 0);
1916        assert_eq!(report.action_errors, 0);
1917        // Backend post-condition: the matching key is gone, the others
1918        // remain. Read back through the service's own list_objects_v2
1919        // path (which is also what the customer-visible HTTP layer
1920        // serves) so we exercise the same code the scanner walked.
1921        let req = synthetic_request(
1922            ListObjectsV2Input {
1923                bucket: "b".into(),
1924                ..Default::default()
1925            },
1926            http::Method::GET,
1927            "/b?list-type=2",
1928        );
1929        let resp = s4
1930            .as_ref()
1931            .list_objects_v2(req)
1932            .await
1933            .expect("post-scan list");
1934        let keys: Vec<String> = resp
1935            .output
1936            .contents
1937            .unwrap_or_default()
1938            .into_iter()
1939            .filter_map(|o| o.key)
1940            .collect();
1941        assert!(!keys.contains(&"stale.log".to_string()));
1942        assert!(keys.contains(&"data/keep1.bin".to_string()));
1943        assert!(keys.contains(&"data/keep2.bin".to_string()));
1944        // Lifecycle action counter: one Expire bumped on bucket "b".
1945        let snap = mgr.actions_snapshot();
1946        assert_eq!(
1947            snap.get(&("b".into(), "expire".into())).copied(),
1948            Some(1)
1949        );
1950    }
1951
1952    #[tokio::test]
1953    async fn run_scan_once_skips_object_lock_protected_keys() {
1954        let backend = ScannerMemBackend::default();
1955        backend.put_now("b", "locked.log", Bytes::from_static(b"x"));
1956        backend.put_now("b", "free.log", Bytes::from_static(b"y"));
1957        let registry = Arc::new(
1958            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1959        );
1960        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1961        let mgr = Arc::new(LifecycleManager::new());
1962        // Aggressive: every object expires immediately.
1963        mgr.put(
1964            "b",
1965            LifecycleConfig {
1966                rules: vec![LifecycleRule::expire_after_days("r", 0)],
1967            },
1968        );
1969        let lock_mgr = Arc::new(ObjectLockManager::new());
1970        // Lock retains "locked.log" until the year 2099 — Compliance
1971        // mode means even Governance bypass cannot delete it.
1972        let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
1973            .expect("parse")
1974            .with_timezone(&Utc);
1975        lock_mgr.set(
1976            "b",
1977            "locked.log",
1978            ObjectLockState {
1979                mode: Some(LockMode::Compliance),
1980                retain_until: Some(retain_until),
1981                legal_hold_on: false,
1982            },
1983        );
1984        let s4 = Arc::new(
1985            S4Service::new(backend, registry, dispatcher)
1986                .with_lifecycle(Arc::clone(&mgr))
1987                .with_object_lock(Arc::clone(&lock_mgr)),
1988        );
1989
1990        let report = run_scan_once(&s4).await.expect("scan");
1991        assert_eq!(report.buckets_scanned, 1);
1992        assert_eq!(report.objects_evaluated, 2);
1993        assert_eq!(report.expired, 1, "free.log should have been expired");
1994        assert_eq!(report.skipped_locked, 1, "locked.log must be skipped");
1995        assert_eq!(report.action_errors, 0);
1996    }
1997
1998    /// v0.8.3 #65 (audit C-2): full scanner walk with a mix of free
1999    /// and locked objects must (a) leave outer/free objects expired,
2000    /// (b) skip the middle locked object, (c) bump
2001    /// `ScanReport::skipped_locked`, and (d) emit a Prometheus
2002    /// `s4_lifecycle_actions_total{action="skipped_locked"}` sample.
2003    /// Previously (v0.7 #45) the skip path bumped only the in-report
2004    /// counter — operator dashboards saw no signal when Object Lock
2005    /// vetoed a Lifecycle Expiration, which is the silent-failure
2006    /// observability gap audit C-2 called out.
2007    #[tokio::test]
2008    async fn scan_one_config_skips_locked_objects_and_bumps_metric() {
2009        // The Prometheus recorder is a process-global slot. Multiple
2010        // tests in the same binary race on `install_recorder()`, so
2011        // we route through `crate::metrics::test_metrics_handle()`
2012        // which is OnceLock-guarded and shared with the
2013        // `metrics::tests::install_and_render_basic_counters` test.
2014        // Use a unique bucket label so this test's sample line is
2015        // identifiable even when other tests in the binary also bump
2016        // the lifecycle counter under different bucket labels.
2017        let metrics_handle = crate::metrics::test_metrics_handle();
2018
2019        let bucket = "lc-locked-metric-65";
2020        let backend = ScannerMemBackend::default();
2021        // Three objects; the middle one ("middle.log") will be
2022        // Object-Lock-Compliance-locked until 2099. The two outer
2023        // objects ("outer-a.log", "outer-c.log") have no lock state
2024        // attached, so the aggressive `expire_after_days(0)` rule
2025        // matches and the scanner's `delete_object` actually fires.
2026        backend.put_now(bucket, "outer-a.log", Bytes::from_static(b"a"));
2027        backend.put_now(bucket, "middle.log", Bytes::from_static(b"m"));
2028        backend.put_now(bucket, "outer-c.log", Bytes::from_static(b"c"));
2029
2030        let registry = Arc::new(
2031            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
2032        );
2033        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2034        let mgr = Arc::new(LifecycleManager::new());
2035        mgr.put(
2036            bucket,
2037            LifecycleConfig {
2038                rules: vec![LifecycleRule::expire_after_days("r", 1)],
2039            },
2040        );
2041        // Object-Lock Compliance retain until far in the future (2099).
2042        // `is_locked(now)` then returns `true` regardless of when the
2043        // test actually runs.
2044        let lock_mgr = Arc::new(ObjectLockManager::new());
2045        let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
2046            .expect("parse retain_until")
2047            .with_timezone(&Utc);
2048        lock_mgr.set(
2049            bucket,
2050            "middle.log",
2051            ObjectLockState {
2052                mode: Some(LockMode::Compliance),
2053                retain_until: Some(retain_until),
2054                legal_hold_on: false,
2055            },
2056        );
2057        let s4 = Arc::new(
2058            S4Service::new(backend, registry, dispatcher)
2059                .with_lifecycle(Arc::clone(&mgr))
2060                .with_object_lock(Arc::clone(&lock_mgr)),
2061        );
2062
2063        // The objects above were `put_now(...)` with `last_modified =
2064        // SystemTime::now()`, so their computed `age` is roughly zero
2065        // and the `expire_after_days(1)` rule alone would NOT match.
2066        // Force the rule threshold down to zero days so all three
2067        // objects qualify for Expiration — the test is about the Lock
2068        // veto, not the age math.
2069        mgr.put(
2070            bucket,
2071            LifecycleConfig {
2072                rules: vec![LifecycleRule::expire_after_days("r", 0)],
2073            },
2074        );
2075
2076        let report = run_scan_once(&s4).await.expect("scan");
2077        assert_eq!(report.buckets_scanned, 1);
2078        assert_eq!(report.objects_evaluated, 3);
2079        assert_eq!(
2080            report.expired, 2,
2081            "outer-a.log + outer-c.log must be DELETEd; got {report:?}"
2082        );
2083        assert_eq!(
2084            report.skipped_locked, 1,
2085            "middle.log is Compliance-locked → scanner must skip; got {report:?}"
2086        );
2087        assert_eq!(report.transitioned, 0);
2088        assert_eq!(report.action_errors, 0);
2089
2090        // Render the Prometheus exporter and assert that a sample line
2091        // for `s4_lifecycle_actions_total{...action="skipped_locked",
2092        // bucket="lc-locked-metric-65"...}` is present with value >= 1.
2093        // The metrics-exporter-prometheus crate sorts labels
2094        // alphabetically (`bucket` appears before `action` in the
2095        // rendered output), so we substring-match both label fragments
2096        // rather than rely on a fixed ordering. We use `>=` (not
2097        // `==`) because the recorder is process-global and a parallel
2098        // run of the same test in a future session could legitimately
2099        // bump it again — but since the bucket label embeds an
2100        // issue-unique suffix, no other test in this binary touches
2101        // this specific (action, bucket) pair.
2102        let rendered = metrics_handle.render();
2103        let bucket_frag = format!("bucket=\"{bucket}\"");
2104        let action_frag = "action=\"skipped_locked\"";
2105        let line = rendered
2106            .lines()
2107            .find(|l| {
2108                l.starts_with("s4_lifecycle_actions_total{")
2109                    && l.contains(&bucket_frag)
2110                    && l.contains(action_frag)
2111            })
2112            .unwrap_or_else(|| {
2113                panic!(
2114                    "Prometheus output missing skipped_locked sample for {bucket}; \
2115                     full render:\n{rendered}"
2116                )
2117            });
2118        // Parse the trailing counter value (whitespace-separated).
2119        let value: u64 = line
2120            .split_whitespace()
2121            .next_back()
2122            .expect("counter value column")
2123            .parse()
2124            .expect("counter value is u64");
2125        assert!(
2126            value >= 1,
2127            "skipped_locked counter must be >= 1 after scan; line: {line}"
2128        );
2129    }
2130
2131    /// v0.8.3 #69 (audit M-2): end-to-end test of the multipart sweep.
2132    /// Two in-flight uploads are seeded — `u-stale` initiated 8 days
2133    /// ago, `u-fresh` initiated 1 hour ago. The lifecycle rule sets
2134    /// `abort_incomplete_multipart_upload_days = 7`. The scanner must
2135    /// abort `u-stale` (bumping `report.aborted_multipart`) but leave
2136    /// `u-fresh` alone. Object walk is a no-op (no objects seeded), so
2137    /// the report's expire / transition counters stay at zero.
2138    #[tokio::test]
2139    async fn run_scan_once_aborts_stale_multipart_upload() {
2140        let backend = ScannerMemBackend::default();
2141        let bucket = "lc-mp-69";
2142        let now = Utc::now();
2143        backend.put_multipart_upload(bucket, "u-stale", "uploads/big.bin", now - Duration::days(8));
2144        backend.put_multipart_upload(bucket, "u-fresh", "uploads/fresh.bin", now - Duration::hours(1));
2145
2146        let registry = Arc::new(
2147            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
2148        );
2149        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2150        let mgr = Arc::new(LifecycleManager::new());
2151        let mut rule = LifecycleRule {
2152            id: "abort-7d".into(),
2153            status: LifecycleStatus::Enabled,
2154            filter: LifecycleFilter::default(),
2155            expiration_days: None,
2156            expiration_date: None,
2157            transitions: Vec::new(),
2158            noncurrent_version_expiration_days: None,
2159            abort_incomplete_multipart_upload_days: Some(7),
2160        };
2161        rule.filter.prefix = Some("uploads/".into());
2162        mgr.put(bucket, LifecycleConfig { rules: vec![rule] });
2163        let s4 = Arc::new(
2164            S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
2165        );
2166
2167        let report = run_scan_once(&s4).await.expect("scan");
2168        assert_eq!(report.buckets_scanned, 1);
2169        assert_eq!(
2170            report.aborted_multipart, 1,
2171            "u-stale must be aborted; got {report:?}",
2172        );
2173        assert_eq!(report.action_errors, 0);
2174        assert_eq!(report.expired, 0);
2175        assert_eq!(report.transitioned, 0);
2176
2177        // Backend post-condition via the wire-side
2178        // `list_multipart_uploads` path: only the fresh upload
2179        // (`u-fresh`) survives — `u-stale` was aborted by the
2180        // scanner.
2181        let post_req = synthetic_request(
2182            ListMultipartUploadsInput {
2183                bucket: bucket.into(),
2184                ..Default::default()
2185            },
2186            http::Method::GET,
2187            &format!("/{bucket}?uploads"),
2188        );
2189        let post = s4
2190            .as_ref()
2191            .list_multipart_uploads(post_req)
2192            .await
2193            .expect("post-scan list_multipart_uploads");
2194        let remaining_ids: Vec<String> = post
2195            .output
2196            .uploads
2197            .unwrap_or_default()
2198            .into_iter()
2199            .filter_map(|u| u.upload_id)
2200            .collect();
2201        assert_eq!(
2202            remaining_ids,
2203            vec!["u-fresh".to_string()],
2204            "exactly u-fresh must remain after the sweep; got {remaining_ids:?}",
2205        );
2206
2207        // Counter snapshot agrees with the report.
2208        let snap = mgr.actions_snapshot();
2209        assert_eq!(
2210            snap.get(&(bucket.into(), "abort_incomplete_multipart".into()))
2211                .copied(),
2212            Some(1),
2213            "v0.8.3 #69: abort_incomplete_multipart counter must be 1",
2214        );
2215    }
2216}