s4_server/lifecycle.rs
1//! S3 Lifecycle execution — per-bucket rule evaluation + manager skeleton
2//! (v0.6 #37).
3//!
4//! AWS S3 Lifecycle attaches a **list of rules** to a bucket; each rule may
5//! request that S3
6//!
7//! 1. **Expire** an object once its age (or the calendar date) crosses a
8//! threshold (`Expiration { Days | Date }`),
9//! 2. **Transition** an object to a different storage class (`Transition
10//! { Days, StorageClass }` — `STANDARD_IA`, `GLACIER_IR`, ...),
11//! 3. **Expire noncurrent versions** in a versioning-enabled bucket
12//! (`NoncurrentVersionExpiration { NoncurrentDays }`).
13//!
14//! Until v0.6 #37 the matching `PutBucketLifecycleConfiguration` /
15//! `GetBucketLifecycleConfiguration` / `DeleteBucketLifecycle` handlers
16//! in `crates/s4-server/src/service.rs` were pure passthroughs (the s3s
17//! framework's default backend stored them but nothing read the rules).
18//! This module owns the in-memory configuration store + the rule
19//! evaluator that decides, for any single object, whether an action
20//! should fire **right now**.
21//!
22//! ## responsibilities (v0.6 #37)
23//!
24//! - in-memory `bucket -> LifecycleConfig` map with JSON snapshot
25//! round-trip (mirroring `versioning.rs` / `object_lock.rs` /
26//! `inventory.rs`'s shape so `--lifecycle-state-file` is a one-line
27//! addition in `main.rs`).
28//! - per-bucket action counters (`actions_total`) — bumped by the
29//! future scanner when an Expiration / Transition /
30//! NoncurrentExpiration action is taken, surfaced via Prometheus
31//! (`s4_lifecycle_actions_total`, see `metrics.rs`).
32//! - [`LifecycleManager::evaluate`] — given one (bucket, key, age,
33//! size, tags) tuple, walk the bucket's rules in declaration order
34//! and return the first matching action. Returns `None` when no
35//! rule matches (or when the matching rule is `Disabled`).
36//! - [`evaluate_batch`] — batched form for the test path: walks a
37//! slice of `(key, age, size, tags)` tuples and returns the (key,
38//! action) pairs that should fire. The actual backend invocation
39//! (S3.delete_object / metadata rewrite) is the caller's job.
40//!
41//! ## scope limitations (v0.6 #37)
42//!
43//! - **Background scanner is a skeleton only.** `main.rs`'s
44//! `--lifecycle-scan-interval-hours` flag spawns a tokio task that
45//! logs the bucket list and stamps a "would-have-run" marker;
46//! walking the source bucket via `list_objects_v2` and actually
47//! invoking `delete_object` / metadata rewrite for each evaluated
48//! action is deferred to v0.7+. Wiring the scheduler to walk a real
49//! bucket end-to-end requires a back-reference from the scheduler
50//! into `S4Service` for the `list_objects_v2` walk and that
51//! reshuffle is out of scope for this issue. The
52//! [`crate::S4Service::run_lifecycle_once_for_test`] entry covers
53//! the in-memory equivalent so the unit + E2E tests exercise the
54//! evaluator end-to-end.
55//! - **`AbortIncompleteMultipartUpload`** is parsed and stored on the
56//! `LifecycleRule` (so PutBucketLifecycleConfiguration round-trips
57//! the field) but not enforced — multipart abort sweeping is a
58//! separate scanner that lives next to the multipart upload manager
59//! (v0.7+).
60//! - **`expiration_date` (calendar date)** is supported in the
61//! evaluator: a rule with `expiration_date` past `now` fires
62//! Expiration immediately. Same wire form as AWS S3.
63//! - **Multi-instance replication.** All state is single-instance
64//! in-memory; `--lifecycle-state-file <PATH>` provides restart
65//! recovery via JSON snapshot, matching the
66//! `--versioning-state-file` shape.
67//! - **Object Lock interplay**: the evaluator does NOT consult the
68//! `ObjectLockManager` directly (the evaluator API is
69//! object-tags-and-size only); the scanner caller is expected to
70//! skip locked objects — see the `evaluate_batch_skips_locked` test
71//! for the canonical pattern. Locking always wins over Lifecycle.
72//! - **Versioning interplay**: the evaluator treats noncurrent
73//! versions as a separate input — pass `is_noncurrent = true` to
74//! [`LifecycleManager::evaluate_with_flags`] for noncurrent version
75//! expiration matching. The legacy `evaluate` shorthand defaults
76//! `is_noncurrent = false` (current version) so existing call sites
77//! stay one-liners.
78
79use std::collections::HashMap;
80use std::sync::Arc;
81use std::sync::RwLock;
82
83use chrono::{DateTime, Duration, Utc};
84use s3s::S3;
85use s3s::S3Request;
86use s3s::dto::*;
87use serde::{Deserialize, Serialize};
88use tracing::warn;
89
90/// Whether a rule is currently being applied. Mirrors AWS S3
91/// `ExpirationStatus` (`"Enabled"` / `"Disabled"`).
92#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub enum LifecycleStatus {
94 Enabled,
95 Disabled,
96}
97
98impl LifecycleStatus {
99 /// Wire form used by AWS S3 (`"Enabled"` / `"Disabled"`).
100 #[must_use]
101 pub fn as_aws_str(self) -> &'static str {
102 match self {
103 Self::Enabled => "Enabled",
104 Self::Disabled => "Disabled",
105 }
106 }
107
108 /// Parse the AWS wire form (case-insensitive). Falls back to `Disabled`
109 /// on unrecognised input — this matches AWS conservative behaviour
110 /// (an unparseable status is treated as "off" so a typo doesn't silently
111 /// expire data).
112 #[must_use]
113 pub fn from_aws_str(s: &str) -> Self {
114 if s.eq_ignore_ascii_case("Enabled") {
115 Self::Enabled
116 } else {
117 Self::Disabled
118 }
119 }
120}
121
122/// Per-rule object filter. AWS S3 represents the filter as one of `Prefix`,
123/// `Tag`, `ObjectSizeGreaterThan`, `ObjectSizeLessThan`, or `And` (= AND of
124/// any subset of those predicates). For internal storage we flatten the
125/// "And" form into a struct of optional fields plus a vector of (key, value)
126/// tags — every present field must match (logical AND). An empty filter (all
127/// fields `None` / empty `tags`) matches every object in the bucket.
128#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
129pub struct LifecycleFilter {
130 /// Object key prefix (empty / `None` = no prefix gating).
131 #[serde(default)]
132 pub prefix: Option<String>,
133 /// Logical AND across every entry: every (key, value) must match the
134 /// object's own tag set.
135 #[serde(default)]
136 pub tags: Vec<(String, String)>,
137 /// Object must be *strictly greater than* this size in bytes.
138 #[serde(default)]
139 pub object_size_greater_than: Option<u64>,
140 /// Object must be *strictly less than* this size in bytes.
141 #[serde(default)]
142 pub object_size_less_than: Option<u64>,
143}
144
145impl LifecycleFilter {
146 /// `true` when this filter accepts the candidate. Empty filter accepts
147 /// every object. Tag matching is AND of all listed tags (each present in
148 /// `object_tags` with the matching value).
149 #[must_use]
150 pub fn matches(&self, key: &str, size: u64, object_tags: &[(String, String)]) -> bool {
151 if let Some(p) = &self.prefix
152 && !key.starts_with(p)
153 {
154 return false;
155 }
156 if let Some(min) = self.object_size_greater_than
157 && size <= min
158 {
159 return false;
160 }
161 if let Some(max) = self.object_size_less_than
162 && size >= max
163 {
164 return false;
165 }
166 for (tk, tv) in &self.tags {
167 let matched = object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv);
168 if !matched {
169 return false;
170 }
171 }
172 true
173 }
174}
175
176/// A single transition step (object age threshold + target storage class).
177/// `days` is days since the object was created. AWS S3 also accepts `Date`
178/// for transitions but Lifecycle deployments overwhelmingly use `Days`; the
179/// `Date` form is omitted here on purpose to keep the evaluator narrow
180/// (operators wanting calendar transitions can synthesise a one-shot rule
181/// at the cadence of their scanner).
182#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
183pub struct TransitionRule {
184 pub days: u32,
185 /// Target storage class (`"STANDARD_IA"` / `"GLACIER_IR"` /
186 /// `"GLACIER"` / `"DEEP_ARCHIVE"` / `"INTELLIGENT_TIERING"` /
187 /// `"ONEZONE_IA"`). Stored as the AWS wire string so PutBucket /
188 /// GetBucket round-trip is a no-op.
189 pub storage_class: String,
190}
191
192/// One lifecycle rule. AWS S3's `LifecycleRule` flattened into the subset
193/// the v0.6 #37 evaluator handles. `id` is the operator-supplied label and
194/// makes Get / Put round-trips non-lossy.
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196pub struct LifecycleRule {
197 pub id: String,
198 pub status: LifecycleStatus,
199 #[serde(default)]
200 pub filter: LifecycleFilter,
201 /// Days since the object was created. Mutually exclusive with
202 /// [`Self::expiration_date`] in AWS — both fields are accepted here on
203 /// input (the evaluator picks `expiration_days` first, then
204 /// `expiration_date`) so a malformed rule with both set still evaluates
205 /// deterministically rather than silently dropping the action.
206 #[serde(default)]
207 pub expiration_days: Option<u32>,
208 /// Calendar date past which matching objects are expired (AWS wire form
209 /// is ISO 8601; here we keep it as a `DateTime<Utc>` so round-trips
210 /// through `serde_json` survive without re-parsing).
211 #[serde(default)]
212 pub expiration_date: Option<DateTime<Utc>>,
213 /// Transition steps in declaration order. The evaluator picks the
214 /// deepest transition (largest `days` ≤ object age) and resolves any
215 /// conflict with expiration in [`LifecycleManager::evaluate_with_flags`].
216 #[serde(default)]
217 pub transitions: Vec<TransitionRule>,
218 /// Days an object has been noncurrent before the noncurrent-version
219 /// expiration fires. Only consulted when the evaluator is asked about
220 /// a noncurrent object (`is_noncurrent = true`).
221 #[serde(default)]
222 pub noncurrent_version_expiration_days: Option<u32>,
223 /// Days after a multipart upload is initiated before the abort fires.
224 /// Stored so PutBucket round-trips, but **not enforced** in the
225 /// v0.6 #37 evaluator — multipart sweeping lives elsewhere.
226 #[serde(default)]
227 pub abort_incomplete_multipart_upload_days: Option<u32>,
228}
229
230impl LifecycleRule {
231 /// Convenience constructor for a "expire after N days" rule. Useful in
232 /// tests + operator scripts.
233 #[must_use]
234 pub fn expire_after_days(id: impl Into<String>, days: u32) -> Self {
235 Self {
236 id: id.into(),
237 status: LifecycleStatus::Enabled,
238 filter: LifecycleFilter::default(),
239 expiration_days: Some(days),
240 expiration_date: None,
241 transitions: Vec::new(),
242 noncurrent_version_expiration_days: None,
243 abort_incomplete_multipart_upload_days: None,
244 }
245 }
246}
247
248/// Per-bucket lifecycle configuration (ordered list of rules — first match
249/// wins, matching AWS S3 semantics).
250#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
251pub struct LifecycleConfig {
252 pub rules: Vec<LifecycleRule>,
253}
254
255/// The action a single rule wants to take **right now** for a candidate
256/// object.
257#[derive(Clone, Debug, PartialEq, Eq)]
258pub enum LifecycleAction {
259 /// Delete the object (`Expiration` / `NoncurrentVersionExpiration`).
260 Expire,
261 /// Move the object to a different storage class (`Transition`). The
262 /// inner string is the AWS wire form (e.g. `"GLACIER_IR"`).
263 Transition { storage_class: String },
264}
265
266impl LifecycleAction {
267 /// Stable label suitable for a metric counter
268 /// (`s4_lifecycle_actions_total{action="..."}`).
269 #[must_use]
270 pub fn metric_label(&self) -> &'static str {
271 match self {
272 Self::Expire => "expire",
273 Self::Transition { .. } => "transition",
274 }
275 }
276}
277
278/// snapshot のシリアライズ format。`to_json` / `from_json` 用。
279#[derive(Debug, Default, Serialize, Deserialize)]
280struct LifecycleSnapshot {
281 by_bucket: HashMap<String, LifecycleConfig>,
282}
283
284/// Per-bucket lifecycle configuration manager.
285///
286/// All read / write operations go through `RwLock` for thread safety;
287/// clones are cheap (`Arc<LifecycleManager>` is the expected handle shape).
288/// `actions_total` is a parallel `RwLock<HashMap<...>>` of `(bucket,
289/// action_label) -> count` so the future background scanner can stamp
290/// successful actions and operators can `GET /metrics` to see the running
291/// totals (the metric is also surfaced via `metrics::counter!` — see
292/// [`crate::metrics::record_lifecycle_action`]).
293#[derive(Debug, Default)]
294pub struct LifecycleManager {
295 by_bucket: RwLock<HashMap<String, LifecycleConfig>>,
296 /// `(bucket, action_label) -> count`. Bumped by the scanner via
297 /// [`Self::record_action`]. Action labels are the
298 /// [`LifecycleAction::metric_label`] values
299 /// (`"expire"` / `"transition"`).
300 actions_total: RwLock<HashMap<(String, String), u64>>,
301}
302
303impl LifecycleManager {
304 /// Empty manager — no bucket has rules.
305 #[must_use]
306 pub fn new() -> Self {
307 Self::default()
308 }
309
310 /// Replace (or create) the lifecycle configuration for `bucket`. Drops
311 /// any previously-attached rules in one shot — matches AWS S3
312 /// `PutBucketLifecycleConfiguration` (full replace, no merge).
313 pub fn put(&self, bucket: &str, config: LifecycleConfig) {
314 self.by_bucket
315 .write()
316 .expect("lifecycle state RwLock poisoned")
317 .insert(bucket.to_owned(), config);
318 }
319
320 /// Return a clone of the bucket's configuration, if any.
321 #[must_use]
322 pub fn get(&self, bucket: &str) -> Option<LifecycleConfig> {
323 self.by_bucket
324 .read()
325 .expect("lifecycle state RwLock poisoned")
326 .get(bucket)
327 .cloned()
328 }
329
330 /// Drop the bucket's lifecycle configuration (idempotent — missing
331 /// bucket is OK).
332 pub fn delete(&self, bucket: &str) {
333 self.by_bucket
334 .write()
335 .expect("lifecycle state RwLock poisoned")
336 .remove(bucket);
337 }
338
339 /// JSON snapshot for restart-recoverable state. Pair with
340 /// [`Self::from_json`].
341 pub fn to_json(&self) -> Result<String, serde_json::Error> {
342 let by_bucket = self
343 .by_bucket
344 .read()
345 .expect("lifecycle state RwLock poisoned")
346 .clone();
347 let snap = LifecycleSnapshot { by_bucket };
348 serde_json::to_string(&snap)
349 }
350
351 /// Restore from a JSON snapshot produced by [`Self::to_json`]. Action
352 /// counters are intentionally not snapshotted — they're transient
353 /// observability data and should reset across process restarts so
354 /// `rate(s4_lifecycle_actions_total[1h])` doesn't double-count.
355 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
356 let snap: LifecycleSnapshot = serde_json::from_str(s)?;
357 Ok(Self {
358 by_bucket: RwLock::new(snap.by_bucket),
359 actions_total: RwLock::new(HashMap::new()),
360 })
361 }
362
363 /// Evaluate which rule (if any) applies to a single **current-version**
364 /// object right now. Walks the bucket's rules in declaration order;
365 /// returns the first matching action. Returns `None` when no rule
366 /// matches (or when the matching rule is `Disabled`, or when the
367 /// bucket has no lifecycle configuration).
368 ///
369 /// Within a single rule the precedence is:
370 ///
371 /// 1. Pick the deepest transition whose `days` threshold is currently
372 /// met (= largest `days ≤ object age`).
373 /// 2. Conflict with expiration: if `expiration_days <=
374 /// transition_days` for the chosen transition, expiration wins
375 /// (the rule wants the object gone before it would have been
376 /// transitioned). Otherwise transition wins (e.g. transition at
377 /// 30d, expiration at 365d, age 60d → transition fires now,
378 /// expiration is future).
379 /// 3. `expiration_date` matches when `now >= expiration_date` and no
380 /// transition is currently applicable.
381 ///
382 /// `object_age` is "now - created_at" supplied by the caller — keeping
383 /// the evaluator pure of the wall clock makes deterministic testing
384 /// trivial.
385 #[must_use]
386 pub fn evaluate(
387 &self,
388 bucket: &str,
389 key: &str,
390 object_age: Duration,
391 object_size: u64,
392 object_tags: &[(String, String)],
393 ) -> Option<LifecycleAction> {
394 self.evaluate_with_flags(
395 bucket,
396 key,
397 object_age,
398 object_size,
399 object_tags,
400 EvaluateFlags::default(),
401 )
402 }
403
404 /// Full-form evaluator with flags for noncurrent-version handling.
405 /// Use this when the scanner is walking a versioning-enabled bucket;
406 /// pass `is_noncurrent = true` for entries that are not the latest
407 /// non-delete-marker version.
408 #[must_use]
409 pub fn evaluate_with_flags(
410 &self,
411 bucket: &str,
412 key: &str,
413 object_age: Duration,
414 object_size: u64,
415 object_tags: &[(String, String)],
416 flags: EvaluateFlags,
417 ) -> Option<LifecycleAction> {
418 let cfg = self.get(bucket)?;
419 let now_for_date = flags.now.unwrap_or_else(Utc::now);
420 let age_days = object_age.num_days().max(0);
421 let age_days_u32 = u32::try_from(age_days).unwrap_or(u32::MAX);
422 for rule in &cfg.rules {
423 if rule.status != LifecycleStatus::Enabled {
424 continue;
425 }
426 if !rule.filter.matches(key, object_size, object_tags) {
427 continue;
428 }
429 // Noncurrent-version expiration: only consulted when the
430 // caller explicitly flags this entry as noncurrent. The
431 // current-version expiration / transition rules do not fire
432 // for noncurrent versions in AWS S3 semantics.
433 if flags.is_noncurrent {
434 if let Some(days) = rule.noncurrent_version_expiration_days
435 && age_days_u32 >= days
436 {
437 return Some(LifecycleAction::Expire);
438 }
439 continue;
440 }
441 // Current-version path.
442 let exp_days_match = rule.expiration_days.filter(|d| age_days_u32 >= *d);
443 let exp_date_match = rule.expiration_date.filter(|d| now_for_date >= *d);
444 // Pick the deepest transition whose threshold is at or
445 // below the object's age. Transitions are typically
446 // declaration-ordered by ascending `days`, but we don't
447 // require it — taking the largest threshold means an
448 // object aged 90d gets `GLACIER` over `STANDARD_IA` even
449 // if `STANDARD_IA(30d)` was declared first.
450 let chosen_transition = rule
451 .transitions
452 .iter()
453 .filter(|t| age_days_u32 >= t.days)
454 .max_by_key(|t| t.days);
455 // Conflict resolution: when `expiration_days` fires AND a
456 // transition fires, expiration wins iff
457 // `expiration_days <= transition_days` (rule wants object
458 // gone before / at the same time it would have been
459 // transitioned). Otherwise the transition wins.
460 if let Some(exp_threshold) = exp_days_match {
461 let trans_threshold = chosen_transition.map(|t| t.days).unwrap_or(u32::MAX);
462 if exp_threshold <= trans_threshold {
463 return Some(LifecycleAction::Expire);
464 }
465 }
466 if let Some(t) = chosen_transition {
467 return Some(LifecycleAction::Transition {
468 storage_class: t.storage_class.clone(),
469 });
470 }
471 // Calendar-date expiration (no transition currently
472 // applicable, but the rule's expiration_date is past).
473 if exp_date_match.is_some() {
474 return Some(LifecycleAction::Expire);
475 }
476 // Fall through to the next rule when no action fires for
477 // this rule — first-match-wins applies only to *firing*
478 // rules, matching AWS semantics where overlapping rules
479 // with disjoint thresholds compose.
480 }
481 None
482 }
483
484 /// Stamp the per-bucket action counter and bump the matching
485 /// Prometheus counter. Called by the future scanner after a successful
486 /// delete / metadata rewrite.
487 pub fn record_action(&self, bucket: &str, action: &LifecycleAction) {
488 let label = action.metric_label();
489 let key = (bucket.to_owned(), label.to_owned());
490 let mut guard = self
491 .actions_total
492 .write()
493 .expect("lifecycle actions counter RwLock poisoned");
494 let entry = guard.entry(key).or_insert(0);
495 *entry = entry.saturating_add(1);
496 crate::metrics::record_lifecycle_action(bucket, label);
497 }
498
499 /// Read-only snapshot of the per-(bucket, action) counter map.
500 /// Useful for tests + introspection (`/admin/lifecycle/stats` style
501 /// endpoints in the future).
502 #[must_use]
503 pub fn actions_snapshot(&self) -> HashMap<(String, String), u64> {
504 self.actions_total
505 .read()
506 .expect("lifecycle actions counter RwLock poisoned")
507 .clone()
508 }
509
510 /// All buckets with a lifecycle configuration attached. Sorted for
511 /// stable scanner ordering.
512 #[must_use]
513 pub fn buckets(&self) -> Vec<String> {
514 let map = self
515 .by_bucket
516 .read()
517 .expect("lifecycle state RwLock poisoned");
518 let mut out: Vec<String> = map.keys().cloned().collect();
519 out.sort();
520 out
521 }
522}
523
524/// Flags for [`LifecycleManager::evaluate_with_flags`]. Default is
525/// "current-version object, evaluator picks `Utc::now()` for the date
526/// comparison". Tests override `now` for determinism.
527#[derive(Clone, Copy, Debug, Default)]
528pub struct EvaluateFlags {
529 pub is_noncurrent: bool,
530 pub now: Option<DateTime<Utc>>,
531}
532
533/// One object the evaluator considers in a batch:
534/// `(key, object_age, object_size, object_tags)`. Defined as a type alias
535/// so [`evaluate_batch`] / [`crate::S4Service::run_lifecycle_once_for_test`]
536/// don't trip clippy's `type-complexity` lint, and so callers building the
537/// list have a single canonical shape to reach for.
538pub type EvaluateBatchEntry = (String, Duration, u64, Vec<(String, String)>);
539
540/// Test-driven scan entry: walks a list of [`EvaluateBatchEntry`] tuples
541/// and produces (key, action) pairs for every object that should fire an
542/// action **right now**. The actual backend invocation (S3.delete_object /
543/// metadata rewrite) is the caller's job. Used by both unit tests and the
544/// E2E test in `tests/roundtrip.rs`; the future background scanner will
545/// reuse the same entry once the bucket-walk is wired through the backend.
546#[must_use]
547pub fn evaluate_batch(
548 manager: &LifecycleManager,
549 bucket: &str,
550 objects: &[EvaluateBatchEntry],
551) -> Vec<(String, LifecycleAction)> {
552 let mut out = Vec::with_capacity(objects.len());
553 for (key, age, size, tags) in objects {
554 if let Some(action) = manager.evaluate(bucket, key, *age, *size, tags) {
555 out.push((key.clone(), action));
556 }
557 }
558 out
559}
560
561/// Per-invocation scanner counters returned by [`run_scan_once`]. Useful
562/// for tests, the `--lifecycle-scan-interval-hours` log line, and any
563/// future `/admin/lifecycle/scan` introspection endpoint. Operators see
564/// the same numbers via Prometheus
565/// (`s4_lifecycle_actions_total{action="expire"|"transition"}`).
566#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
567pub struct ScanReport {
568 /// Number of buckets the scanner walked (= buckets with a lifecycle
569 /// configuration attached at the moment the scanner ran).
570 pub buckets_scanned: usize,
571 /// Number of distinct keys the scanner evaluated. Multi-page lists
572 /// count one key once even if the listing was paginated.
573 pub objects_evaluated: usize,
574 /// Number of objects deleted as a result of an Expiration action.
575 pub expired: usize,
576 /// Number of objects whose `x-amz-storage-class` was rewritten as a
577 /// result of a Transition action.
578 pub transitioned: usize,
579 /// Number of objects skipped because an Object Lock (Compliance,
580 /// Governance, or legal hold) was in effect. The Lock always wins
581 /// over Lifecycle, matching AWS S3 semantics.
582 pub skipped_locked: usize,
583 /// Number of objects the evaluator wanted to act on but the action
584 /// failed (e.g. backend `delete_object` returned an error). Logged
585 /// individually at WARN level; this counter exists so tests / metrics
586 /// can assert no silent loss.
587 pub action_errors: usize,
588}
589
590/// Convert an s3s `Timestamp` (`time::OffsetDateTime` underneath) into a
591/// `chrono::DateTime<Utc>` via the RFC3339 wire form. Used by the scanner
592/// to compute object age (= `now - last_modified`). Returns `None` when
593/// the stamp is unparseable, in which case the caller falls back to
594/// treating the object as freshly created (age = 0).
595fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
596 let mut buf = Vec::new();
597 ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
598 let s = std::str::from_utf8(&buf).ok()?;
599 chrono::DateTime::parse_from_rfc3339(s)
600 .ok()
601 .map(|dt| dt.with_timezone(&Utc))
602}
603
604/// Build a synthetic `S3Request` with the minimum metadata the
605/// scanner-internal calls need. The lifecycle scanner is a
606/// system-internal caller (no end-user credentials, no real HTTP method
607/// / URI), so policy gates downstream see `credentials = None` /
608/// `region = None` and treat the call as anonymous-internal. Backends
609/// that do not gate internal traffic ignore these fields entirely.
610fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
611 S3Request {
612 input,
613 method,
614 uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
615 headers: http::HeaderMap::new(),
616 extensions: http::Extensions::new(),
617 credentials: None,
618 region: None,
619 service: None,
620 trailing_headers: None,
621 }
622}
623
624/// Walk every bucket that has a lifecycle configuration attached, list
625/// its objects via `list_objects_v2` (continuation-token pagination), and
626/// for each object evaluate the rule set + execute the matching
627/// Expiration / Transition action. Object-Lock-protected objects are
628/// **skipped** (the Lock always wins over Lifecycle). Versioning chains
629/// are intentionally out of scope for v0.7 #45 — see the module-level
630/// limitations note.
631///
632/// ## error handling
633///
634/// Per-bucket / per-object failures are logged at WARN level and bumped
635/// in `ScanReport::action_errors`; the scanner does NOT abort early on a
636/// single bad object so one slow / faulty bucket can't starve every
637/// other bucket's lifecycle. The function only returns `Err(_)` when the
638/// scanner cannot make progress at all (no current usage — kept for the
639/// future case where the manager itself becomes unavailable).
640///
641/// ## scope (v0.7 #45)
642///
643/// - Current-version objects only (Versioning-enabled chains rely on
644/// `evaluate_with_flags(is_noncurrent = true)`, but walking the
645/// shadow keys requires the version chain access pattern from
646/// `versioning.rs` and is deferred to a follow-up issue).
647/// - `head_object`'s `last_modified` is used to compute age. When the
648/// backend omits the field (some S3-compatible backends do), the
649/// object is treated as age 0 and skipped — matches AWS conservative
650/// behaviour where a malformed timestamp must not silently expire data.
651/// - Tags are looked up via the attached
652/// [`crate::tagging::TagManager`] (when wired). Buckets without a
653/// tag manager pass an empty tag list to the evaluator.
654/// - Transition rewrites the object's `x-amz-storage-class` via
655/// `copy_object` (same bucket / same key, `MetadataDirective: COPY`,
656/// new `StorageClass`). Backends that ignore the storage class
657/// header silently no-op the transition; the counter still bumps to
658/// reflect "the scanner asked for a transition" (matching AWS where
659/// a no-op transition still costs a request).
660pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
661 s4: &Arc<crate::S4Service<B>>,
662) -> Result<ScanReport, String> {
663 let Some(mgr) = s4.lifecycle_manager().cloned() else {
664 // No lifecycle manager attached (e.g. operator did not set
665 // `--lifecycle-state-file`). Scan is a no-op.
666 return Ok(ScanReport::default());
667 };
668 let buckets = mgr.buckets();
669 if buckets.is_empty() {
670 return Ok(ScanReport::default());
671 }
672 let now = Utc::now();
673 let mut report = ScanReport {
674 buckets_scanned: buckets.len(),
675 ..ScanReport::default()
676 };
677 for bucket in buckets {
678 scan_bucket(s4, &mgr, &bucket, now, &mut report).await;
679 }
680 Ok(report)
681}
682
683/// Walk one bucket end-to-end. Pagination uses the `continuation_token`
684/// loop documented in
685/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>.
686async fn scan_bucket<B: S3 + Send + Sync + 'static>(
687 s4: &Arc<crate::S4Service<B>>,
688 mgr: &Arc<LifecycleManager>,
689 bucket: &str,
690 now: DateTime<Utc>,
691 report: &mut ScanReport,
692) {
693 let mut continuation: Option<String> = None;
694 loop {
695 let list_input = ListObjectsV2Input {
696 bucket: bucket.to_owned(),
697 continuation_token: continuation.clone(),
698 ..Default::default()
699 };
700 let list_req = synthetic_request(
701 list_input,
702 http::Method::GET,
703 &format!("/{bucket}?list-type=2"),
704 );
705 let resp = match s4.as_ref().list_objects_v2(list_req).await {
706 Ok(r) => r,
707 Err(e) => {
708 warn!(
709 bucket = %bucket,
710 error = %e,
711 "S4 lifecycle: list_objects_v2 failed; skipping bucket for this scan",
712 );
713 report.action_errors = report.action_errors.saturating_add(1);
714 return;
715 }
716 };
717 let output = resp.output;
718 let contents = output.contents.unwrap_or_default();
719 for obj in &contents {
720 let Some(key) = obj.key.as_deref() else {
721 continue;
722 };
723 // Filter out S4-internal sidecars / shadow versions early so
724 // the lifecycle scanner mirrors the same "client-visible
725 // object set" the customer sees through `list_objects_v2`.
726 // (The S4Service.list_objects_v2 handler already drops them
727 // before returning, but this is a belt-and-braces guard for
728 // any future bypass that builds the list elsewhere.)
729 if key.ends_with(".s4index") {
730 continue;
731 }
732 report.objects_evaluated = report.objects_evaluated.saturating_add(1);
733 let size = obj.size.unwrap_or(0).max(0) as u64;
734 let age = match obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc) {
735 Some(lm) => now.signed_duration_since(lm),
736 None => Duration::zero(),
737 };
738 let tags: Vec<(String, String)> = s4
739 .as_ref()
740 .tag_manager()
741 .and_then(|m| m.get_object_tags(bucket, key))
742 .map(|set| set.iter().cloned().collect())
743 .unwrap_or_default();
744 let Some(action) = mgr.evaluate(bucket, key, age, size, &tags) else {
745 continue;
746 };
747 // Object-Lock-protected objects are skipped before any
748 // backend-mutating call. Lock wins over Lifecycle, full
749 // stop — matches AWS behaviour where an Expiration on a
750 // locked object is dropped, not retried.
751 if let Some(lock_mgr) = s4.as_ref().object_lock_manager()
752 && let Some(state) = lock_mgr.get(bucket, key)
753 && state.is_locked(now)
754 {
755 report.skipped_locked = report.skipped_locked.saturating_add(1);
756 continue;
757 }
758 match action {
759 LifecycleAction::Expire => match execute_expire(s4, bucket, key).await {
760 Ok(()) => {
761 mgr.record_action(bucket, &LifecycleAction::Expire);
762 report.expired = report.expired.saturating_add(1);
763 }
764 Err(e) => {
765 warn!(
766 bucket = %bucket,
767 key = %key,
768 error = %e,
769 "S4 lifecycle: Expire action failed",
770 );
771 report.action_errors = report.action_errors.saturating_add(1);
772 }
773 },
774 LifecycleAction::Transition { storage_class } => {
775 match execute_transition(s4, bucket, key, &storage_class).await {
776 Ok(()) => {
777 mgr.record_action(
778 bucket,
779 &LifecycleAction::Transition {
780 storage_class: storage_class.clone(),
781 },
782 );
783 report.transitioned = report.transitioned.saturating_add(1);
784 }
785 Err(e) => {
786 warn!(
787 bucket = %bucket,
788 key = %key,
789 storage_class = %storage_class,
790 error = %e,
791 "S4 lifecycle: Transition action failed",
792 );
793 report.action_errors = report.action_errors.saturating_add(1);
794 }
795 }
796 }
797 }
798 }
799 if output.is_truncated.unwrap_or(false) {
800 continuation = output.next_continuation_token;
801 if continuation.is_none() {
802 // Defensive: AWS guarantees a NextContinuationToken when
803 // is_truncated=true, but a malformed backend could omit
804 // it; break to avoid an infinite loop.
805 break;
806 }
807 } else {
808 break;
809 }
810 }
811}
812
813/// Issue `delete_object` against the wrapped `S4Service`. The handler in
814/// `service.rs` runs the WORM check itself, so even if the scanner's
815/// pre-check missed (race with an MFA-Delete put-bucket-versioning), the
816/// backend refuses the delete with `AccessDenied` and the error path
817/// above bumps `action_errors` rather than silently losing data.
818async fn execute_expire<B: S3 + Send + Sync + 'static>(
819 s4: &Arc<crate::S4Service<B>>,
820 bucket: &str,
821 key: &str,
822) -> Result<(), String> {
823 let input = DeleteObjectInput {
824 bucket: bucket.to_owned(),
825 key: key.to_owned(),
826 ..Default::default()
827 };
828 let req = synthetic_request(
829 input,
830 http::Method::DELETE,
831 &format!("/{bucket}/{key}"),
832 );
833 s4.as_ref()
834 .delete_object(req)
835 .await
836 .map(|_| ())
837 .map_err(|e| format!("{e}"))
838}
839
840/// Rewrite the object's storage class via a same-key `copy_object` with
841/// `MetadataDirective: COPY` (preserves user metadata) and the new
842/// `storage_class`. Backends that ignore storage-class headers
843/// effectively no-op; the counter still records the attempt so dashboards
844/// reflect the scanner's intent.
845async fn execute_transition<B: S3 + Send + Sync + 'static>(
846 s4: &Arc<crate::S4Service<B>>,
847 bucket: &str,
848 key: &str,
849 storage_class: &str,
850) -> Result<(), String> {
851 // CopyObjectInput has dozens of `Option` fields plus three required
852 // (bucket / key / copy_source); the s3s-generated `builder()` is
853 // the path that fills the optional ones with `None` for us. The
854 // `set_*` setters return `&mut Self`, so we drive them in
855 // statement form rather than as a method chain.
856 let mut builder = CopyObjectInput::builder();
857 builder.set_bucket(bucket.to_owned());
858 builder.set_key(key.to_owned());
859 builder.set_copy_source(CopySource::Bucket {
860 bucket: bucket.to_owned().into_boxed_str(),
861 key: key.to_owned().into_boxed_str(),
862 version_id: None,
863 });
864 builder.set_metadata_directive(Some(MetadataDirective::from_static(MetadataDirective::COPY)));
865 builder.set_storage_class(Some(StorageClass::from(storage_class.to_owned())));
866 let input = builder
867 .build()
868 .map_err(|e| format!("CopyObjectInput build: {e}"))?;
869 let req = synthetic_request(
870 input,
871 http::Method::PUT,
872 &format!("/{bucket}/{key}"),
873 );
874 s4.as_ref()
875 .copy_object(req)
876 .await
877 .map(|_| ())
878 .map_err(|e| format!("{e}"))
879}
880
881#[cfg(test)]
882mod tests {
883 use super::*;
884
885 fn enabled(rule: LifecycleRule) -> LifecycleRule {
886 LifecycleRule {
887 status: LifecycleStatus::Enabled,
888 ..rule
889 }
890 }
891
892 fn cfg_with(rules: Vec<LifecycleRule>) -> LifecycleConfig {
893 LifecycleConfig { rules }
894 }
895
896 fn manager_with(bucket: &str, rules: Vec<LifecycleRule>) -> LifecycleManager {
897 let m = LifecycleManager::new();
898 m.put(bucket, cfg_with(rules));
899 m
900 }
901
902 #[test]
903 fn evaluate_age_past_expiration_returns_expire() {
904 let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
905 let action = m.evaluate("b", "k", Duration::days(31), 100, &[]);
906 assert_eq!(action, Some(LifecycleAction::Expire));
907 }
908
909 #[test]
910 fn evaluate_age_before_expiration_returns_none() {
911 let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
912 let action = m.evaluate("b", "k", Duration::days(5), 100, &[]);
913 assert_eq!(action, None);
914 }
915
916 #[test]
917 fn evaluate_prefix_filter_matches() {
918 let mut rule = LifecycleRule::expire_after_days("r", 1);
919 rule.filter.prefix = Some("logs/".into());
920 let m = manager_with("b", vec![rule]);
921 assert_eq!(
922 m.evaluate("b", "logs/2026/a.log", Duration::days(2), 1, &[]),
923 Some(LifecycleAction::Expire)
924 );
925 assert_eq!(
926 m.evaluate("b", "data/keep.bin", Duration::days(2), 1, &[]),
927 None
928 );
929 }
930
931 #[test]
932 fn evaluate_tag_filter_requires_all_tags_to_match() {
933 let mut rule = LifecycleRule::expire_after_days("r", 1);
934 rule.filter.tags = vec![
935 ("env".into(), "dev".into()),
936 ("expirable".into(), "yes".into()),
937 ];
938 let m = manager_with("b", vec![rule]);
939 // All tags present + matching → fire.
940 assert_eq!(
941 m.evaluate(
942 "b",
943 "k",
944 Duration::days(2),
945 1,
946 &[
947 ("env".into(), "dev".into()),
948 ("expirable".into(), "yes".into()),
949 ("owner".into(), "alice".into()),
950 ]
951 ),
952 Some(LifecycleAction::Expire)
953 );
954 // One tag missing → no fire.
955 assert_eq!(
956 m.evaluate(
957 "b",
958 "k",
959 Duration::days(2),
960 1,
961 &[("env".into(), "dev".into())]
962 ),
963 None
964 );
965 // Tag present but with the wrong value → no fire.
966 assert_eq!(
967 m.evaluate(
968 "b",
969 "k",
970 Duration::days(2),
971 1,
972 &[
973 ("env".into(), "prod".into()),
974 ("expirable".into(), "yes".into()),
975 ]
976 ),
977 None
978 );
979 }
980
981 #[test]
982 fn evaluate_size_filters_gate_action() {
983 let mut rule = LifecycleRule::expire_after_days("r", 1);
984 rule.filter.object_size_greater_than = Some(1024);
985 rule.filter.object_size_less_than = Some(10 * 1024);
986 let m = manager_with("b", vec![rule]);
987 // Inside the (1024, 10*1024) range → fire.
988 assert_eq!(
989 m.evaluate("b", "k", Duration::days(2), 4096, &[]),
990 Some(LifecycleAction::Expire)
991 );
992 // At the boundary (size == greater_than) → strict `>`, no fire.
993 assert_eq!(m.evaluate("b", "k", Duration::days(2), 1024, &[]), None);
994 // Above the upper bound → no fire.
995 assert_eq!(
996 m.evaluate("b", "k", Duration::days(2), 100 * 1024, &[]),
997 None
998 );
999 }
1000
1001 #[test]
1002 fn evaluate_transition_fires_before_expiration() {
1003 // Transition at 30d, expiration at 365d, age 60d → transition.
1004 let rule = enabled(LifecycleRule {
1005 id: "r".into(),
1006 status: LifecycleStatus::Enabled,
1007 filter: LifecycleFilter::default(),
1008 expiration_days: Some(365),
1009 expiration_date: None,
1010 transitions: vec![TransitionRule {
1011 days: 30,
1012 storage_class: "GLACIER_IR".into(),
1013 }],
1014 noncurrent_version_expiration_days: None,
1015 abort_incomplete_multipart_upload_days: None,
1016 });
1017 let m = manager_with("b", vec![rule]);
1018 let action = m.evaluate("b", "k", Duration::days(60), 1, &[]);
1019 assert_eq!(
1020 action,
1021 Some(LifecycleAction::Transition {
1022 storage_class: "GLACIER_IR".into(),
1023 })
1024 );
1025 }
1026
1027 #[test]
1028 fn evaluate_expiration_wins_when_threshold_is_earlier_than_transition() {
1029 // Expiration at 30d, transition at 90d, age 100d → expire (the
1030 // rule wants the object gone *before* it would have transitioned).
1031 let rule = enabled(LifecycleRule {
1032 id: "r".into(),
1033 status: LifecycleStatus::Enabled,
1034 filter: LifecycleFilter::default(),
1035 expiration_days: Some(30),
1036 expiration_date: None,
1037 transitions: vec![TransitionRule {
1038 days: 90,
1039 storage_class: "GLACIER".into(),
1040 }],
1041 noncurrent_version_expiration_days: None,
1042 abort_incomplete_multipart_upload_days: None,
1043 });
1044 let m = manager_with("b", vec![rule]);
1045 let action = m.evaluate("b", "k", Duration::days(100), 1, &[]);
1046 assert_eq!(action, Some(LifecycleAction::Expire));
1047 }
1048
1049 #[test]
1050 fn evaluate_disabled_rule_never_fires() {
1051 let mut rule = LifecycleRule::expire_after_days("r", 1);
1052 rule.status = LifecycleStatus::Disabled;
1053 let m = manager_with("b", vec![rule]);
1054 assert_eq!(m.evaluate("b", "k", Duration::days(365), 1, &[]), None);
1055 }
1056
1057 #[test]
1058 fn evaluate_unknown_bucket_returns_none() {
1059 let m = LifecycleManager::new();
1060 assert_eq!(m.evaluate("ghost", "k", Duration::days(365), 1, &[]), None);
1061 }
1062
1063 #[test]
1064 fn evaluate_noncurrent_version_expiration() {
1065 let rule = enabled(LifecycleRule {
1066 id: "r".into(),
1067 status: LifecycleStatus::Enabled,
1068 filter: LifecycleFilter::default(),
1069 expiration_days: None,
1070 expiration_date: None,
1071 transitions: vec![],
1072 noncurrent_version_expiration_days: Some(7),
1073 abort_incomplete_multipart_upload_days: None,
1074 });
1075 let m = manager_with("b", vec![rule]);
1076 // current-version path → no rule matches (no expiration_days set).
1077 assert_eq!(m.evaluate("b", "k", Duration::days(30), 1, &[]), None);
1078 // noncurrent path with age past 7d → expire.
1079 let action = m.evaluate_with_flags(
1080 "b",
1081 "k",
1082 Duration::days(8),
1083 1,
1084 &[],
1085 EvaluateFlags {
1086 is_noncurrent: true,
1087 now: None,
1088 },
1089 );
1090 assert_eq!(action, Some(LifecycleAction::Expire));
1091 // noncurrent path with age before 7d → no fire.
1092 let action = m.evaluate_with_flags(
1093 "b",
1094 "k",
1095 Duration::days(3),
1096 1,
1097 &[],
1098 EvaluateFlags {
1099 is_noncurrent: true,
1100 now: None,
1101 },
1102 );
1103 assert_eq!(action, None);
1104 }
1105
1106 #[test]
1107 fn evaluate_batch_distributes_actions_across_object_ages() {
1108 // Transition at 30d, expiration at 60d. Conflict resolver picks
1109 // expire iff `exp_days <= trans_days` for the chosen transition.
1110 // With exp=60, trans=30: at age 40-59 the transition fires; at
1111 // age >= 60 expiration wins (because exp_days=60 <= trans_days=30
1112 // is false, so... wait — re-read: the resolver compares
1113 // exp_threshold (60) vs trans_threshold (30) and triggers expire
1114 // ONLY when 60 <= 30, which is false → transition keeps winning
1115 // until both thresholds met but exp <= trans). For exp=60 trans=30
1116 // pair, transition always wins regardless of age (rule pattern is
1117 // "transition first, expire later" — the next scanner pass
1118 // picks up the expiration). So expect 4 transitions.
1119 let rule = enabled(LifecycleRule {
1120 id: "r".into(),
1121 status: LifecycleStatus::Enabled,
1122 filter: LifecycleFilter::default(),
1123 expiration_days: Some(60),
1124 expiration_date: None,
1125 transitions: vec![TransitionRule {
1126 days: 30,
1127 storage_class: "STANDARD_IA".into(),
1128 }],
1129 noncurrent_version_expiration_days: None,
1130 abort_incomplete_multipart_upload_days: None,
1131 });
1132 let m = manager_with("b", vec![rule]);
1133 let objects = vec![
1134 ("young".to_string(), Duration::days(10), 1u64, vec![]),
1135 ("middle".to_string(), Duration::days(40), 1u64, vec![]),
1136 ("middle2".to_string(), Duration::days(45), 1u64, vec![]),
1137 ("old".to_string(), Duration::days(90), 1u64, vec![]),
1138 ("ancient".to_string(), Duration::days(365), 1u64, vec![]),
1139 ];
1140 let actions = evaluate_batch(&m, "b", &objects);
1141 assert_eq!(actions.len(), 4);
1142 for (_, a) in &actions {
1143 assert!(matches!(a, LifecycleAction::Transition { .. }));
1144 }
1145 }
1146
1147 #[test]
1148 fn json_round_trip_preserves_rules() {
1149 let rule = enabled(LifecycleRule {
1150 id: "complex".into(),
1151 status: LifecycleStatus::Enabled,
1152 filter: LifecycleFilter {
1153 prefix: Some("logs/".into()),
1154 tags: vec![("env".into(), "prod".into())],
1155 object_size_greater_than: Some(1024),
1156 object_size_less_than: None,
1157 },
1158 expiration_days: Some(365),
1159 expiration_date: None,
1160 transitions: vec![TransitionRule {
1161 days: 30,
1162 storage_class: "STANDARD_IA".into(),
1163 }],
1164 noncurrent_version_expiration_days: Some(7),
1165 abort_incomplete_multipart_upload_days: Some(3),
1166 });
1167 let m = manager_with("b1", vec![rule.clone()]);
1168 let json = m.to_json().expect("to_json");
1169 let m2 = LifecycleManager::from_json(&json).expect("from_json");
1170 let cfg = m2.get("b1").expect("bucket survives roundtrip");
1171 assert_eq!(cfg.rules.len(), 1);
1172 assert_eq!(cfg.rules[0], rule);
1173 }
1174
1175 #[test]
1176 fn lifecycle_config_default_is_empty() {
1177 let cfg = LifecycleConfig::default();
1178 assert!(cfg.rules.is_empty());
1179 }
1180
1181 #[test]
1182 fn evaluate_batch_skips_locked_objects_at_caller_layer() {
1183 // The evaluator itself does not consult ObjectLock; the scanner
1184 // (and tests) are expected to filter locked keys out before /
1185 // after calling `evaluate_batch`. This test documents the
1186 // canonical pattern.
1187 let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 1)]);
1188 let objects = vec![
1189 ("locked".to_string(), Duration::days(30), 1u64, vec![]),
1190 ("free".to_string(), Duration::days(30), 1u64, vec![]),
1191 ];
1192 let locked_keys: std::collections::HashSet<&str> = ["locked"].into_iter().collect();
1193 let raw = evaluate_batch(&m, "b", &objects);
1194 let filtered: Vec<_> = raw
1195 .into_iter()
1196 .filter(|(k, _)| !locked_keys.contains(k.as_str()))
1197 .collect();
1198 assert_eq!(filtered.len(), 1);
1199 assert_eq!(filtered[0].0, "free");
1200 }
1201
1202 #[test]
1203 fn record_action_bumps_per_bucket_counter() {
1204 let m = LifecycleManager::new();
1205 m.record_action("b", &LifecycleAction::Expire);
1206 m.record_action("b", &LifecycleAction::Expire);
1207 m.record_action(
1208 "b",
1209 &LifecycleAction::Transition {
1210 storage_class: "GLACIER".into(),
1211 },
1212 );
1213 let snap = m.actions_snapshot();
1214 assert_eq!(snap.get(&("b".into(), "expire".into())).copied(), Some(2));
1215 assert_eq!(
1216 snap.get(&("b".into(), "transition".into())).copied(),
1217 Some(1)
1218 );
1219 }
1220
1221 // ---- v0.7 #45: scanner runner tests --------------------------------
1222 //
1223 // These tests stand up an in-memory `S4Service` over a tiny
1224 // `ScannerMemBackend` (separate from the larger `MemoryBackend` in
1225 // `tests/roundtrip.rs` so this module stays self-contained). The
1226 // backend implements only the four `S3` methods the scanner touches:
1227 // `put_object`, `head_object`, `delete_object`, `list_objects_v2`.
1228 // Tags are exercised via the optional `with_tagging(...)` manager.
1229 //
1230 // Object age is faked by setting an `expire_after_days(0)` rule, so
1231 // any object whose backend-recorded `last_modified` is at or before
1232 // "now" matches — sidesteps the `head_object`/`Timestamp` parsing
1233 // entirely (and matches the canonical "operator just put the bucket
1234 // on aggressive expiration" test path).
1235
1236 use std::collections::HashMap;
1237 use std::sync::Mutex as StdMutex;
1238
1239 use bytes::Bytes;
1240 use s3s::dto as dto2;
1241 use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
1242 use s4_codec::dispatcher::AlwaysDispatcher;
1243 use s4_codec::passthrough::Passthrough;
1244 use s4_codec::{CodecKind, CodecRegistry};
1245
1246 use crate::S4Service;
1247 use crate::object_lock::{LockMode, ObjectLockManager, ObjectLockState};
1248
1249 #[derive(Default)]
1250 struct ScannerMemBackend {
1251 objects: StdMutex<HashMap<(String, String), ScannerStored>>,
1252 }
1253
1254 #[derive(Clone)]
1255 struct ScannerStored {
1256 body: Bytes,
1257 last_modified: dto2::Timestamp,
1258 }
1259
1260 impl ScannerMemBackend {
1261 fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
1262 self.objects.lock().unwrap().insert(
1263 (bucket.to_owned(), key.to_owned()),
1264 ScannerStored {
1265 body,
1266 last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
1267 },
1268 );
1269 }
1270 }
1271
1272 #[async_trait::async_trait]
1273 impl S3 for ScannerMemBackend {
1274 async fn put_object(
1275 &self,
1276 req: S3Request<dto2::PutObjectInput>,
1277 ) -> S3Result<S3Response<dto2::PutObjectOutput>> {
1278 self.put_now(&req.input.bucket, &req.input.key, Bytes::new());
1279 Ok(S3Response::new(dto2::PutObjectOutput::default()))
1280 }
1281
1282 async fn head_object(
1283 &self,
1284 req: S3Request<dto2::HeadObjectInput>,
1285 ) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
1286 let key = (req.input.bucket.clone(), req.input.key.clone());
1287 let lock = self.objects.lock().unwrap();
1288 let stored = lock
1289 .get(&key)
1290 .ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
1291 Ok(S3Response::new(dto2::HeadObjectOutput {
1292 content_length: Some(stored.body.len() as i64),
1293 last_modified: Some(stored.last_modified.clone()),
1294 ..Default::default()
1295 }))
1296 }
1297
1298 async fn delete_object(
1299 &self,
1300 req: S3Request<dto2::DeleteObjectInput>,
1301 ) -> S3Result<S3Response<dto2::DeleteObjectOutput>> {
1302 let key = (req.input.bucket.clone(), req.input.key.clone());
1303 self.objects.lock().unwrap().remove(&key);
1304 Ok(S3Response::new(dto2::DeleteObjectOutput::default()))
1305 }
1306
1307 async fn list_objects_v2(
1308 &self,
1309 req: S3Request<dto2::ListObjectsV2Input>,
1310 ) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
1311 let prefix = req.input.bucket.clone();
1312 let lock = self.objects.lock().unwrap();
1313 let mut contents: Vec<dto2::Object> = lock
1314 .iter()
1315 .filter(|((b, _), _)| b == &prefix)
1316 .map(|((_, k), v)| dto2::Object {
1317 key: Some(k.clone()),
1318 size: Some(v.body.len() as i64),
1319 last_modified: Some(v.last_modified.clone()),
1320 ..Default::default()
1321 })
1322 .collect();
1323 contents.sort_by(|a, b| a.key.cmp(&b.key));
1324 let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
1325 Ok(S3Response::new(dto2::ListObjectsV2Output {
1326 name: Some(prefix),
1327 contents: Some(contents),
1328 key_count: Some(key_count),
1329 is_truncated: Some(false),
1330 ..Default::default()
1331 }))
1332 }
1333
1334 async fn copy_object(
1335 &self,
1336 _req: S3Request<dto2::CopyObjectInput>,
1337 ) -> S3Result<S3Response<dto2::CopyObjectOutput>> {
1338 // Transition path: scanner copies same-key with new
1339 // storage_class. The mem backend doesn't track storage
1340 // class, so it's a no-op success — exactly the AWS-side
1341 // behaviour for a backend that ignores the field.
1342 Ok(S3Response::new(dto2::CopyObjectOutput::default()))
1343 }
1344 }
1345
1346 fn make_service() -> Arc<S4Service<ScannerMemBackend>> {
1347 let registry = Arc::new(
1348 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1349 );
1350 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1351 Arc::new(S4Service::new(
1352 ScannerMemBackend::default(),
1353 registry,
1354 dispatcher,
1355 ))
1356 }
1357
1358 #[tokio::test]
1359 async fn run_scan_once_no_lifecycle_manager_returns_empty_report() {
1360 // Service has no lifecycle manager attached — scanner must
1361 // no-op cleanly (operator might not have set
1362 // `--lifecycle-state-file`). Also covers the empty-buckets
1363 // path in `run_scan_once`.
1364 let s4 = make_service();
1365 let report = run_scan_once(&s4).await.expect("scan");
1366 assert_eq!(report, ScanReport::default());
1367
1368 // And: lifecycle manager attached but no buckets configured.
1369 let mgr = Arc::new(LifecycleManager::new());
1370 let backend = ScannerMemBackend::default();
1371 let registry = Arc::new(
1372 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1373 );
1374 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1375 let s4_empty = Arc::new(
1376 S4Service::new(backend, registry, dispatcher).with_lifecycle(mgr),
1377 );
1378 let report = run_scan_once(&s4_empty).await.expect("scan empty");
1379 assert_eq!(report, ScanReport::default());
1380 }
1381
1382 #[tokio::test]
1383 async fn run_scan_once_expires_matching_objects_via_backend() {
1384 // Three objects: only "stale.log" matches the rule (prefix
1385 // gating). The other two are written but not under the prefix,
1386 // so the evaluator returns None for them.
1387 let backend = ScannerMemBackend::default();
1388 backend.put_now("b", "stale.log", Bytes::from_static(b"x"));
1389 backend.put_now("b", "data/keep1.bin", Bytes::from_static(b"y"));
1390 backend.put_now("b", "data/keep2.bin", Bytes::from_static(b"z"));
1391 // Rule: any object under `stale.` prefix is expired immediately
1392 // (`expire_after_days(0)` matches age >= 0d, which is every
1393 // backend object).
1394 let mgr = Arc::new(LifecycleManager::new());
1395 let mut rule = LifecycleRule::expire_after_days("r", 0);
1396 rule.filter.prefix = Some("stale.".into());
1397 mgr.put("b", LifecycleConfig { rules: vec![rule] });
1398 let registry = Arc::new(
1399 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1400 );
1401 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1402 let s4 = Arc::new(
1403 S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
1404 );
1405
1406 let report = run_scan_once(&s4).await.expect("scan");
1407 assert_eq!(report.buckets_scanned, 1);
1408 assert_eq!(report.objects_evaluated, 3);
1409 assert_eq!(report.expired, 1);
1410 assert_eq!(report.transitioned, 0);
1411 assert_eq!(report.skipped_locked, 0);
1412 assert_eq!(report.action_errors, 0);
1413 // Backend post-condition: the matching key is gone, the others
1414 // remain. Read back through the service's own list_objects_v2
1415 // path (which is also what the customer-visible HTTP layer
1416 // serves) so we exercise the same code the scanner walked.
1417 let req = synthetic_request(
1418 ListObjectsV2Input {
1419 bucket: "b".into(),
1420 ..Default::default()
1421 },
1422 http::Method::GET,
1423 "/b?list-type=2",
1424 );
1425 let resp = s4
1426 .as_ref()
1427 .list_objects_v2(req)
1428 .await
1429 .expect("post-scan list");
1430 let keys: Vec<String> = resp
1431 .output
1432 .contents
1433 .unwrap_or_default()
1434 .into_iter()
1435 .filter_map(|o| o.key)
1436 .collect();
1437 assert!(!keys.contains(&"stale.log".to_string()));
1438 assert!(keys.contains(&"data/keep1.bin".to_string()));
1439 assert!(keys.contains(&"data/keep2.bin".to_string()));
1440 // Lifecycle action counter: one Expire bumped on bucket "b".
1441 let snap = mgr.actions_snapshot();
1442 assert_eq!(
1443 snap.get(&("b".into(), "expire".into())).copied(),
1444 Some(1)
1445 );
1446 }
1447
1448 #[tokio::test]
1449 async fn run_scan_once_skips_object_lock_protected_keys() {
1450 let backend = ScannerMemBackend::default();
1451 backend.put_now("b", "locked.log", Bytes::from_static(b"x"));
1452 backend.put_now("b", "free.log", Bytes::from_static(b"y"));
1453 let registry = Arc::new(
1454 CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
1455 );
1456 let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
1457 let mgr = Arc::new(LifecycleManager::new());
1458 // Aggressive: every object expires immediately.
1459 mgr.put(
1460 "b",
1461 LifecycleConfig {
1462 rules: vec![LifecycleRule::expire_after_days("r", 0)],
1463 },
1464 );
1465 let lock_mgr = Arc::new(ObjectLockManager::new());
1466 // Lock retains "locked.log" until the year 2099 — Compliance
1467 // mode means even Governance bypass cannot delete it.
1468 let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
1469 .expect("parse")
1470 .with_timezone(&Utc);
1471 lock_mgr.set(
1472 "b",
1473 "locked.log",
1474 ObjectLockState {
1475 mode: Some(LockMode::Compliance),
1476 retain_until: Some(retain_until),
1477 legal_hold_on: false,
1478 },
1479 );
1480 let s4 = Arc::new(
1481 S4Service::new(backend, registry, dispatcher)
1482 .with_lifecycle(Arc::clone(&mgr))
1483 .with_object_lock(Arc::clone(&lock_mgr)),
1484 );
1485
1486 let report = run_scan_once(&s4).await.expect("scan");
1487 assert_eq!(report.buckets_scanned, 1);
1488 assert_eq!(report.objects_evaluated, 2);
1489 assert_eq!(report.expired, 1, "free.log should have been expired");
1490 assert_eq!(report.skipped_locked, 1, "locked.log must be skipped");
1491 assert_eq!(report.action_errors, 0);
1492 }
1493
1494}