bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

impl BucketWarden {
    pub fn bulk_copy_objects(
        &mut self,
        principal: &str,
        request: BulkObjectCopyRequest,
    ) -> BulkObjectCopyResult {
        let mut copied = Vec::new();
        let mut errors = Vec::new();
        for entry in request.entries {
            let source_bucket = entry.source_bucket.clone();
            let source_key = entry.source_key.clone();
            let destination_bucket = entry.destination_bucket.clone();
            let destination_key = entry.destination_key.clone();
            match self.copy_object(principal, entry) {
                Ok(result) => copied.push(result),
                Err(error) => errors.push(BulkObjectCopyError {
                    source_bucket,
                    source_key,
                    destination_bucket,
                    destination_key,
                    code: multi_object_delete_error_code(&error).to_string(),
                    message: error.to_string(),
                }),
            }
        }
        BulkObjectCopyResult { copied, errors }
    }

    pub fn copy_object(
        &mut self,
        principal: &str,
        request: CopyObjectRequest,
    ) -> Result<CopyObjectResult, RuntimeError> {
        if !validate_object_key(&request.destination_key) {
            return Err(RuntimeError::InvalidObjectKey(request.destination_key));
        }
        let source_resource = object_resource(&request.source_bucket, &request.source_key);
        let destination_resource =
            object_resource(&request.destination_bucket, &request.destination_key);
        self.authorize(principal, S3Action::GetObject, &source_resource)?;
        self.authorize(principal, S3Action::CopyObject, &destination_resource)?;
        let creates_new_current_key = self
            .buckets
            .get(&request.destination_bucket)
            .and_then(|bucket| bucket.objects.get(&request.destination_key))
            .is_none_or(|object| !object.has_current_version());
        self.enforce_object_creation_quota(&request.destination_bucket, creates_new_current_key)?;
        let destination_owner = bucket_owner(self.require_bucket(&request.destination_bucket)?);
        let (body, copied_lock, source_metadata) = {
            let source = if let Some(version_id) = request.source_version_id.as_deref() {
                self.version_by_id(&request.source_bucket, &request.source_key, version_id)?
            } else {
                self.current_version(&request.source_bucket, &request.source_key)?
            };
            if source.delete_marker {
                return Err(RuntimeError::NoSuchKey(object_resource(
                    &request.source_bucket,
                    &request.source_key,
                )));
            }
            let source_ciphertext = source.ciphertext.clone();
            let source_integrity = source.integrity.clone();
            let source_lock = source.lock.clone();
            let source_metadata = source.metadata.clone();
            let body = match self.kms.decrypt(&source_ciphertext) {
                Ok(body) => body,
                Err(error) => {
                    self.audit_kms_failure(
                        principal,
                        "kms:Decrypt",
                        &source_resource,
                        &source_ciphertext.key_id,
                        &error.to_string(),
                    );
                    return Err(RuntimeError::Kms(error));
                }
            };
            self.verify_integrity_record(&source_integrity, &body)?;
            (body, source_lock, source_metadata)
        };
        let mut metadata = match request.metadata_directive {
            MetadataDirective::Copy => source_metadata,
            MetadataDirective::Replace(metadata) => metadata,
        };
        if let Some(encryption) = request.destination_encryption {
            metadata.encryption = Some(encryption);
        }
        let local_ordinal =
            self.next_object_local_ordinal(&request.destination_bucket, &request.destination_key)?;
        let version_id = self.allocate_object_version_id(local_ordinal);
        let etag = object_etag(&body);
        let ciphertext = match self.kms.encrypt(&body) {
            Ok(ciphertext) => ciphertext,
            Err(error) => {
                let key_id = self.kms.key_id().to_string();
                self.audit_kms_failure(
                    principal,
                    "kms:Encrypt",
                    &destination_resource,
                    &key_id,
                    &error.to_string(),
                );
                return Err(RuntimeError::Kms(error));
            }
        };
        let envelope = self.envelope_metadata(metadata.encryption.as_ref(), &ciphertext, &body);
        let integrity = IntegrityRecord::for_body(&body);
        let destination_bucket = self
            .buckets
            .get_mut(&request.destination_bucket)
            .expect("destination bucket checked above");
        destination_bucket
            .objects
            .entry(request.destination_key.clone())
            .or_default()
            .versions
            .push(StoredVersion {
                version_id: version_id.clone(),
                local_ordinal,
                ciphertext,
                etag: etag.clone(),
                last_modified_epoch_seconds: self.clock_epoch_seconds,
                metadata,
                envelope,
                integrity: integrity.clone(),
                tags: BTreeMap::new(),
                lock: copied_lock,
                delete_marker: false,
                owner: destination_owner,
                replication_status: None,
            });
        self.replication.record(
            ReplicationAction::PutObject,
            &request.destination_bucket,
            &request.destination_key,
            &version_id,
        );
        self.record_storage_commit(
            "CopyObject",
            &request.destination_bucket,
            &request.destination_key,
            &version_id,
            &integrity.checksum_sha256,
        );
        self.audit_allowed(
            principal,
            S3Action::CopyObject,
            &destination_resource,
            Some(format!("{}:{version_id}", source_resource)),
        );
        let kms_key_id = self.kms.key_id().to_string();
        self.audit_kms_encrypt(principal, &destination_resource, &version_id, &kms_key_id);
        self.emit_notification_event(
            "s3:ObjectCreated:Copy",
            &request.destination_bucket,
            &request.destination_key,
            &version_id,
        );
        Ok(CopyObjectResult {
            source_bucket: request.source_bucket,
            source_key: request.source_key,
            destination_bucket: request.destination_bucket,
            destination_key: request.destination_key,
            version_id,
            etag,
        })
    }

    pub fn restore_object(
        &mut self,
        principal: &str,
        bucket: &str,
        key: &str,
    ) -> Result<String, RuntimeError> {
        self.restore_object_entry(principal, bucket, key, None)
            .map(|entry| entry.restore_header)
    }

    pub fn bulk_restore_objects(
        &mut self,
        principal: &str,
        request: BulkObjectRestoreRequest,
    ) -> BulkObjectRestoreResult {
        let mut restored = Vec::new();
        let mut errors = Vec::new();
        for entry in request.entries {
            let bucket = entry.bucket.clone();
            let key = entry.key.clone();
            let version_id = entry.version_id.clone();
            match self.restore_object_entry(principal, &bucket, &key, version_id.as_deref()) {
                Ok(result) => restored.push(result),
                Err(error) => {
                    errors.push(BulkObjectActionError {
                        bucket,
                        key,
                        version_id,
                        code: multi_object_delete_error_code(&error).to_string(),
                        message: error.to_string(),
                    });
                }
            }
        }
        BulkObjectRestoreResult { restored, errors }
    }

    fn restore_object_entry(
        &mut self,
        principal: &str,
        bucket: &str,
        key: &str,
        version_id: Option<&str>,
    ) -> Result<BulkObjectRestoreEntry, RuntimeError> {
        let resource = object_resource(bucket, key);
        self.authorize(principal, S3Action::RestoreObject, &resource)?;
        let restored = if let Some(version_id) = version_id {
            self.get_object_version(principal, bucket, key, version_id)?
        } else {
            self.get_object(principal, bucket, key)?
        };
        let restore_header = "ongoing-request=\"false\"".to_string();
        self.audit_allowed(
            principal,
            S3Action::RestoreObject,
            &resource,
            Some(restored.version_id.clone()),
        );
        self.emit_notification_event(
            "s3:ObjectRestore:Completed",
            bucket,
            key,
            &restored.version_id,
        );
        Ok(BulkObjectRestoreEntry {
            bucket: bucket.to_string(),
            key: key.to_string(),
            version_id: restored.version_id,
            restore_header,
        })
    }
}