bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

#[derive(Clone, Debug, Default)]
struct LifecycleExecutionPlan {
    current_keys: Vec<String>,
    noncurrent_versions: Vec<(String, String)>,
    upload_ids: Vec<String>,
    skipped_locked_versions: usize,
}

impl BucketWarden {
    pub fn get_bucket_lifecycle(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<BucketLifecycleConfiguration, RuntimeError> {
        self.authorize(principal, S3Action::GetBucketLifecycle, bucket)?;
        let rules = self.require_bucket(bucket)?.lifecycle_rules.clone();
        if rules.is_empty() {
            return Err(RuntimeError::NoSuchLifecycleConfiguration(
                bucket.to_string(),
            ));
        }
        self.audit_allowed(
            principal,
            S3Action::GetBucketLifecycle,
            bucket,
            Some(rules.len().to_string()),
        );
        Ok(BucketLifecycleConfiguration {
            bucket: bucket.to_string(),
            rules,
        })
    }
    pub fn put_bucket_lifecycle(
        &mut self,
        principal: &str,
        bucket: &str,
        rules: Vec<LifecycleRule>,
    ) -> Result<BucketLifecycleConfiguration, RuntimeError> {
        self.authorize(principal, S3Action::PutBucketLifecycle, bucket)?;
        validate_lifecycle_rules(&rules)?;
        self.require_bucket_mut(bucket)?.lifecycle_rules = rules.clone();
        self.audit_allowed(
            principal,
            S3Action::PutBucketLifecycle,
            bucket,
            Some(rules.len().to_string()),
        );
        Ok(BucketLifecycleConfiguration {
            bucket: bucket.to_string(),
            rules,
        })
    }
    pub fn delete_bucket_lifecycle(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<(), RuntimeError> {
        self.authorize(principal, S3Action::DeleteBucketLifecycle, bucket)?;
        self.require_bucket_mut(bucket)?.lifecycle_rules.clear();
        self.audit_allowed(principal, S3Action::DeleteBucketLifecycle, bucket, None);
        Ok(())
    }
    pub fn run_bucket_lifecycle(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<LifecycleRunResult, RuntimeError> {
        self.authorize(principal, S3Action::RunBucketLifecycle, bucket)?;
        let rules = self.require_bucket(bucket)?.lifecycle_rules.clone();
        let mut result = LifecycleRunResult {
            bucket: bucket.to_string(),
            ..LifecycleRunResult::default()
        };
        if rules.is_empty() {
            self.audit_allowed(
                principal,
                S3Action::RunBucketLifecycle,
                bucket,
                Some("0".to_string()),
            );
            return Ok(result);
        }
        let plan = self.plan_bucket_lifecycle(bucket, &rules)?;
        result.skipped_locked_versions = plan.skipped_locked_versions;
        for key in &plan.current_keys {
            match self.delete_object(principal, bucket, &key, false) {
                Ok(delete_result) => {
                    result.expired_current_versions += 1;
                    self.emit_notification_event(
                        "s3:LifecycleExpiration:DeleteMarkerCreated",
                        bucket,
                        &key,
                        &delete_result.delete_marker_version_id,
                    );
                }
                Err(RuntimeError::ObjectLocked(_)) => result.skipped_locked_versions += 1,
                Err(error) => return Err(error),
            }
        }
        for (key, version_id) in &plan.noncurrent_versions {
            match self.delete_object_version(principal, bucket, &key, &version_id, false) {
                Ok(delete_result) => {
                    result.expired_noncurrent_versions += 1;
                    self.emit_notification_event(
                        "s3:LifecycleExpiration:Delete",
                        bucket,
                        &key,
                        &delete_result.version_id,
                    );
                }
                Err(RuntimeError::ObjectLocked(_)) => result.skipped_locked_versions += 1,
                Err(error) => return Err(error),
            }
        }
        for upload_id in plan.upload_ids {
            if let Some(upload) = self.multipart_uploads.remove(&upload_id) {
                self.audit_allowed(
                    principal,
                    S3Action::AbortMultipartUpload,
                    &object_resource(&upload.bucket, &upload.key),
                    Some(format!("lifecycle:{upload_id}")),
                );
                self.emit_notification_event(
                    "s3:LifecycleExpiration:AbortMultipartUpload",
                    &upload.bucket,
                    &upload.key,
                    &upload_id,
                );
                result.aborted_multipart_uploads += 1;
            }
        }
        self.audit_allowed(
            principal,
            S3Action::RunBucketLifecycle,
            bucket,
            Some(format!(
                "current={},noncurrent={},uploads={},skipped={}",
                result.expired_current_versions,
                result.expired_noncurrent_versions,
                result.aborted_multipart_uploads,
                result.skipped_locked_versions
            )),
        );
        Ok(result)
    }

    pub fn preview_bucket_lifecycle(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<LifecycleRunResult, RuntimeError> {
        self.authorize(principal, S3Action::RunBucketLifecycle, bucket)?;
        let rules = self.require_bucket(bucket)?.lifecycle_rules.clone();
        let mut result = LifecycleRunResult {
            bucket: bucket.to_string(),
            dry_run: true,
            ..LifecycleRunResult::default()
        };
        if rules.is_empty() {
            self.audit_allowed(
                principal,
                S3Action::RunBucketLifecycle,
                bucket,
                Some("preview:current=0,noncurrent=0,uploads=0,skipped=0".to_string()),
            );
            return Ok(result);
        }
        let plan = self.plan_bucket_lifecycle(bucket, &rules)?;
        result.expired_current_versions = plan.current_keys.len();
        result.expired_noncurrent_versions = plan.noncurrent_versions.len();
        result.aborted_multipart_uploads = plan.upload_ids.len();
        result.skipped_locked_versions = plan.skipped_locked_versions;
        result.preview_current_keys = plan.current_keys;
        result.preview_noncurrent_versions = plan
            .noncurrent_versions
            .into_iter()
            .map(|(key, version_id)| LifecycleVersionTarget { key, version_id })
            .collect();
        result.preview_aborted_upload_ids = plan.upload_ids;
        self.audit_allowed(
            principal,
            S3Action::RunBucketLifecycle,
            bucket,
            Some(format!(
                "preview:current={},noncurrent={},uploads={},skipped={}",
                result.expired_current_versions,
                result.expired_noncurrent_versions,
                result.aborted_multipart_uploads,
                result.skipped_locked_versions
            )),
        );
        Ok(result)
    }

    fn plan_bucket_lifecycle(
        &self,
        bucket: &str,
        rules: &[LifecycleRule],
    ) -> Result<LifecycleExecutionPlan, RuntimeError> {
        let mut plan = LifecycleExecutionPlan::default();
        {
            let bucket_state = self.require_bucket(bucket)?;
            for rule in rules.iter().filter(|rule| rule.status == "Enabled") {
                for (key, object) in &bucket_state.objects {
                    if !lifecycle_rule_matches(rule, key) {
                        continue;
                    }
                    let latest_index = object.versions.len().saturating_sub(1);
                    for (index, version) in object.versions.iter().enumerate() {
                        if version.delete_marker {
                            continue;
                        }
                        if !tag_filter_matches(&rule.tag_filter, &version.tags) {
                            continue;
                        }
                        if !lifecycle_size_matches(rule, version.content_length()) {
                            continue;
                        }
                        if index == latest_index {
                            if lifecycle_age_matches(
                                self.clock_epoch_seconds,
                                version.last_modified_epoch_seconds,
                                rule.expiration_days,
                            ) {
                                if version
                                    .lock
                                    .assert_deletable(self.clock_epoch_seconds, false)
                                    .is_ok()
                                {
                                    plan.current_keys.push(key.clone());
                                } else {
                                    plan.skipped_locked_versions += 1;
                                }
                            }
                        } else if lifecycle_age_matches(
                            self.clock_epoch_seconds,
                            version.last_modified_epoch_seconds,
                            rule.noncurrent_expiration_days,
                        ) {
                            if version
                                .lock
                                .assert_deletable(self.clock_epoch_seconds, false)
                                .is_ok()
                            {
                                plan.noncurrent_versions
                                    .push((key.clone(), version.version_id.clone()));
                            } else {
                                plan.skipped_locked_versions += 1;
                            }
                        }
                    }
                }
            }
        }
        plan.current_keys.sort();
        plan.current_keys.dedup();
        plan.noncurrent_versions.sort();
        plan.noncurrent_versions.dedup();
        for rule in rules.iter().filter(|rule| rule.status == "Enabled") {
            for (upload_id, upload) in &self.multipart_uploads {
                if upload.bucket == bucket
                    && lifecycle_rule_matches(rule, &upload.key)
                    && rule.tag_filter.is_empty()
                    && lifecycle_age_matches(
                        self.clock_epoch_seconds,
                        upload.initiated_epoch_seconds,
                        rule.abort_incomplete_multipart_upload_days,
                    )
                {
                    plan.upload_ids.push(upload_id.clone());
                }
            }
        }
        plan.upload_ids.sort();
        plan.upload_ids.dedup();
        Ok(plan)
    }

    pub fn get_bucket_notification(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<BucketNotificationConfiguration, RuntimeError> {
        self.authorize(principal, S3Action::GetBucketNotification, bucket)?;
        let rules = self.require_bucket(bucket)?.notification_rules.clone();
        self.audit_allowed(
            principal,
            S3Action::GetBucketNotification,
            bucket,
            Some(rules.len().to_string()),
        );
        Ok(BucketNotificationConfiguration {
            bucket: bucket.to_string(),
            rules,
        })
    }
    pub fn put_bucket_notification(
        &mut self,
        principal: &str,
        bucket: &str,
        rules: Vec<NotificationRule>,
    ) -> Result<BucketNotificationConfiguration, RuntimeError> {
        self.authorize(principal, S3Action::PutBucketNotification, bucket)?;
        validate_notification_rules(&rules)?;
        self.require_bucket_mut(bucket)?.notification_rules = rules.clone();
        self.audit_allowed(
            principal,
            S3Action::PutBucketNotification,
            bucket,
            Some(rules.len().to_string()),
        );
        Ok(BucketNotificationConfiguration {
            bucket: bucket.to_string(),
            rules,
        })
    }
    pub fn notification_events(&self) -> &[NotificationEvent] {
        &self.notification_events
    }
    pub fn notification_summary(&self) -> NotificationEventSummary {
        let mut summary = NotificationEventSummary {
            first_event_id: self
                .notification_events
                .first()
                .map(|event| event.event_id.clone()),
            last_event_id: self
                .notification_events
                .last()
                .map(|event| event.event_id.clone()),
            ..NotificationEventSummary::default()
        };
        for event in &self.notification_events {
            summary.total += 1;
            *summary
                .by_event_name
                .entry(event.event_name.clone())
                .or_default() += 1;
            *summary
                .by_target_kind
                .entry(event.target_kind.clone())
                .or_default() += 1;
        }
        summary
    }
}

fn lifecycle_size_matches(rule: &LifecycleRule, size: usize) -> bool {
    let size = size as u64;
    rule.object_size_greater_than
        .is_none_or(|minimum| size > minimum)
        && rule
            .object_size_less_than
            .is_none_or(|maximum| size < maximum)
}