bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

impl BucketWarden {
    pub fn complete_multipart_upload(
        &mut self,
        principal: &str,
        request: CompleteMultipartUploadRequest,
    ) -> Result<CompleteMultipartUploadResult, RuntimeError> {
        let resource = object_resource(&request.bucket, &request.key);
        self.authorize(principal, S3Action::CompleteMultipartUpload, &resource)?;
        let upload = self
            .multipart_uploads
            .get(&request.upload_id)
            .ok_or_else(|| RuntimeError::NoSuchUpload(request.upload_id.clone()))?;
        if upload.bucket != request.bucket || upload.key != request.key {
            return Err(RuntimeError::NoSuchUpload(request.upload_id));
        }
        let part_order = if request.parts.is_empty() {
            upload
                .parts
                .iter()
                .map(|(part_number, part)| CompletedPart {
                    part_number: *part_number,
                    etag: part.etag.clone(),
                })
                .collect::<Vec<_>>()
        } else {
            request.parts
        };
        if part_order.is_empty() {
            return Err(RuntimeError::NoMultipartParts(request.upload_id));
        }
        let creates_new_current_key = self
            .buckets
            .get(&request.bucket)
            .and_then(|bucket| bucket.objects.get(&request.key))
            .is_none_or(|object| !object.has_current_version());
        self.enforce_object_creation_quota(&request.bucket, creates_new_current_key)?;
        let mut body = Vec::new();
        let mut part_checksums = Vec::new();
        for completed in &part_order {
            let part = upload.parts.get(&completed.part_number).ok_or_else(|| {
                RuntimeError::MissingMultipartPart {
                    upload_id: request.upload_id.clone(),
                    part_number: completed.part_number,
                }
            })?;
            if completed.etag != part.etag {
                return Err(RuntimeError::MultipartEtagMismatch {
                    upload_id: request.upload_id.clone(),
                    part_number: completed.part_number,
                });
            }
            part_checksums.push(PartIntegrityRecord {
                part_number: completed.part_number,
                checksum_sha256: part.checksum_sha256.clone(),
                size: part.body.len(),
            });
            body.extend_from_slice(&part.body);
        }
        let mut metadata = upload.metadata.clone();
        if let Some(encryption) = metadata.encryption.clone() {
            metadata.encryption = Some(self.normalize_encryption(encryption)?);
        }
        let lock = upload.lock.clone();
        let owner = bucket_owner(
            self.buckets
                .get(&request.bucket)
                .expect("bucket existed when upload was created"),
        );
        let etag = multipart_complete_etag(&part_order);
        self.multipart_uploads.remove(&request.upload_id);
        let local_ordinal = self.next_object_local_ordinal(&request.bucket, &request.key)?;
        let version_id = self.allocate_object_version_id(local_ordinal);
        let ciphertext = match self.kms.encrypt(&body) {
            Ok(ciphertext) => ciphertext,
            Err(error) => {
                let key_id = self.kms.key_id().to_string();
                self.audit_kms_failure(
                    principal,
                    "kms:Encrypt",
                    &resource,
                    &key_id,
                    &error.to_string(),
                );
                return Err(RuntimeError::Kms(error));
            }
        };
        let envelope = self.envelope_metadata(metadata.encryption.as_ref(), &ciphertext, &body);
        let integrity = IntegrityRecord::for_multipart_body(&body, part_checksums);
        self.buckets
            .get_mut(&request.bucket)
            .expect("bucket existed when upload was created")
            .objects
            .entry(request.key.clone())
            .or_default()
            .versions
            .push(StoredVersion {
                version_id: version_id.clone(),
                local_ordinal,
                ciphertext,
                etag: etag.clone(),
                last_modified_epoch_seconds: self.clock_epoch_seconds,
                metadata,
                envelope,
                integrity: integrity.clone(),
                tags: BTreeMap::new(),
                lock,
                delete_marker: false,
                owner,
                replication_status: None,
            });
        self.replication.record(
            ReplicationAction::PutObject,
            &request.bucket,
            &request.key,
            &version_id,
        );
        self.record_storage_commit(
            "CompleteMultipartUpload",
            &request.bucket,
            &request.key,
            &version_id,
            &integrity.checksum_sha256,
        );
        self.audit_allowed(
            principal,
            S3Action::CompleteMultipartUpload,
            &resource,
            Some(format!("{}:{version_id}", request.upload_id)),
        );
        let kms_key_id = self.kms.key_id().to_string();
        self.audit_kms_encrypt(principal, &resource, &version_id, &kms_key_id);
        self.emit_notification_event(
            "s3:ObjectCreated:CompleteMultipartUpload",
            &request.bucket,
            &request.key,
            &version_id,
        );
        Ok(CompleteMultipartUploadResult {
            bucket: request.bucket,
            key: request.key,
            upload_id: request.upload_id,
            version_id,
            etag,
        })
    }

    pub fn abort_multipart_upload(
        &mut self,
        principal: &str,
        request: AbortMultipartUploadRequest,
    ) -> Result<(), RuntimeError> {
        let resource = object_resource(&request.bucket, &request.key);
        self.authorize(principal, S3Action::AbortMultipartUpload, &resource)?;
        let upload = self
            .multipart_uploads
            .remove(&request.upload_id)
            .ok_or_else(|| RuntimeError::NoSuchUpload(request.upload_id.clone()))?;
        if upload.bucket != request.bucket || upload.key != request.key {
            self.multipart_uploads.insert(request.upload_id, upload);
            return Err(RuntimeError::NoSuchUpload(resource));
        }
        self.audit_allowed(
            principal,
            S3Action::AbortMultipartUpload,
            &resource,
            Some("aborted".to_string()),
        );
        Ok(())
    }
}