Skip to main content

s4_server/
lifecycle.rs

1//! S3 Lifecycle execution — per-bucket rule evaluation + manager skeleton
2//! (v0.6 #37).
3//!
4//! AWS S3 Lifecycle attaches a **list of rules** to a bucket; each rule may
5//! request that S3
6//!
7//! 1. **Expire** an object once its age (or the calendar date) crosses a
8//!    threshold (`Expiration { Days | Date }`),
9//! 2. **Transition** an object to a different storage class (`Transition
10//!    { Days, StorageClass }` — `STANDARD_IA`, `GLACIER_IR`, ...),
11//! 3. **Expire noncurrent versions** in a versioning-enabled bucket
12//!    (`NoncurrentVersionExpiration { NoncurrentDays }`).
13//!
14//! Until v0.6 #37 the matching `PutBucketLifecycleConfiguration` /
15//! `GetBucketLifecycleConfiguration` / `DeleteBucketLifecycle` handlers
16//! in `crates/s4-server/src/service.rs` were pure passthroughs (the s3s
17//! framework's default backend stored them but nothing read the rules).
18//! This module owns the in-memory configuration store + the rule
19//! evaluator that decides, for any single object, whether an action
20//! should fire **right now**.
21//!
22//! ## responsibilities (v0.6 #37)
23//!
24//! - in-memory `bucket -> LifecycleConfig` map with JSON snapshot
25//!   round-trip (mirroring `versioning.rs` / `object_lock.rs` /
26//!   `inventory.rs`'s shape so `--lifecycle-state-file` is a one-line
27//!   addition in `main.rs`).
28//! - per-bucket action counters (`actions_total`) — bumped by the
29//!   future scanner when an Expiration / Transition /
30//!   NoncurrentExpiration action is taken, surfaced via Prometheus
31//!   (`s4_lifecycle_actions_total`, see `metrics.rs`).
32//! - [`LifecycleManager::evaluate`] — given one (bucket, key, age,
33//!   size, tags) tuple, walk the bucket's rules in declaration order
34//!   and return the first matching action. Returns `None` when no
35//!   rule matches (or when the matching rule is `Disabled`).
36//! - [`evaluate_batch`] — batched form for the test path: walks a
37//!   slice of `(key, age, size, tags)` tuples and returns the (key,
38//!   action) pairs that should fire. The actual backend invocation
39//!   (S3.delete_object / metadata rewrite) is the caller's job.
40//!
41//! ## scope limitations (v0.6 #37)
42//!
43//! - **Background scanner is a skeleton only.** `main.rs`'s
44//!   `--lifecycle-scan-interval-hours` flag spawns a tokio task that
45//!   logs the bucket list and stamps a "would-have-run" marker;
46//!   walking the source bucket via `list_objects_v2` and actually
47//!   invoking `delete_object` / metadata rewrite for each evaluated
48//!   action is deferred to v0.7+. Wiring the scheduler to walk a real
49//!   bucket end-to-end requires a back-reference from the scheduler
50//!   into `S4Service` for the `list_objects_v2` walk and that
51//!   reshuffle is out of scope for this issue. The
52//!   [`crate::S4Service::run_lifecycle_once_for_test`] entry covers
53//!   the in-memory equivalent so the unit + E2E tests exercise the
54//!   evaluator end-to-end.
55//! - **`AbortIncompleteMultipartUpload`** is parsed and stored on the
56//!   `LifecycleRule` (so PutBucketLifecycleConfiguration round-trips
57//!   the field) but not enforced — multipart abort sweeping is a
58//!   separate scanner that lives next to the multipart upload manager
59//!   (v0.7+).
60//! - **`expiration_date` (calendar date)** is supported in the
61//!   evaluator: a rule with `expiration_date` past `now` fires
62//!   Expiration immediately. Same wire form as AWS S3.
63//! - **Multi-instance replication.** All state is single-instance
64//!   in-memory; `--lifecycle-state-file <PATH>` provides restart
65//!   recovery via JSON snapshot, matching the
66//!   `--versioning-state-file` shape.
67//! - **Object Lock interplay**: the evaluator does NOT consult the
68//!   `ObjectLockManager` directly (the evaluator API is
69//!   object-tags-and-size only); the scanner caller is expected to
70//!   skip locked objects — see the `evaluate_batch_skips_locked` test
71//!   for the canonical pattern. Locking always wins over Lifecycle.
72//! - **Versioning interplay**: the evaluator treats noncurrent
73//!   versions as a separate input — pass `is_noncurrent = true` to
74//!   [`LifecycleManager::evaluate_with_flags`] for noncurrent version
75//!   expiration matching. The legacy `evaluate` shorthand defaults
76//!   `is_noncurrent = false` (current version) so existing call sites
77//!   stay one-liners.
78
79use std::collections::HashMap;
80use std::sync::Arc;
81use std::sync::RwLock;
82
83use chrono::{DateTime, Duration, Utc};
84use s3s::S3;
85use s3s::S3Request;
86use s3s::dto::*;
87use serde::{Deserialize, Serialize};
88use tracing::warn;
89
90/// Whether a rule is currently being applied. Mirrors AWS S3
91/// `ExpirationStatus` (`"Enabled"` / `"Disabled"`).
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub enum LifecycleStatus {
94    Enabled,
95    Disabled,
96}
97
98impl LifecycleStatus {
99    /// Wire form used by AWS S3 (`"Enabled"` / `"Disabled"`).
100    #[must_use]
101    pub fn as_aws_str(self) -> &'static str {
102        match self {
103            Self::Enabled => "Enabled",
104            Self::Disabled => "Disabled",
105        }
106    }
107
108    /// Parse the AWS wire form (case-insensitive). Falls back to `Disabled`
109    /// on unrecognised input — this matches AWS conservative behaviour
110    /// (an unparseable status is treated as "off" so a typo doesn't silently
111    /// expire data).
112    #[must_use]
113    pub fn from_aws_str(s: &str) -> Self {
114        if s.eq_ignore_ascii_case("Enabled") {
115            Self::Enabled
116        } else {
117            Self::Disabled
118        }
119    }
120}
121
122/// Per-rule object filter. AWS S3 represents the filter as one of `Prefix`,
123/// `Tag`, `ObjectSizeGreaterThan`, `ObjectSizeLessThan`, or `And` (= AND of
124/// any subset of those predicates). For internal storage we flatten the
125/// "And" form into a struct of optional fields plus a vector of (key, value)
126/// tags — every present field must match (logical AND). An empty filter (all
127/// fields `None` / empty `tags`) matches every object in the bucket.
128#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
129pub struct LifecycleFilter {
130    /// Object key prefix (empty / `None` = no prefix gating).
131    #[serde(default)]
132    pub prefix: Option<String>,
133    /// Logical AND across every entry: every (key, value) must match the
134    /// object's own tag set.
135    #[serde(default)]
136    pub tags: Vec<(String, String)>,
137    /// Object must be *strictly greater than* this size in bytes.
138    #[serde(default)]
139    pub object_size_greater_than: Option<u64>,
140    /// Object must be *strictly less than* this size in bytes.
141    #[serde(default)]
142    pub object_size_less_than: Option<u64>,
143}
144
145impl LifecycleFilter {
146    /// `true` when this filter accepts the candidate. Empty filter accepts
147    /// every object. Tag matching is AND of all listed tags (each present in
148    /// `object_tags` with the matching value).
149    #[must_use]
150    pub fn matches(&self, key: &str, size: u64, object_tags: &[(String, String)]) -> bool {
151        if let Some(p) = &self.prefix
152            && !key.starts_with(p)
153        {
154            return false;
155        }
156        if let Some(min) = self.object_size_greater_than
157            && size <= min
158        {
159            return false;
160        }
161        if let Some(max) = self.object_size_less_than
162            && size >= max
163        {
164            return false;
165        }
166        for (tk, tv) in &self.tags {
167            let matched = object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv);
168            if !matched {
169                return false;
170            }
171        }
172        true
173    }
174}
175
176/// A single transition step (object age threshold + target storage class).
177/// `days` is days since the object was created. AWS S3 also accepts `Date`
178/// for transitions but Lifecycle deployments overwhelmingly use `Days`; the
179/// `Date` form is omitted here on purpose to keep the evaluator narrow
180/// (operators wanting calendar transitions can synthesise a one-shot rule
181/// at the cadence of their scanner).
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
183pub struct TransitionRule {
184    pub days: u32,
185    /// Target storage class (`"STANDARD_IA"` / `"GLACIER_IR"` /
186    /// `"GLACIER"` / `"DEEP_ARCHIVE"` / `"INTELLIGENT_TIERING"` /
187    /// `"ONEZONE_IA"`). Stored as the AWS wire string so PutBucket /
188    /// GetBucket round-trip is a no-op.
189    pub storage_class: String,
190}
191
192/// One lifecycle rule. AWS S3's `LifecycleRule` flattened into the subset
193/// the v0.6 #37 evaluator handles. `id` is the operator-supplied label and
194/// makes Get / Put round-trips non-lossy.
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196pub struct LifecycleRule {
197    pub id: String,
198    pub status: LifecycleStatus,
199    #[serde(default)]
200    pub filter: LifecycleFilter,
201    /// Days since the object was created. Mutually exclusive with
202    /// [`Self::expiration_date`] in AWS — both fields are accepted here on
203    /// input (the evaluator picks `expiration_days` first, then
204    /// `expiration_date`) so a malformed rule with both set still evaluates
205    /// deterministically rather than silently dropping the action.
206    #[serde(default)]
207    pub expiration_days: Option<u32>,
208    /// Calendar date past which matching objects are expired (AWS wire form
209    /// is ISO 8601; here we keep it as a `DateTime<Utc>` so round-trips
210    /// through `serde_json` survive without re-parsing).
211    #[serde(default)]
212    pub expiration_date: Option<DateTime<Utc>>,
213    /// Transition steps in declaration order. The evaluator picks the
214    /// deepest transition (largest `days` ≤ object age) and resolves any
215    /// conflict with expiration in [`LifecycleManager::evaluate_with_flags`].
216    #[serde(default)]
217    pub transitions: Vec<TransitionRule>,
218    /// Days an object has been noncurrent before the noncurrent-version
219    /// expiration fires. Only consulted when the evaluator is asked about
220    /// a noncurrent object (`is_noncurrent = true`).
221    #[serde(default)]
222    pub noncurrent_version_expiration_days: Option<u32>,
223    /// Days after a multipart upload is initiated before the abort fires.
224    /// Stored so PutBucket round-trips, but **not enforced** in the
225    /// v0.6 #37 evaluator — multipart sweeping lives elsewhere.
226    #[serde(default)]
227    pub abort_incomplete_multipart_upload_days: Option<u32>,
228}
229
230impl LifecycleRule {
231    /// Convenience constructor for a "expire after N days" rule. Useful in
232    /// tests + operator scripts.
233    #[must_use]
234    pub fn expire_after_days(id: impl Into<String>, days: u32) -> Self {
235        Self {
236            id: id.into(),
237            status: LifecycleStatus::Enabled,
238            filter: LifecycleFilter::default(),
239            expiration_days: Some(days),
240            expiration_date: None,
241            transitions: Vec::new(),
242            noncurrent_version_expiration_days: None,
243            abort_incomplete_multipart_upload_days: None,
244        }
245    }
246}
247
248/// Per-bucket lifecycle configuration (ordered list of rules — first match
249/// wins, matching AWS S3 semantics).
250#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
251pub struct LifecycleConfig {
252    pub rules: Vec<LifecycleRule>,
253}
254
255/// The action a single rule wants to take **right now** for a candidate
256/// object.
257#[derive(Clone, Debug, PartialEq, Eq)]
258pub enum LifecycleAction {
259    /// Delete the object (`Expiration` / `NoncurrentVersionExpiration`).
260    Expire,
261    /// Move the object to a different storage class (`Transition`). The
262    /// inner string is the AWS wire form (e.g. `"GLACIER_IR"`).
263    Transition { storage_class: String },
264}
265
266impl LifecycleAction {
267    /// Stable label suitable for a metric counter
268    /// (`s4_lifecycle_actions_total{action="..."}`).
269    #[must_use]
270    pub fn metric_label(&self) -> &'static str {
271        match self {
272            Self::Expire => "expire",
273            Self::Transition { .. } => "transition",
274        }
275    }
276}
277
278/// snapshot のシリアライズ format。`to_json` / `from_json` 用。
279#[derive(Debug, Default, Serialize, Deserialize)]
280struct LifecycleSnapshot {
281    by_bucket: HashMap<String, LifecycleConfig>,
282}
283
284/// Per-bucket lifecycle configuration manager.
285///
286/// All read / write operations go through `RwLock` for thread safety;
287/// clones are cheap (`Arc<LifecycleManager>` is the expected handle shape).
288/// `actions_total` is a parallel `RwLock<HashMap<...>>` of `(bucket,
289/// action_label) -> count` so the future background scanner can stamp
290/// successful actions and operators can `GET /metrics` to see the running
291/// totals (the metric is also surfaced via `metrics::counter!` — see
292/// [`crate::metrics::record_lifecycle_action`]).
293#[derive(Debug, Default)]
294pub struct LifecycleManager {
295    by_bucket: RwLock<HashMap<String, LifecycleConfig>>,
296    /// `(bucket, action_label) -> count`. Bumped by the scanner via
297    /// [`Self::record_action`]. Action labels are the
298    /// [`LifecycleAction::metric_label`] values
299    /// (`"expire"` / `"transition"`).
300    actions_total: RwLock<HashMap<(String, String), u64>>,
301}
302
303impl LifecycleManager {
304    /// Empty manager — no bucket has rules.
305    #[must_use]
306    pub fn new() -> Self {
307        Self::default()
308    }
309
310    /// Replace (or create) the lifecycle configuration for `bucket`. Drops
311    /// any previously-attached rules in one shot — matches AWS S3
312    /// `PutBucketLifecycleConfiguration` (full replace, no merge).
313    pub fn put(&self, bucket: &str, config: LifecycleConfig) {
314        self.by_bucket
315            .write()
316            .expect("lifecycle state RwLock poisoned")
317            .insert(bucket.to_owned(), config);
318    }
319
320    /// Return a clone of the bucket's configuration, if any.
321    #[must_use]
322    pub fn get(&self, bucket: &str) -> Option<LifecycleConfig> {
323        self.by_bucket
324            .read()
325            .expect("lifecycle state RwLock poisoned")
326            .get(bucket)
327            .cloned()
328    }
329
330    /// Drop the bucket's lifecycle configuration (idempotent — missing
331    /// bucket is OK).
332    pub fn delete(&self, bucket: &str) {
333        self.by_bucket
334            .write()
335            .expect("lifecycle state RwLock poisoned")
336            .remove(bucket);
337    }
338
339    /// JSON snapshot for restart-recoverable state. Pair with
340    /// [`Self::from_json`].
341    pub fn to_json(&self) -> Result<String, serde_json::Error> {
342        let by_bucket = self
343            .by_bucket
344            .read()
345            .expect("lifecycle state RwLock poisoned")
346            .clone();
347        let snap = LifecycleSnapshot { by_bucket };
348        serde_json::to_string(&snap)
349    }
350
351    /// Restore from a JSON snapshot produced by [`Self::to_json`]. Action
352    /// counters are intentionally not snapshotted — they're transient
353    /// observability data and should reset across process restarts so
354    /// `rate(s4_lifecycle_actions_total[1h])` doesn't double-count.
355    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
356        let snap: LifecycleSnapshot = serde_json::from_str(s)?;
357        Ok(Self {
358            by_bucket: RwLock::new(snap.by_bucket),
359            actions_total: RwLock::new(HashMap::new()),
360        })
361    }
362
363    /// Evaluate which rule (if any) applies to a single **current-version**
364    /// object right now. Walks the bucket's rules in declaration order;
365    /// returns the first matching action. Returns `None` when no rule
366    /// matches (or when the matching rule is `Disabled`, or when the
367    /// bucket has no lifecycle configuration).
368    ///
369    /// Within a single rule the precedence is:
370    ///
371    /// 1. Pick the deepest transition whose `days` threshold is currently
372    ///    met (= largest `days ≤ object age`).
373    /// 2. Conflict with expiration: if `expiration_days <=
374    ///    transition_days` for the chosen transition, expiration wins
375    ///    (the rule wants the object gone before it would have been
376    ///    transitioned). Otherwise transition wins (e.g. transition at
377    ///    30d, expiration at 365d, age 60d → transition fires now,
378    ///    expiration is future).
379    /// 3. `expiration_date` matches when `now >= expiration_date` and no
380    ///    transition is currently applicable.
381    ///
382    /// `object_age` is "now - created_at" supplied by the caller — keeping
383    /// the evaluator pure of the wall clock makes deterministic testing
384    /// trivial.
385    #[must_use]
386    pub fn evaluate(
387        &self,
388        bucket: &str,
389        key: &str,
390        object_age: Duration,
391        object_size: u64,
392        object_tags: &[(String, String)],
393    ) -> Option<LifecycleAction> {
394        self.evaluate_with_flags(
395            bucket,
396            key,
397            object_age,
398            object_size,
399            object_tags,
400            EvaluateFlags::default(),
401        )
402    }
403
404    /// Full-form evaluator with flags for noncurrent-version handling.
405    /// Use this when the scanner is walking a versioning-enabled bucket;
406    /// pass `is_noncurrent = true` for entries that are not the latest
407    /// non-delete-marker version.
408    #[must_use]
409    pub fn evaluate_with_flags(
410        &self,
411        bucket: &str,
412        key: &str,
413        object_age: Duration,
414        object_size: u64,
415        object_tags: &[(String, String)],
416        flags: EvaluateFlags,
417    ) -> Option<LifecycleAction> {
418        let cfg = self.get(bucket)?;
419        let now_for_date = flags.now.unwrap_or_else(Utc::now);
420        let age_days = object_age.num_days().max(0);
421        let age_days_u32 = u32::try_from(age_days).unwrap_or(u32::MAX);
422        for rule in &cfg.rules {
423            if rule.status != LifecycleStatus::Enabled {
424                continue;
425            }
426            if !rule.filter.matches(key, object_size, object_tags) {
427                continue;
428            }
429            // Noncurrent-version expiration: only consulted when the
430            // caller explicitly flags this entry as noncurrent. The
431            // current-version expiration / transition rules do not fire
432            // for noncurrent versions in AWS S3 semantics.
433            if flags.is_noncurrent {
434                if let Some(days) = rule.noncurrent_version_expiration_days
435                    && age_days_u32 >= days
436                {
437                    return Some(LifecycleAction::Expire);
438                }
439                continue;
440            }
441            // Current-version path.
442            let exp_days_match = rule.expiration_days.filter(|d| age_days_u32 >= *d);
443            let exp_date_match = rule.expiration_date.filter(|d| now_for_date >= *d);
444            // Pick the deepest transition whose threshold is at or
445            // below the object's age. Transitions are typically
446            // declaration-ordered by ascending `days`, but we don't
447            // require it — taking the largest threshold means an
448            // object aged 90d gets `GLACIER` over `STANDARD_IA` even
449            // if `STANDARD_IA(30d)` was declared first.
450            let chosen_transition = rule
451                .transitions
452                .iter()
453                .filter(|t| age_days_u32 >= t.days)
454                .max_by_key(|t| t.days);
455            // Conflict resolution: when `expiration_days` fires AND a
456            // transition fires, expiration wins iff
457            // `expiration_days <= transition_days` (rule wants object
458            // gone before / at the same time it would have been
459            // transitioned). Otherwise the transition wins.
460            if let Some(exp_threshold) = exp_days_match {
461                let trans_threshold = chosen_transition.map(|t| t.days).unwrap_or(u32::MAX);
462                if exp_threshold <= trans_threshold {
463                    return Some(LifecycleAction::Expire);
464                }
465            }
466            if let Some(t) = chosen_transition {
467                return Some(LifecycleAction::Transition {
468                    storage_class: t.storage_class.clone(),
469                });
470            }
471            // Calendar-date expiration (no transition currently
472            // applicable, but the rule's expiration_date is past).
473            if exp_date_match.is_some() {
474                return Some(LifecycleAction::Expire);
475            }
476            // Fall through to the next rule when no action fires for
477            // this rule — first-match-wins applies only to *firing*
478            // rules, matching AWS semantics where overlapping rules
479            // with disjoint thresholds compose.
480        }
481        None
482    }
483
484    /// Stamp the per-bucket action counter and bump the matching
485    /// Prometheus counter. Called by the future scanner after a successful
486    /// delete / metadata rewrite.
487    pub fn record_action(&self, bucket: &str, action: &LifecycleAction) {
488        let label = action.metric_label();
489        let key = (bucket.to_owned(), label.to_owned());
490        let mut guard = self
491            .actions_total
492            .write()
493            .expect("lifecycle actions counter RwLock poisoned");
494        let entry = guard.entry(key).or_insert(0);
495        *entry = entry.saturating_add(1);
496        crate::metrics::record_lifecycle_action(bucket, label);
497    }
498
499    /// Read-only snapshot of the per-(bucket, action) counter map.
500    /// Useful for tests + introspection (`/admin/lifecycle/stats` style
501    /// endpoints in the future).
502    #[must_use]
503    pub fn actions_snapshot(&self) -> HashMap<(String, String), u64> {
504        self.actions_total
505            .read()
506            .expect("lifecycle actions counter RwLock poisoned")
507            .clone()
508    }
509
510    /// All buckets with a lifecycle configuration attached. Sorted for
511    /// stable scanner ordering.
512    #[must_use]
513    pub fn buckets(&self) -> Vec<String> {
514        let map = self
515            .by_bucket
516            .read()
517            .expect("lifecycle state RwLock poisoned");
518        let mut out: Vec<String> = map.keys().cloned().collect();
519        out.sort();
520        out
521    }
522}
523
524/// Flags for [`LifecycleManager::evaluate_with_flags`]. Default is
525/// "current-version object, evaluator picks `Utc::now()` for the date
526/// comparison". Tests override `now` for determinism.
527#[derive(Clone, Copy, Debug, Default)]
528pub struct EvaluateFlags {
529    pub is_noncurrent: bool,
530    pub now: Option<DateTime<Utc>>,
531}
532
533/// One object the evaluator considers in a batch:
534/// `(key, object_age, object_size, object_tags)`. Defined as a type alias
535/// so [`evaluate_batch`] / [`crate::S4Service::run_lifecycle_once_for_test`]
536/// don't trip clippy's `type-complexity` lint, and so callers building the
537/// list have a single canonical shape to reach for.
538pub type EvaluateBatchEntry = (String, Duration, u64, Vec<(String, String)>);
539
540/// Test-driven scan entry: walks a list of [`EvaluateBatchEntry`] tuples
541/// and produces (key, action) pairs for every object that should fire an
542/// action **right now**. The actual backend invocation (S3.delete_object /
543/// metadata rewrite) is the caller's job. Used by both unit tests and the
544/// E2E test in `tests/roundtrip.rs`; the future background scanner will
545/// reuse the same entry once the bucket-walk is wired through the backend.
546#[must_use]
547pub fn evaluate_batch(
548    manager: &LifecycleManager,
549    bucket: &str,
550    objects: &[EvaluateBatchEntry],
551) -> Vec<(String, LifecycleAction)> {
552    let mut out = Vec::with_capacity(objects.len());
553    for (key, age, size, tags) in objects {
554        if let Some(action) = manager.evaluate(bucket, key, *age, *size, tags) {
555            out.push((key.clone(), action));
556        }
557    }
558    out
559}
560
561/// Per-invocation scanner counters returned by [`run_scan_once`]. Useful
562/// for tests, the `--lifecycle-scan-interval-hours` log line, and any
563/// future `/admin/lifecycle/scan` introspection endpoint. Operators see
564/// the same numbers via Prometheus
565/// (`s4_lifecycle_actions_total{action="expire"|"transition"}`).
566#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
567pub struct ScanReport {
568    /// Number of buckets the scanner walked (= buckets with a lifecycle
569    /// configuration attached at the moment the scanner ran).
570    pub buckets_scanned: usize,
571    /// Number of distinct keys the scanner evaluated. Multi-page lists
572    /// count one key once even if the listing was paginated.
573    pub objects_evaluated: usize,
574    /// Number of objects deleted as a result of an Expiration action.
575    pub expired: usize,
576    /// Number of objects whose `x-amz-storage-class` was rewritten as a
577    /// result of a Transition action.
578    pub transitioned: usize,
579    /// Number of objects skipped because an Object Lock (Compliance,
580    /// Governance, or legal hold) was in effect. The Lock always wins
581    /// over Lifecycle, matching AWS S3 semantics.
582    pub skipped_locked: usize,
583    /// Number of objects the evaluator wanted to act on but the action
584    /// failed (e.g. backend `delete_object` returned an error). Logged
585    /// individually at WARN level; this counter exists so tests / metrics
586    /// can assert no silent loss.
587    pub action_errors: usize,
588}
589
590/// Convert an s3s `Timestamp` (`time::OffsetDateTime` underneath) into a
591/// `chrono::DateTime<Utc>` via the RFC3339 wire form. Used by the scanner
592/// to compute object age (= `now - last_modified`). Returns `None` when
593/// the stamp is unparseable, in which case the caller falls back to
594/// treating the object as freshly created (age = 0).
595fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
596    let mut buf = Vec::new();
597    ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
598    let s = std::str::from_utf8(&buf).ok()?;
599    chrono::DateTime::parse_from_rfc3339(s)
600        .ok()
601        .map(|dt| dt.with_timezone(&Utc))
602}
603
604/// Build a synthetic `S3Request` with the minimum metadata the
605/// scanner-internal calls need. The lifecycle scanner is a
606/// system-internal caller (no end-user credentials, no real HTTP method
607/// / URI), so policy gates downstream see `credentials = None` /
608/// `region = None` and treat the call as anonymous-internal. Backends
609/// that do not gate internal traffic ignore these fields entirely.
610fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
611    S3Request {
612        input,
613        method,
614        uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
615        headers: http::HeaderMap::new(),
616        extensions: http::Extensions::new(),
617        credentials: None,
618        region: None,
619        service: None,
620        trailing_headers: None,
621    }
622}
623
624/// Walk every bucket that has a lifecycle configuration attached, list
625/// its objects via `list_objects_v2` (continuation-token pagination), and
626/// for each object evaluate the rule set + execute the matching
627/// Expiration / Transition action. Object-Lock-protected objects are
628/// **skipped** (the Lock always wins over Lifecycle). Versioning chains
629/// are intentionally out of scope for v0.7 #45 — see the module-level
630/// limitations note.
631///
632/// ## error handling
633///
634/// Per-bucket / per-object failures are logged at WARN level and bumped
635/// in `ScanReport::action_errors`; the scanner does NOT abort early on a
636/// single bad object so one slow / faulty bucket can't starve every
637/// other bucket's lifecycle. The function only returns `Err(_)` when the
638/// scanner cannot make progress at all (no current usage — kept for the
639/// future case where the manager itself becomes unavailable).
640///
641/// ## scope (v0.7 #45)
642///
643/// - Current-version objects only (Versioning-enabled chains rely on
644///   `evaluate_with_flags(is_noncurrent = true)`, but walking the
645///   shadow keys requires the version chain access pattern from
646///   `versioning.rs` and is deferred to a follow-up issue).
647/// - `head_object`'s `last_modified` is used to compute age. When the
648///   backend omits the field (some S3-compatible backends do), the
649///   object is treated as age 0 and skipped — matches AWS conservative
650///   behaviour where a malformed timestamp must not silently expire data.
651/// - Tags are looked up via the attached
652///   [`crate::tagging::TagManager`] (when wired). Buckets without a
653///   tag manager pass an empty tag list to the evaluator.
654/// - Transition rewrites the object's `x-amz-storage-class` via
655///   `copy_object` (same bucket / same key, `MetadataDirective: COPY`,
656///   new `StorageClass`). Backends that ignore the storage class
657///   header silently no-op the transition; the counter still bumps to
658///   reflect "the scanner asked for a transition" (matching AWS where
659///   a no-op transition still costs a request).
660pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
661    s4: &Arc<crate::S4Service<B>>,
662) -> Result<ScanReport, String> {
663    let Some(mgr) = s4.lifecycle_manager().cloned() else {
664        // No lifecycle manager attached (e.g. operator did not set
665        // `--lifecycle-state-file`). Scan is a no-op.
666        return Ok(ScanReport::default());
667    };
668    let buckets = mgr.buckets();
669    if buckets.is_empty() {
670        return Ok(ScanReport::default());
671    }
672    let now = Utc::now();
673    let mut report = ScanReport {
674        buckets_scanned: buckets.len(),
675        ..ScanReport::default()
676    };
677    for bucket in buckets {
678        scan_bucket(s4, &mgr, &bucket, now, &mut report).await;
679    }
680    Ok(report)
681}
682
683/// Walk one bucket end-to-end. Pagination uses the `continuation_token`
684/// loop documented in
685/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>.
686async fn scan_bucket<B: S3 + Send + Sync + 'static>(
687    s4: &Arc<crate::S4Service<B>>,
688    mgr: &Arc<LifecycleManager>,
689    bucket: &str,
690    now: DateTime<Utc>,
691    report: &mut ScanReport,
692) {
693    let mut continuation: Option<String> = None;
694    loop {
695        let list_input = ListObjectsV2Input {
696            bucket: bucket.to_owned(),
697            continuation_token: continuation.clone(),
698            ..Default::default()
699        };
700        let list_req = synthetic_request(
701            list_input,
702            http::Method::GET,
703            &format!("/{bucket}?list-type=2"),
704        );
705        let resp = match s4.as_ref().list_objects_v2(list_req).await {
706            Ok(r) => r,
707            Err(e) => {
708                warn!(
709                    bucket = %bucket,
710                    error = %e,
711                    "S4 lifecycle: list_objects_v2 failed; skipping bucket for this scan",
712                );
713                report.action_errors = report.action_errors.saturating_add(1);
714                return;
715            }
716        };
717        let output = resp.output;
718        let contents = output.contents.unwrap_or_default();
719        for obj in &contents {
720            let Some(key) = obj.key.as_deref() else {
721                continue;
722            };
723            // Filter out S4-internal sidecars / shadow versions early so
724            // the lifecycle scanner mirrors the same "client-visible
725            // object set" the customer sees through `list_objects_v2`.
726            // (The S4Service.list_objects_v2 handler already drops them
727            // before returning, but this is a belt-and-braces guard for
728            // any future bypass that builds the list elsewhere.)
729            if key.ends_with(".s4index") {
730                continue;
731            }
732            report.objects_evaluated = report.objects_evaluated.saturating_add(1);
733            let size = obj.size.unwrap_or(0).max(0) as u64;
734            let age = match obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc) {
735                Some(lm) => now.signed_duration_since(lm),
736                None => Duration::zero(),
737            };
738            let tags: Vec<(String, String)> = s4
739                .as_ref()
740                .tag_manager()
741                .and_then(|m| m.get_object_tags(bucket, key))
742                .map(|set| set.iter().cloned().collect())
743                .unwrap_or_default();
744            let Some(action) = mgr.evaluate(bucket, key, age, size, &tags) else {
745                continue;
746            };
747            // Object-Lock-protected objects are skipped before any
748            // backend-mutating call. Lock wins over Lifecycle, full
749            // stop — matches AWS behaviour where an Expiration on a
750            // locked object is dropped, not retried.
751            if let Some(lock_mgr) = s4.as_ref().object_lock_manager()
752                && let Some(state) = lock_mgr.get(bucket, key)
753                && state.is_locked(now)
754            {
755                report.skipped_locked = report.skipped_locked.saturating_add(1);
756                continue;
757            }
758            match action {
759                LifecycleAction::Expire => match execute_expire(s4, bucket, key).await {
760                    Ok(()) => {
761                        mgr.record_action(bucket, &LifecycleAction::Expire);
762                        report.expired = report.expired.saturating_add(1);
763                    }
764                    Err(e) => {
765                        warn!(
766                            bucket = %bucket,
767                            key = %key,
768                            error = %e,
769                            "S4 lifecycle: Expire action failed",
770                        );
771                        report.action_errors = report.action_errors.saturating_add(1);
772                    }
773                },
774                LifecycleAction::Transition { storage_class } => {
775                    match execute_transition(s4, bucket, key, &storage_class).await {
776                        Ok(()) => {
777                            mgr.record_action(
778                                bucket,
779                                &LifecycleAction::Transition {
780                                    storage_class: storage_class.clone(),
781                                },
782                            );
783                            report.transitioned = report.transitioned.saturating_add(1);
784                        }
785                        Err(e) => {
786                            warn!(
787                                bucket = %bucket,
788                                key = %key,
789                                storage_class = %storage_class,
790                                error = %e,
791                                "S4 lifecycle: Transition action failed",
792                            );
793                            report.action_errors = report.action_errors.saturating_add(1);
794                        }
795                    }
796                }
797            }
798        }
799        if output.is_truncated.unwrap_or(false) {
800            continuation = output.next_continuation_token;
801            if continuation.is_none() {
802                // Defensive: AWS guarantees a NextContinuationToken when
803                // is_truncated=true, but a malformed backend could omit
804                // it; break to avoid an infinite loop.
805                break;
806            }
807        } else {
808            break;
809        }
810    }
811}
812
813/// Issue `delete_object` against the wrapped `S4Service`. The handler in
814/// `service.rs` runs the WORM check itself, so even if the scanner's
815/// pre-check missed (race with an MFA-Delete put-bucket-versioning), the
816/// backend refuses the delete with `AccessDenied` and the error path
817/// above bumps `action_errors` rather than silently losing data.
818async fn execute_expire<B: S3 + Send + Sync + 'static>(
819    s4: &Arc<crate::S4Service<B>>,
820    bucket: &str,
821    key: &str,
822) -> Result<(), String> {
823    let input = DeleteObjectInput {
824        bucket: bucket.to_owned(),
825        key: key.to_owned(),
826        ..Default::default()
827    };
828    let req = synthetic_request(
829        input,
830        http::Method::DELETE,
831        &format!("/{bucket}/{key}"),
832    );
833    s4.as_ref()
834        .delete_object(req)
835        .await
836        .map(|_| ())
837        .map_err(|e| format!("{e}"))
838}
839
840/// Rewrite the object's storage class via a same-key `copy_object` with
841/// `MetadataDirective: COPY` (preserves user metadata) and the new
842/// `storage_class`. Backends that ignore storage-class headers
843/// effectively no-op; the counter still records the attempt so dashboards
844/// reflect the scanner's intent.
845async fn execute_transition<B: S3 + Send + Sync + 'static>(
846    s4: &Arc<crate::S4Service<B>>,
847    bucket: &str,
848    key: &str,
849    storage_class: &str,
850) -> Result<(), String> {
851    // CopyObjectInput has dozens of `Option` fields plus three required
852    // (bucket / key / copy_source); the s3s-generated `builder()` is
853    // the path that fills the optional ones with `None` for us. The
854    // `set_*` setters return `&mut Self`, so we drive them in
855    // statement form rather than as a method chain.
856    let mut builder = CopyObjectInput::builder();
857    builder.set_bucket(bucket.to_owned());
858    builder.set_key(key.to_owned());
859    builder.set_copy_source(CopySource::Bucket {
860        bucket: bucket.to_owned().into_boxed_str(),
861        key: key.to_owned().into_boxed_str(),
862        version_id: None,
863    });
864    builder.set_metadata_directive(Some(MetadataDirective::from_static(MetadataDirective::COPY)));
865    builder.set_storage_class(Some(StorageClass::from(storage_class.to_owned())));
866    let input = builder
867        .build()
868        .map_err(|e| format!("CopyObjectInput build: {e}"))?;
869    let req = synthetic_request(
870        input,
871        http::Method::PUT,
872        &format!("/{bucket}/{key}"),
873    );
874    s4.as_ref()
875        .copy_object(req)
876        .await
877        .map(|_| ())
878        .map_err(|e| format!("{e}"))
879}
880
881#[cfg(test)]
882mod tests {
883    use super::*;
884
885    fn enabled(rule: LifecycleRule) -> LifecycleRule {
886        LifecycleRule {
887            status: LifecycleStatus::Enabled,
888            ..rule
889        }
890    }
891
892    fn cfg_with(rules: Vec<LifecycleRule>) -> LifecycleConfig {
893        LifecycleConfig { rules }
894    }
895
896    fn manager_with(bucket: &str, rules: Vec<LifecycleRule>) -> LifecycleManager {
897        let m = LifecycleManager::new();
898        m.put(bucket, cfg_with(rules));
899        m
900    }
901
902    #[test]
903    fn evaluate_age_past_expiration_returns_expire() {
904        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
905        let action = m.evaluate("b", "k", Duration::days(31), 100, &[]);
906        assert_eq!(action, Some(LifecycleAction::Expire));
907    }
908
909    #[test]
910    fn evaluate_age_before_expiration_returns_none() {
911        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
912        let action = m.evaluate("b", "k", Duration::days(5), 100, &[]);
913        assert_eq!(action, None);
914    }
915
916    #[test]
917    fn evaluate_prefix_filter_matches() {
918        let mut rule = LifecycleRule::expire_after_days("r", 1);
919        rule.filter.prefix = Some("logs/".into());
920        let m = manager_with("b", vec![rule]);
921        assert_eq!(
922            m.evaluate("b", "logs/2026/a.log", Duration::days(2), 1, &[]),
923            Some(LifecycleAction::Expire)
924        );
925        assert_eq!(
926            m.evaluate("b", "data/keep.bin", Duration::days(2), 1, &[]),
927            None
928        );
929    }
930
931    #[test]
932    fn evaluate_tag_filter_requires_all_tags_to_match() {
933        let mut rule = LifecycleRule::expire_after_days("r", 1);
934        rule.filter.tags = vec![
935            ("env".into(), "dev".into()),
936            ("expirable".into(), "yes".into()),
937        ];
938        let m = manager_with("b", vec![rule]);
939        // All tags present + matching → fire.
940        assert_eq!(
941            m.evaluate(
942                "b",
943                "k",
944                Duration::days(2),
945                1,
946                &[
947                    ("env".into(), "dev".into()),
948                    ("expirable".into(), "yes".into()),
949                    ("owner".into(), "alice".into()),
950                ]
951            ),
952            Some(LifecycleAction::Expire)
953        );
954        // One tag missing → no fire.
955        assert_eq!(
956            m.evaluate(
957                "b",
958                "k",
959                Duration::days(2),
960                1,
961                &[("env".into(), "dev".into())]
962            ),
963            None
964        );
965        // Tag present but with the wrong value → no fire.
966        assert_eq!(
967            m.evaluate(
968                "b",
969                "k",
970                Duration::days(2),
971                1,
972                &[
973                    ("env".into(), "prod".into()),
974                    ("expirable".into(), "yes".into()),
975                ]
976            ),
977            None
978        );
979    }
980
981    #[test]
982    fn evaluate_size_filters_gate_action() {
983        let mut rule = LifecycleRule::expire_after_days("r", 1);
984        rule.filter.object_size_greater_than = Some(1024);
985        rule.filter.object_size_less_than = Some(10 * 1024);
986        let m = manager_with("b", vec![rule]);
987        // Inside the (1024, 10*1024) range → fire.
988        assert_eq!(
989            m.evaluate("b", "k", Duration::days(2), 4096, &[]),
990            Some(LifecycleAction::Expire)
991        );
992        // At the boundary (size == greater_than) → strict `>`, no fire.
993        assert_eq!(m.evaluate("b", "k", Duration::days(2), 1024, &[]), None);
994        // Above the upper bound → no fire.
995        assert_eq!(
996            m.evaluate("b", "k", Duration::days(2), 100 * 1024, &[]),
997            None
998        );
999    }
1000
1001    #[test]
1002    fn evaluate_transition_fires_before_expiration() {
1003        // Transition at 30d, expiration at 365d, age 60d → transition.
1004        let rule = enabled(LifecycleRule {
1005            id: "r".into(),
1006            status: LifecycleStatus::Enabled,
1007            filter: LifecycleFilter::default(),
1008            expiration_days: Some(365),
1009            expiration_date: None,
1010            transitions: vec![TransitionRule {
1011                days: 30,
1012                storage_class: "GLACIER_IR".into(),
1013            }],
1014            noncurrent_version_expiration_days: None,
1015            abort_incomplete_multipart_upload_days: None,
1016        });
1017        let m = manager_with("b", vec![rule]);
1018        let action = m.evaluate("b", "k", Duration::days(60), 1, &[]);
1019        assert_eq!(
1020            action,
1021            Some(LifecycleAction::Transition {
1022                storage_class: "GLACIER_IR".into(),
1023            })
1024        );
1025    }
1026
1027    #[test]
1028    fn evaluate_expiration_wins_when_threshold_is_earlier_than_transition() {
1029        // Expiration at 30d, transition at 90d, age 100d → expire (the
1030        // rule wants the object gone *before* it would have transitioned).
1031        let rule = enabled(LifecycleRule {
1032            id: "r".into(),
1033            status: LifecycleStatus::Enabled,
1034            filter: LifecycleFilter::default(),
1035            expiration_days: Some(30),
1036            expiration_date: None,
1037            transitions: vec![TransitionRule {
1038                days: 90,
1039                storage_class: "GLACIER".into(),
1040            }],
1041            noncurrent_version_expiration_days: None,
1042            abort_incomplete_multipart_upload_days: None,
1043        });
1044        let m = manager_with("b", vec![rule]);
1045        let action = m.evaluate("b", "k", Duration::days(100), 1, &[]);
1046        assert_eq!(action, Some(LifecycleAction::Expire));
1047    }
1048
1049    #[test]
1050    fn evaluate_disabled_rule_never_fires() {
1051        let mut rule = LifecycleRule::expire_after_days("r", 1);
1052        rule.status = LifecycleStatus::Disabled;
1053        let m = manager_with("b", vec![rule]);
1054        assert_eq!(m.evaluate("b", "k", Duration::days(365), 1, &[]), None);
1055    }
1056
1057    #[test]
1058    fn evaluate_unknown_bucket_returns_none() {
1059        let m = LifecycleManager::new();
1060        assert_eq!(m.evaluate("ghost", "k", Duration::days(365), 1, &[]), None);
1061    }
1062
1063    #[test]
1064    fn evaluate_noncurrent_version_expiration() {
1065        let rule = enabled(LifecycleRule {
1066            id: "r".into(),
1067            status: LifecycleStatus::Enabled,
1068            filter: LifecycleFilter::default(),
1069            expiration_days: None,
1070            expiration_date: None,
1071            transitions: vec![],
1072            noncurrent_version_expiration_days: Some(7),
1073            abort_incomplete_multipart_upload_days: None,
1074        });
1075        let m = manager_with("b", vec![rule]);
1076        // current-version path → no rule matches (no expiration_days set).
1077        assert_eq!(m.evaluate("b", "k", Duration::days(30), 1, &[]), None);
1078        // noncurrent path with age past 7d → expire.
1079        let action = m.evaluate_with_flags(
1080            "b",
1081            "k",
1082            Duration::days(8),
1083            1,
1084            &[],
1085            EvaluateFlags {
1086                is_noncurrent: true,
1087                now: None,
1088            },
1089        );
1090        assert_eq!(action, Some(LifecycleAction::Expire));
1091        // noncurrent path with age before 7d → no fire.
1092        let action = m.evaluate_with_flags(
1093            "b",
1094            "k",
1095            Duration::days(3),
1096            1,
1097            &[],
1098            EvaluateFlags {
1099                is_noncurrent: true,
1100                now: None,
1101            },
1102        );
1103        assert_eq!(action, None);
1104    }
1105
1106    #[test]
1107    fn evaluate_batch_distributes_actions_across_object_ages() {
1108        // Transition at 30d, expiration at 60d. Conflict resolver picks
1109        // expire iff `exp_days <= trans_days` for the chosen transition.
1110        // With exp=60, trans=30: at age 40-59 the transition fires; at
1111        // age >= 60 expiration wins (because exp_days=60 <= trans_days=30
1112        // is false, so... wait — re-read: the resolver compares
1113        // exp_threshold (60) vs trans_threshold (30) and triggers expire
1114        // ONLY when 60 <= 30, which is false → transition keeps winning
1115        // until both thresholds met but exp <= trans). For exp=60 trans=30
1116        // pair, transition always wins regardless of age (rule pattern is
1117        // "transition first, expire later" — the next scanner pass
1118        // picks up the expiration). So expect 4 transitions.
1119        let rule = enabled(LifecycleRule {
1120            id: "r".into(),
1121            status: LifecycleStatus::Enabled,
1122            filter: LifecycleFilter::default(),
1123            expiration_days: Some(60),
1124            expiration_date: None,
1125            transitions: vec![TransitionRule {
1126                days: 30,
1127                storage_class: "STANDARD_IA".into(),
1128            }],
1129            noncurrent_version_expiration_days: None,
1130            abort_incomplete_multipart_upload_days: None,
1131        });
1132        let m = manager_with("b", vec![rule]);
1133        let objects = vec![
1134            ("young".to_string(), Duration::days(10), 1u64, vec![]),
1135            ("middle".to_string(), Duration::days(40), 1u64, vec![]),
1136            ("middle2".to_string(), Duration::days(45), 1u64, vec![]),
1137            ("old".to_string(), Duration::days(90), 1u64, vec![]),
1138            ("ancient".to_string(), Duration::days(365), 1u64, vec![]),
1139        ];
1140        let actions = evaluate_batch(&m, "b", &objects);
1141        assert_eq!(actions.len(), 4);
1142        for (_, a) in &actions {
1143            assert!(matches!(a, LifecycleAction::Transition { .. }));
1144        }
1145    }
1146
1147    #[test]
1148    fn json_round_trip_preserves_rules() {
1149        let rule = enabled(LifecycleRule {
1150            id: "complex".into(),
1151            status: LifecycleStatus::Enabled,
1152            filter: LifecycleFilter {
1153                prefix: Some("logs/".into()),
1154                tags: vec![("env".into(), "prod".into())],
1155                object_size_greater_than: Some(1024),
1156                object_size_less_than: None,
1157            },
1158            expiration_days: Some(365),
1159            expiration_date: None,
1160            transitions: vec![TransitionRule {
1161                days: 30,
1162                storage_class: "STANDARD_IA".into(),
1163            }],
1164            noncurrent_version_expiration_days: Some(7),
1165            abort_incomplete_multipart_upload_days: Some(3),
1166        });
1167        let m = manager_with("b1", vec![rule.clone()]);
1168        let json = m.to_json().expect("to_json");
1169        let m2 = LifecycleManager::from_json(&json).expect("from_json");
1170        let cfg = m2.get("b1").expect("bucket survives roundtrip");
1171        assert_eq!(cfg.rules.len(), 1);
1172        assert_eq!(cfg.rules[0], rule);
1173    }
1174
1175    #[test]
1176    fn lifecycle_config_default_is_empty() {
1177        let cfg = LifecycleConfig::default();
1178        assert!(cfg.rules.is_empty());
1179    }
1180
1181    #[test]
1182    fn evaluate_batch_skips_locked_objects_at_caller_layer() {
1183        // The evaluator itself does not consult ObjectLock; the scanner
1184        // (and tests) are expected to filter locked keys out before /
1185        // after calling `evaluate_batch`. This test documents the
1186        // canonical pattern.
1187        let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 1)]);
1188        let objects = vec![
1189            ("locked".to_string(), Duration::days(30), 1u64, vec![]),
1190            ("free".to_string(), Duration::days(30), 1u64, vec![]),
1191        ];
1192        let locked_keys: std::collections::HashSet<&str> = ["locked"].into_iter().collect();
1193        let raw = evaluate_batch(&m, "b", &objects);
1194        let filtered: Vec<_> = raw
1195            .into_iter()
1196            .filter(|(k, _)| !locked_keys.contains(k.as_str()))
1197            .collect();
1198        assert_eq!(filtered.len(), 1);
1199        assert_eq!(filtered[0].0, "free");
1200    }
1201
1202    #[test]
1203    fn record_action_bumps_per_bucket_counter() {
1204        let m = LifecycleManager::new();
1205        m.record_action("b", &LifecycleAction::Expire);
1206        m.record_action("b", &LifecycleAction::Expire);
1207        m.record_action(
1208            "b",
1209            &LifecycleAction::Transition {
1210                storage_class: "GLACIER".into(),
1211            },
1212        );
1213        let snap = m.actions_snapshot();
1214        assert_eq!(snap.get(&("b".into(), "expire".into())).copied(), Some(2));
1215        assert_eq!(
1216            snap.get(&("b".into(), "transition".into())).copied(),
1217            Some(1)
1218        );
1219    }
1220
1221    // ---- v0.7 #45: scanner runner tests --------------------------------
1222    //
1223    // These tests stand up an in-memory `S4Service` over a tiny
1224    // `ScannerMemBackend` (separate from the larger `MemoryBackend` in
1225    // `tests/roundtrip.rs` so this module stays self-contained). The
1226    // backend implements only the four `S3` methods the scanner touches:
1227    // `put_object`, `head_object`, `delete_object`, `list_objects_v2`.
1228    // Tags are exercised via the optional `with_tagging(...)` manager.
1229    //
1230    // Object age is faked by setting an `expire_after_days(0)` rule, so
1231    // any object whose backend-recorded `last_modified` is at or before
1232    // "now" matches — sidesteps the `head_object`/`Timestamp` parsing
1233    // entirely (and matches the canonical "operator just put the bucket
1234    // on aggressive expiration" test path).
1235
1236    use std::collections::HashMap;
1237    use std::sync::Mutex as StdMutex;
1238
1239    use bytes::Bytes;
1240    use s3s::dto as dto2;
1241    use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1242    use s4_codec::dispatcher::AlwaysDispatcher;
1243    use s4_codec::passthrough::Passthrough;
1244    use s4_codec::{CodecKind, CodecRegistry};
1245
1246    use crate::S4Service;
1247    use crate::object_lock::{LockMode, ObjectLockManager, ObjectLockState};
1248
1249    #[derive(Default)]
1250    struct ScannerMemBackend {
1251        objects: StdMutex<HashMap<(String, String), ScannerStored>>,
1252    }
1253
1254    #[derive(Clone)]
1255    struct ScannerStored {
1256        body: Bytes,
1257        last_modified: dto2::Timestamp,
1258    }
1259
1260    impl ScannerMemBackend {
1261        fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1262            self.objects.lock().unwrap().insert(
1263                (bucket.to_owned(), key.to_owned()),
1264                ScannerStored {
1265                    body,
1266                    last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1267                },
1268            );
1269        }
1270    }
1271
1272    #[async_trait::async_trait]
1273    impl S3 for ScannerMemBackend {
1274        async fn put_object(
1275            &self,
1276            req: S3Request<dto2::PutObjectInput>,
1277        ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1278            self.put_now(&req.input.bucket, &req.input.key, Bytes::new());
1279            Ok(S3Response::new(dto2::PutObjectOutput::default()))
1280        }
1281
1282        async fn head_object(
1283            &self,
1284            req: S3Request<dto2::HeadObjectInput>,
1285        ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1286            let key = (req.input.bucket.clone(), req.input.key.clone());
1287            let lock = self.objects.lock().unwrap();
1288            let stored = lock
1289                .get(&key)
1290                .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1291            Ok(S3Response::new(dto2::HeadObjectOutput {
1292                content_length: Some(stored.body.len() as i64),
1293                last_modified: Some(stored.last_modified.clone()),
1294                ..Default::default()
1295            }))
1296        }
1297
1298        async fn delete_object(
1299            &self,
1300            req: S3Request<dto2::DeleteObjectInput>,
1301        ) -> S3Result<S3Response<dto2::DeleteObjectOutput>> {
1302            let key = (req.input.bucket.clone(), req.input.key.clone());
1303            self.objects.lock().unwrap().remove(&key);
1304            Ok(S3Response::new(dto2::DeleteObjectOutput::default()))
1305        }
1306
1307        async fn list_objects_v2(
1308            &self,
1309            req: S3Request<dto2::ListObjectsV2Input>,
1310        ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1311            let prefix = req.input.bucket.clone();
1312            let lock = self.objects.lock().unwrap();
1313            let mut contents: Vec<dto2::Object> = lock
1314                .iter()
1315                .filter(|((b, _), _)| b == &prefix)
1316                .map(|((_, k), v)| dto2::Object {
1317                    key: Some(k.clone()),
1318                    size: Some(v.body.len() as i64),
1319                    last_modified: Some(v.last_modified.clone()),
1320                    ..Default::default()
1321                })
1322                .collect();
1323            contents.sort_by(|a, b| a.key.cmp(&b.key));
1324            let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1325            Ok(S3Response::new(dto2::ListObjectsV2Output {
1326                name: Some(prefix),
1327                contents: Some(contents),
1328                key_count: Some(key_count),
1329                is_truncated: Some(false),
1330                ..Default::default()
1331            }))
1332        }
1333
1334        async fn copy_object(
1335            &self,
1336            _req: S3Request<dto2::CopyObjectInput>,
1337        ) -> S3Result<S3Response<dto2::CopyObjectOutput>> {
1338            // Transition path: scanner copies same-key with new
1339            // storage_class. The mem backend doesn't track storage
1340            // class, so it's a no-op success — exactly the AWS-side
1341            // behaviour for a backend that ignores the field.
1342            Ok(S3Response::new(dto2::CopyObjectOutput::default()))
1343        }
1344    }
1345
1346    fn make_service() -> Arc<S4Service<ScannerMemBackend>> {
1347        let registry = Arc::new(
1348            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1349        );
1350        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1351        Arc::new(S4Service::new(
1352            ScannerMemBackend::default(),
1353            registry,
1354            dispatcher,
1355        ))
1356    }
1357
1358    #[tokio::test]
1359    async fn run_scan_once_no_lifecycle_manager_returns_empty_report() {
1360        // Service has no lifecycle manager attached — scanner must
1361        // no-op cleanly (operator might not have set
1362        // `--lifecycle-state-file`). Also covers the empty-buckets
1363        // path in `run_scan_once`.
1364        let s4 = make_service();
1365        let report = run_scan_once(&s4).await.expect("scan");
1366        assert_eq!(report, ScanReport::default());
1367
1368        // And: lifecycle manager attached but no buckets configured.
1369        let mgr = Arc::new(LifecycleManager::new());
1370        let backend = ScannerMemBackend::default();
1371        let registry = Arc::new(
1372            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1373        );
1374        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1375        let s4_empty = Arc::new(
1376            S4Service::new(backend, registry, dispatcher).with_lifecycle(mgr),
1377        );
1378        let report = run_scan_once(&s4_empty).await.expect("scan empty");
1379        assert_eq!(report, ScanReport::default());
1380    }
1381
1382    #[tokio::test]
1383    async fn run_scan_once_expires_matching_objects_via_backend() {
1384        // Three objects: only "stale.log" matches the rule (prefix
1385        // gating). The other two are written but not under the prefix,
1386        // so the evaluator returns None for them.
1387        let backend = ScannerMemBackend::default();
1388        backend.put_now("b", "stale.log", Bytes::from_static(b"x"));
1389        backend.put_now("b", "data/keep1.bin", Bytes::from_static(b"y"));
1390        backend.put_now("b", "data/keep2.bin", Bytes::from_static(b"z"));
1391        // Rule: any object under `stale.` prefix is expired immediately
1392        // (`expire_after_days(0)` matches age >= 0d, which is every
1393        // backend object).
1394        let mgr = Arc::new(LifecycleManager::new());
1395        let mut rule = LifecycleRule::expire_after_days("r", 0);
1396        rule.filter.prefix = Some("stale.".into());
1397        mgr.put("b", LifecycleConfig { rules: vec![rule] });
1398        let registry = Arc::new(
1399            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1400        );
1401        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1402        let s4 = Arc::new(
1403            S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
1404        );
1405
1406        let report = run_scan_once(&s4).await.expect("scan");
1407        assert_eq!(report.buckets_scanned, 1);
1408        assert_eq!(report.objects_evaluated, 3);
1409        assert_eq!(report.expired, 1);
1410        assert_eq!(report.transitioned, 0);
1411        assert_eq!(report.skipped_locked, 0);
1412        assert_eq!(report.action_errors, 0);
1413        // Backend post-condition: the matching key is gone, the others
1414        // remain. Read back through the service's own list_objects_v2
1415        // path (which is also what the customer-visible HTTP layer
1416        // serves) so we exercise the same code the scanner walked.
1417        let req = synthetic_request(
1418            ListObjectsV2Input {
1419                bucket: "b".into(),
1420                ..Default::default()
1421            },
1422            http::Method::GET,
1423            "/b?list-type=2",
1424        );
1425        let resp = s4
1426            .as_ref()
1427            .list_objects_v2(req)
1428            .await
1429            .expect("post-scan list");
1430        let keys: Vec<String> = resp
1431            .output
1432            .contents
1433            .unwrap_or_default()
1434            .into_iter()
1435            .filter_map(|o| o.key)
1436            .collect();
1437        assert!(!keys.contains(&"stale.log".to_string()));
1438        assert!(keys.contains(&"data/keep1.bin".to_string()));
1439        assert!(keys.contains(&"data/keep2.bin".to_string()));
1440        // Lifecycle action counter: one Expire bumped on bucket "b".
1441        let snap = mgr.actions_snapshot();
1442        assert_eq!(
1443            snap.get(&("b".into(), "expire".into())).copied(),
1444            Some(1)
1445        );
1446    }
1447
1448    #[tokio::test]
1449    async fn run_scan_once_skips_object_lock_protected_keys() {
1450        let backend = ScannerMemBackend::default();
1451        backend.put_now("b", "locked.log", Bytes::from_static(b"x"));
1452        backend.put_now("b", "free.log", Bytes::from_static(b"y"));
1453        let registry = Arc::new(
1454            CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1455        );
1456        let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1457        let mgr = Arc::new(LifecycleManager::new());
1458        // Aggressive: every object expires immediately.
1459        mgr.put(
1460            "b",
1461            LifecycleConfig {
1462                rules: vec![LifecycleRule::expire_after_days("r", 0)],
1463            },
1464        );
1465        let lock_mgr = Arc::new(ObjectLockManager::new());
1466        // Lock retains "locked.log" until the year 2099 — Compliance
1467        // mode means even Governance bypass cannot delete it.
1468        let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
1469            .expect("parse")
1470            .with_timezone(&Utc);
1471        lock_mgr.set(
1472            "b",
1473            "locked.log",
1474            ObjectLockState {
1475                mode: Some(LockMode::Compliance),
1476                retain_until: Some(retain_until),
1477                legal_hold_on: false,
1478            },
1479        );
1480        let s4 = Arc::new(
1481            S4Service::new(backend, registry, dispatcher)
1482                .with_lifecycle(Arc::clone(&mgr))
1483                .with_object_lock(Arc::clone(&lock_mgr)),
1484        );
1485
1486        let report = run_scan_once(&s4).await.expect("scan");
1487        assert_eq!(report.buckets_scanned, 1);
1488        assert_eq!(report.objects_evaluated, 2);
1489        assert_eq!(report.expired, 1, "free.log should have been expired");
1490        assert_eq!(report.skipped_locked, 1, "locked.log must be skipped");
1491        assert_eq!(report.action_errors, 0);
1492    }
1493
1494}