use super::*;
impl BucketWarden {
pub fn bulk_copy_objects(
&mut self,
principal: &str,
request: BulkObjectCopyRequest,
) -> BulkObjectCopyResult {
let mut copied = Vec::new();
let mut errors = Vec::new();
for entry in request.entries {
let source_bucket = entry.source_bucket.clone();
let source_key = entry.source_key.clone();
let destination_bucket = entry.destination_bucket.clone();
let destination_key = entry.destination_key.clone();
match self.copy_object(principal, entry) {
Ok(result) => copied.push(result),
Err(error) => errors.push(BulkObjectCopyError {
source_bucket,
source_key,
destination_bucket,
destination_key,
code: multi_object_delete_error_code(&error).to_string(),
message: error.to_string(),
}),
}
}
BulkObjectCopyResult { copied, errors }
}
pub fn copy_object(
&mut self,
principal: &str,
request: CopyObjectRequest,
) -> Result<CopyObjectResult, RuntimeError> {
if !validate_object_key(&request.destination_key) {
return Err(RuntimeError::InvalidObjectKey(request.destination_key));
}
let source_resource = object_resource(&request.source_bucket, &request.source_key);
let destination_resource =
object_resource(&request.destination_bucket, &request.destination_key);
self.authorize(principal, S3Action::GetObject, &source_resource)?;
self.authorize(principal, S3Action::CopyObject, &destination_resource)?;
let creates_new_current_key = self
.buckets
.get(&request.destination_bucket)
.and_then(|bucket| bucket.objects.get(&request.destination_key))
.is_none_or(|object| !object.has_current_version());
self.enforce_object_creation_quota(&request.destination_bucket, creates_new_current_key)?;
let destination_owner = bucket_owner(self.require_bucket(&request.destination_bucket)?);
let (body, copied_lock, source_metadata) = {
let source = if let Some(version_id) = request.source_version_id.as_deref() {
self.version_by_id(&request.source_bucket, &request.source_key, version_id)?
} else {
self.current_version(&request.source_bucket, &request.source_key)?
};
if source.delete_marker {
return Err(RuntimeError::NoSuchKey(object_resource(
&request.source_bucket,
&request.source_key,
)));
}
let source_ciphertext = source.ciphertext.clone();
let source_integrity = source.integrity.clone();
let source_lock = source.lock.clone();
let source_metadata = source.metadata.clone();
let body = match self.kms.decrypt(&source_ciphertext) {
Ok(body) => body,
Err(error) => {
self.audit_kms_failure(
principal,
"kms:Decrypt",
&source_resource,
&source_ciphertext.key_id,
&error.to_string(),
);
return Err(RuntimeError::Kms(error));
}
};
self.verify_integrity_record(&source_integrity, &body)?;
(body, source_lock, source_metadata)
};
let mut metadata = match request.metadata_directive {
MetadataDirective::Copy => source_metadata,
MetadataDirective::Replace(metadata) => metadata,
};
if let Some(encryption) = request.destination_encryption {
metadata.encryption = Some(encryption);
}
let local_ordinal =
self.next_object_local_ordinal(&request.destination_bucket, &request.destination_key)?;
let version_id = self.allocate_object_version_id(local_ordinal);
let etag = object_etag(&body);
let ciphertext = match self.kms.encrypt(&body) {
Ok(ciphertext) => ciphertext,
Err(error) => {
let key_id = self.kms.key_id().to_string();
self.audit_kms_failure(
principal,
"kms:Encrypt",
&destination_resource,
&key_id,
&error.to_string(),
);
return Err(RuntimeError::Kms(error));
}
};
let envelope = self.envelope_metadata(metadata.encryption.as_ref(), &ciphertext, &body);
let integrity = IntegrityRecord::for_body(&body);
let destination_bucket = self
.buckets
.get_mut(&request.destination_bucket)
.expect("destination bucket checked above");
destination_bucket
.objects
.entry(request.destination_key.clone())
.or_default()
.versions
.push(StoredVersion {
version_id: version_id.clone(),
local_ordinal,
ciphertext,
etag: etag.clone(),
last_modified_epoch_seconds: self.clock_epoch_seconds,
metadata,
envelope,
integrity: integrity.clone(),
tags: BTreeMap::new(),
lock: copied_lock,
delete_marker: false,
owner: destination_owner,
replication_status: None,
});
self.replication.record(
ReplicationAction::PutObject,
&request.destination_bucket,
&request.destination_key,
&version_id,
);
self.record_storage_commit(
"CopyObject",
&request.destination_bucket,
&request.destination_key,
&version_id,
&integrity.checksum_sha256,
);
self.audit_allowed(
principal,
S3Action::CopyObject,
&destination_resource,
Some(format!("{}:{version_id}", source_resource)),
);
let kms_key_id = self.kms.key_id().to_string();
self.audit_kms_encrypt(principal, &destination_resource, &version_id, &kms_key_id);
self.emit_notification_event(
"s3:ObjectCreated:Copy",
&request.destination_bucket,
&request.destination_key,
&version_id,
);
Ok(CopyObjectResult {
source_bucket: request.source_bucket,
source_key: request.source_key,
destination_bucket: request.destination_bucket,
destination_key: request.destination_key,
version_id,
etag,
})
}
pub fn restore_object(
&mut self,
principal: &str,
bucket: &str,
key: &str,
) -> Result<String, RuntimeError> {
self.restore_object_entry(principal, bucket, key, None)
.map(|entry| entry.restore_header)
}
pub fn bulk_restore_objects(
&mut self,
principal: &str,
request: BulkObjectRestoreRequest,
) -> BulkObjectRestoreResult {
let mut restored = Vec::new();
let mut errors = Vec::new();
for entry in request.entries {
let bucket = entry.bucket.clone();
let key = entry.key.clone();
let version_id = entry.version_id.clone();
match self.restore_object_entry(principal, &bucket, &key, version_id.as_deref()) {
Ok(result) => restored.push(result),
Err(error) => {
errors.push(BulkObjectActionError {
bucket,
key,
version_id,
code: multi_object_delete_error_code(&error).to_string(),
message: error.to_string(),
});
}
}
}
BulkObjectRestoreResult { restored, errors }
}
fn restore_object_entry(
&mut self,
principal: &str,
bucket: &str,
key: &str,
version_id: Option<&str>,
) -> Result<BulkObjectRestoreEntry, RuntimeError> {
let resource = object_resource(bucket, key);
self.authorize(principal, S3Action::RestoreObject, &resource)?;
let restored = if let Some(version_id) = version_id {
self.get_object_version(principal, bucket, key, version_id)?
} else {
self.get_object(principal, bucket, key)?
};
let restore_header = "ongoing-request=\"false\"".to_string();
self.audit_allowed(
principal,
S3Action::RestoreObject,
&resource,
Some(restored.version_id.clone()),
);
self.emit_notification_event(
"s3:ObjectRestore:Completed",
bucket,
key,
&restored.version_id,
);
Ok(BulkObjectRestoreEntry {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: restored.version_id,
restore_header,
})
}
}