use super::*;
#[derive(Clone, Debug, Default)]
struct LifecycleExecutionPlan {
current_keys: Vec<String>,
noncurrent_versions: Vec<(String, String)>,
upload_ids: Vec<String>,
skipped_locked_versions: usize,
}
impl BucketWarden {
pub fn get_bucket_lifecycle(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketLifecycleConfiguration, RuntimeError> {
self.authorize(principal, S3Action::GetBucketLifecycle, bucket)?;
let rules = self.require_bucket(bucket)?.lifecycle_rules.clone();
if rules.is_empty() {
return Err(RuntimeError::NoSuchLifecycleConfiguration(
bucket.to_string(),
));
}
self.audit_allowed(
principal,
S3Action::GetBucketLifecycle,
bucket,
Some(rules.len().to_string()),
);
Ok(BucketLifecycleConfiguration {
bucket: bucket.to_string(),
rules,
})
}
pub fn put_bucket_lifecycle(
&mut self,
principal: &str,
bucket: &str,
rules: Vec<LifecycleRule>,
) -> Result<BucketLifecycleConfiguration, RuntimeError> {
self.authorize(principal, S3Action::PutBucketLifecycle, bucket)?;
validate_lifecycle_rules(&rules)?;
self.require_bucket_mut(bucket)?.lifecycle_rules = rules.clone();
self.audit_allowed(
principal,
S3Action::PutBucketLifecycle,
bucket,
Some(rules.len().to_string()),
);
Ok(BucketLifecycleConfiguration {
bucket: bucket.to_string(),
rules,
})
}
pub fn delete_bucket_lifecycle(
&mut self,
principal: &str,
bucket: &str,
) -> Result<(), RuntimeError> {
self.authorize(principal, S3Action::DeleteBucketLifecycle, bucket)?;
self.require_bucket_mut(bucket)?.lifecycle_rules.clear();
self.audit_allowed(principal, S3Action::DeleteBucketLifecycle, bucket, None);
Ok(())
}
pub fn run_bucket_lifecycle(
&mut self,
principal: &str,
bucket: &str,
) -> Result<LifecycleRunResult, RuntimeError> {
self.authorize(principal, S3Action::RunBucketLifecycle, bucket)?;
let rules = self.require_bucket(bucket)?.lifecycle_rules.clone();
let mut result = LifecycleRunResult {
bucket: bucket.to_string(),
..LifecycleRunResult::default()
};
if rules.is_empty() {
self.audit_allowed(
principal,
S3Action::RunBucketLifecycle,
bucket,
Some("0".to_string()),
);
return Ok(result);
}
let plan = self.plan_bucket_lifecycle(bucket, &rules)?;
result.skipped_locked_versions = plan.skipped_locked_versions;
for key in &plan.current_keys {
match self.delete_object(principal, bucket, &key, false) {
Ok(delete_result) => {
result.expired_current_versions += 1;
self.emit_notification_event(
"s3:LifecycleExpiration:DeleteMarkerCreated",
bucket,
&key,
&delete_result.delete_marker_version_id,
);
}
Err(RuntimeError::ObjectLocked(_)) => result.skipped_locked_versions += 1,
Err(error) => return Err(error),
}
}
for (key, version_id) in &plan.noncurrent_versions {
match self.delete_object_version(principal, bucket, &key, &version_id, false) {
Ok(delete_result) => {
result.expired_noncurrent_versions += 1;
self.emit_notification_event(
"s3:LifecycleExpiration:Delete",
bucket,
&key,
&delete_result.version_id,
);
}
Err(RuntimeError::ObjectLocked(_)) => result.skipped_locked_versions += 1,
Err(error) => return Err(error),
}
}
for upload_id in plan.upload_ids {
if let Some(upload) = self.multipart_uploads.remove(&upload_id) {
self.audit_allowed(
principal,
S3Action::AbortMultipartUpload,
&object_resource(&upload.bucket, &upload.key),
Some(format!("lifecycle:{upload_id}")),
);
self.emit_notification_event(
"s3:LifecycleExpiration:AbortMultipartUpload",
&upload.bucket,
&upload.key,
&upload_id,
);
result.aborted_multipart_uploads += 1;
}
}
self.audit_allowed(
principal,
S3Action::RunBucketLifecycle,
bucket,
Some(format!(
"current={},noncurrent={},uploads={},skipped={}",
result.expired_current_versions,
result.expired_noncurrent_versions,
result.aborted_multipart_uploads,
result.skipped_locked_versions
)),
);
Ok(result)
}
pub fn preview_bucket_lifecycle(
&mut self,
principal: &str,
bucket: &str,
) -> Result<LifecycleRunResult, RuntimeError> {
self.authorize(principal, S3Action::RunBucketLifecycle, bucket)?;
let rules = self.require_bucket(bucket)?.lifecycle_rules.clone();
let mut result = LifecycleRunResult {
bucket: bucket.to_string(),
dry_run: true,
..LifecycleRunResult::default()
};
if rules.is_empty() {
self.audit_allowed(
principal,
S3Action::RunBucketLifecycle,
bucket,
Some("preview:current=0,noncurrent=0,uploads=0,skipped=0".to_string()),
);
return Ok(result);
}
let plan = self.plan_bucket_lifecycle(bucket, &rules)?;
result.expired_current_versions = plan.current_keys.len();
result.expired_noncurrent_versions = plan.noncurrent_versions.len();
result.aborted_multipart_uploads = plan.upload_ids.len();
result.skipped_locked_versions = plan.skipped_locked_versions;
result.preview_current_keys = plan.current_keys;
result.preview_noncurrent_versions = plan
.noncurrent_versions
.into_iter()
.map(|(key, version_id)| LifecycleVersionTarget { key, version_id })
.collect();
result.preview_aborted_upload_ids = plan.upload_ids;
self.audit_allowed(
principal,
S3Action::RunBucketLifecycle,
bucket,
Some(format!(
"preview:current={},noncurrent={},uploads={},skipped={}",
result.expired_current_versions,
result.expired_noncurrent_versions,
result.aborted_multipart_uploads,
result.skipped_locked_versions
)),
);
Ok(result)
}
fn plan_bucket_lifecycle(
&self,
bucket: &str,
rules: &[LifecycleRule],
) -> Result<LifecycleExecutionPlan, RuntimeError> {
let mut plan = LifecycleExecutionPlan::default();
{
let bucket_state = self.require_bucket(bucket)?;
for rule in rules.iter().filter(|rule| rule.status == "Enabled") {
for (key, object) in &bucket_state.objects {
if !lifecycle_rule_matches(rule, key) {
continue;
}
let latest_index = object.versions.len().saturating_sub(1);
for (index, version) in object.versions.iter().enumerate() {
if version.delete_marker {
continue;
}
if !tag_filter_matches(&rule.tag_filter, &version.tags) {
continue;
}
if !lifecycle_size_matches(rule, version.content_length()) {
continue;
}
if index == latest_index {
if lifecycle_age_matches(
self.clock_epoch_seconds,
version.last_modified_epoch_seconds,
rule.expiration_days,
) {
if version
.lock
.assert_deletable(self.clock_epoch_seconds, false)
.is_ok()
{
plan.current_keys.push(key.clone());
} else {
plan.skipped_locked_versions += 1;
}
}
} else if lifecycle_age_matches(
self.clock_epoch_seconds,
version.last_modified_epoch_seconds,
rule.noncurrent_expiration_days,
) {
if version
.lock
.assert_deletable(self.clock_epoch_seconds, false)
.is_ok()
{
plan.noncurrent_versions
.push((key.clone(), version.version_id.clone()));
} else {
plan.skipped_locked_versions += 1;
}
}
}
}
}
}
plan.current_keys.sort();
plan.current_keys.dedup();
plan.noncurrent_versions.sort();
plan.noncurrent_versions.dedup();
for rule in rules.iter().filter(|rule| rule.status == "Enabled") {
for (upload_id, upload) in &self.multipart_uploads {
if upload.bucket == bucket
&& lifecycle_rule_matches(rule, &upload.key)
&& rule.tag_filter.is_empty()
&& lifecycle_age_matches(
self.clock_epoch_seconds,
upload.initiated_epoch_seconds,
rule.abort_incomplete_multipart_upload_days,
)
{
plan.upload_ids.push(upload_id.clone());
}
}
}
plan.upload_ids.sort();
plan.upload_ids.dedup();
Ok(plan)
}
pub fn get_bucket_notification(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketNotificationConfiguration, RuntimeError> {
self.authorize(principal, S3Action::GetBucketNotification, bucket)?;
let rules = self.require_bucket(bucket)?.notification_rules.clone();
self.audit_allowed(
principal,
S3Action::GetBucketNotification,
bucket,
Some(rules.len().to_string()),
);
Ok(BucketNotificationConfiguration {
bucket: bucket.to_string(),
rules,
})
}
pub fn put_bucket_notification(
&mut self,
principal: &str,
bucket: &str,
rules: Vec<NotificationRule>,
) -> Result<BucketNotificationConfiguration, RuntimeError> {
self.authorize(principal, S3Action::PutBucketNotification, bucket)?;
validate_notification_rules(&rules)?;
self.require_bucket_mut(bucket)?.notification_rules = rules.clone();
self.audit_allowed(
principal,
S3Action::PutBucketNotification,
bucket,
Some(rules.len().to_string()),
);
Ok(BucketNotificationConfiguration {
bucket: bucket.to_string(),
rules,
})
}
pub fn notification_events(&self) -> &[NotificationEvent] {
&self.notification_events
}
pub fn notification_summary(&self) -> NotificationEventSummary {
let mut summary = NotificationEventSummary {
first_event_id: self
.notification_events
.first()
.map(|event| event.event_id.clone()),
last_event_id: self
.notification_events
.last()
.map(|event| event.event_id.clone()),
..NotificationEventSummary::default()
};
for event in &self.notification_events {
summary.total += 1;
*summary
.by_event_name
.entry(event.event_name.clone())
.or_default() += 1;
*summary
.by_target_kind
.entry(event.target_kind.clone())
.or_default() += 1;
}
summary
}
}
fn lifecycle_size_matches(rule: &LifecycleRule, size: usize) -> bool {
let size = size as u64;
rule.object_size_greater_than
.is_none_or(|minimum| size > minimum)
&& rule
.object_size_less_than
.is_none_or(|maximum| size < maximum)
}