bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;
impl BucketWarden {
    pub fn get_bucket_replication(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<BucketReplicationConfiguration, RuntimeError> {
        self.authorize(principal, S3Action::GetBucketReplication, bucket)?;
        let state = self.require_bucket(bucket)?.replication.clone();
        if state.rules.is_empty() {
            return Err(RuntimeError::NoSuchBucketReplication(bucket.to_string()));
        }
        self.audit_allowed(
            principal,
            S3Action::GetBucketReplication,
            bucket,
            Some(state.rules.len().to_string()),
        );
        Ok(BucketReplicationConfiguration {
            bucket: bucket.to_string(),
            role: state.role,
            rules: state.rules,
        })
    }
    pub fn put_bucket_replication(
        &mut self,
        principal: &str,
        bucket: &str,
        role: Option<String>,
        rules: Vec<ReplicationRule>,
    ) -> Result<BucketReplicationConfiguration, RuntimeError> {
        self.authorize(principal, S3Action::PutBucketReplication, bucket)?;
        validate_replication_rules(&rules)?;
        self.require_bucket_mut(bucket)?.replication = BucketReplicationState {
            role: role.clone(),
            rules: rules.clone(),
            configured_epoch_seconds: self.clock_epoch_seconds,
        };
        self.audit_allowed(
            principal,
            S3Action::PutBucketReplication,
            bucket,
            Some(rules.len().to_string()),
        );
        Ok(BucketReplicationConfiguration {
            bucket: bucket.to_string(),
            role,
            rules,
        })
    }
    pub fn delete_bucket_replication(
        &mut self,
        principal: &str,
        bucket: &str,
    ) -> Result<(), RuntimeError> {
        self.authorize(principal, S3Action::DeleteBucketReplication, bucket)?;
        self.require_bucket_mut(bucket)?.replication = BucketReplicationState::default();
        self.audit_allowed(principal, S3Action::DeleteBucketReplication, bucket, None);
        Ok(())
    }
    pub fn run_bucket_replication(
        &mut self,
        principal: &str,
        source_bucket: &str,
    ) -> Result<ReplicationRunResult, RuntimeError> {
        self.authorize(principal, S3Action::RunBucketReplication, source_bucket)?;
        let config = self.require_bucket(source_bucket)?.replication.clone();
        if config.rules.is_empty() {
            return Err(RuntimeError::NoSuchBucketReplication(
                source_bucket.to_string(),
            ));
        }
        let mut result = ReplicationRunResult {
            source_bucket: source_bucket.to_string(),
            ..ReplicationRunResult::default()
        };
        for rule in config.rules.iter().filter(|rule| rule.status == "Enabled") {
            let Some(destination_bucket) = replication_destination_bucket(rule) else {
                result.skipped_missing_destinations += 1;
                continue;
            };
            if !self.buckets.contains_key(destination_bucket) {
                result.skipped_missing_destinations += 1;
                continue;
            }
            let source_versions = self.replication_source_versions(
                source_bucket,
                rule,
                config.configured_epoch_seconds,
            )?;
            self.replicate_versions_to_destination(
                source_bucket,
                destination_bucket,
                rule,
                &source_versions,
                &mut result,
            )?;
        }
        self.audit_allowed(
            principal,
            S3Action::RunBucketReplication,
            source_bucket,
            Some(format!(
                "objects={},markers={},deleted={},existing={},encrypted_skipped={},missing_destinations={}",
                result.replicated_object_versions,
                result.replicated_delete_markers,
                result.replicated_deleted_versions,
                result.skipped_existing_versions,
                result.skipped_encrypted_objects,
                result.skipped_missing_destinations
            )),
        );
        Ok(result)
    }
    pub(crate) fn replication_source_versions(
        &self,
        source_bucket: &str,
        rule: &ReplicationRule,
        configured_epoch_seconds: u64,
    ) -> Result<Vec<ReplicationSourceVersion>, RuntimeError> {
        let bucket_state = self.require_bucket(source_bucket)?;
        let mut versions = Vec::new();
        for (key, object) in &bucket_state.objects {
            if !replication_rule_matches(rule, key) {
                continue;
            }
            for version in &object.versions {
                if !tag_filter_matches(&rule.tag_filter, &version.tags) {
                    continue;
                }
                if version.delete_marker && !rule.delete_marker_replication {
                    continue;
                }
                if !rule.existing_object_replication
                    && version.last_modified_epoch_seconds < configured_epoch_seconds
                {
                    continue;
                }
                versions.push(ReplicationSourceVersion {
                    key: key.clone(),
                    version: version.clone(),
                });
            }
        }
        Ok(versions)
    }
    pub(crate) fn replicate_versions_to_destination(
        &mut self,
        source_bucket: &str,
        destination_bucket: &str,
        rule: &ReplicationRule,
        source_versions: &[ReplicationSourceVersion],
        result: &mut ReplicationRunResult,
    ) -> Result<(), RuntimeError> {
        let mut completed_source_versions = Vec::<(String, String)>::new();
        let mut replicated_events = Vec::<(String, String, String)>::new();
        let mut deleted_replica_events = Vec::<(String, String)>::new();
        let mut desired_versions = BTreeMap::<String, Vec<String>>::new();
        for source in source_versions {
            if source.version.metadata.encryption.is_some() && !rule.replicate_encrypted_objects {
                continue;
            }
            desired_versions
                .entry(source.key.clone())
                .or_default()
                .push(source.version.version_id.clone());
        }
        {
            let destination_state = self.require_bucket_mut(destination_bucket)?;
            let destination_owner = bucket_owner(destination_state);
            for source in source_versions {
                if source.version.metadata.encryption.is_some() && !rule.replicate_encrypted_objects
                {
                    result.skipped_encrypted_objects += 1;
                    continue;
                }
                let object = destination_state
                    .objects
                    .entry(source.key.clone())
                    .or_default();
                if object
                    .versions
                    .iter()
                    .any(|version| version.version_id == source.version.version_id)
                {
                    result.skipped_existing_versions += 1;
                    completed_source_versions
                        .push((source.key.clone(), source.version.version_id.clone()));
                    continue;
                }
                let mut version = source.version.clone();
                version.owner = destination_owner.clone();
                version.replication_status = Some("COMPLETED".to_string());
                if version.delete_marker {
                    result.replicated_delete_markers += 1;
                    replicated_events.push((
                        "s3:Replication:DeleteMarkerReplicated".to_string(),
                        source.key.clone(),
                        source.version.version_id.clone(),
                    ));
                } else {
                    result.replicated_object_versions += 1;
                    replicated_events.push((
                        "s3:Replication:ObjectReplicated".to_string(),
                        source.key.clone(),
                        source.version.version_id.clone(),
                    ));
                }
                object.versions.push(version);
                completed_source_versions
                    .push((source.key.clone(), source.version.version_id.clone()));
            }
            let mut empty_keys = Vec::new();
            for (key, object) in &mut destination_state.objects {
                if !replication_rule_matches(rule, key) {
                    continue;
                }
                let desired = desired_versions.get(key);
                let before = object.versions.len();
                let mut removed_versions = Vec::new();
                object.versions.retain(|version| {
                    let keep = version.replication_status.as_deref() != Some("COMPLETED")
                        || desired.is_some_and(|ids| ids.contains(&version.version_id));
                    if !keep {
                        removed_versions.push(version.version_id.clone());
                    }
                    keep
                });
                result.replicated_deleted_versions += before - object.versions.len();
                for version_id in removed_versions {
                    deleted_replica_events.push((key.clone(), version_id));
                }
                if object.versions.is_empty() {
                    empty_keys.push(key.clone());
                }
            }
            for key in empty_keys {
                destination_state.objects.remove(&key);
            }
        }
        for (key, version_id) in completed_source_versions {
            self.version_by_id_mut(source_bucket, &key, &version_id)?
                .replication_status = Some("COMPLETED".to_string());
        }
        for (event_name, key, version_id) in replicated_events {
            self.emit_notification_event(&event_name, source_bucket, &key, &version_id);
        }
        for (key, version_id) in deleted_replica_events {
            self.emit_notification_event(
                "s3:Replication:ObjectDeleted",
                source_bucket,
                &key,
                &version_id,
            );
        }
        Ok(())
    }
    pub(crate) fn cors_preflight(
        &mut self,
        request: &S3HttpRequest,
        bucket: &str,
        key: &str,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let origin = header(&request.headers, "origin")
            .ok_or_else(|| RuntimeError::CorsNotAllowed("missing Origin".to_string()))?;
        let requested_method = header(&request.headers, "access-control-request-method")
            .ok_or_else(|| RuntimeError::CorsNotAllowed("missing requested method".to_string()))?;
        let requested_headers = header(&request.headers, "access-control-request-headers")
            .map(parse_header_list)
            .unwrap_or_default();
        let bucket_state = self.require_bucket(bucket)?;
        let Some(rule) = bucket_state.cors_rules.iter().find(|rule| {
            cors_value_matches(&rule.allowed_origins, origin)
                && cors_value_matches(&rule.allowed_methods, requested_method)
                && requested_headers.iter().all(|requested| {
                    rule.allowed_headers.is_empty()
                        || cors_value_matches(&rule.allowed_headers, requested)
                })
        }) else {
            return Err(RuntimeError::CorsNotAllowed(origin.to_string()));
        };
        let response_origin = cors_response_origin(&rule.allowed_origins, origin)
            .ok_or_else(|| RuntimeError::CorsNotAllowed(origin.to_string()))?;
        let mut response = S3HttpResponse::new(200)
            .with_header("access-control-allow-origin", response_origin)
            .with_header("vary", "Origin")
            .with_header(
                "access-control-allow-methods",
                rule.allowed_methods.join(", "),
            );
        if !rule.allowed_headers.is_empty() {
            response = response.with_header(
                "access-control-allow-headers",
                rule.allowed_headers.join(", "),
            );
        }
        if !rule.expose_headers.is_empty() {
            response = response.with_header(
                "access-control-expose-headers",
                rule.expose_headers.join(", "),
            );
        }
        if let Some(max_age) = rule.max_age_seconds {
            response = response.with_header("access-control-max-age", max_age.to_string());
        }
        self.audit.append(
            &request.principal,
            &format!("{:?}", S3Action::GetBucketCors),
            &object_resource(bucket, key),
            AuditOutcome::Allowed,
            Some("cors-preflight".to_string()),
        );
        Ok(response)
    }
    pub(crate) fn apply_actual_cors_response(
        &self,
        response: S3HttpResponse,
        bucket: &str,
        origin: &str,
        method: &str,
    ) -> S3HttpResponse {
        let Some(bucket_state) = self.buckets.get(bucket) else {
            return response;
        };
        let Some(rule) = bucket_state.cors_rules.iter().find(|rule| {
            cors_value_matches(&rule.allowed_origins, origin)
                && cors_value_matches(&rule.allowed_methods, method)
        }) else {
            return response;
        };
        let Some(response_origin) = cors_response_origin(&rule.allowed_origins, origin) else {
            return response;
        };
        let mut response = response
            .with_header("access-control-allow-origin", response_origin)
            .with_header("vary", "Origin");
        if !rule.expose_headers.is_empty() {
            response = response.with_header(
                "access-control-expose-headers",
                rule.expose_headers.join(", "),
            );
        }
        response
    }
}

#[cfg(test)]
mod tests;