use super::*;
impl BucketWarden {
pub fn complete_multipart_upload(
&mut self,
principal: &str,
request: CompleteMultipartUploadRequest,
) -> Result<CompleteMultipartUploadResult, RuntimeError> {
let resource = object_resource(&request.bucket, &request.key);
self.authorize(principal, S3Action::CompleteMultipartUpload, &resource)?;
let upload = self
.multipart_uploads
.get(&request.upload_id)
.ok_or_else(|| RuntimeError::NoSuchUpload(request.upload_id.clone()))?;
if upload.bucket != request.bucket || upload.key != request.key {
return Err(RuntimeError::NoSuchUpload(request.upload_id));
}
let part_order = if request.parts.is_empty() {
upload
.parts
.iter()
.map(|(part_number, part)| CompletedPart {
part_number: *part_number,
etag: part.etag.clone(),
})
.collect::<Vec<_>>()
} else {
request.parts
};
if part_order.is_empty() {
return Err(RuntimeError::NoMultipartParts(request.upload_id));
}
let creates_new_current_key = self
.buckets
.get(&request.bucket)
.and_then(|bucket| bucket.objects.get(&request.key))
.is_none_or(|object| !object.has_current_version());
self.enforce_object_creation_quota(&request.bucket, creates_new_current_key)?;
let mut body = Vec::new();
let mut part_checksums = Vec::new();
for completed in &part_order {
let part = upload.parts.get(&completed.part_number).ok_or_else(|| {
RuntimeError::MissingMultipartPart {
upload_id: request.upload_id.clone(),
part_number: completed.part_number,
}
})?;
if completed.etag != part.etag {
return Err(RuntimeError::MultipartEtagMismatch {
upload_id: request.upload_id.clone(),
part_number: completed.part_number,
});
}
part_checksums.push(PartIntegrityRecord {
part_number: completed.part_number,
checksum_sha256: part.checksum_sha256.clone(),
size: part.body.len(),
});
body.extend_from_slice(&part.body);
}
let mut metadata = upload.metadata.clone();
if let Some(encryption) = metadata.encryption.clone() {
metadata.encryption = Some(self.normalize_encryption(encryption)?);
}
let lock = upload.lock.clone();
let owner = bucket_owner(
self.buckets
.get(&request.bucket)
.expect("bucket existed when upload was created"),
);
let etag = multipart_complete_etag(&part_order);
self.multipart_uploads.remove(&request.upload_id);
let local_ordinal = self.next_object_local_ordinal(&request.bucket, &request.key)?;
let version_id = self.allocate_object_version_id(local_ordinal);
let ciphertext = match self.kms.encrypt(&body) {
Ok(ciphertext) => ciphertext,
Err(error) => {
let key_id = self.kms.key_id().to_string();
self.audit_kms_failure(
principal,
"kms:Encrypt",
&resource,
&key_id,
&error.to_string(),
);
return Err(RuntimeError::Kms(error));
}
};
let envelope = self.envelope_metadata(metadata.encryption.as_ref(), &ciphertext, &body);
let integrity = IntegrityRecord::for_multipart_body(&body, part_checksums);
self.buckets
.get_mut(&request.bucket)
.expect("bucket existed when upload was created")
.objects
.entry(request.key.clone())
.or_default()
.versions
.push(StoredVersion {
version_id: version_id.clone(),
local_ordinal,
ciphertext,
etag: etag.clone(),
last_modified_epoch_seconds: self.clock_epoch_seconds,
metadata,
envelope,
integrity: integrity.clone(),
tags: BTreeMap::new(),
lock,
delete_marker: false,
owner,
replication_status: None,
});
self.replication.record(
ReplicationAction::PutObject,
&request.bucket,
&request.key,
&version_id,
);
self.record_storage_commit(
"CompleteMultipartUpload",
&request.bucket,
&request.key,
&version_id,
&integrity.checksum_sha256,
);
self.audit_allowed(
principal,
S3Action::CompleteMultipartUpload,
&resource,
Some(format!("{}:{version_id}", request.upload_id)),
);
let kms_key_id = self.kms.key_id().to_string();
self.audit_kms_encrypt(principal, &resource, &version_id, &kms_key_id);
self.emit_notification_event(
"s3:ObjectCreated:CompleteMultipartUpload",
&request.bucket,
&request.key,
&version_id,
);
Ok(CompleteMultipartUploadResult {
bucket: request.bucket,
key: request.key,
upload_id: request.upload_id,
version_id,
etag,
})
}
pub fn abort_multipart_upload(
&mut self,
principal: &str,
request: AbortMultipartUploadRequest,
) -> Result<(), RuntimeError> {
let resource = object_resource(&request.bucket, &request.key);
self.authorize(principal, S3Action::AbortMultipartUpload, &resource)?;
let upload = self
.multipart_uploads
.remove(&request.upload_id)
.ok_or_else(|| RuntimeError::NoSuchUpload(request.upload_id.clone()))?;
if upload.bucket != request.bucket || upload.key != request.key {
self.multipart_uploads.insert(request.upload_id, upload);
return Err(RuntimeError::NoSuchUpload(resource));
}
self.audit_allowed(
principal,
S3Action::AbortMultipartUpload,
&resource,
Some("aborted".to_string()),
);
Ok(())
}
}