use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use chrono::{DateTime, Duration, Utc};
use s3s::S3;
use s3s::S3Request;
use s3s::dto::*;
use serde::{Deserialize, Serialize};
use tracing::warn;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum LifecycleStatus {
Enabled,
Disabled,
}
impl LifecycleStatus {
#[must_use]
pub fn as_aws_str(self) -> &'static str {
match self {
Self::Enabled => "Enabled",
Self::Disabled => "Disabled",
}
}
#[must_use]
pub fn from_aws_str(s: &str) -> Self {
if s.eq_ignore_ascii_case("Enabled") {
Self::Enabled
} else {
Self::Disabled
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct LifecycleFilter {
#[serde(default)]
pub prefix: Option<String>,
#[serde(default)]
pub tags: Vec<(String, String)>,
#[serde(default)]
pub object_size_greater_than: Option<u64>,
#[serde(default)]
pub object_size_less_than: Option<u64>,
}
impl LifecycleFilter {
#[must_use]
pub fn matches(&self, key: &str, size: u64, object_tags: &[(String, String)]) -> bool {
if let Some(p) = &self.prefix
&& !key.starts_with(p)
{
return false;
}
if let Some(min) = self.object_size_greater_than
&& size <= min
{
return false;
}
if let Some(max) = self.object_size_less_than
&& size >= max
{
return false;
}
for (tk, tv) in &self.tags {
let matched = object_tags.iter().any(|(ok, ov)| ok == tk && ov == tv);
if !matched {
return false;
}
}
true
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct TransitionRule {
pub days: u32,
pub storage_class: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LifecycleRule {
pub id: String,
pub status: LifecycleStatus,
#[serde(default)]
pub filter: LifecycleFilter,
#[serde(default)]
pub expiration_days: Option<u32>,
#[serde(default)]
pub expiration_date: Option<DateTime<Utc>>,
#[serde(default)]
pub transitions: Vec<TransitionRule>,
#[serde(default)]
pub noncurrent_version_expiration_days: Option<u32>,
#[serde(default)]
pub abort_incomplete_multipart_upload_days: Option<u32>,
}
impl LifecycleRule {
#[must_use]
pub fn expire_after_days(id: impl Into<String>, days: u32) -> Self {
Self {
id: id.into(),
status: LifecycleStatus::Enabled,
filter: LifecycleFilter::default(),
expiration_days: Some(days),
expiration_date: None,
transitions: Vec::new(),
noncurrent_version_expiration_days: None,
abort_incomplete_multipart_upload_days: None,
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct LifecycleConfig {
pub rules: Vec<LifecycleRule>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum LifecycleAction {
Expire,
Transition { storage_class: String },
}
impl LifecycleAction {
#[must_use]
pub fn metric_label(&self) -> &'static str {
match self {
Self::Expire => "expire",
Self::Transition { .. } => "transition",
}
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct LifecycleSnapshot {
by_bucket: HashMap<String, LifecycleConfig>,
}
#[derive(Debug, Default)]
pub struct LifecycleManager {
by_bucket: RwLock<HashMap<String, LifecycleConfig>>,
actions_total: RwLock<HashMap<(String, String), u64>>,
}
impl LifecycleManager {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn put(&self, bucket: &str, config: LifecycleConfig) {
self.by_bucket
.write()
.expect("lifecycle state RwLock poisoned")
.insert(bucket.to_owned(), config);
}
#[must_use]
pub fn get(&self, bucket: &str) -> Option<LifecycleConfig> {
self.by_bucket
.read()
.expect("lifecycle state RwLock poisoned")
.get(bucket)
.cloned()
}
pub fn delete(&self, bucket: &str) {
self.by_bucket
.write()
.expect("lifecycle state RwLock poisoned")
.remove(bucket);
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
let by_bucket = self
.by_bucket
.read()
.expect("lifecycle state RwLock poisoned")
.clone();
let snap = LifecycleSnapshot { by_bucket };
serde_json::to_string(&snap)
}
pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
let snap: LifecycleSnapshot = serde_json::from_str(s)?;
Ok(Self {
by_bucket: RwLock::new(snap.by_bucket),
actions_total: RwLock::new(HashMap::new()),
})
}
#[must_use]
pub fn evaluate(
&self,
bucket: &str,
key: &str,
object_age: Duration,
object_size: u64,
object_tags: &[(String, String)],
) -> Option<LifecycleAction> {
self.evaluate_with_flags(
bucket,
key,
object_age,
object_size,
object_tags,
EvaluateFlags::default(),
)
}
#[must_use]
pub fn evaluate_with_flags(
&self,
bucket: &str,
key: &str,
object_age: Duration,
object_size: u64,
object_tags: &[(String, String)],
flags: EvaluateFlags,
) -> Option<LifecycleAction> {
let cfg = self.get(bucket)?;
let now_for_date = flags.now.unwrap_or_else(Utc::now);
let age_days = object_age.num_days().max(0);
let age_days_u32 = u32::try_from(age_days).unwrap_or(u32::MAX);
for rule in &cfg.rules {
if rule.status != LifecycleStatus::Enabled {
continue;
}
if !rule.filter.matches(key, object_size, object_tags) {
continue;
}
if flags.is_noncurrent {
if let Some(days) = rule.noncurrent_version_expiration_days
&& age_days_u32 >= days
{
return Some(LifecycleAction::Expire);
}
continue;
}
let exp_days_match = rule.expiration_days.filter(|d| age_days_u32 >= *d);
let exp_date_match = rule.expiration_date.filter(|d| now_for_date >= *d);
let chosen_transition = rule
.transitions
.iter()
.filter(|t| age_days_u32 >= t.days)
.max_by_key(|t| t.days);
if let Some(exp_threshold) = exp_days_match {
let trans_threshold = chosen_transition.map(|t| t.days).unwrap_or(u32::MAX);
if exp_threshold <= trans_threshold {
return Some(LifecycleAction::Expire);
}
}
if let Some(t) = chosen_transition {
return Some(LifecycleAction::Transition {
storage_class: t.storage_class.clone(),
});
}
if exp_date_match.is_some() {
return Some(LifecycleAction::Expire);
}
}
None
}
pub fn record_action(&self, bucket: &str, action: &LifecycleAction) {
let label = action.metric_label();
let key = (bucket.to_owned(), label.to_owned());
let mut guard = self
.actions_total
.write()
.expect("lifecycle actions counter RwLock poisoned");
let entry = guard.entry(key).or_insert(0);
*entry = entry.saturating_add(1);
crate::metrics::record_lifecycle_action(bucket, label);
}
#[must_use]
pub fn actions_snapshot(&self) -> HashMap<(String, String), u64> {
self.actions_total
.read()
.expect("lifecycle actions counter RwLock poisoned")
.clone()
}
#[must_use]
pub fn buckets(&self) -> Vec<String> {
let map = self
.by_bucket
.read()
.expect("lifecycle state RwLock poisoned");
let mut out: Vec<String> = map.keys().cloned().collect();
out.sort();
out
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct EvaluateFlags {
pub is_noncurrent: bool,
pub now: Option<DateTime<Utc>>,
}
pub type EvaluateBatchEntry = (String, Duration, u64, Vec<(String, String)>);
#[must_use]
pub fn evaluate_batch(
manager: &LifecycleManager,
bucket: &str,
objects: &[EvaluateBatchEntry],
) -> Vec<(String, LifecycleAction)> {
let mut out = Vec::with_capacity(objects.len());
for (key, age, size, tags) in objects {
if let Some(action) = manager.evaluate(bucket, key, *age, *size, tags) {
out.push((key.clone(), action));
}
}
out
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct ScanReport {
pub buckets_scanned: usize,
pub objects_evaluated: usize,
pub expired: usize,
pub transitioned: usize,
pub skipped_locked: usize,
pub action_errors: usize,
}
fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<DateTime<Utc>> {
let mut buf = Vec::new();
ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
let s = std::str::from_utf8(&buf).ok()?;
chrono::DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
}
fn synthetic_request<T>(input: T, method: http::Method, uri_path: &str) -> S3Request<T> {
S3Request {
input,
method,
uri: uri_path.parse().unwrap_or_else(|_| "/".parse().expect("/")),
headers: http::HeaderMap::new(),
extensions: http::Extensions::new(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
}
}
pub async fn run_scan_once<B: S3 + Send + Sync + 'static>(
s4: &Arc<crate::S4Service<B>>,
) -> Result<ScanReport, String> {
let Some(mgr) = s4.lifecycle_manager().cloned() else {
return Ok(ScanReport::default());
};
let buckets = mgr.buckets();
if buckets.is_empty() {
return Ok(ScanReport::default());
}
let now = Utc::now();
let mut report = ScanReport {
buckets_scanned: buckets.len(),
..ScanReport::default()
};
for bucket in buckets {
scan_bucket(s4, &mgr, &bucket, now, &mut report).await;
}
Ok(report)
}
async fn scan_bucket<B: S3 + Send + Sync + 'static>(
s4: &Arc<crate::S4Service<B>>,
mgr: &Arc<LifecycleManager>,
bucket: &str,
now: DateTime<Utc>,
report: &mut ScanReport,
) {
let mut continuation: Option<String> = None;
loop {
let list_input = ListObjectsV2Input {
bucket: bucket.to_owned(),
continuation_token: continuation.clone(),
..Default::default()
};
let list_req = synthetic_request(
list_input,
http::Method::GET,
&format!("/{bucket}?list-type=2"),
);
let resp = match s4.as_ref().list_objects_v2(list_req).await {
Ok(r) => r,
Err(e) => {
warn!(
bucket = %bucket,
error = %e,
"S4 lifecycle: list_objects_v2 failed; skipping bucket for this scan",
);
report.action_errors = report.action_errors.saturating_add(1);
return;
}
};
let output = resp.output;
let contents = output.contents.unwrap_or_default();
for obj in &contents {
let Some(key) = obj.key.as_deref() else {
continue;
};
if key.ends_with(".s4index") {
continue;
}
report.objects_evaluated = report.objects_evaluated.saturating_add(1);
let size = obj.size.unwrap_or(0).max(0) as u64;
let age = match obj.last_modified.as_ref().and_then(timestamp_to_chrono_utc) {
Some(lm) => now.signed_duration_since(lm),
None => Duration::zero(),
};
let tags: Vec<(String, String)> = s4
.as_ref()
.tag_manager()
.and_then(|m| m.get_object_tags(bucket, key))
.map(|set| set.iter().cloned().collect())
.unwrap_or_default();
let Some(action) = mgr.evaluate(bucket, key, age, size, &tags) else {
continue;
};
if let Some(lock_mgr) = s4.as_ref().object_lock_manager()
&& let Some(state) = lock_mgr.get(bucket, key)
&& state.is_locked(now)
{
report.skipped_locked = report.skipped_locked.saturating_add(1);
continue;
}
match action {
LifecycleAction::Expire => match execute_expire(s4, bucket, key).await {
Ok(()) => {
mgr.record_action(bucket, &LifecycleAction::Expire);
report.expired = report.expired.saturating_add(1);
}
Err(e) => {
warn!(
bucket = %bucket,
key = %key,
error = %e,
"S4 lifecycle: Expire action failed",
);
report.action_errors = report.action_errors.saturating_add(1);
}
},
LifecycleAction::Transition { storage_class } => {
match execute_transition(s4, bucket, key, &storage_class).await {
Ok(()) => {
mgr.record_action(
bucket,
&LifecycleAction::Transition {
storage_class: storage_class.clone(),
},
);
report.transitioned = report.transitioned.saturating_add(1);
}
Err(e) => {
warn!(
bucket = %bucket,
key = %key,
storage_class = %storage_class,
error = %e,
"S4 lifecycle: Transition action failed",
);
report.action_errors = report.action_errors.saturating_add(1);
}
}
}
}
}
if output.is_truncated.unwrap_or(false) {
continuation = output.next_continuation_token;
if continuation.is_none() {
break;
}
} else {
break;
}
}
}
async fn execute_expire<B: S3 + Send + Sync + 'static>(
s4: &Arc<crate::S4Service<B>>,
bucket: &str,
key: &str,
) -> Result<(), String> {
let input = DeleteObjectInput {
bucket: bucket.to_owned(),
key: key.to_owned(),
..Default::default()
};
let req = synthetic_request(
input,
http::Method::DELETE,
&format!("/{bucket}/{key}"),
);
s4.as_ref()
.delete_object(req)
.await
.map(|_| ())
.map_err(|e| format!("{e}"))
}
async fn execute_transition<B: S3 + Send + Sync + 'static>(
s4: &Arc<crate::S4Service<B>>,
bucket: &str,
key: &str,
storage_class: &str,
) -> Result<(), String> {
let mut builder = CopyObjectInput::builder();
builder.set_bucket(bucket.to_owned());
builder.set_key(key.to_owned());
builder.set_copy_source(CopySource::Bucket {
bucket: bucket.to_owned().into_boxed_str(),
key: key.to_owned().into_boxed_str(),
version_id: None,
});
builder.set_metadata_directive(Some(MetadataDirective::from_static(MetadataDirective::COPY)));
builder.set_storage_class(Some(StorageClass::from(storage_class.to_owned())));
let input = builder
.build()
.map_err(|e| format!("CopyObjectInput build: {e}"))?;
let req = synthetic_request(
input,
http::Method::PUT,
&format!("/{bucket}/{key}"),
);
s4.as_ref()
.copy_object(req)
.await
.map(|_| ())
.map_err(|e| format!("{e}"))
}
#[cfg(test)]
mod tests {
use super::*;
fn enabled(rule: LifecycleRule) -> LifecycleRule {
LifecycleRule {
status: LifecycleStatus::Enabled,
..rule
}
}
fn cfg_with(rules: Vec<LifecycleRule>) -> LifecycleConfig {
LifecycleConfig { rules }
}
fn manager_with(bucket: &str, rules: Vec<LifecycleRule>) -> LifecycleManager {
let m = LifecycleManager::new();
m.put(bucket, cfg_with(rules));
m
}
#[test]
fn evaluate_age_past_expiration_returns_expire() {
let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
let action = m.evaluate("b", "k", Duration::days(31), 100, &[]);
assert_eq!(action, Some(LifecycleAction::Expire));
}
#[test]
fn evaluate_age_before_expiration_returns_none() {
let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 30)]);
let action = m.evaluate("b", "k", Duration::days(5), 100, &[]);
assert_eq!(action, None);
}
#[test]
fn evaluate_prefix_filter_matches() {
let mut rule = LifecycleRule::expire_after_days("r", 1);
rule.filter.prefix = Some("logs/".into());
let m = manager_with("b", vec![rule]);
assert_eq!(
m.evaluate("b", "logs/2026/a.log", Duration::days(2), 1, &[]),
Some(LifecycleAction::Expire)
);
assert_eq!(
m.evaluate("b", "data/keep.bin", Duration::days(2), 1, &[]),
None
);
}
#[test]
fn evaluate_tag_filter_requires_all_tags_to_match() {
let mut rule = LifecycleRule::expire_after_days("r", 1);
rule.filter.tags = vec![
("env".into(), "dev".into()),
("expirable".into(), "yes".into()),
];
let m = manager_with("b", vec![rule]);
assert_eq!(
m.evaluate(
"b",
"k",
Duration::days(2),
1,
&[
("env".into(), "dev".into()),
("expirable".into(), "yes".into()),
("owner".into(), "alice".into()),
]
),
Some(LifecycleAction::Expire)
);
assert_eq!(
m.evaluate(
"b",
"k",
Duration::days(2),
1,
&[("env".into(), "dev".into())]
),
None
);
assert_eq!(
m.evaluate(
"b",
"k",
Duration::days(2),
1,
&[
("env".into(), "prod".into()),
("expirable".into(), "yes".into()),
]
),
None
);
}
#[test]
fn evaluate_size_filters_gate_action() {
let mut rule = LifecycleRule::expire_after_days("r", 1);
rule.filter.object_size_greater_than = Some(1024);
rule.filter.object_size_less_than = Some(10 * 1024);
let m = manager_with("b", vec![rule]);
assert_eq!(
m.evaluate("b", "k", Duration::days(2), 4096, &[]),
Some(LifecycleAction::Expire)
);
assert_eq!(m.evaluate("b", "k", Duration::days(2), 1024, &[]), None);
assert_eq!(
m.evaluate("b", "k", Duration::days(2), 100 * 1024, &[]),
None
);
}
#[test]
fn evaluate_transition_fires_before_expiration() {
let rule = enabled(LifecycleRule {
id: "r".into(),
status: LifecycleStatus::Enabled,
filter: LifecycleFilter::default(),
expiration_days: Some(365),
expiration_date: None,
transitions: vec![TransitionRule {
days: 30,
storage_class: "GLACIER_IR".into(),
}],
noncurrent_version_expiration_days: None,
abort_incomplete_multipart_upload_days: None,
});
let m = manager_with("b", vec![rule]);
let action = m.evaluate("b", "k", Duration::days(60), 1, &[]);
assert_eq!(
action,
Some(LifecycleAction::Transition {
storage_class: "GLACIER_IR".into(),
})
);
}
#[test]
fn evaluate_expiration_wins_when_threshold_is_earlier_than_transition() {
let rule = enabled(LifecycleRule {
id: "r".into(),
status: LifecycleStatus::Enabled,
filter: LifecycleFilter::default(),
expiration_days: Some(30),
expiration_date: None,
transitions: vec![TransitionRule {
days: 90,
storage_class: "GLACIER".into(),
}],
noncurrent_version_expiration_days: None,
abort_incomplete_multipart_upload_days: None,
});
let m = manager_with("b", vec![rule]);
let action = m.evaluate("b", "k", Duration::days(100), 1, &[]);
assert_eq!(action, Some(LifecycleAction::Expire));
}
#[test]
fn evaluate_disabled_rule_never_fires() {
let mut rule = LifecycleRule::expire_after_days("r", 1);
rule.status = LifecycleStatus::Disabled;
let m = manager_with("b", vec![rule]);
assert_eq!(m.evaluate("b", "k", Duration::days(365), 1, &[]), None);
}
#[test]
fn evaluate_unknown_bucket_returns_none() {
let m = LifecycleManager::new();
assert_eq!(m.evaluate("ghost", "k", Duration::days(365), 1, &[]), None);
}
#[test]
fn evaluate_noncurrent_version_expiration() {
let rule = enabled(LifecycleRule {
id: "r".into(),
status: LifecycleStatus::Enabled,
filter: LifecycleFilter::default(),
expiration_days: None,
expiration_date: None,
transitions: vec![],
noncurrent_version_expiration_days: Some(7),
abort_incomplete_multipart_upload_days: None,
});
let m = manager_with("b", vec![rule]);
assert_eq!(m.evaluate("b", "k", Duration::days(30), 1, &[]), None);
let action = m.evaluate_with_flags(
"b",
"k",
Duration::days(8),
1,
&[],
EvaluateFlags {
is_noncurrent: true,
now: None,
},
);
assert_eq!(action, Some(LifecycleAction::Expire));
let action = m.evaluate_with_flags(
"b",
"k",
Duration::days(3),
1,
&[],
EvaluateFlags {
is_noncurrent: true,
now: None,
},
);
assert_eq!(action, None);
}
#[test]
fn evaluate_batch_distributes_actions_across_object_ages() {
let rule = enabled(LifecycleRule {
id: "r".into(),
status: LifecycleStatus::Enabled,
filter: LifecycleFilter::default(),
expiration_days: Some(60),
expiration_date: None,
transitions: vec![TransitionRule {
days: 30,
storage_class: "STANDARD_IA".into(),
}],
noncurrent_version_expiration_days: None,
abort_incomplete_multipart_upload_days: None,
});
let m = manager_with("b", vec![rule]);
let objects = vec![
("young".to_string(), Duration::days(10), 1u64, vec![]),
("middle".to_string(), Duration::days(40), 1u64, vec![]),
("middle2".to_string(), Duration::days(45), 1u64, vec![]),
("old".to_string(), Duration::days(90), 1u64, vec![]),
("ancient".to_string(), Duration::days(365), 1u64, vec![]),
];
let actions = evaluate_batch(&m, "b", &objects);
assert_eq!(actions.len(), 4);
for (_, a) in &actions {
assert!(matches!(a, LifecycleAction::Transition { .. }));
}
}
#[test]
fn json_round_trip_preserves_rules() {
let rule = enabled(LifecycleRule {
id: "complex".into(),
status: LifecycleStatus::Enabled,
filter: LifecycleFilter {
prefix: Some("logs/".into()),
tags: vec![("env".into(), "prod".into())],
object_size_greater_than: Some(1024),
object_size_less_than: None,
},
expiration_days: Some(365),
expiration_date: None,
transitions: vec![TransitionRule {
days: 30,
storage_class: "STANDARD_IA".into(),
}],
noncurrent_version_expiration_days: Some(7),
abort_incomplete_multipart_upload_days: Some(3),
});
let m = manager_with("b1", vec![rule.clone()]);
let json = m.to_json().expect("to_json");
let m2 = LifecycleManager::from_json(&json).expect("from_json");
let cfg = m2.get("b1").expect("bucket survives roundtrip");
assert_eq!(cfg.rules.len(), 1);
assert_eq!(cfg.rules[0], rule);
}
#[test]
fn lifecycle_config_default_is_empty() {
let cfg = LifecycleConfig::default();
assert!(cfg.rules.is_empty());
}
#[test]
fn evaluate_batch_skips_locked_objects_at_caller_layer() {
let m = manager_with("b", vec![LifecycleRule::expire_after_days("r", 1)]);
let objects = vec![
("locked".to_string(), Duration::days(30), 1u64, vec![]),
("free".to_string(), Duration::days(30), 1u64, vec![]),
];
let locked_keys: std::collections::HashSet<&str> = ["locked"].into_iter().collect();
let raw = evaluate_batch(&m, "b", &objects);
let filtered: Vec<_> = raw
.into_iter()
.filter(|(k, _)| !locked_keys.contains(k.as_str()))
.collect();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].0, "free");
}
#[test]
fn record_action_bumps_per_bucket_counter() {
let m = LifecycleManager::new();
m.record_action("b", &LifecycleAction::Expire);
m.record_action("b", &LifecycleAction::Expire);
m.record_action(
"b",
&LifecycleAction::Transition {
storage_class: "GLACIER".into(),
},
);
let snap = m.actions_snapshot();
assert_eq!(snap.get(&("b".into(), "expire".into())).copied(), Some(2));
assert_eq!(
snap.get(&("b".into(), "transition".into())).copied(),
Some(1)
);
}
use std::collections::HashMap;
use std::sync::Mutex as StdMutex;
use bytes::Bytes;
use s3s::dto as dto2;
use s3s::{S3Error, S3ErrorCode, S3Response, S3Result};
use s4_codec::dispatcher::AlwaysDispatcher;
use s4_codec::passthrough::Passthrough;
use s4_codec::{CodecKind, CodecRegistry};
use crate::S4Service;
use crate::object_lock::{LockMode, ObjectLockManager, ObjectLockState};
#[derive(Default)]
struct ScannerMemBackend {
objects: StdMutex<HashMap<(String, String), ScannerStored>>,
}
#[derive(Clone)]
struct ScannerStored {
body: Bytes,
last_modified: dto2::Timestamp,
}
impl ScannerMemBackend {
fn put_now(&self, bucket: &str, key: &str, body: Bytes) {
self.objects.lock().unwrap().insert(
(bucket.to_owned(), key.to_owned()),
ScannerStored {
body,
last_modified: dto2::Timestamp::from(std::time::SystemTime::now()),
},
);
}
}
#[async_trait::async_trait]
impl S3 for ScannerMemBackend {
async fn put_object(
&self,
req: S3Request<dto2::PutObjectInput>,
) -> S3Result<S3Response<dto2::PutObjectOutput>> {
self.put_now(&req.input.bucket, &req.input.key, Bytes::new());
Ok(S3Response::new(dto2::PutObjectOutput::default()))
}
async fn head_object(
&self,
req: S3Request<dto2::HeadObjectInput>,
) -> S3Result<S3Response<dto2::HeadObjectOutput>> {
let key = (req.input.bucket.clone(), req.input.key.clone());
let lock = self.objects.lock().unwrap();
let stored = lock
.get(&key)
.ok_or_else(|| S3Error::new(S3ErrorCode::NoSuchKey))?;
Ok(S3Response::new(dto2::HeadObjectOutput {
content_length: Some(stored.body.len() as i64),
last_modified: Some(stored.last_modified.clone()),
..Default::default()
}))
}
async fn delete_object(
&self,
req: S3Request<dto2::DeleteObjectInput>,
) -> S3Result<S3Response<dto2::DeleteObjectOutput>> {
let key = (req.input.bucket.clone(), req.input.key.clone());
self.objects.lock().unwrap().remove(&key);
Ok(S3Response::new(dto2::DeleteObjectOutput::default()))
}
async fn list_objects_v2(
&self,
req: S3Request<dto2::ListObjectsV2Input>,
) -> S3Result<S3Response<dto2::ListObjectsV2Output>> {
let prefix = req.input.bucket.clone();
let lock = self.objects.lock().unwrap();
let mut contents: Vec<dto2::Object> = lock
.iter()
.filter(|((b, _), _)| b == &prefix)
.map(|((_, k), v)| dto2::Object {
key: Some(k.clone()),
size: Some(v.body.len() as i64),
last_modified: Some(v.last_modified.clone()),
..Default::default()
})
.collect();
contents.sort_by(|a, b| a.key.cmp(&b.key));
let key_count = i32::try_from(contents.len()).unwrap_or(i32::MAX);
Ok(S3Response::new(dto2::ListObjectsV2Output {
name: Some(prefix),
contents: Some(contents),
key_count: Some(key_count),
is_truncated: Some(false),
..Default::default()
}))
}
async fn copy_object(
&self,
_req: S3Request<dto2::CopyObjectInput>,
) -> S3Result<S3Response<dto2::CopyObjectOutput>> {
Ok(S3Response::new(dto2::CopyObjectOutput::default()))
}
}
fn make_service() -> Arc<S4Service<ScannerMemBackend>> {
let registry = Arc::new(
CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
);
let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
Arc::new(S4Service::new(
ScannerMemBackend::default(),
registry,
dispatcher,
))
}
#[tokio::test]
async fn run_scan_once_no_lifecycle_manager_returns_empty_report() {
let s4 = make_service();
let report = run_scan_once(&s4).await.expect("scan");
assert_eq!(report, ScanReport::default());
let mgr = Arc::new(LifecycleManager::new());
let backend = ScannerMemBackend::default();
let registry = Arc::new(
CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
);
let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
let s4_empty = Arc::new(
S4Service::new(backend, registry, dispatcher).with_lifecycle(mgr),
);
let report = run_scan_once(&s4_empty).await.expect("scan empty");
assert_eq!(report, ScanReport::default());
}
#[tokio::test]
async fn run_scan_once_expires_matching_objects_via_backend() {
let backend = ScannerMemBackend::default();
backend.put_now("b", "stale.log", Bytes::from_static(b"x"));
backend.put_now("b", "data/keep1.bin", Bytes::from_static(b"y"));
backend.put_now("b", "data/keep2.bin", Bytes::from_static(b"z"));
let mgr = Arc::new(LifecycleManager::new());
let mut rule = LifecycleRule::expire_after_days("r", 0);
rule.filter.prefix = Some("stale.".into());
mgr.put("b", LifecycleConfig { rules: vec![rule] });
let registry = Arc::new(
CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
);
let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
let s4 = Arc::new(
S4Service::new(backend, registry, dispatcher).with_lifecycle(Arc::clone(&mgr)),
);
let report = run_scan_once(&s4).await.expect("scan");
assert_eq!(report.buckets_scanned, 1);
assert_eq!(report.objects_evaluated, 3);
assert_eq!(report.expired, 1);
assert_eq!(report.transitioned, 0);
assert_eq!(report.skipped_locked, 0);
assert_eq!(report.action_errors, 0);
let req = synthetic_request(
ListObjectsV2Input {
bucket: "b".into(),
..Default::default()
},
http::Method::GET,
"/b?list-type=2",
);
let resp = s4
.as_ref()
.list_objects_v2(req)
.await
.expect("post-scan list");
let keys: Vec<String> = resp
.output
.contents
.unwrap_or_default()
.into_iter()
.filter_map(|o| o.key)
.collect();
assert!(!keys.contains(&"stale.log".to_string()));
assert!(keys.contains(&"data/keep1.bin".to_string()));
assert!(keys.contains(&"data/keep2.bin".to_string()));
let snap = mgr.actions_snapshot();
assert_eq!(
snap.get(&("b".into(), "expire".into())).copied(),
Some(1)
);
}
#[tokio::test]
async fn run_scan_once_skips_object_lock_protected_keys() {
let backend = ScannerMemBackend::default();
backend.put_now("b", "locked.log", Bytes::from_static(b"x"));
backend.put_now("b", "free.log", Bytes::from_static(b"y"));
let registry = Arc::new(
CodecRegistry::new(CodecKind::Passthrough).with(Arc::new(Passthrough)),
);
let dispatcher = Arc::new(AlwaysDispatcher(CodecKind::Passthrough));
let mgr = Arc::new(LifecycleManager::new());
mgr.put(
"b",
LifecycleConfig {
rules: vec![LifecycleRule::expire_after_days("r", 0)],
},
);
let lock_mgr = Arc::new(ObjectLockManager::new());
let retain_until = chrono::DateTime::parse_from_rfc3339("2099-01-01T00:00:00Z")
.expect("parse")
.with_timezone(&Utc);
lock_mgr.set(
"b",
"locked.log",
ObjectLockState {
mode: Some(LockMode::Compliance),
retain_until: Some(retain_until),
legal_hold_on: false,
},
);
let s4 = Arc::new(
S4Service::new(backend, registry, dispatcher)
.with_lifecycle(Arc::clone(&mgr))
.with_object_lock(Arc::clone(&lock_mgr)),
);
let report = run_scan_once(&s4).await.expect("scan");
assert_eq!(report.buckets_scanned, 1);
assert_eq!(report.objects_evaluated, 2);
assert_eq!(report.expired, 1, "free.log should have been expired");
assert_eq!(report.skipped_locked, 1, "locked.log must be skipped");
assert_eq!(report.action_errors, 0);
}
}