bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;
mod encryption_update;
mod write_get_object_response;
impl BucketWarden {
    pub(crate) fn object_lock_from_headers(
        &self,
        bucket: &str,
        headers: &BTreeMap<String, String>,
    ) -> Result<ObjectLock, RuntimeError> {
        let mode = header(headers, "x-amz-object-lock-mode")
            .map(parse_retention_mode)
            .transpose()?;
        let retain_until = header(headers, "x-amz-object-lock-retain-until-date")
            .map(parse_retention_timestamp)
            .transpose()?;
        let legal_hold = header(headers, "x-amz-object-lock-legal-hold")
            .map(parse_legal_hold_status)
            .transpose()?
            .unwrap_or(false);
        if mode.is_some() || retain_until.is_some() || legal_hold {
            self.require_object_lock_enabled(bucket)?;
        }
        let mut lock = self.default_object_lock(bucket)?;
        if let Some(mode) = mode {
            let retain_until = retain_until.ok_or_else(|| {
                RuntimeError::InvalidObjectLockConfiguration(
                    "x-amz-object-lock-retain-until-date is required with mode".to_string(),
                )
            })?;
            self.require_retain_until_in_future(retain_until)?;
            lock.set_retention(mode, retain_until);
        } else if retain_until.is_some() {
            return Err(RuntimeError::InvalidObjectLockConfiguration(
                "x-amz-object-lock-mode is required with retain-until-date".to_string(),
            ));
        }
        if legal_hold {
            lock.set_legal_hold(true);
        }
        Ok(lock)
    }
    pub(crate) fn require_retain_until_in_future(
        &self,
        retain_until_epoch_seconds: u64,
    ) -> Result<(), RuntimeError> {
        if retain_until_epoch_seconds <= self.clock_epoch_seconds {
            return Err(RuntimeError::InvalidRetention(format!(
                "retain-until date must be after current runtime clock {}",
                self.clock_epoch_seconds
            )));
        }
        Ok(())
    }
    pub(crate) fn require_object_lock_enabled(&self, bucket: &str) -> Result<(), RuntimeError> {
        let bucket_state = self.require_bucket(bucket)?;
        if bucket_state.object_lock.enabled {
            Ok(())
        } else {
            Err(RuntimeError::ObjectLockConfigurationNotFound(
                bucket.to_string(),
            ))
        }
    }
    pub(crate) fn default_object_lock(&self, bucket: &str) -> Result<ObjectLock, RuntimeError> {
        let bucket_state = self.require_bucket(bucket)?;
        let Some(default_retention) = &bucket_state.object_lock.default_retention else {
            return Ok(ObjectLock::none());
        };
        let mode = retention_mode_from_text(&default_retention.mode)?;
        let seconds = match (default_retention.days, default_retention.years) {
            (Some(days), None) => days.saturating_mul(86_400),
            (None, Some(years)) => years.saturating_mul(365).saturating_mul(86_400),
            _ => {
                return Err(RuntimeError::InvalidObjectLockConfiguration(
                    "default retention requires exactly one of Days or Years".to_string(),
                ))
            }
        };
        Ok(ObjectLock::retained(
            mode,
            self.clock_epoch_seconds.saturating_add(seconds),
        ))
    }
    pub(crate) fn encryption_from_headers(
        &self,
        bucket: &str,
        headers: &BTreeMap<String, String>,
    ) -> Result<ServerSideEncryption, RuntimeError> {
        if header(headers, "x-amz-server-side-encryption-customer-algorithm").is_some()
            || header(headers, "x-amz-server-side-encryption-customer-key").is_some()
            || header(headers, "x-amz-server-side-encryption-customer-key-md5").is_some()
        {
            return Err(RuntimeError::UnsupportedEncryption(
                "SSE-C is not supported by the current runtime boundary".to_string(),
            ));
        }
        let kms_key_id =
            header(headers, "x-amz-server-side-encryption-aws-kms-key-id").map(str::to_string);
        if let Some(algorithm) = header(headers, "x-amz-server-side-encryption") {
            return self.normalize_encryption(ServerSideEncryption {
                algorithm: algorithm.to_string(),
                kms_key_id,
            });
        }
        if kms_key_id.is_some() {
            return Err(RuntimeError::InvalidEncryption(
                "KMS key id requires x-amz-server-side-encryption: aws:kms".to_string(),
            ));
        }
        let bucket_state = self.require_bucket(bucket)?;
        Ok(bucket_state
            .encryption
            .clone()
            .unwrap_or_else(|| ServerSideEncryption {
                algorithm: "AES256".to_string(),
                kms_key_id: None,
            }))
    }
    pub(crate) fn normalize_encryption(
        &self,
        encryption: ServerSideEncryption,
    ) -> Result<ServerSideEncryption, RuntimeError> {
        match encryption.algorithm.as_str() {
            "AES256" => {
                if encryption.kms_key_id.is_some() {
                    return Err(RuntimeError::InvalidEncryption(
                        "SSE-S3 AES256 cannot include a KMS key id".to_string(),
                    ));
                }
                Ok(ServerSideEncryption {
                    algorithm: "AES256".to_string(),
                    kms_key_id: None,
                })
            }
            "aws:kms" => {
                let kms_key_id = encryption
                    .kms_key_id
                    .unwrap_or_else(|| self.kms.key_id().to_string());
                if kms_key_id != self.kms.key_id() {
                    return Err(RuntimeError::InvalidEncryption(format!(
                        "KMS key id {kms_key_id} does not match active runtime key {}",
                        self.kms.key_id()
                    )));
                }
                Ok(ServerSideEncryption {
                    algorithm: "aws:kms".to_string(),
                    kms_key_id: Some(kms_key_id),
                })
            }
            "aws:kms:dsse" => Err(RuntimeError::UnsupportedEncryption(
                "DSSE-KMS is not supported by the current runtime boundary".to_string(),
            )),
            value => Err(RuntimeError::UnsupportedEncryption(format!(
                "unsupported server-side encryption algorithm: {value}"
            ))),
        }
    }
    pub fn put_object(
        &mut self,
        principal: &str,
        request: PutObjectRequest,
        lock: ObjectLock,
    ) -> Result<PutObjectResult, RuntimeError> {
        if !validate_object_key(&request.key) {
            return Err(RuntimeError::InvalidObjectKey(request.key));
        }
        let resource = object_resource(&request.bucket, &request.key);
        self.authorize(principal, S3Action::PutObject, &resource)?;
        let creates_new_current_key = self
            .buckets
            .get(&request.bucket)
            .and_then(|bucket| bucket.objects.get(&request.key))
            .is_none_or(|object| !object.has_current_version());
        self.enforce_object_creation_quota(&request.bucket, creates_new_current_key)?;
        let owner = bucket_owner(self.require_bucket(&request.bucket)?);
        let local_ordinal = self.next_object_local_ordinal(&request.bucket, &request.key)?;
        let version_id = self.allocate_object_version_id(local_ordinal);
        let etag = object_etag(&request.body);
        let ciphertext = match self.kms.encrypt(&request.body) {
            Ok(ciphertext) => ciphertext,
            Err(error) => {
                let key_id = self.kms.key_id().to_string();
                self.audit_kms_failure(
                    principal,
                    "kms:Encrypt",
                    &resource,
                    &key_id,
                    &error.to_string(),
                );
                return Err(RuntimeError::Kms(error));
            }
        };
        let envelope = self.envelope_metadata(
            request.metadata.encryption.as_ref(),
            &ciphertext,
            &request.body,
        );
        let integrity = IntegrityRecord::for_body(&request.body);
        let version = StoredVersion {
            version_id: version_id.clone(),
            local_ordinal,
            ciphertext,
            etag: etag.clone(),
            last_modified_epoch_seconds: self.clock_epoch_seconds,
            metadata: request.metadata,
            envelope,
            integrity: integrity.clone(),
            tags: BTreeMap::new(),
            lock,
            delete_marker: false,
            owner,
            replication_status: None,
        };
        let bucket = self
            .buckets
            .get_mut(&request.bucket)
            .expect("bucket checked above");
        bucket
            .objects
            .entry(request.key.clone())
            .or_default()
            .versions
            .push(version);
        self.replication.record(
            ReplicationAction::PutObject,
            &request.bucket,
            &request.key,
            &version_id,
        );
        self.record_storage_commit(
            "PutObject",
            &request.bucket,
            &request.key,
            &version_id,
            &integrity.checksum_sha256,
        );
        self.audit_allowed(
            principal,
            S3Action::PutObject,
            &resource,
            Some(version_id.clone()),
        );
        let kms_key_id = self.kms.key_id().to_string();
        self.audit_kms_encrypt(principal, &resource, &version_id, &kms_key_id);
        self.emit_notification_event(
            "s3:ObjectCreated:Put",
            &request.bucket,
            &request.key,
            &version_id,
        );
        Ok(PutObjectResult {
            bucket: request.bucket,
            key: request.key,
            version_id,
            etag,
        })
    }

    pub fn rename_object(
        &mut self,
        principal: &str,
        request: RenameObjectRequest,
    ) -> Result<RenameObjectResult, RuntimeError> {
        if !crate::bucket_basics::is_directory_bucket_name(&request.bucket) {
            return Err(RuntimeError::InvalidBucketName(
                "RenameObject is only supported for directory buckets".to_string(),
            ));
        }
        if request.source_key.ends_with('/') || request.destination_key.ends_with('/') {
            return Err(RuntimeError::InvalidObjectKey(
                "RenameObject does not support keys ending with '/'".to_string(),
            ));
        }
        if !validate_object_key(&request.source_key)
            || !validate_object_key(&request.destination_key)
        {
            return Err(RuntimeError::InvalidObjectKey(request.destination_key));
        }
        let source_resource = object_resource(&request.bucket, &request.source_key);
        let destination_resource = object_resource(&request.bucket, &request.destination_key);
        self.authorize(principal, S3Action::RenameObject, &source_resource)?;
        let (current_version, remove_source_entry) = {
            let bucket_state = self.require_bucket_mut(&request.bucket)?;
            let source_object = bucket_state
                .objects
                .get_mut(&request.source_key)
                .ok_or_else(|| RuntimeError::NoSuchKey(source_resource.clone()))?;
            let version = source_object
                .versions
                .pop()
                .ok_or_else(|| RuntimeError::NoSuchKey(source_resource.clone()))?;
            if version.delete_marker {
                return Err(RuntimeError::NoSuchKey(source_resource));
            }
            (version, source_object.versions.is_empty())
        };
        if remove_source_entry {
            self.require_bucket_mut(&request.bucket)?
                .objects
                .remove(&request.source_key);
        }
        self.require_bucket_mut(&request.bucket)?
            .objects
            .entry(request.destination_key.clone())
            .or_default()
            .versions
            .push(current_version.clone());
        self.audit_allowed(
            principal,
            S3Action::RenameObject,
            &destination_resource,
            Some(current_version.version_id.clone()),
        );
        Ok(RenameObjectResult {
            bucket: request.bucket,
            source_key: request.source_key,
            destination_key: request.destination_key,
            version_id: current_version.version_id,
            etag: current_version.etag,
        })
    }

    pub fn write_get_object_response(
        &mut self,
        principal: &str,
        request: WriteGetObjectResponseRequest,
    ) -> Result<(), RuntimeError> {
        self.authorize(principal, S3Action::WriteGetObjectResponse, "*")?;
        if request.request_route.trim().is_empty() || request.request_token.trim().is_empty() {
            return Err(RuntimeError::InvalidEncryption(
                "WriteGetObjectResponse requires non-empty request route and request token"
                    .to_string(),
            ));
        }
        let status_code = request.status_code.unwrap_or(200);
        if status_code >= 400 {
            if request.error_code.as_deref().is_none() || request.error_message.as_deref().is_none()
            {
                return Err(RuntimeError::InvalidEncryption(
                    "WriteGetObjectResponse error responses require both error code and error message"
                        .to_string(),
                ));
            }
            if !request.body.is_empty() {
                return Err(RuntimeError::InvalidEncryption(
                    "WriteGetObjectResponse cannot include a transformed body for an error response"
                        .to_string(),
                ));
            }
        } else if request.error_code.is_some() || request.error_message.is_some() {
            return Err(RuntimeError::InvalidEncryption(
                "WriteGetObjectResponse successful responses cannot include error headers"
                    .to_string(),
            ));
        }
        self.audit_allowed(
            principal,
            S3Action::WriteGetObjectResponse,
            "*",
            Some(format!("{}:{}", request.request_route, status_code)),
        );
        Ok(())
    }
}