Skip to main content

s4_server/
lifecycle.rs

1//! S3 Lifecycle execution — per-bucket rule evaluation + manager skeleton
2//! (v0.6 #37).
3//!
4//! AWS S3 Lifecycle attaches a **list of rules** to a bucket; each rule may
5//! request that S3
6//!
7//! 1. **Expire** an object once its age (or the calendar date) crosses a
8//!    threshold (`Expiration { Days | Date }`),
9//! 2. **Transition** an object to a different storage class (`Transition
10//!    { Days, StorageClass }` — `STANDARD_IA`, `GLACIER_IR`, ...),
11//! 3. **Expire noncurrent versions** in a versioning-enabled bucket
12//!    (`NoncurrentVersionExpiration { NoncurrentDays }`).
13//!
14//! Until v0.6 #37 the matching `PutBucketLifecycleConfiguration` /
15//! `GetBucketLifecycleConfiguration` / `DeleteBucketLifecycle` handlers
16//! in `crates/s4-server/src/service.rs` were pure passthroughs (the s3s
17//! framework's default backend stored them but nothing read the rules).
18//! This module owns the in-memory configuration store + the rule
19//! evaluator that decides, for any single object, whether an action
20//! should fire **right now**.
21//!
22//! ## responsibilities (v0.6 #37)
23//!
24//! - in-memory `bucket -> LifecycleConfig` map with JSON snapshot
25//!   round-trip (mirroring `versioning.rs` / `object_lock.rs` /
26//!   `inventory.rs`'s shape so `--lifecycle-state-file` is a one-line
27//!   addition in `main.rs`).
28//! - per-bucket action counters (`actions_total`) — bumped by the
29//!   future scanner when an Expiration / Transition /
30//!   NoncurrentExpiration action is taken, surfaced via Prometheus
31//!   (`s4_lifecycle_actions_total`, see `metrics.rs`).
32//! - [`LifecycleManager::evaluate`] — given one (bucket, key, age,
33//!   size, tags) tuple, walk the bucket's rules in declaration order
34//!   and return the first matching action. Returns `None` when no
35//!   rule matches (or when the matching rule is `Disabled`).
36//! - [`evaluate_batch`] — batched form for the test path: walks a
37//!   slice of `(key, age, size, tags)` tuples and returns the (key,
38//!   action) pairs that should fire. The actual backend invocation
39//!   (S3.delete_object / metadata rewrite) is the caller's job.
40//!
41//! ## scope limitations (v0.6 #37)
42//!
43//! - **Background scanner is a skeleton only.** `main.rs`'s
44//!   `--lifecycle-scan-interval-hours` flag spawns a tokio task that
45//!   logs the bucket list and stamps a "would-have-run" marker;
46//!   walking the source bucket via `list_objects_v2` and actually
47//!   invoking `delete_object` / metadata rewrite for each evaluated
48//!   action is deferred to v0.7+. Wiring the scheduler to walk a real
49//!   bucket end-to-end requires a back-reference from the scheduler
50//!   into `S4Service` for the `list_objects_v2` walk and that
51//!   reshuffle is out of scope for this issue. The
52//!   [`crate::S4Service::run_lifecycle_once_for_test`] entry covers
53//!   the in-memory equivalent so the unit + E2E tests exercise the
54//!   evaluator end-to-end.
55//! - **`AbortIncompleteMultipartUpload`** is parsed and stored on the
56//!   `LifecycleRule` (so PutBucketLifecycleConfiguration round-trips
57//!   the field) but not enforced — multipart abort sweeping is a
58//!   separate scanner that lives next to the multipart upload manager
59//!   (v0.7+).
60//! - **`expiration_date` (calendar date)** is supported in the
61//!   evaluator: a rule with `expiration_date` past `now` fires
62//!   Expiration immediately. Same wire form as AWS S3.
63//! - **Multi-instance replication.** All state is single-instance
64//!   in-memory; `--lifecycle-state-file <PATH>` provides restart
65//!   recovery via JSON snapshot, matching the
66//!   `--versioning-state-file` shape.
67//! - **Object Lock interplay**: the evaluator does NOT consult the
68//!   `ObjectLockManager` directly (the evaluator API is
69//!   object-tags-and-size only); the scanner caller is expected to
70//!   skip locked objects — see the `evaluate_batch_skips_locked` test
71//!   for the canonical pattern. Locking always wins over Lifecycle.
72//! - **Versioning interplay**: the evaluator treats noncurrent
73//!   versions as a separate input — pass `is_noncurrent = true` to
74//!   [`LifecycleManager::evaluate_with_flags`] for noncurrent version
75//!   expiration matching. The legacy `evaluate` shorthand defaults
76//!   `is_noncurrent = false` (current version) so existing call sites
77//!   stay one-liners.
78
79use std::collections::HashMap;
80use std::sync::Arc;
81use std::sync::RwLock;
82
83use chrono::{DateTime, Duration, Utc};
84use s3s::S3;
85use s3s::S3Request;
86use s3s::dto::*;
87use serde::{Deserialize, Serialize};
88use tracing::warn;
89
90/// Whether a rule is currently being applied. Mirrors AWS S3
91/// `ExpirationStatus` (`"Enabled"` / `"Disabled"`).
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub enum LifecycleStatus {
94    Enabled,
95    Disabled,
96}
97
98impl LifecycleStatus {
99    /// Wire form used by AWS S3 (`"Enabled"` / `"Disabled"`).
100    #[must_use]
101    pub fn as_aws_str(self) -> &'static str {
102        match self {
103            Self::Enabled => "Enabled",
104            Self::Disabled => "Disabled",
105        }
106    }
107
108    /// Parse the AWS wire form (case-insensitive). Falls back to `Disabled`
109    /// on unrecognised input — this matches AWS conservative behaviour
110    /// (an unparseable status is treated as "off" so a typo doesn't silently
111    /// expire data).
112    #[must_use]
113    pub fn from_aws_str(s: &str) -> Self {
114        if s.eq_ignore_ascii_case("Enabled") {
115            Self::Enabled
116        } else {
117            Self::Disabled
118        }
119    }
120}
121
122/// Per-rule object filter. AWS S3 represents the filter as one of `Prefix`,
123/// `Tag`, `ObjectSizeGreaterThan`, `ObjectSizeLessThan`, or `And` (= AND of
124/// any subset of those predicates). For internal storage we flatten the
125/// "And" form into a struct of optional fields plus a vector of (key, value)
126/// tags — every present field must match (logical AND). An empty filter (all
127/// fields `None` / empty `tags`) matches every object in the bucket.
128#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
129pub struct LifecycleFilter {
130    /// Object key prefix (empty / `None` = no prefix gating).
131    #[serde(default)]
132    pub prefix: Option<String>,
133    /// Logical AND across every entry: every (key, value) must match the
134    /// object's own tag set.
135    #[serde(default)]
136    pub tags: Vec<(String, String)>,
137    /// Object must be *strictly greater than* this size in bytes.
138    #[serde(default)]
139    pub object_size_greater_than: Option<u64>,
140    /// Object must be *strictly less than* this size in bytes.
141    #[serde(default)]
142    pub object_size_less_than: Option<u64>,
143}
144
145impl LifecycleFilter {
146    /// `true` when this filter accepts the candidate. Empty filter accepts
147    /// every object. Tag matching is AND of all listed tags (each present in
148    /// `object_tags` with the matching value).
149    #[must_use]
150    pub fn matches(&self, key: &str, size: u64, object_tags: &[(String, String)]) -> bool {
151        if let Some(p) = &self.prefix
152            && !key.starts_with(p)
153        {
154            return false;
155        }
156        if let Some(min) = self.object_size_greater_than
157            && size <= min
158        {
159            return false;
160        }
161        if let Some(max) = self.object_size_less_than
162            && size >= max
163        {
164            return false;
165        }
166        for (tk, tv) in &self.tags {
167            let matched = object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv);
168            if !matched {
169                return false;
170            }
171        }
172        true
173    }
174}
175
176/// A single transition step (object age threshold + target storage class).
177/// `days` is days since the object was created. AWS S3 also accepts `Date`
178/// for transitions but Lifecycle deployments overwhelmingly use `Days`; the
179/// `Date` form is omitted here on purpose to keep the evaluator narrow
180/// (operators wanting calendar transitions can synthesise a one-shot rule
181/// at the cadence of their scanner).
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
183pub struct TransitionRule {
184    pub days: u32,
185    /// Target storage class (`"STANDARD_IA"` / `"GLACIER_IR"` /
186    /// `"GLACIER"` / `"DEEP_ARCHIVE"` / `"INTELLIGENT_TIERING"` /
187    /// `"ONEZONE_IA"`). Stored as the AWS wire string so PutBucket /
188    /// GetBucket round-trip is a no-op.
189    pub storage_class: String,
190}
191
192/// One lifecycle rule. AWS S3's `LifecycleRule` flattened into the subset
193/// the v0.6 #37 evaluator handles. `id` is the operator-supplied label and
194/// makes Get / Put round-trips non-lossy.
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196pub struct LifecycleRule {
197    pub id: String,
198    pub status: LifecycleStatus,
199    #[serde(default)]
200    pub filter: LifecycleFilter,
201    /// Days since the object was created. Mutually exclusive with
202    /// [`Self::expiration_date`] in AWS — both fields are accepted here on
203    /// input (the evaluator picks `expiration_days` first, then
204    /// `expiration_date`) so a malformed rule with both set still evaluates
205    /// deterministically rather than silently dropping the action.
206    #[serde(default)]
207    pub expiration_days: Option<u32>,
208    /// Calendar date past which matching objects are expired (AWS wire form
209    /// is ISO 8601; here we keep it as a `DateTime<Utc>` so round-trips
210    /// through `serde_json` survive without re-parsing).
211    #[serde(default)]
212    pub expiration_date: Option<DateTime<Utc>>,
213    /// Transition steps in declaration order. The evaluator picks the
214    /// deepest transition (largest `days` ≤ object age) and resolves any
215    /// conflict with expiration in [`LifecycleManager::evaluate_with_flags`].
216    #[serde(default)]
217    pub transitions: Vec<TransitionRule>,
218    /// Days an object has been noncurrent before the noncurrent-version
219    /// expiration fires. Only consulted when the evaluator is asked about
220    /// a noncurrent object (`is_noncurrent = true`).
221    #[serde(default)]
222    pub noncurrent_version_expiration_days: Option<u32>,
223    /// Days after a multipart upload is initiated before the abort fires.
224    /// Stored so PutBucket round-trips, but **not enforced** in the
225    /// v0.6 #37 evaluator — multipart sweeping lives elsewhere.
226    #[serde(default)]
227    pub abort_incomplete_multipart_upload_days: Option<u32>,
228}
229
230impl LifecycleRule {
231    /// Convenience constructor for a "expire after N days" rule. Useful in
232    /// tests + operator scripts.
233    #[must_use]
234    pub fn expire_after_days(id: impl Into<String>, days: u32) -> Self {
235        Self {
236            id: id.into(),
237            status: LifecycleStatus::Enabled,
238            filter: LifecycleFilter::default(),
239            expiration_days: Some(days),
240            expiration_date: None,
241            transitions: Vec::new(),
242            noncurrent_version_expiration_days: None,
243            abort_incomplete_multipart_upload_days: None,
244        }
245    }
246}
247
248/// Per-bucket lifecycle configuration (ordered list of rules — first match
249/// wins, matching AWS S3 semantics).
250#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
251pub struct LifecycleConfig {
252    pub rules: Vec<LifecycleRule>,
253}
254
255/// The action a single rule wants to take **right now** for a candidate
256/// object.
257#[derive(Clone, Debug, PartialEq, Eq)]
258pub enum LifecycleAction {
259    /// Delete the object (`Expiration` / `NoncurrentVersionExpiration`).
260    Expire,
261    /// Move the object to a different storage class (`Transition`). The
262    /// inner string is the AWS wire form (e.g. `"GLACIER_IR"`).
263    Transition { storage_class: String },
264    /// v0.8.3 #69 (audit M-2): abort an in-flight multipart upload that
265    /// has been initiated longer ago than the rule's
266    /// `abort_incomplete_multipart_upload_days`. The inner string is the
267    /// backend-issued `upload_id` (so the scanner can route the
268    /// `AbortMultipartUpload` call without re-listing). Same wire
269    /// semantic as AWS S3 `AbortIncompleteMultipartUpload`.
270    AbortMultipartUpload { upload_id: String },
271}
272
273impl LifecycleAction {
274    /// Stable label suitable for a metric counter
275    /// (`s4_lifecycle_actions_total{action="..."}`).
276    #[must_use]
277    pub fn metric_label(&self) -> &'static str {
278        match self {
279            Self::Expire => "expire",
280            Self::Transition { .. } => "transition",
281            Self::AbortMultipartUpload { .. } => "abort_incomplete_multipart",
282        }
283    }
284}
285
286/// v0.8.3 #69: one in-flight multipart upload the lifecycle scanner
287/// considers for abort. Mirrors the (subset of) `MultipartUpload` fields
288/// the rule evaluator needs (key, upload_id, initiated). `tags` is kept
289/// in the shape the existing object-path evaluator uses
290/// (`Vec<(String, String)>`) so a future enhancement that surfaces
291/// upload-time tags from `MultipartStateStore` can flow through the
292/// same filter check without API churn — AWS S3 itself does not attach
293/// tags to in-flight multipart uploads, so for the scanner-driven path
294/// the slice is always empty (the filter's prefix / size predicates
295/// still apply via [`LifecycleFilter::matches`], passing size = 0).
296#[derive(Clone, Debug)]
297pub struct MultipartUploadCandidate {
298    pub upload_id: String,
299    pub key: String,
300    pub initiated: DateTime<Utc>,
301    pub tags: Vec<(String, String)>,
302}
303
304/// snapshot のシリアライズ format。`to_json` / `from_json` 用。
305#[derive(Debug, Default, Serialize, Deserialize)]
306struct LifecycleSnapshot {
307    by_bucket: HashMap<String, LifecycleConfig>,
308}
309
310/// Per-bucket lifecycle configuration manager.
311///
312/// All read / write operations go through `RwLock` for thread safety;
313/// clones are cheap (`Arc<LifecycleManager>` is the expected handle shape).
314/// `actions_total` is a parallel `RwLock<HashMap<...>>` of `(bucket,
315/// action_label) -> count` so the future background scanner can stamp
316/// successful actions and operators can `GET /metrics` to see the running
317/// totals (the metric is also surfaced via `metrics::counter!` — see
318/// [`crate::metrics::record_lifecycle_action`]).
319#[derive(Debug, Default)]
320pub struct LifecycleManager {
321    by_bucket: RwLock<HashMap<String, LifecycleConfig>>,
322    /// `(bucket, action_label) -> count`. Bumped by the scanner via
323    /// [`Self::record_action`]. Action labels are the
324    /// [`LifecycleAction::metric_label`] values
325    /// (`"expire"` / `"transition"`).
326    actions_total: RwLock<HashMap<(String, String), u64>>,
327}
328
329impl LifecycleManager {
330    /// Empty manager — no bucket has rules.
331    #[must_use]
332    pub fn new() -> Self {
333        Self::default()
334    }
335
336    /// Replace (or create) the lifecycle configuration for `bucket`. Drops
337    /// any previously-attached rules in one shot — matches AWS S3
338    /// `PutBucketLifecycleConfiguration` (full replace, no merge).
339    pub fn put(&self, bucket: &str, config: LifecycleConfig) {
340        crate::lock_recovery::recover_write(&self.by_bucket, "lifecycle.by_bucket")
341            .insert(bucket.to_owned(), config);
342    }
343
344    /// Return a clone of the bucket's configuration, if any.
345    #[must_use]
346    pub fn get(&self, bucket: &str) -> Option<LifecycleConfig> {
347        crate::lock_recovery::recover_read(&self.by_bucket, "lifecycle.by_bucket")
348            .get(bucket)
349            .cloned()
350    }
351
352    /// Drop the bucket's lifecycle configuration (idempotent — missing
353    /// bucket is OK).
354    pub fn delete(&self, bucket: &str) {
355        crate::lock_recovery::recover_write(&self.by_bucket, "lifecycle.by_bucket").remove(bucket);
356    }
357
358    /// JSON snapshot for restart-recoverable state. Pair with
359    /// [`Self::from_json`].
360    pub fn to_json(&self) -> Result<String, serde_json::Error> {
361        let by_bucket =
362            crate::lock_recovery::recover_read(&self.by_bucket, "lifecycle.by_bucket").clone();
363        let snap = LifecycleSnapshot { by_bucket };
364        serde_json::to_string(&snap)
365    }
366
367    /// Restore from a JSON snapshot produced by [`Self::to_json`]. Action
368    /// counters are intentionally not snapshotted — they're transient
369    /// observability data and should reset across process restarts so
370    /// `rate(s4_lifecycle_actions_total[1h])` doesn't double-count.
371    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
372        let snap: LifecycleSnapshot = serde_json::from_str(s)?;
373        Ok(Self {
374            by_bucket: RwLock::new(snap.by_bucket),
375            actions_total: RwLock::new(HashMap::new()),
376        })
377    }
378
379    /// Evaluate which rule (if any) applies to a single **current-version**
380    /// object right now. Walks the bucket's rules in declaration order;
381    /// returns the first matching action. Returns `None` when no rule
382    /// matches (or when the matching rule is `Disabled`, or when the
383    /// bucket has no lifecycle configuration).
384    ///
385    /// Within a single rule the precedence is:
386    ///
387    /// 1. Pick the deepest transition whose `days` threshold is currently
388    ///    met (= largest `days ≤ object age`).
389    /// 2. Conflict with expiration: if `expiration_days <=
390    ///    transition_days` for the chosen transition, expiration wins
391    ///    (the rule wants the object gone before it would have been
392    ///    transitioned). Otherwise transition wins (e.g. transition at
393    ///    30d, expiration at 365d, age 60d → transition fires now,
394    ///    expiration is future).
395    /// 3. `expiration_date` matches when `now >= expiration_date` and no
396    ///    transition is currently applicable.
397    ///
398    /// `object_age` is "now - created_at" supplied by the caller — keeping
399    /// the evaluator pure of the wall clock makes deterministic testing
400    /// trivial.
401    #[must_use]
402    pub fn evaluate(
403        &self,
404        bucket: &str,
405        key: &str,
406        object_age: Duration,
407        object_size: u64,
408        object_tags: &[(String, String)],
409    ) -> Option<LifecycleAction> {
410        self.evaluate_with_flags(
411            bucket,
412            key,
413            object_age,
414            object_size,
415            object_tags,
416            EvaluateFlags::default(),
417        )
418    }
419
420    /// Full-form evaluator with flags for noncurrent-version handling.
421    /// Use this when the scanner is walking a versioning-enabled bucket;
422    /// pass `is_noncurrent = true` for entries that are not the latest
423    /// non-delete-marker version.
424    #[must_use]
425    pub fn evaluate_with_flags(
426        &self,
427        bucket: &str,
428        key: &str,
429        object_age: Duration,
430        object_size: u64,
431        object_tags: &[(String, String)],
432        flags: EvaluateFlags,
433    ) -> Option<LifecycleAction> {
434        let cfg = self.get(bucket)?;
435        let now_for_date = flags.now.unwrap_or_else(Utc::now);
436        let age_days = object_age.num_days().max(0);
437        let age_days_u32 = u32::try_from(age_days).unwrap_or(u32::MAX);
438        for rule in &cfg.rules {
439            if rule.status != LifecycleStatus::Enabled {
440                continue;
441            }
442            if !rule.filter.matches(key, object_size, object_tags) {
443                continue;
444            }
445            // Noncurrent-version expiration: only consulted when the
446            // caller explicitly flags this entry as noncurrent. The
447            // current-version expiration / transition rules do not fire
448            // for noncurrent versions in AWS S3 semantics.
449            if flags.is_noncurrent {
450                if let Some(days) = rule.noncurrent_version_expiration_days
451                    && age_days_u32 >= days
452                {
453                    return Some(LifecycleAction::Expire);
454                }
455                continue;
456            }
457            // Current-version path.
458            let exp_days_match = rule.expiration_days.filter(|d| age_days_u32 >= *d);
459            let exp_date_match = rule.expiration_date.filter(|d| now_for_date >= *d);
460            // Pick the deepest transition whose threshold is at or
461            // below the object's age. Transitions are typically
462            // declaration-ordered by ascending `days`, but we don't
463            // require it — taking the largest threshold means an
464            // object aged 90d gets `GLACIER` over `STANDARD_IA` even
465            // if `STANDARD_IA(30d)` was declared first.
466            let chosen_transition = rule
467                .transitions
468                .iter()
469                .filter(|t| age_days_u32 >= t.days)
470                .max_by_key(|t| t.days);
471            // Conflict resolution: when `expiration_days` fires AND a
472            // transition fires, expiration wins iff
473            // `expiration_days <= transition_days` (rule wants object
474            // gone before / at the same time it would have been
475            // transitioned). Otherwise the transition wins.
476            if let Some(exp_threshold) = exp_days_match {
477                let trans_threshold = chosen_transition.map(|t| t.days).unwrap_or(u32::MAX);
478                if exp_threshold <= trans_threshold {
479                    return Some(LifecycleAction::Expire);
480                }
481            }
482            if let Some(t) = chosen_transition {
483                return Some(LifecycleAction::Transition {
484                    storage_class: t.storage_class.clone(),
485                });
486            }
487            // Calendar-date expiration (no transition currently
488            // applicable, but the rule's expiration_date is past).
489            if exp_date_match.is_some() {
490                return Some(LifecycleAction::Expire);
491            }
492            // Fall through to the next rule when no action fires for
493            // this rule — first-match-wins applies only to *firing*
494            // rules, matching AWS semantics where overlapping rules
495            // with disjoint thresholds compose.
496        }
497        None
498    }
499
500    /// v0.8.3 #69 (audit M-2): evaluate one in-flight multipart upload
501    /// against the bucket's rules. Returns
502    /// [`LifecycleAction::AbortMultipartUpload`] when at least one
503    /// `Enabled` rule (a) accepts the upload's key via its filter and
504    /// (b) carries an `abort_incomplete_multipart_upload_days`
505    /// threshold whose age (`now - initiated`) is currently met.
506    /// Returns `None` otherwise (no matching rule, no
507    /// abort-multipart-upload-days set, or the upload is too young).
508    ///
509    /// Filter matching reuses [`LifecycleFilter::matches`] with
510    /// `object_size = 0` — in-flight uploads have no assembled size
511    /// yet (the parts are stored independently in the backend), so
512    /// any rule whose filter sets `object_size_greater_than` /
513    /// `object_size_less_than` is treated as if the upload were
514    /// 0 bytes. AWS S3 itself does not gate
515    /// `AbortIncompleteMultipartUpload` on size; this matches the
516    /// AWS semantic (size predicates simply do not apply to the
517    /// abort path) for the typical filter shape (no size predicate).
518    /// Operators wanting size-gated abort can carry the upload's
519    /// declared part length on the `MultipartUploadCandidate` in a
520    /// follow-up issue — the API extension is additive.
521    #[must_use]
522    pub fn evaluate_in_flight_multipart(
523        &self,
524        bucket: &str,
525        upload: &MultipartUploadCandidate,
526        now: DateTime<Utc>,
527    ) -> Option<LifecycleAction> {
528        let cfg = self.get(bucket)?;
529        for rule in &cfg.rules {
530            if rule.status != LifecycleStatus::Enabled {
531                continue;
532            }
533            if !rule.filter.matches(&upload.key, 0, &upload.tags) {
534                continue;
535            }
536            if let Some(days) = rule.abort_incomplete_multipart_upload_days {
537                let age = now.signed_duration_since(upload.initiated);
538                if age >= Duration::days(i64::from(days)) {
539                    return Some(LifecycleAction::AbortMultipartUpload {
540                        upload_id: upload.upload_id.clone(),
541                    });
542                }
543            }
544        }
545        None
546    }
547
548    /// Stamp the per-bucket action counter and bump the matching
549    /// Prometheus counter. Called by the future scanner after a successful
550    /// delete / metadata rewrite.
551    pub fn record_action(&self, bucket: &str, action: &LifecycleAction) {
552        let label = action.metric_label();
553        let key = (bucket.to_owned(), label.to_owned());
554        let mut guard =
555            crate::lock_recovery::recover_write(&self.actions_total, "lifecycle.actions_total");
556        let entry = guard.entry(key).or_insert(0);
557        *entry = entry.saturating_add(1);
558        crate::metrics::record_lifecycle_action(bucket, label);
559    }
560
561    /// Read-only snapshot of the per-(bucket, action) counter map.
562    /// Useful for tests + introspection (`/admin/lifecycle/stats` style
563    /// endpoints in the future).
564    #[must_use]
565    pub fn actions_snapshot(&self) -> HashMap<(String, String), u64> {
566        crate::lock_recovery::recover_read(&self.actions_total, "lifecycle.actions_total").clone()
567    }
568
569    /// All buckets with a lifecycle configuration attached. Sorted for
570    /// stable scanner ordering.
571    #[must_use]
572    pub fn buckets(&self) -> Vec<String> {
573        let map = crate::lock_recovery::recover_read(&self.by_bucket, "lifecycle.by_bucket");
574        let mut out: Vec<String> = map.keys().cloned().collect();
575        out.sort();
576        out
577    }
578}
579
580/// Flags for [`LifecycleManager::evaluate_with_flags`]. Default is
581/// "current-version object, evaluator picks `Utc::now()` for the date
582/// comparison". Tests override `now` for determinism.
583#[derive(Clone, Copy, Debug, Default)]
584pub struct EvaluateFlags {
585    pub is_noncurrent: bool,
586    pub now: Option<DateTime<Utc>>,
587}
588
589/// One object the evaluator considers in a batch:
590/// `(key, object_age, object_size, object_tags)`. Defined as a type alias
591/// so [`evaluate_batch`] / [`crate::S4Service::run_lifecycle_once_for_test`]
592/// don't trip clippy's `type-complexity` lint, and so callers building the
593/// list have a single canonical shape to reach for.
594pub type EvaluateBatchEntry = (String, Duration, u64, Vec<(String, String)>);
595
596/// Test-driven scan entry: walks a list of [`EvaluateBatchEntry`] tuples
597/// and produces (key, action) pairs for every object that should fire an
598/// action **right now**. The actual backend invocation (S3.delete_object /
599/// metadata rewrite) is the caller's job. Used by both unit tests and the
600/// E2E test in `tests/roundtrip.rs`; the future background scanner will
601/// reuse the same entry once the bucket-walk is wired through the backend.
602#[must_use]
603pub fn evaluate_batch(
604    manager: &LifecycleManager,
605    bucket: &str,
606    objects: &[EvaluateBatchEntry],
607) -> Vec<(String, LifecycleAction)> {
608    let mut out = Vec::with_capacity(objects.len());
609    for (key, age, size, tags) in objects {
610        if let Some(action) = manager.evaluate(bucket, key, *age, *size, tags) {
611            out.push((key.clone(), action));
612        }
613    }
614    out
615}
616
617/// Per-invocation scanner counters returned by [`run_scan_once`]. Useful
618/// for tests, the `--lifecycle-scan-interval-hours` log line, and any
619/// future `/admin/lifecycle/scan` introspection endpoint. Operators see
620/// the same numbers via Prometheus
621/// (`s4_lifecycle_actions_total{action="expire"|"transition"}`).
622#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
623pub struct ScanReport {
624    /// Number of buckets the scanner walked (= buckets with a lifecycle
625    /// configuration attached at the moment the scanner ran).
626    pub buckets_scanned: usize,
627    /// Number of distinct keys the scanner evaluated. Multi-page lists
628    /// count one key once even if the listing was paginated.
629    pub objects_evaluated: usize,
630    /// Number of objects deleted as a result of an Expiration action.
631    pub expired: usize,
632    /// Number of objects whose `x-amz-storage-class` was rewritten as a
633    /// result of a Transition action.
634    pub transitioned: usize,
635    /// Number of objects skipped because an Object Lock (Compliance,
636    /// Governance, or legal hold) was in effect. The Lock always wins
637    /// over Lifecycle, matching AWS S3 semantics.
638    pub skipped_locked: usize,
639    /// v0.8.3 #69 (audit M-2): number of in-flight multipart uploads
640    /// the scanner aborted as a result of an
641    /// `AbortIncompleteMultipartUpload` action. Pair with the
642    /// Prometheus counter
643    /// `s4_lifecycle_actions_total{action="abort_incomplete_multipart"}`.
644    /// Only counts successful aborts — a backend
645    /// `abort_multipart_upload` failure bumps `action_errors` instead
646    /// (matching the existing Expire / Transition error-path).
647    pub aborted_multipart: usize,
648    /// Number of objects the evaluator wanted to act on but the action
649    /// failed (e.g. backend `delete_object` returned an error). Logged
650    /// individually at WARN level; this counter exists so tests / metrics
651    /// can assert no silent loss.
652    pub action_errors: usize,
653}
654
655/// Convert an s3s `Timestamp` (`time::OffsetDateTime` underneath) into a
656/// `chrono::DateTime<Utc>` via the RFC3339 wire form. Used by the scanner
657/// to compute object age (= `now - last_modified`). Returns `None` when
658/// the stamp is unparseable, in which case the caller falls back to
659/// treating the object as freshly created (age = 0).
660fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
661    let mut buf = Vec::new();
662    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf)
663        .ok()?;
664    let s = std::str::from_utf8(&buf).ok()?;
665    chrono::DateTime::parse_from_rfc3339(s)
666        .ok()
667        .map(|dt| dt.with_timezone(&Utc))
668}
669
670/// Build a synthetic `S3Request` with the minimum metadata the
671/// scanner-internal calls need. The lifecycle scanner is a
672/// system-internal caller (no end-user credentials, no real HTTP method
673/// / URI), so policy gates downstream see `credentials = None` /
674/// `region = None` and treat the call as anonymous-internal. Backends
675/// that do not gate internal traffic ignore these fields entirely.
676fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
677    S3Request {
678        input,
679        method,
680        uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
681        headers: http::HeaderMap::new(),
682        extensions: http::Extensions::new(),
683        credentials: None,
684        region: None,
685        service: None,
686        trailing_headers: None,
687    }
688}
689
690/// Walk every bucket that has a lifecycle configuration attached, list
691/// its objects via `list_objects_v2` (continuation-token pagination), and
692/// for each object evaluate the rule set + execute the matching
693/// Expiration / Transition action. Object-Lock-protected objects are
694/// **skipped** (the Lock always wins over Lifecycle). Versioning chains
695/// are intentionally out of scope for v0.7 #45 — see the module-level
696/// limitations note.
697///
698/// ## error handling
699///
700/// Per-bucket / per-object failures are logged at WARN level and bumped
701/// in `ScanReport::action_errors`; the scanner does NOT abort early on a
702/// single bad object so one slow / faulty bucket can't starve every
703/// other bucket's lifecycle. The function only returns `Err(_)` when the
704/// scanner cannot make progress at all (no current usage — kept for the
705/// future case where the manager itself becomes unavailable).
706///
707/// ## scope (v0.7 #45)
708///
709/// - Current-version objects only (Versioning-enabled chains rely on
710///   `evaluate_with_flags(is_noncurrent = true)`, but walking the
711///   shadow keys requires the version chain access pattern from
712///   `versioning.rs` and is deferred to a follow-up issue).
713/// - `head_object`'s `last_modified` is used to compute age. When the
714///   backend omits the field (some S3-compatible backends do), the
715///   object is treated as age 0 and skipped — matches AWS conservative
716///   behaviour where a malformed timestamp must not silently expire data.
717/// - Tags are looked up via the attached
718///   [`crate::tagging::TagManager`] (when wired). Buckets without a
719///   tag manager pass an empty tag list to the evaluator.
720/// - Transition rewrites the object's `x-amz-storage-class` via
721///   `copy_object` (same bucket / same key, `MetadataDirective: COPY`,
722///   new `StorageClass`). Backends that ignore the storage class
723///   header silently no-op the transition; the counter still bumps to
724///   reflect "the scanner asked for a transition" (matching AWS where
725///   a no-op transition still costs a request).
726pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
727    s4: &Arc<crate::S4Service<B>>,
728) -> Result<ScanReport, String> {
729    let Some(mgr) = s4.lifecycle_manager().cloned() else {
730        // No lifecycle manager attached (e.g. operator did not set
731        // `--lifecycle-state-file`). Scan is a no-op.
732        return Ok(ScanReport::default());
733    };
734    let buckets = mgr.buckets();
735    if buckets.is_empty() {
736        return Ok(ScanReport::default());
737    }
738    let now = Utc::now();
739    let mut report = ScanReport {
740        buckets_scanned: buckets.len(),
741        ..ScanReport::default()
742    };
743    for bucket in buckets {
744        scan_bucket(s4, &mgr, &bucket, now, &mut report).await;
745        // v0.8.3 #69 (audit M-2): walk in-flight multipart uploads for
746        // the same bucket and abort any whose `Initiated` time is past
747        // the rule's `abort_incomplete_multipart_upload_days` threshold.
748        // Run after the object walk so the (typically smaller) multipart
749        // pass still happens even if the object walk hit a transient
750        // backend error mid-stream (per-bucket failure isolation —
751        // matching the existing one-bad-bucket-doesn't-starve-others
752        // policy).
753        scan_in_flight_multipart_uploads(s4, &mgr, &bucket, now, &mut report).await;
754    }
755    Ok(report)
756}
757
758/// Walk one bucket end-to-end. Pagination uses the `continuation_token`
759/// loop documented in
760/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>.
761async fn scan_bucket<B: S3 + Send + Sync + 'static>(
762    s4: &Arc<crate::S4Service<B>>,
763    mgr: &Arc<LifecycleManager>,
764    bucket: &str,
765    now: DateTime<Utc>,
766    report: &mut ScanReport,
767) {
768    let mut continuation: Option<String> = None;
769    loop {
770        let list_input = ListObjectsV2Input {
771            bucket: bucket.to_owned(),
772            continuation_token: continuation.clone(),
773            ..Default::default()
774        };
775        let list_req = synthetic_request(
776            list_input,
777            http::Method::GET,
778            &format!("/{bucket}?list-type=2"),
779        );
780        let resp = match s4.as_ref().list_objects_v2(list_req).await {
781            Ok(r) => r,
782            Err(e) => {
783                warn!(
784                    bucket = %bucket,
785                    error = %e,
786                    "S4 lifecycle: list_objects_v2 failed; skipping bucket for this scan",
787                );
788                report.action_errors = report.action_errors.saturating_add(1);
789                return;
790            }
791        };
792        let output = resp.output;
793        let contents = output.contents.unwrap_or_default();
794        for obj in &contents {
795            let Some(key) = obj.key.as_deref() else {
796                continue;
797            };
798            // Filter out S4-internal sidecars / shadow versions early so
799            // the lifecycle scanner mirrors the same "client-visible
800            // object set" the customer sees through `list_objects_v2`.
801            // (The S4Service.list_objects_v2 handler already drops them
802            // before returning, but this is a belt-and-braces guard for
803            // any future bypass that builds the list elsewhere.)
804            if key.ends_with(".s4index") {
805                continue;
806            }
807            report.objects_evaluated = report.objects_evaluated.saturating_add(1);
808            let size = obj.size.unwrap_or(0).max(0) as u64;
809            let age = match obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc) {
810                Some(lm) => now.signed_duration_since(lm),
811                None => Duration::zero(),
812            };
813            let tags: Vec<(String, String)> = s4
814                .as_ref()
815                .tag_manager()
816                .and_then(|m| m.get_object_tags(bucket, key))
817                .map(|set| set.iter().cloned().collect())
818                .unwrap_or_default();
819            let Some(action) = mgr.evaluate(bucket, key, age, size, &tags) else {
820                continue;
821            };
822            // Object-Lock-protected objects are skipped before any
823            // backend-mutating call. Lock wins over Lifecycle, full
824            // stop — matches AWS behaviour where an Expiration on a
825            // locked object is dropped, not retried.
826            //
827            // v0.8.3 #65 (audit C-2): in addition to bumping the
828            // in-report counter, emit a Prometheus
829            // `s4_lifecycle_actions_total{action="skipped_locked"}`
830            // sample so operator dashboards can alert on the
831            // "lifecycle wanted to act but Object Lock vetoed" path
832            // (previously a silent skip — the scanner's
833            // `list_objects_v2` walked the key and `evaluate(...)`
834            // returned an action, but no observable signal fired
835            // when the backend would have refused the DELETE).
836            if let Some(lock_mgr) = s4.as_ref().object_lock_manager()
837                && let Some(state) = lock_mgr.get(bucket, key)
838                && state.is_locked(now)
839            {
840                report.skipped_locked = report.skipped_locked.saturating_add(1);
841                crate::metrics::record_lifecycle_action(bucket, "skipped_locked");
842                continue;
843            }
844            match action {
845                LifecycleAction::Expire => match execute_expire(s4, bucket, key).await {
846                    Ok(()) => {
847                        mgr.record_action(bucket, &LifecycleAction::Expire);
848                        report.expired = report.expired.saturating_add(1);
849                    }
850                    Err(e) => {
851                        warn!(
852                            bucket = %bucket,
853                            key = %key,
854                            error = %e,
855                            "S4 lifecycle: Expire action failed",
856                        );
857                        report.action_errors = report.action_errors.saturating_add(1);
858                    }
859                },
860                LifecycleAction::Transition { storage_class } => {
861                    match execute_transition(s4, bucket, key, &storage_class).await {
862                        Ok(()) => {
863                            mgr.record_action(
864                                bucket,
865                                &LifecycleAction::Transition {
866                                    storage_class: storage_class.clone(),
867                                },
868                            );
869                            report.transitioned = report.transitioned.saturating_add(1);
870                        }
871                        Err(e) => {
872                            warn!(
873                                bucket = %bucket,
874                                key = %key,
875                                storage_class = %storage_class,
876                                error = %e,
877                                "S4 lifecycle: Transition action failed",
878                            );
879                            report.action_errors = report.action_errors.saturating_add(1);
880                        }
881                    }
882                }
883                // v0.8.3 #69 (audit M-2): the per-key path's
884                // `evaluate(...)` only ever returns Expire /
885                // Transition (the AbortMultipartUpload variant comes
886                // from the in-flight multipart walker further down,
887                // which uses `evaluate_in_flight_multipart`). Match
888                // exhaustiveness still requires an arm; logging at
889                // warn keeps the control-flow honest if the
890                // evaluator ever grows a path that surfaces an
891                // abort here (e.g. someone wires a future evaluator
892                // that returns abort for a regular object key — the
893                // arm prevents silent dispatch and the warn surfaces
894                // the misuse).
895                LifecycleAction::AbortMultipartUpload { upload_id } => {
896                    warn!(
897                        bucket = %bucket,
898                        key = %key,
899                        upload_id = %upload_id,
900                        "S4 lifecycle: AbortMultipartUpload returned for a key path; \
901                         this is unexpected — the per-key evaluator should only \
902                         emit Expire / Transition. Dropping action.",
903                    );
904                    report.action_errors = report.action_errors.saturating_add(1);
905                }
906            }
907        }
908        if output.is_truncated.unwrap_or(false) {
909            // v0.8.4 #78 (audit M3): pagination guard hardening. AWS
910            // guarantees `NextContinuationToken` when `is_truncated=true`
911            // and that the token always advances, but a malformed
912            // backend (or a third-party S3 emulator with a buggy
913            // listing implementation) can break either invariant. Two
914            // failure modes are now caught explicitly with a
915            // `tracing::warn` so operators see the divergence rather
916            // than spinning forever:
917            //   1. `is_truncated=true` with `next_continuation_token=None`
918            //   2. `next_continuation_token` repeats the previous value
919            //      (caller would re-issue the identical request → same
920            //      page → infinite loop)
921            // Either condition exits the pagination loop for this scan
922            // tick; the next scheduled tick re-enters from the start
923            // marker, so transient backend bugs self-recover.
924            let next = output.next_continuation_token.clone();
925            if next.is_none() {
926                warn!(
927                    bucket = %bucket,
928                    "S4 lifecycle: list_objects_v2 pagination stuck — \
929                     is_truncated=true but next_continuation_token \
930                     missing; breaking loop to avoid spin",
931                );
932                break;
933            }
934            if next == continuation {
935                warn!(
936                    bucket = %bucket,
937                    token = ?continuation,
938                    "S4 lifecycle: list_objects_v2 pagination stuck — \
939                     same continuation_token repeated; breaking loop \
940                     to avoid spin",
941                );
942                break;
943            }
944            continuation = next;
945        } else {
946            break;
947        }
948    }
949}
950
951/// v0.8.3 #69 (audit M-2): walk every in-flight multipart upload for
952/// `bucket` via `list_multipart_uploads` (key-marker / upload-id-marker
953/// pagination) and abort any whose `Initiated` time is older than the
954/// rule's `abort_incomplete_multipart_upload_days` threshold. Successful
955/// aborts bump `report.aborted_multipart` AND (`mgr.record_action`)
956/// `s4_lifecycle_actions_total{action="abort_incomplete_multipart"}` so
957/// operator dashboards see the same signal whether they look at
958/// in-process counters or Prometheus.
959///
960/// On a successful abort the entry in `MultipartStateStore` (which
961/// holds the per-upload SSE-C key bytes / tag set / object-lock recipe
962/// captured at `CreateMultipartUpload` time) is also dropped — same
963/// shape as the user-facing `abort_multipart_upload` handler in
964/// `service.rs`. Without the drop the abandoned upload's `Zeroizing<[u8;
965/// 32]>` SSE-C key would linger in `multipart_state` until the
966/// `sweep_stale` background tick (v0.8.2 #62) reaped it on TTL.
967///
968/// Per-page / per-upload backend failures are logged at WARN and bumped
969/// in `report.action_errors`; the loop does NOT abort the bucket — one
970/// bad upload must not prevent the rest of the bucket's stale uploads
971/// from being cleaned up. Mirrors the same isolation policy
972/// `scan_bucket` uses for `list_objects_v2` failures.
973async fn scan_in_flight_multipart_uploads<B: S3 + Send + Sync + 'static>(
974    s4: &Arc<crate::S4Service<B>>,
975    mgr: &Arc<LifecycleManager>,
976    bucket: &str,
977    now: DateTime<Utc>,
978    report: &mut ScanReport,
979) {
980    let mut key_marker: Option<String> = None;
981    let mut upload_id_marker: Option<String> = None;
982    loop {
983        let list_input = ListMultipartUploadsInput {
984            bucket: bucket.to_owned(),
985            key_marker: key_marker.clone(),
986            upload_id_marker: upload_id_marker.clone(),
987            ..Default::default()
988        };
989        let list_req =
990            synthetic_request(list_input, http::Method::GET, &format!("/{bucket}?uploads"));
991        let resp = match s4.as_ref().list_multipart_uploads(list_req).await {
992            Ok(r) => r,
993            Err(e) => {
994                warn!(
995                    bucket = %bucket,
996                    error = %e,
997                    "S4 lifecycle: list_multipart_uploads failed; \
998                     skipping bucket multipart sweep for this scan",
999                );
1000                report.action_errors = report.action_errors.saturating_add(1);
1001                return;
1002            }
1003        };
1004        let output = resp.output;
1005        let uploads = output.uploads.unwrap_or_default();
1006        for upload in &uploads {
1007            let Some(upload_id) = upload.upload_id.as_deref() else {
1008                continue;
1009            };
1010            let Some(key) = upload.key.as_deref() else {
1011                continue;
1012            };
1013            // `Initiated` is `Option<Timestamp>`; absent or
1014            // unparseable → treat as "freshly initiated" (age 0)
1015            // and skip. Matches the conservative `last_modified`
1016            // handling in `scan_bucket` — never abort an upload
1017            // whose age we cannot determine.
1018            let Some(initiated) = upload.initiated.as_ref().and_then(timestamp_to_chrono_utc)
1019            else {
1020                continue;
1021            };
1022            let candidate = MultipartUploadCandidate {
1023                upload_id: upload_id.to_owned(),
1024                key: key.to_owned(),
1025                initiated,
1026                tags: Vec::new(),
1027            };
1028            let Some(action) = mgr.evaluate_in_flight_multipart(bucket, &candidate, now) else {
1029                continue;
1030            };
1031            let LifecycleAction::AbortMultipartUpload {
1032                upload_id: action_upload_id,
1033            } = action
1034            else {
1035                // The evaluator is contractually
1036                // AbortMultipartUpload-only on this path; this arm
1037                // exists only to satisfy match exhaustiveness if a
1038                // future rev returns a different variant. Treat as
1039                // an error so the divergence is observable.
1040                warn!(
1041                    bucket = %bucket,
1042                    key = %key,
1043                    upload_id = %upload_id,
1044                    "S4 lifecycle: evaluate_in_flight_multipart returned \
1045                     non-Abort action; dropping",
1046                );
1047                report.action_errors = report.action_errors.saturating_add(1);
1048                continue;
1049            };
1050            match execute_abort_multipart(s4, bucket, key, &action_upload_id).await {
1051                Ok(()) => {
1052                    mgr.record_action(
1053                        bucket,
1054                        &LifecycleAction::AbortMultipartUpload {
1055                            upload_id: action_upload_id.clone(),
1056                        },
1057                    );
1058                    report.aborted_multipart = report.aborted_multipart.saturating_add(1);
1059                    // Drop the per-upload state so the
1060                    // (Zeroizing-wrapped) SSE-C key bytes / tag
1061                    // recipe / object-lock recipe go away
1062                    // immediately rather than waiting for the
1063                    // hourly `sweep_stale` tick. Idempotent —
1064                    // `remove(...)` on a missing key is a no-op
1065                    // (some uploads may not have been registered
1066                    // here, e.g. a server restart between Create
1067                    // and the lifecycle sweep).
1068                    s4.as_ref().multipart_state().remove(&action_upload_id);
1069                }
1070                Err(e) => {
1071                    warn!(
1072                        bucket = %bucket,
1073                        key = %key,
1074                        upload_id = %action_upload_id,
1075                        error = %e,
1076                        "S4 lifecycle: AbortMultipartUpload action failed",
1077                    );
1078                    report.action_errors = report.action_errors.saturating_add(1);
1079                }
1080            }
1081        }
1082        if output.is_truncated.unwrap_or(false) {
1083            // v0.8.4 #78 (audit M3): pagination guard hardening — same
1084            // shape as the `scan_bucket` continuation-token guard above
1085            // but for the (key_marker, upload_id_marker) pair. AWS
1086            // guarantees `NextKeyMarker` (and `NextUploadIdMarker` when
1087            // an in-flight upload boundary lands inside a key) on
1088            // truncated responses, but a malformed backend can:
1089            //   1. set `is_truncated=true` and omit BOTH next markers
1090            //      (the next iteration re-issues the same request →
1091            //      same page → infinite loop), or
1092            //   2. echo the same marker pair back (same outcome).
1093            // Both modes warn-log and break — the next scheduled scan
1094            // re-enters from the original markers, so a transient
1095            // backend bug self-recovers.
1096            let next_key = output.next_key_marker.clone();
1097            let next_upload_id = output.next_upload_id_marker.clone();
1098            if next_key.is_none() && next_upload_id.is_none() {
1099                warn!(
1100                    bucket = %bucket,
1101                    "S4 lifecycle: list_multipart_uploads pagination \
1102                     stuck — is_truncated=true but both \
1103                     next_key_marker and next_upload_id_marker \
1104                     missing; breaking loop to avoid spin",
1105                );
1106                break;
1107            }
1108            if next_key == key_marker && next_upload_id == upload_id_marker {
1109                warn!(
1110                    bucket = %bucket,
1111                    key_marker = ?key_marker,
1112                    upload_id_marker = ?upload_id_marker,
1113                    "S4 lifecycle: list_multipart_uploads pagination \
1114                     stuck — same (key_marker, upload_id_marker) pair \
1115                     repeated; breaking loop to avoid spin",
1116                );
1117                break;
1118            }
1119            key_marker = next_key;
1120            upload_id_marker = next_upload_id;
1121        } else {
1122            break;
1123        }
1124    }
1125}
1126
1127/// v0.8.3 #69 (audit M-2): issue `abort_multipart_upload` against the
1128/// wrapped `S4Service`. The handler in `service.rs` does the
1129/// `multipart_state.remove(...)` itself before forwarding to the
1130/// backend; we additionally `remove` from the lifecycle scanner side
1131/// (in [`scan_in_flight_multipart_uploads`]) to defensively cover the
1132/// case where the backend abort succeeds but the response routing
1133/// shortens early.
1134async fn execute_abort_multipart<B: S3 + Send + Sync + 'static>(
1135    s4: &Arc<crate::S4Service<B>>,
1136    bucket: &str,
1137    key: &str,
1138    upload_id: &str,
1139) -> Result<(), String> {
1140    let input = AbortMultipartUploadInput {
1141        bucket: bucket.to_owned(),
1142        key: key.to_owned(),
1143        upload_id: upload_id.to_owned(),
1144        ..Default::default()
1145    };
1146    let req = synthetic_request(
1147        input,
1148        http::Method::DELETE,
1149        &format!("/{bucket}/{key}?uploadId={upload_id}"),
1150    );
1151    s4.as_ref()
1152        .abort_multipart_upload(req)
1153        .await
1154        .map(|_| ())
1155        .map_err(|e| format!("{e}"))
1156}
1157
1158/// Issue `delete_object` against the wrapped `S4Service`. The handler in
1159/// `service.rs` runs the WORM check itself, so even if the scanner's
1160/// pre-check missed (race with an MFA-Delete put-bucket-versioning), the
1161/// backend refuses the delete with `AccessDenied` and the error path
1162/// above bumps `action_errors` rather than silently losing data.
1163async fn execute_expire<B: S3 + Send + Sync + 'static>(
1164    s4: &Arc<crate::S4Service<B>>,
1165    bucket: &str,
1166    key: &str,
1167) -> Result<(), String> {
1168    let input = DeleteObjectInput {
1169        bucket: bucket.to_owned(),
1170        key: key.to_owned(),
1171        ..Default::default()
1172    };
1173    let req = synthetic_request(input, http::Method::DELETE, &format!("/{bucket}/{key}"));
1174    s4.as_ref()
1175        .delete_object(req)
1176        .await
1177        .map(|_| ())
1178        .map_err(|e| format!("{e}"))
1179}
1180
1181/// Rewrite the object's storage class via a same-key `copy_object` with
1182/// `MetadataDirective: COPY` (preserves user metadata) and the new
1183/// `storage_class`. Backends that ignore storage-class headers
1184/// effectively no-op; the counter still records the attempt so dashboards
1185/// reflect the scanner's intent.
1186async fn execute_transition<B: S3 + Send + Sync + 'static>(
1187    s4: &Arc<crate::S4Service<B>>,
1188    bucket: &str,
1189    key: &str,
1190    storage_class: &str,
1191) -> Result<(), String> {
1192    // CopyObjectInput has dozens of `Option` fields plus three required
1193    // (bucket / key / copy_source); the s3s-generated `builder()` is
1194    // the path that fills the optional ones with `None` for us. The
1195    // `set_*` setters return `&mut Self`, so we drive them in
1196    // statement form rather than as a method chain.
1197    let mut builder = CopyObjectInput::builder();
1198    builder.set_bucket(bucket.to_owned());
1199    builder.set_key(key.to_owned());
1200    builder.set_copy_source(CopySource::Bucket {
1201        bucket: bucket.to_owned().into_boxed_str(),
1202        key: key.to_owned().into_boxed_str(),
1203        version_id: None,
1204    });
1205    builder.set_metadata_directive(Some(MetadataDirective::from_static(
1206        MetadataDirective::COPY,
1207    )));
1208    builder.set_storage_class(Some(StorageClass::from(storage_class.to_owned())));
1209    let input = builder
1210        .build()
1211        .map_err(|e| format!("CopyObjectInput build: {e}"))?;
1212    let req = synthetic_request(input, http::Method::PUT, &format!("/{bucket}/{key}"));
1213    s4.as_ref()
1214        .copy_object(req)
1215        .await
1216        .map(|_| ())
1217        .map_err(|e| format!("{e}"))
1218}
1219
1220#[cfg(test)]
1221mod tests {
1222    use super::*;
1223
1224    fn enabled(rule: LifecycleRule) -> LifecycleRule {
1225        LifecycleRule {
1226            status: LifecycleStatus::Enabled,
1227            ..rule
1228        }
1229    }
1230
1231    fn cfg_with(rules: Vec<LifecycleRule>) -> LifecycleConfig {
1232        LifecycleConfig { rules }
1233    }
1234
1235    fn manager_with(bucket: &str, rules: Vec<LifecycleRule>) -> LifecycleManager {
1236        let m = LifecycleManager::new();
1237        m.put(bucket, cfg_with(rules));
1238        m
1239    }
1240
1241    #[test]
1242    fn evaluate_age_past_expiration_returns_expire() {
1243        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
1244        let action = m.evaluate("b", "k", Duration::days(31), 100, &[]);
1245        assert_eq!(action, Some(LifecycleAction::Expire));
1246    }
1247
1248    #[test]
1249    fn evaluate_age_before_expiration_returns_none() {
1250        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
1251        let action = m.evaluate("b", "k", Duration::days(5), 100, &[]);
1252        assert_eq!(action, None);
1253    }
1254
1255    #[test]
1256    fn evaluate_prefix_filter_matches() {
1257        let mut rule = LifecycleRule::expire_after_days("r", 1);
1258        rule.filter.prefix = Some("logs/".into());
1259        let m = manager_with("b", vec![rule]);
1260        assert_eq!(
1261            m.evaluate("b", "logs/2026/a.log", Duration::days(2), 1, &[]),
1262            Some(LifecycleAction::Expire)
1263        );
1264        assert_eq!(
1265            m.evaluate("b", "data/keep.bin", Duration::days(2), 1, &[]),
1266            None
1267        );
1268    }
1269
1270    #[test]
1271    fn evaluate_tag_filter_requires_all_tags_to_match() {
1272        let mut rule = LifecycleRule::expire_after_days("r", 1);
1273        rule.filter.tags = vec![
1274            ("env".into(), "dev".into()),
1275            ("expirable".into(), "yes".into()),
1276        ];
1277        let m = manager_with("b", vec![rule]);
1278        // All tags present + matching → fire.
1279        assert_eq!(
1280            m.evaluate(
1281                "b",
1282                "k",
1283                Duration::days(2),
1284                1,
1285                &[
1286                    ("env".into(), "dev".into()),
1287                    ("expirable".into(), "yes".into()),
1288                    ("owner".into(), "alice".into()),
1289                ]
1290            ),
1291            Some(LifecycleAction::Expire)
1292        );
1293        // One tag missing → no fire.
1294        assert_eq!(
1295            m.evaluate(
1296                "b",
1297                "k",
1298                Duration::days(2),
1299                1,
1300                &[("env".into(), "dev".into())]
1301            ),
1302            None
1303        );
1304        // Tag present but with the wrong value → no fire.
1305        assert_eq!(
1306            m.evaluate(
1307                "b",
1308                "k",
1309                Duration::days(2),
1310                1,
1311                &[
1312                    ("env".into(), "prod".into()),
1313                    ("expirable".into(), "yes".into()),
1314                ]
1315            ),
1316            None
1317        );
1318    }
1319
1320    #[test]
1321    fn evaluate_size_filters_gate_action() {
1322        let mut rule = LifecycleRule::expire_after_days("r", 1);
1323        rule.filter.object_size_greater_than = Some(1024);
1324        rule.filter.object_size_less_than = Some(10 * 1024);
1325        let m = manager_with("b", vec![rule]);
1326        // Inside the (1024, 10*1024) range → fire.
1327        assert_eq!(
1328            m.evaluate("b", "k", Duration::days(2), 4096, &[]),
1329            Some(LifecycleAction::Expire)
1330        );
1331        // At the boundary (size == greater_than) → strict `>`, no fire.
1332        assert_eq!(m.evaluate("b", "k", Duration::days(2), 1024, &[]), None);
1333        // Above the upper bound → no fire.
1334        assert_eq!(
1335            m.evaluate("b", "k", Duration::days(2), 100 * 1024, &[]),
1336            None
1337        );
1338    }
1339
1340    #[test]
1341    fn evaluate_transition_fires_before_expiration() {
1342        // Transition at 30d, expiration at 365d, age 60d → transition.
1343        let rule = enabled(LifecycleRule {
1344            id: "r".into(),
1345            status: LifecycleStatus::Enabled,
1346            filter: LifecycleFilter::default(),
1347            expiration_days: Some(365),
1348            expiration_date: None,
1349            transitions: vec![TransitionRule {
1350                days: 30,
1351                storage_class: "GLACIER_IR".into(),
1352            }],
1353            noncurrent_version_expiration_days: None,
1354            abort_incomplete_multipart_upload_days: None,
1355        });
1356        let m = manager_with("b", vec![rule]);
1357        let action = m.evaluate("b", "k", Duration::days(60), 1, &[]);
1358        assert_eq!(
1359            action,
1360            Some(LifecycleAction::Transition {
1361                storage_class: "GLACIER_IR".into(),
1362            })
1363        );
1364    }
1365
1366    #[test]
1367    fn evaluate_expiration_wins_when_threshold_is_earlier_than_transition() {
1368        // Expiration at 30d, transition at 90d, age 100d → expire (the
1369        // rule wants the object gone *before* it would have transitioned).
1370        let rule = enabled(LifecycleRule {
1371            id: "r".into(),
1372            status: LifecycleStatus::Enabled,
1373            filter: LifecycleFilter::default(),
1374            expiration_days: Some(30),
1375            expiration_date: None,
1376            transitions: vec![TransitionRule {
1377                days: 90,
1378                storage_class: "GLACIER".into(),
1379            }],
1380            noncurrent_version_expiration_days: None,
1381            abort_incomplete_multipart_upload_days: None,
1382        });
1383        let m = manager_with("b", vec![rule]);
1384        let action = m.evaluate("b", "k", Duration::days(100), 1, &[]);
1385        assert_eq!(action, Some(LifecycleAction::Expire));
1386    }
1387
1388    #[test]
1389    fn evaluate_disabled_rule_never_fires() {
1390        let mut rule = LifecycleRule::expire_after_days("r", 1);
1391        rule.status = LifecycleStatus::Disabled;
1392        let m = manager_with("b", vec![rule]);
1393        assert_eq!(m.evaluate("b", "k", Duration::days(365), 1, &[]), None);
1394    }
1395
1396    #[test]
1397    fn evaluate_unknown_bucket_returns_none() {
1398        let m = LifecycleManager::new();
1399        assert_eq!(m.evaluate("ghost", "k", Duration::days(365), 1, &[]), None);
1400    }
1401
1402    #[test]
1403    fn evaluate_noncurrent_version_expiration() {
1404        let rule = enabled(LifecycleRule {
1405            id: "r".into(),
1406            status: LifecycleStatus::Enabled,
1407            filter: LifecycleFilter::default(),
1408            expiration_days: None,
1409            expiration_date: None,
1410            transitions: vec![],
1411            noncurrent_version_expiration_days: Some(7),
1412            abort_incomplete_multipart_upload_days: None,
1413        });
1414        let m = manager_with("b", vec![rule]);
1415        // current-version path → no rule matches (no expiration_days set).
1416        assert_eq!(m.evaluate("b", "k", Duration::days(30), 1, &[]), None);
1417        // noncurrent path with age past 7d → expire.
1418        let action = m.evaluate_with_flags(
1419            "b",
1420            "k",
1421            Duration::days(8),
1422            1,
1423            &[],
1424            EvaluateFlags {
1425                is_noncurrent: true,
1426                now: None,
1427            },
1428        );
1429        assert_eq!(action, Some(LifecycleAction::Expire));
1430        // noncurrent path with age before 7d → no fire.
1431        let action = m.evaluate_with_flags(
1432            "b",
1433            "k",
1434            Duration::days(3),
1435            1,
1436            &[],
1437            EvaluateFlags {
1438                is_noncurrent: true,
1439                now: None,
1440            },
1441        );
1442        assert_eq!(action, None);
1443    }
1444
1445    #[test]
1446    fn evaluate_batch_distributes_actions_across_object_ages() {
1447        // Transition at 30d, expiration at 60d. Conflict resolver picks
1448        // expire iff `exp_days <= trans_days` for the chosen transition.
1449        // With exp=60, trans=30: at age 40-59 the transition fires; at
1450        // age >= 60 expiration wins (because exp_days=60 <= trans_days=30
1451        // is false, so... wait — re-read: the resolver compares
1452        // exp_threshold (60) vs trans_threshold (30) and triggers expire
1453        // ONLY when 60 <= 30, which is false → transition keeps winning
1454        // until both thresholds met but exp <= trans). For exp=60 trans=30
1455        // pair, transition always wins regardless of age (rule pattern is
1456        // "transition first, expire later" — the next scanner pass
1457        // picks up the expiration). So expect 4 transitions.
1458        let rule = enabled(LifecycleRule {
1459            id: "r".into(),
1460            status: LifecycleStatus::Enabled,
1461            filter: LifecycleFilter::default(),
1462            expiration_days: Some(60),
1463            expiration_date: None,
1464            transitions: vec![TransitionRule {
1465                days: 30,
1466                storage_class: "STANDARD_IA".into(),
1467            }],
1468            noncurrent_version_expiration_days: None,
1469            abort_incomplete_multipart_upload_days: None,
1470        });
1471        let m = manager_with("b", vec![rule]);
1472        let objects = vec![
1473            ("young".to_string(), Duration::days(10), 1u64, vec![]),
1474            ("middle".to_string(), Duration::days(40), 1u64, vec![]),
1475            ("middle2".to_string(), Duration::days(45), 1u64, vec![]),
1476            ("old".to_string(), Duration::days(90), 1u64, vec![]),
1477            ("ancient".to_string(), Duration::days(365), 1u64, vec![]),
1478        ];
1479        let actions = evaluate_batch(&m, "b", &objects);
1480        assert_eq!(actions.len(), 4);
1481        for (_, a) in &actions {
1482            assert!(matches!(a, LifecycleAction::Transition { .. }));
1483        }
1484    }
1485
1486    #[test]
1487    fn json_round_trip_preserves_rules() {
1488        let rule = enabled(LifecycleRule {
1489            id: "complex".into(),
1490            status: LifecycleStatus::Enabled,
1491            filter: LifecycleFilter {
1492                prefix: Some("logs/".into()),
1493                tags: vec![("env".into(), "prod".into())],
1494                object_size_greater_than: Some(1024),
1495                object_size_less_than: None,
1496            },
1497            expiration_days: Some(365),
1498            expiration_date: None,
1499            transitions: vec![TransitionRule {
1500                days: 30,
1501                storage_class: "STANDARD_IA".into(),
1502            }],
1503            noncurrent_version_expiration_days: Some(7),
1504            abort_incomplete_multipart_upload_days: Some(3),
1505        });
1506        let m = manager_with("b1", vec![rule.clone()]);
1507        let json = m.to_json().expect("to_json");
1508        let m2 = LifecycleManager::from_json(&json).expect("from_json");
1509        let cfg = m2.get("b1").expect("bucket survives roundtrip");
1510        assert_eq!(cfg.rules.len(), 1);
1511        assert_eq!(cfg.rules[0], rule);
1512    }
1513
1514    #[test]
1515    fn lifecycle_config_default_is_empty() {
1516        let cfg = LifecycleConfig::default();
1517        assert!(cfg.rules.is_empty());
1518    }
1519
1520    #[test]
1521    fn evaluate_batch_skips_locked_objects_at_caller_layer() {
1522        // The evaluator itself does not consult ObjectLock; the scanner
1523        // (and tests) are expected to filter locked keys out before /
1524        // after calling `evaluate_batch`. This test documents the
1525        // canonical pattern.
1526        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 1)]);
1527        let objects = vec![
1528            ("locked".to_string(), Duration::days(30), 1u64, vec![]),
1529            ("free".to_string(), Duration::days(30), 1u64, vec![]),
1530        ];
1531        let locked_keys: std::collections::HashSet<&str> = ["locked"].into_iter().collect();
1532        let raw = evaluate_batch(&m, "b", &objects);
1533        let filtered: Vec<_> = raw
1534            .into_iter()
1535            .filter(|(k, _)| !locked_keys.contains(k.as_str()))
1536            .collect();
1537        assert_eq!(filtered.len(), 1);
1538        assert_eq!(filtered[0].0, "free");
1539    }
1540
1541    #[test]
1542    fn record_action_bumps_per_bucket_counter() {
1543        let m = LifecycleManager::new();
1544        m.record_action("b", &LifecycleAction::Expire);
1545        m.record_action("b", &LifecycleAction::Expire);
1546        m.record_action(
1547            "b",
1548            &LifecycleAction::Transition {
1549                storage_class: "GLACIER".into(),
1550            },
1551        );
1552        m.record_action(
1553            "b",
1554            &LifecycleAction::AbortMultipartUpload {
1555                upload_id: "u-xyz".into(),
1556            },
1557        );
1558        let snap = m.actions_snapshot();
1559        assert_eq!(snap.get(&("b".into(), "expire".into())).copied(), Some(2));
1560        assert_eq!(
1561            snap.get(&("b".into(), "transition".into())).copied(),
1562            Some(1)
1563        );
1564        assert_eq!(
1565            snap.get(&("b".into(), "abort_incomplete_multipart".into()))
1566                .copied(),
1567            Some(1),
1568            "v0.8.3 #69: AbortMultipartUpload metric_label must bump \
1569             `abort_incomplete_multipart` counter",
1570        );
1571    }
1572
1573    // ---- v0.8.3 #69 (audit M-2): AbortIncompleteMultipartUpload --------
1574    //
1575    // Three unit tests covering the `evaluate_in_flight_multipart`
1576    // path: (a) age past threshold → AbortMultipartUpload, (b) age
1577    // before threshold → None, (c) the rule is `Disabled` → None
1578    // (a Disabled rule must never fire even on a stale upload).
1579    //
1580    // Test fixtures fake `now` and `initiated` so the assertion is
1581    // deterministic regardless of when the test runs.
1582
1583    fn abort_rule(id: &str, days: u32) -> LifecycleRule {
1584        LifecycleRule {
1585            id: id.into(),
1586            status: LifecycleStatus::Enabled,
1587            filter: LifecycleFilter::default(),
1588            expiration_days: None,
1589            expiration_date: None,
1590            transitions: Vec::new(),
1591            noncurrent_version_expiration_days: None,
1592            abort_incomplete_multipart_upload_days: Some(days),
1593        }
1594    }
1595
1596    /// Upload age 8 days, rule threshold 7 days → AbortMultipartUpload
1597    /// fires with the upload's `upload_id`.
1598    #[test]
1599    fn evaluate_in_flight_multipart_aborts_past_threshold() {
1600        let m = manager_with("b", vec![abort_rule("r", 7)]);
1601        let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1602            .expect("parse now")
1603            .with_timezone(&Utc);
1604        let initiated = now - Duration::days(8);
1605        let candidate = MultipartUploadCandidate {
1606            upload_id: "u-stale".into(),
1607            key: "uploads/big.bin".into(),
1608            initiated,
1609            tags: Vec::new(),
1610        };
1611        let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1612        assert_eq!(
1613            action,
1614            Some(LifecycleAction::AbortMultipartUpload {
1615                upload_id: "u-stale".into(),
1616            }),
1617        );
1618    }
1619
1620    /// Upload age 1 day, rule threshold 7 days → no fire (upload is
1621    /// fresh enough to keep around).
1622    #[test]
1623    fn evaluate_in_flight_multipart_keeps_recent_upload() {
1624        let m = manager_with("b", vec![abort_rule("r", 7)]);
1625        let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1626            .expect("parse now")
1627            .with_timezone(&Utc);
1628        let initiated = now - Duration::days(1);
1629        let candidate = MultipartUploadCandidate {
1630            upload_id: "u-fresh".into(),
1631            key: "uploads/big.bin".into(),
1632            initiated,
1633            tags: Vec::new(),
1634        };
1635        let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1636        assert_eq!(action, None);
1637    }
1638
1639    /// `Disabled` rule must never fire even when the upload is well
1640    /// past the threshold — Disabled means the operator is staging the
1641    /// rule (preview / dry-run), the action must wait for Enable.
1642    #[test]
1643    fn evaluate_in_flight_multipart_skips_disabled_rule() {
1644        let mut rule = abort_rule("r", 1);
1645        rule.status = LifecycleStatus::Disabled;
1646        let m = manager_with("b", vec![rule]);
1647        let now = chrono::DateTime::parse_from_rfc3339("2026-05-14T00:00:00Z")
1648            .expect("parse now")
1649            .with_timezone(&Utc);
1650        let initiated = now - Duration::days(365);
1651        let candidate = MultipartUploadCandidate {
1652            upload_id: "u-ancient".into(),
1653            key: "uploads/big.bin".into(),
1654            initiated,
1655            tags: Vec::new(),
1656        };
1657        let action = m.evaluate_in_flight_multipart("b", &candidate, now);
1658        assert_eq!(
1659            action, None,
1660            "Disabled rule must not abort even a 365-day-old upload",
1661        );
1662    }
1663
1664    // ---- v0.7 #45: scanner runner tests --------------------------------
1665    //
1666    // These tests stand up an in-memory `S4Service` over a tiny
1667    // `ScannerMemBackend` (separate from the larger `MemoryBackend` in
1668    // `tests/roundtrip.rs` so this module stays self-contained). The
1669    // backend implements only the four `S3` methods the scanner touches:
1670    // `put_object`, `head_object`, `delete_object`, `list_objects_v2`.
1671    // Tags are exercised via the optional `with_tagging(...)` manager.
1672    //
1673    // Object age is faked by setting an `expire_after_days(0)` rule, so
1674    // any object whose backend-recorded `last_modified` is at or before
1675    // "now" matches — sidesteps the `head_object`/`Timestamp` parsing
1676    // entirely (and matches the canonical "operator just put the bucket
1677    // on aggressive expiration" test path).
1678
1679    use std::collections::HashMap;
1680    use std::sync::Mutex as StdMutex;
1681
1682    use bytes::Bytes;
1683    use s3s::dto as dto2;
1684    use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1685    use s4_codec::dispatcher::AlwaysDispatcher;
1686    use s4_codec::passthrough::Passthrough;
1687    use s4_codec::{CodecKind, CodecRegistry};
1688
1689    use crate::S4Service;
1690    use crate::object_lock::{LockMode, ObjectLockManager, ObjectLockState};
1691
1692    #[derive(Default)]
1693    struct ScannerMemBackend {
1694        objects: StdMutex<HashMap<(String, String), ScannerStored>>,
1695        /// v0.8.3 #69: in-flight multipart uploads keyed by
1696        /// `(bucket, upload_id)`. Tests seed entries via
1697        /// `put_multipart_upload(...)` so the lifecycle scanner's
1698        /// `list_multipart_uploads` walk has something to consume.
1699        multipart_uploads: StdMutex<HashMap<(String, String), ScannerMultipart>>,
1700    }
1701
1702    #[derive(Clone)]
1703    struct ScannerStored {
1704        body: Bytes,
1705        last_modified: dto2::Timestamp,
1706    }
1707
1708    /// v0.8.3 #69: minimal multipart-upload record the test backend
1709    /// returns from `list_multipart_uploads`. `initiated` is a
1710    /// `chrono::DateTime<Utc>` so tests can fake an old upload by
1711    /// passing `Utc::now() - Duration::days(N)` directly (no
1712    /// SystemTime arithmetic).
1713    #[derive(Clone)]
1714    struct ScannerMultipart {
1715        key: String,
1716        initiated: chrono::DateTime<Utc>,
1717    }
1718
1719    impl ScannerMemBackend {
1720        fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1721            self.objects.lock().unwrap().insert(
1722                (bucket.to_owned(), key.to_owned()),
1723                ScannerStored {
1724                    body,
1725                    last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1726                },
1727            );
1728        }
1729
1730        /// v0.8.3 #69: seed an in-flight multipart upload the
1731        /// lifecycle scanner can then walk + abort.
1732        fn put_multipart_upload(
1733            &self,
1734            bucket: &str,
1735            upload_id: &str,
1736            key: &str,
1737            initiated: chrono::DateTime<Utc>,
1738        ) {
1739            self.multipart_uploads.lock().unwrap().insert(
1740                (bucket.to_owned(), upload_id.to_owned()),
1741                ScannerMultipart {
1742                    key: key.to_owned(),
1743                    initiated,
1744                },
1745            );
1746        }
1747    }
1748
1749    #[async_trait::async_trait]
1750    impl S3 for ScannerMemBackend {
1751        async fn put_object(
1752            &self,
1753            req: S3Request<dto2::PutObjectInput>,
1754        ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1755            self.put_now(&req.input.bucket, &req.input.key, Bytes::new());
1756            Ok(S3Response::new(dto2::PutObjectOutput::default()))
1757        }
1758
1759        async fn head_object(
1760            &self,
1761            req: S3Request<dto2::HeadObjectInput>,
1762        ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1763            let key = (req.input.bucket.clone(), req.input.key.clone());
1764            let lock = self.objects.lock().unwrap();
1765            let stored = lock
1766                .get(&key)
1767                .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1768            Ok(S3Response::new(dto2::HeadObjectOutput {
1769                content_length: Some(stored.body.len() as i64),
1770                last_modified: Some(stored.last_modified.clone()),
1771                ..Default::default()
1772            }))
1773        }
1774
1775        async fn delete_object(
1776            &self,
1777            req: S3Request<dto2::DeleteObjectInput>,
1778        ) -> S3Result<S3Response<dto2::DeleteObjectOutput>> {
1779            let key = (req.input.bucket.clone(), req.input.key.clone());
1780            self.objects.lock().unwrap().remove(&key);
1781            Ok(S3Response::new(dto2::DeleteObjectOutput::default()))
1782        }
1783
1784        async fn list_objects_v2(
1785            &self,
1786            req: S3Request<dto2::ListObjectsV2Input>,
1787        ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1788            let prefix = req.input.bucket.clone();
1789            let lock = self.objects.lock().unwrap();
1790            let mut contents: Vec<dto2::Object> = lock
1791                .iter()
1792                .filter(|((b, _), _)| b == &prefix)
1793                .map(|((_, k), v)| dto2::Object {
1794                    key: Some(k.clone()),
1795                    size: Some(v.body.len() as i64),
1796                    last_modified: Some(v.last_modified.clone()),
1797                    ..Default::default()
1798                })
1799                .collect();
1800            contents.sort_by(|a, b| a.key.cmp(&b.key));
1801            let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1802            Ok(S3Response::new(dto2::ListObjectsV2Output {
1803                name: Some(prefix),
1804                contents: Some(contents),
1805                key_count: Some(key_count),
1806                is_truncated: Some(false),
1807                ..Default::default()
1808            }))
1809        }
1810
1811        async fn copy_object(
1812            &self,
1813            _req: S3Request<dto2::CopyObjectInput>,
1814        ) -> S3Result<S3Response<dto2::CopyObjectOutput>> {
1815            // Transition path: scanner copies same-key with new
1816            // storage_class. The mem backend doesn't track storage
1817            // class, so it's a no-op success — exactly the AWS-side
1818            // behaviour for a backend that ignores the field.
1819            Ok(S3Response::new(dto2::CopyObjectOutput::default()))
1820        }
1821
1822        // ---- v0.8.3 #69: multipart abort path -----------------------
1823        //
1824        // The lifecycle scanner walks `list_multipart_uploads` per
1825        // bucket and calls `abort_multipart_upload` on every upload
1826        // whose `Initiated` time is past the rule's threshold. The
1827        // test backend returns the seeded entries on listing and
1828        // drops them on abort so post-conditions are observable.
1829
1830        async fn list_multipart_uploads(
1831            &self,
1832            req: S3Request<dto2::ListMultipartUploadsInput>,
1833        ) -> S3Result<S3Response<dto2::ListMultipartUploadsOutput>> {
1834            let bucket = req.input.bucket.clone();
1835            let lock = self.multipart_uploads.lock().unwrap();
1836            let mut uploads: Vec<dto2::MultipartUpload> = lock
1837                .iter()
1838                .filter(|((b, _), _)| b == &bucket)
1839                .map(|((_, upload_id), v)| {
1840                    let st: std::time::SystemTime = v.initiated.into();
1841                    dto2::MultipartUpload {
1842                        upload_id: Some(upload_id.clone()),
1843                        key: Some(v.key.clone()),
1844                        initiated: Some(dto2::Timestamp::from(st)),
1845                        ..Default::default()
1846                    }
1847                })
1848                .collect();
1849            // Stable order so test assertions on count + post-condition
1850            // do not race on the HashMap iteration order.
1851            uploads.sort_by(|a, b| a.upload_id.cmp(&b.upload_id));
1852            Ok(S3Response::new(dto2::ListMultipartUploadsOutput {
1853                bucket: Some(bucket),
1854                uploads: Some(uploads),
1855                is_truncated: Some(false),
1856                ..Default::default()
1857            }))
1858        }
1859
1860        async fn abort_multipart_upload(
1861            &self,
1862            req: S3Request<dto2::AbortMultipartUploadInput>,
1863        ) -> S3Result<S3Response<dto2::AbortMultipartUploadOutput>> {
1864            let bucket = req.input.bucket.clone();
1865            let upload_id = req.input.upload_id.clone();
1866            self.multipart_uploads
1867                .lock()
1868                .unwrap()
1869                .remove(&(bucket, upload_id));
1870            Ok(S3Response::new(dto2::AbortMultipartUploadOutput::default()))
1871        }
1872    }
1873
1874    fn make_service() -> Arc<S4Service<ScannerMemBackend>> {
1875        let registry =
1876            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
1877        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1878        Arc::new(S4Service::new(
1879            ScannerMemBackend::default(),
1880            registry,
1881            dispatcher,
1882        ))
1883    }
1884
1885    #[tokio::test]
1886    async fn run_scan_once_no_lifecycle_manager_returns_empty_report() {
1887        // Service has no lifecycle manager attached — scanner must
1888        // no-op cleanly (operator might not have set
1889        // `--lifecycle-state-file`). Also covers the empty-buckets
1890        // path in `run_scan_once`.
1891        let s4 = make_service();
1892        let report = run_scan_once(&s4).await.expect("scan");
1893        assert_eq!(report, ScanReport::default());
1894
1895        // And: lifecycle manager attached but no buckets configured.
1896        let mgr = Arc::new(LifecycleManager::new());
1897        let backend = ScannerMemBackend::default();
1898        let registry =
1899            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
1900        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1901        let s4_empty = Arc::new(S4Service::new(backend, registry, dispatcher).with_lifecycle(mgr));
1902        let report = run_scan_once(&s4_empty).await.expect("scan empty");
1903        assert_eq!(report, ScanReport::default());
1904    }
1905
1906    #[tokio::test]
1907    async fn run_scan_once_expires_matching_objects_via_backend() {
1908        // Three objects: only "stale.log" matches the rule (prefix
1909        // gating). The other two are written but not under the prefix,
1910        // so the evaluator returns None for them.
1911        let backend = ScannerMemBackend::default();
1912        backend.put_now("b", "stale.log", Bytes::from_static(b"x"));
1913        backend.put_now("b", "data/keep1.bin", Bytes::from_static(b"y"));
1914        backend.put_now("b", "data/keep2.bin", Bytes::from_static(b"z"));
1915        // Rule: any object under `stale.` prefix is expired immediately
1916        // (`expire_after_days(0)` matches age >= 0d, which is every
1917        // backend object).
1918        let mgr = Arc::new(LifecycleManager::new());
1919        let mut rule = LifecycleRule::expire_after_days("r", 0);
1920        rule.filter.prefix = Some("stale.".into());
1921        mgr.put("b", LifecycleConfig { rules: vec![rule] });
1922        let registry =
1923            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
1924        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1925        let s4 = Arc::new(
1926            S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
1927        );
1928
1929        let report = run_scan_once(&s4).await.expect("scan");
1930        assert_eq!(report.buckets_scanned, 1);
1931        assert_eq!(report.objects_evaluated, 3);
1932        assert_eq!(report.expired, 1);
1933        assert_eq!(report.transitioned, 0);
1934        assert_eq!(report.skipped_locked, 0);
1935        assert_eq!(report.action_errors, 0);
1936        // Backend post-condition: the matching key is gone, the others
1937        // remain. Read back through the service's own list_objects_v2
1938        // path (which is also what the customer-visible HTTP layer
1939        // serves) so we exercise the same code the scanner walked.
1940        let req = synthetic_request(
1941            ListObjectsV2Input {
1942                bucket: "b".into(),
1943                ..Default::default()
1944            },
1945            http::Method::GET,
1946            "/b?list-type=2",
1947        );
1948        let resp = s4
1949            .as_ref()
1950            .list_objects_v2(req)
1951            .await
1952            .expect("post-scan list");
1953        let keys: Vec<String> = resp
1954            .output
1955            .contents
1956            .unwrap_or_default()
1957            .into_iter()
1958            .filter_map(|o| o.key)
1959            .collect();
1960        assert!(!keys.contains(&"stale.log".to_string()));
1961        assert!(keys.contains(&"data/keep1.bin".to_string()));
1962        assert!(keys.contains(&"data/keep2.bin".to_string()));
1963        // Lifecycle action counter: one Expire bumped on bucket "b".
1964        let snap = mgr.actions_snapshot();
1965        assert_eq!(snap.get(&("b".into(), "expire".into())).copied(), Some(1));
1966    }
1967
1968    #[tokio::test]
1969    async fn run_scan_once_skips_object_lock_protected_keys() {
1970        let backend = ScannerMemBackend::default();
1971        backend.put_now("b", "locked.log", Bytes::from_static(b"x"));
1972        backend.put_now("b", "free.log", Bytes::from_static(b"y"));
1973        let registry =
1974            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
1975        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1976        let mgr = Arc::new(LifecycleManager::new());
1977        // Aggressive: every object expires immediately.
1978        mgr.put(
1979            "b",
1980            LifecycleConfig {
1981                rules: vec![LifecycleRule::expire_after_days("r", 0)],
1982            },
1983        );
1984        let lock_mgr = Arc::new(ObjectLockManager::new());
1985        // Lock retains "locked.log" until the year 2099 — Compliance
1986        // mode means even Governance bypass cannot delete it.
1987        let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
1988            .expect("parse")
1989            .with_timezone(&Utc);
1990        lock_mgr.set(
1991            "b",
1992            "locked.log",
1993            ObjectLockState {
1994                mode: Some(LockMode::Compliance),
1995                retain_until: Some(retain_until),
1996                legal_hold_on: false,
1997            },
1998        );
1999        let s4 = Arc::new(
2000            S4Service::new(backend, registry, dispatcher)
2001                .with_lifecycle(Arc::clone(&mgr))
2002                .with_object_lock(Arc::clone(&lock_mgr)),
2003        );
2004
2005        let report = run_scan_once(&s4).await.expect("scan");
2006        assert_eq!(report.buckets_scanned, 1);
2007        assert_eq!(report.objects_evaluated, 2);
2008        assert_eq!(report.expired, 1, "free.log should have been expired");
2009        assert_eq!(report.skipped_locked, 1, "locked.log must be skipped");
2010        assert_eq!(report.action_errors, 0);
2011    }
2012
2013    /// v0.8.3 #65 (audit C-2): full scanner walk with a mix of free
2014    /// and locked objects must (a) leave outer/free objects expired,
2015    /// (b) skip the middle locked object, (c) bump
2016    /// `ScanReport::skipped_locked`, and (d) emit a Prometheus
2017    /// `s4_lifecycle_actions_total{action="skipped_locked"}` sample.
2018    /// Previously (v0.7 #45) the skip path bumped only the in-report
2019    /// counter — operator dashboards saw no signal when Object Lock
2020    /// vetoed a Lifecycle Expiration, which is the silent-failure
2021    /// observability gap audit C-2 called out.
2022    #[tokio::test]
2023    async fn scan_one_config_skips_locked_objects_and_bumps_metric() {
2024        // The Prometheus recorder is a process-global slot. Multiple
2025        // tests in the same binary race on `install_recorder()`, so
2026        // we route through `crate::metrics::test_metrics_handle()`
2027        // which is OnceLock-guarded and shared with the
2028        // `metrics::tests::install_and_render_basic_counters` test.
2029        // Use a unique bucket label so this test's sample line is
2030        // identifiable even when other tests in the binary also bump
2031        // the lifecycle counter under different bucket labels.
2032        let metrics_handle = crate::metrics::test_metrics_handle();
2033
2034        let bucket = "lc-locked-metric-65";
2035        let backend = ScannerMemBackend::default();
2036        // Three objects; the middle one ("middle.log") will be
2037        // Object-Lock-Compliance-locked until 2099. The two outer
2038        // objects ("outer-a.log", "outer-c.log") have no lock state
2039        // attached, so the aggressive `expire_after_days(0)` rule
2040        // matches and the scanner's `delete_object` actually fires.
2041        backend.put_now(bucket, "outer-a.log", Bytes::from_static(b"a"));
2042        backend.put_now(bucket, "middle.log", Bytes::from_static(b"m"));
2043        backend.put_now(bucket, "outer-c.log", Bytes::from_static(b"c"));
2044
2045        let registry =
2046            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
2047        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2048        let mgr = Arc::new(LifecycleManager::new());
2049        mgr.put(
2050            bucket,
2051            LifecycleConfig {
2052                rules: vec![LifecycleRule::expire_after_days("r", 1)],
2053            },
2054        );
2055        // Object-Lock Compliance retain until far in the future (2099).
2056        // `is_locked(now)` then returns `true` regardless of when the
2057        // test actually runs.
2058        let lock_mgr = Arc::new(ObjectLockManager::new());
2059        let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
2060            .expect("parse retain_until")
2061            .with_timezone(&Utc);
2062        lock_mgr.set(
2063            bucket,
2064            "middle.log",
2065            ObjectLockState {
2066                mode: Some(LockMode::Compliance),
2067                retain_until: Some(retain_until),
2068                legal_hold_on: false,
2069            },
2070        );
2071        let s4 = Arc::new(
2072            S4Service::new(backend, registry, dispatcher)
2073                .with_lifecycle(Arc::clone(&mgr))
2074                .with_object_lock(Arc::clone(&lock_mgr)),
2075        );
2076
2077        // The objects above were `put_now(...)` with `last_modified =
2078        // SystemTime::now()`, so their computed `age` is roughly zero
2079        // and the `expire_after_days(1)` rule alone would NOT match.
2080        // Force the rule threshold down to zero days so all three
2081        // objects qualify for Expiration — the test is about the Lock
2082        // veto, not the age math.
2083        mgr.put(
2084            bucket,
2085            LifecycleConfig {
2086                rules: vec![LifecycleRule::expire_after_days("r", 0)],
2087            },
2088        );
2089
2090        let report = run_scan_once(&s4).await.expect("scan");
2091        assert_eq!(report.buckets_scanned, 1);
2092        assert_eq!(report.objects_evaluated, 3);
2093        assert_eq!(
2094            report.expired, 2,
2095            "outer-a.log + outer-c.log must be DELETEd; got {report:?}"
2096        );
2097        assert_eq!(
2098            report.skipped_locked, 1,
2099            "middle.log is Compliance-locked → scanner must skip; got {report:?}"
2100        );
2101        assert_eq!(report.transitioned, 0);
2102        assert_eq!(report.action_errors, 0);
2103
2104        // Render the Prometheus exporter and assert that a sample line
2105        // for `s4_lifecycle_actions_total{...action="skipped_locked",
2106        // bucket="lc-locked-metric-65"...}` is present with value >= 1.
2107        // The metrics-exporter-prometheus crate sorts labels
2108        // alphabetically (`bucket` appears before `action` in the
2109        // rendered output), so we substring-match both label fragments
2110        // rather than rely on a fixed ordering. We use `>=` (not
2111        // `==`) because the recorder is process-global and a parallel
2112        // run of the same test in a future session could legitimately
2113        // bump it again — but since the bucket label embeds an
2114        // issue-unique suffix, no other test in this binary touches
2115        // this specific (action, bucket) pair.
2116        let rendered = metrics_handle.render();
2117        let bucket_frag = format!("bucket=\"{bucket}\"");
2118        let action_frag = "action=\"skipped_locked\"";
2119        let line = rendered
2120            .lines()
2121            .find(|l| {
2122                l.starts_with("s4_lifecycle_actions_total{")
2123                    && l.contains(&bucket_frag)
2124                    && l.contains(action_frag)
2125            })
2126            .unwrap_or_else(|| {
2127                panic!(
2128                    "Prometheus output missing skipped_locked sample for {bucket}; \
2129                     full render:\n{rendered}"
2130                )
2131            });
2132        // Parse the trailing counter value (whitespace-separated).
2133        let value: u64 = line
2134            .split_whitespace()
2135            .next_back()
2136            .expect("counter value column")
2137            .parse()
2138            .expect("counter value is u64");
2139        assert!(
2140            value >= 1,
2141            "skipped_locked counter must be >= 1 after scan; line: {line}"
2142        );
2143    }
2144
2145    /// v0.8.3 #69 (audit M-2): end-to-end test of the multipart sweep.
2146    /// Two in-flight uploads are seeded — `u-stale` initiated 8 days
2147    /// ago, `u-fresh` initiated 1 hour ago. The lifecycle rule sets
2148    /// `abort_incomplete_multipart_upload_days = 7`. The scanner must
2149    /// abort `u-stale` (bumping `report.aborted_multipart`) but leave
2150    /// `u-fresh` alone. Object walk is a no-op (no objects seeded), so
2151    /// the report's expire / transition counters stay at zero.
2152    #[tokio::test]
2153    async fn run_scan_once_aborts_stale_multipart_upload() {
2154        let backend = ScannerMemBackend::default();
2155        let bucket = "lc-mp-69";
2156        let now = Utc::now();
2157        backend.put_multipart_upload(
2158            bucket,
2159            "u-stale",
2160            "uploads/big.bin",
2161            now - Duration::days(8),
2162        );
2163        backend.put_multipart_upload(
2164            bucket,
2165            "u-fresh",
2166            "uploads/fresh.bin",
2167            now - Duration::hours(1),
2168        );
2169
2170        let registry =
2171            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
2172        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2173        let mgr = Arc::new(LifecycleManager::new());
2174        let mut rule = LifecycleRule {
2175            id: "abort-7d".into(),
2176            status: LifecycleStatus::Enabled,
2177            filter: LifecycleFilter::default(),
2178            expiration_days: None,
2179            expiration_date: None,
2180            transitions: Vec::new(),
2181            noncurrent_version_expiration_days: None,
2182            abort_incomplete_multipart_upload_days: Some(7),
2183        };
2184        rule.filter.prefix = Some("uploads/".into());
2185        mgr.put(bucket, LifecycleConfig { rules: vec![rule] });
2186        let s4 = Arc::new(
2187            S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
2188        );
2189
2190        let report = run_scan_once(&s4).await.expect("scan");
2191        assert_eq!(report.buckets_scanned, 1);
2192        assert_eq!(
2193            report.aborted_multipart, 1,
2194            "u-stale must be aborted; got {report:?}",
2195        );
2196        assert_eq!(report.action_errors, 0);
2197        assert_eq!(report.expired, 0);
2198        assert_eq!(report.transitioned, 0);
2199
2200        // Backend post-condition via the wire-side
2201        // `list_multipart_uploads` path: only the fresh upload
2202        // (`u-fresh`) survives — `u-stale` was aborted by the
2203        // scanner.
2204        let post_req = synthetic_request(
2205            ListMultipartUploadsInput {
2206                bucket: bucket.into(),
2207                ..Default::default()
2208            },
2209            http::Method::GET,
2210            &format!("/{bucket}?uploads"),
2211        );
2212        let post = s4
2213            .as_ref()
2214            .list_multipart_uploads(post_req)
2215            .await
2216            .expect("post-scan list_multipart_uploads");
2217        let remaining_ids: Vec<String> = post
2218            .output
2219            .uploads
2220            .unwrap_or_default()
2221            .into_iter()
2222            .filter_map(|u| u.upload_id)
2223            .collect();
2224        assert_eq!(
2225            remaining_ids,
2226            vec!["u-fresh".to_string()],
2227            "exactly u-fresh must remain after the sweep; got {remaining_ids:?}",
2228        );
2229
2230        // Counter snapshot agrees with the report.
2231        let snap = mgr.actions_snapshot();
2232        assert_eq!(
2233            snap.get(&(bucket.into(), "abort_incomplete_multipart".into()))
2234                .copied(),
2235            Some(1),
2236            "v0.8.3 #69: abort_incomplete_multipart counter must be 1",
2237        );
2238    }
2239
2240    // ---- v0.8.4 #78 (audit M3): pagination guard hardening ------------
2241    //
2242    // The two backends below intentionally return malformed truncated
2243    // responses on the FIRST call to the offending list endpoint:
2244    //   * `MalformedListObjectsBackend.list_objects_v2` returns
2245    //     `is_truncated=true, next_continuation_token=None`
2246    //   * `MalformedListMultipartBackend.list_multipart_uploads`
2247    //     returns `is_truncated=true, next_key_marker=None,
2248    //     next_upload_id_marker=None`
2249    // Each backend tracks call count via a `StdMutex<u32>`. If the
2250    // pagination guard fails to break, the second iteration re-issues
2251    // the same request and the backend `panic!()`s — turning the
2252    // infinite loop into a deterministic test failure (and avoiding
2253    // the alternative "test hangs forever" outcome which would block
2254    // CI).
2255    //
2256    // Both backends pair the malformed list with a benign no-op for
2257    // the OTHER list endpoint so `run_scan_once` (which always invokes
2258    // both `scan_bucket` and `scan_in_flight_multipart_uploads`) does
2259    // not collide with the path under test.
2260
2261    use std::sync::atomic::{AtomicU32, Ordering};
2262
2263    /// Backend whose `list_objects_v2` is malformed (`is_truncated=true`
2264    /// with `next_continuation_token=None`); `list_multipart_uploads`
2265    /// is a benign empty non-truncated response. A second
2266    /// `list_objects_v2` call panics — proving the v0.8.4 #78 guard
2267    /// short-circuits the pagination loop.
2268    #[derive(Default)]
2269    struct MalformedListObjectsBackend {
2270        list_calls: AtomicU32,
2271    }
2272
2273    #[async_trait::async_trait]
2274    impl S3 for MalformedListObjectsBackend {
2275        async fn list_objects_v2(
2276            &self,
2277            req: S3Request<dto2::ListObjectsV2Input>,
2278        ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
2279            let n = self.list_calls.fetch_add(1, Ordering::SeqCst);
2280            assert!(
2281                n == 0,
2282                "v0.8.4 #78: list_objects_v2 must be called exactly \
2283                 once when the guard fires; got call #{} which means \
2284                 the pagination loop did not break on a missing \
2285                 next_continuation_token",
2286                n + 1
2287            );
2288            Ok(S3Response::new(dto2::ListObjectsV2Output {
2289                name: Some(req.input.bucket.clone()),
2290                contents: Some(Vec::new()),
2291                key_count: Some(0),
2292                is_truncated: Some(true),
2293                next_continuation_token: None,
2294                ..Default::default()
2295            }))
2296        }
2297
2298        async fn list_multipart_uploads(
2299            &self,
2300            req: S3Request<dto2::ListMultipartUploadsInput>,
2301        ) -> S3Result<S3Response<dto2::ListMultipartUploadsOutput>> {
2302            // Benign: empty + not-truncated. The multipart path is not
2303            // under test here; we just need it to no-op cleanly so
2304            // `run_scan_once` walks both and the assertion isolates
2305            // the object-list guard.
2306            Ok(S3Response::new(dto2::ListMultipartUploadsOutput {
2307                bucket: Some(req.input.bucket),
2308                uploads: Some(Vec::new()),
2309                is_truncated: Some(false),
2310                ..Default::default()
2311            }))
2312        }
2313    }
2314
2315    /// Backend whose `list_multipart_uploads` is malformed
2316    /// (`is_truncated=true` with both `next_key_marker=None` and
2317    /// `next_upload_id_marker=None`); `list_objects_v2` is benign.
2318    /// Second `list_multipart_uploads` call panics.
2319    #[derive(Default)]
2320    struct MalformedListMultipartBackend {
2321        mp_calls: AtomicU32,
2322    }
2323
2324    #[async_trait::async_trait]
2325    impl S3 for MalformedListMultipartBackend {
2326        async fn list_objects_v2(
2327            &self,
2328            req: S3Request<dto2::ListObjectsV2Input>,
2329        ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
2330            // Benign: empty + not-truncated.
2331            Ok(S3Response::new(dto2::ListObjectsV2Output {
2332                name: Some(req.input.bucket),
2333                contents: Some(Vec::new()),
2334                key_count: Some(0),
2335                is_truncated: Some(false),
2336                ..Default::default()
2337            }))
2338        }
2339
2340        async fn list_multipart_uploads(
2341            &self,
2342            req: S3Request<dto2::ListMultipartUploadsInput>,
2343        ) -> S3Result<S3Response<dto2::ListMultipartUploadsOutput>> {
2344            let n = self.mp_calls.fetch_add(1, Ordering::SeqCst);
2345            assert!(
2346                n == 0,
2347                "v0.8.4 #78: list_multipart_uploads must be called \
2348                 exactly once when the guard fires; got call #{} \
2349                 which means the pagination loop did not break on \
2350                 missing (next_key_marker, next_upload_id_marker)",
2351                n + 1
2352            );
2353            Ok(S3Response::new(dto2::ListMultipartUploadsOutput {
2354                bucket: Some(req.input.bucket),
2355                uploads: Some(Vec::new()),
2356                is_truncated: Some(true),
2357                next_key_marker: None,
2358                next_upload_id_marker: None,
2359                ..Default::default()
2360            }))
2361        }
2362    }
2363
2364    /// v0.8.4 #78 (audit M3): a backend that lies about
2365    /// `list_objects_v2` truncation (sets `is_truncated=true` but omits
2366    /// `next_continuation_token`) must NOT cause the lifecycle scanner
2367    /// to spin. The guard in `scan_bucket` warn-logs and breaks the
2368    /// loop after the first malformed page; the backend's call counter
2369    /// + assertion turns any regression into an immediate panic
2370    /// instead of a test hang. The scan completes cleanly with
2371    /// `buckets_scanned = 1` and no actions taken (the malformed page
2372    /// has zero contents).
2373    #[tokio::test]
2374    async fn scan_handles_truncated_with_missing_marker_without_infinite_loop() {
2375        let backend = MalformedListObjectsBackend::default();
2376        let bucket = "lc-malformed-list-78";
2377        let registry =
2378            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
2379        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2380        let mgr = Arc::new(LifecycleManager::new());
2381        // Any rule will do — the malformed listing returns zero
2382        // contents so the evaluator never sees a key. We just need
2383        // the bucket to be in `mgr.buckets()` so `scan_bucket` runs.
2384        mgr.put(
2385            bucket,
2386            LifecycleConfig {
2387                rules: vec![LifecycleRule::expire_after_days("r", 0)],
2388            },
2389        );
2390        let s4 = Arc::new(
2391            S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
2392        );
2393
2394        // The decisive assertion is "this future completes" — if the
2395        // guard regressed, the second `list_objects_v2` call panics
2396        // (per the backend's `assert!`) and the test fails. We also
2397        // sanity-check the report shape: scanner saw the bucket but
2398        // took no actions (zero contents in the malformed page).
2399        let report = run_scan_once(&s4).await.expect("scan");
2400        assert_eq!(report.buckets_scanned, 1);
2401        assert_eq!(report.objects_evaluated, 0);
2402        assert_eq!(report.expired, 0);
2403        assert_eq!(report.transitioned, 0);
2404        assert_eq!(report.action_errors, 0);
2405    }
2406
2407    /// v0.8.4 #78 (audit M3): same guarantee for the multipart sweep
2408    /// — a backend that returns `is_truncated=true` with both
2409    /// `next_key_marker=None` and `next_upload_id_marker=None` must
2410    /// NOT cause `scan_in_flight_multipart_uploads` to spin. Second
2411    /// `list_multipart_uploads` call panics if the guard regresses.
2412    #[tokio::test]
2413    async fn scan_multipart_handles_truncated_with_missing_marker_without_infinite_loop() {
2414        let backend = MalformedListMultipartBackend::default();
2415        let bucket = "lc-malformed-mp-78";
2416        let registry =
2417            Arc::new(CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)));
2418        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
2419        let mgr = Arc::new(LifecycleManager::new());
2420        // Rule with `abort_incomplete_multipart_upload_days = Some(7)`
2421        // so the multipart-evaluator path is reachable (the rule body
2422        // is otherwise irrelevant — the malformed listing has zero
2423        // uploads).
2424        let mut rule = LifecycleRule {
2425            id: "abort-7d".into(),
2426            status: LifecycleStatus::Enabled,
2427            filter: LifecycleFilter::default(),
2428            expiration_days: None,
2429            expiration_date: None,
2430            transitions: Vec::new(),
2431            noncurrent_version_expiration_days: None,
2432            abort_incomplete_multipart_upload_days: Some(7),
2433        };
2434        rule.filter.prefix = Some("uploads/".into());
2435        mgr.put(bucket, LifecycleConfig { rules: vec![rule] });
2436        let s4 = Arc::new(
2437            S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
2438        );
2439
2440        let report = run_scan_once(&s4).await.expect("scan");
2441        assert_eq!(report.buckets_scanned, 1);
2442        assert_eq!(report.aborted_multipart, 0);
2443        assert_eq!(report.action_errors, 0);
2444    }
2445
2446    /// v0.8.4 #77 (audit H-8): a panic inside the `by_bucket` write
2447    /// guard poisons the lock. `to_json` must recover via
2448    /// [`crate::lock_recovery::recover_read`] and surface the data
2449    /// instead of re-panicking on the SIGUSR1 dump-back path.
2450    #[test]
2451    fn lifecycle_to_json_after_panic_recovers_via_poison() {
2452        let mgr = std::sync::Arc::new(LifecycleManager::new());
2453        mgr.put(
2454            "b",
2455            LifecycleConfig {
2456                rules: vec![LifecycleRule {
2457                    id: "r1".into(),
2458                    status: LifecycleStatus::Enabled,
2459                    filter: LifecycleFilter::default(),
2460                    expiration_days: Some(30),
2461                    expiration_date: None,
2462                    transitions: Vec::new(),
2463                    noncurrent_version_expiration_days: None,
2464                    abort_incomplete_multipart_upload_days: None,
2465                }],
2466            },
2467        );
2468        let mgr_cl = std::sync::Arc::clone(&mgr);
2469        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
2470            let mut g = mgr_cl.by_bucket.write().expect("clean lock");
2471            g.entry("b2".into()).or_default();
2472            panic!("force-poison");
2473        }));
2474        assert!(
2475            mgr.by_bucket.is_poisoned(),
2476            "write panic must poison by_bucket lock"
2477        );
2478        let json = mgr.to_json().expect("to_json after poison must succeed");
2479        let mgr2 = LifecycleManager::from_json(&json).expect("from_json");
2480        assert!(
2481            mgr2.get("b").is_some(),
2482            "recovered snapshot keeps original config"
2483        );
2484    }
2485}