use super::*;
mod encryption_update;
mod write_get_object_response;
impl BucketWarden {
pub(crate) fn object_lock_from_headers(
&self,
bucket: &str,
headers: &BTreeMap<String, String>,
) -> Result<ObjectLock, RuntimeError> {
let mode = header(headers, "x-amz-object-lock-mode")
.map(parse_retention_mode)
.transpose()?;
let retain_until = header(headers, "x-amz-object-lock-retain-until-date")
.map(parse_retention_timestamp)
.transpose()?;
let legal_hold = header(headers, "x-amz-object-lock-legal-hold")
.map(parse_legal_hold_status)
.transpose()?
.unwrap_or(false);
if mode.is_some() || retain_until.is_some() || legal_hold {
self.require_object_lock_enabled(bucket)?;
}
let mut lock = self.default_object_lock(bucket)?;
if let Some(mode) = mode {
let retain_until = retain_until.ok_or_else(|| {
RuntimeError::InvalidObjectLockConfiguration(
"x-amz-object-lock-retain-until-date is required with mode".to_string(),
)
})?;
self.require_retain_until_in_future(retain_until)?;
lock.set_retention(mode, retain_until);
} else if retain_until.is_some() {
return Err(RuntimeError::InvalidObjectLockConfiguration(
"x-amz-object-lock-mode is required with retain-until-date".to_string(),
));
}
if legal_hold {
lock.set_legal_hold(true);
}
Ok(lock)
}
pub(crate) fn require_retain_until_in_future(
&self,
retain_until_epoch_seconds: u64,
) -> Result<(), RuntimeError> {
if retain_until_epoch_seconds <= self.clock_epoch_seconds {
return Err(RuntimeError::InvalidRetention(format!(
"retain-until date must be after current runtime clock {}",
self.clock_epoch_seconds
)));
}
Ok(())
}
pub(crate) fn require_object_lock_enabled(&self, bucket: &str) -> Result<(), RuntimeError> {
let bucket_state = self.require_bucket(bucket)?;
if bucket_state.object_lock.enabled {
Ok(())
} else {
Err(RuntimeError::ObjectLockConfigurationNotFound(
bucket.to_string(),
))
}
}
pub(crate) fn default_object_lock(&self, bucket: &str) -> Result<ObjectLock, RuntimeError> {
let bucket_state = self.require_bucket(bucket)?;
let Some(default_retention) = &bucket_state.object_lock.default_retention else {
return Ok(ObjectLock::none());
};
let mode = retention_mode_from_text(&default_retention.mode)?;
let seconds = match (default_retention.days, default_retention.years) {
(Some(days), None) => days.saturating_mul(86_400),
(None, Some(years)) => years.saturating_mul(365).saturating_mul(86_400),
_ => {
return Err(RuntimeError::InvalidObjectLockConfiguration(
"default retention requires exactly one of Days or Years".to_string(),
))
}
};
Ok(ObjectLock::retained(
mode,
self.clock_epoch_seconds.saturating_add(seconds),
))
}
pub(crate) fn encryption_from_headers(
&self,
bucket: &str,
headers: &BTreeMap<String, String>,
) -> Result<ServerSideEncryption, RuntimeError> {
if header(headers, "x-amz-server-side-encryption-customer-algorithm").is_some()
|| header(headers, "x-amz-server-side-encryption-customer-key").is_some()
|| header(headers, "x-amz-server-side-encryption-customer-key-md5").is_some()
{
return Err(RuntimeError::UnsupportedEncryption(
"SSE-C is not supported by the current runtime boundary".to_string(),
));
}
let kms_key_id =
header(headers, "x-amz-server-side-encryption-aws-kms-key-id").map(str::to_string);
if let Some(algorithm) = header(headers, "x-amz-server-side-encryption") {
return self.normalize_encryption(ServerSideEncryption {
algorithm: algorithm.to_string(),
kms_key_id,
});
}
if kms_key_id.is_some() {
return Err(RuntimeError::InvalidEncryption(
"KMS key id requires x-amz-server-side-encryption: aws:kms".to_string(),
));
}
let bucket_state = self.require_bucket(bucket)?;
Ok(bucket_state
.encryption
.clone()
.unwrap_or_else(|| ServerSideEncryption {
algorithm: "AES256".to_string(),
kms_key_id: None,
}))
}
pub(crate) fn normalize_encryption(
&self,
encryption: ServerSideEncryption,
) -> Result<ServerSideEncryption, RuntimeError> {
match encryption.algorithm.as_str() {
"AES256" => {
if encryption.kms_key_id.is_some() {
return Err(RuntimeError::InvalidEncryption(
"SSE-S3 AES256 cannot include a KMS key id".to_string(),
));
}
Ok(ServerSideEncryption {
algorithm: "AES256".to_string(),
kms_key_id: None,
})
}
"aws:kms" => {
let kms_key_id = encryption
.kms_key_id
.unwrap_or_else(|| self.kms.key_id().to_string());
if kms_key_id != self.kms.key_id() {
return Err(RuntimeError::InvalidEncryption(format!(
"KMS key id {kms_key_id} does not match active runtime key {}",
self.kms.key_id()
)));
}
Ok(ServerSideEncryption {
algorithm: "aws:kms".to_string(),
kms_key_id: Some(kms_key_id),
})
}
"aws:kms:dsse" => Err(RuntimeError::UnsupportedEncryption(
"DSSE-KMS is not supported by the current runtime boundary".to_string(),
)),
value => Err(RuntimeError::UnsupportedEncryption(format!(
"unsupported server-side encryption algorithm: {value}"
))),
}
}
pub fn put_object(
&mut self,
principal: &str,
request: PutObjectRequest,
lock: ObjectLock,
) -> Result<PutObjectResult, RuntimeError> {
if !validate_object_key(&request.key) {
return Err(RuntimeError::InvalidObjectKey(request.key));
}
let resource = object_resource(&request.bucket, &request.key);
self.authorize(principal, S3Action::PutObject, &resource)?;
let creates_new_current_key = self
.buckets
.get(&request.bucket)
.and_then(|bucket| bucket.objects.get(&request.key))
.is_none_or(|object| !object.has_current_version());
self.enforce_object_creation_quota(&request.bucket, creates_new_current_key)?;
let owner = bucket_owner(self.require_bucket(&request.bucket)?);
let local_ordinal = self.next_object_local_ordinal(&request.bucket, &request.key)?;
let version_id = self.allocate_object_version_id(local_ordinal);
let etag = object_etag(&request.body);
let ciphertext = match self.kms.encrypt(&request.body) {
Ok(ciphertext) => ciphertext,
Err(error) => {
let key_id = self.kms.key_id().to_string();
self.audit_kms_failure(
principal,
"kms:Encrypt",
&resource,
&key_id,
&error.to_string(),
);
return Err(RuntimeError::Kms(error));
}
};
let envelope = self.envelope_metadata(
request.metadata.encryption.as_ref(),
&ciphertext,
&request.body,
);
let integrity = IntegrityRecord::for_body(&request.body);
let version = StoredVersion {
version_id: version_id.clone(),
local_ordinal,
ciphertext,
etag: etag.clone(),
last_modified_epoch_seconds: self.clock_epoch_seconds,
metadata: request.metadata,
envelope,
integrity: integrity.clone(),
tags: BTreeMap::new(),
lock,
delete_marker: false,
owner,
replication_status: None,
};
let bucket = self
.buckets
.get_mut(&request.bucket)
.expect("bucket checked above");
bucket
.objects
.entry(request.key.clone())
.or_default()
.versions
.push(version);
self.replication.record(
ReplicationAction::PutObject,
&request.bucket,
&request.key,
&version_id,
);
self.record_storage_commit(
"PutObject",
&request.bucket,
&request.key,
&version_id,
&integrity.checksum_sha256,
);
self.audit_allowed(
principal,
S3Action::PutObject,
&resource,
Some(version_id.clone()),
);
let kms_key_id = self.kms.key_id().to_string();
self.audit_kms_encrypt(principal, &resource, &version_id, &kms_key_id);
self.emit_notification_event(
"s3:ObjectCreated:Put",
&request.bucket,
&request.key,
&version_id,
);
Ok(PutObjectResult {
bucket: request.bucket,
key: request.key,
version_id,
etag,
})
}
pub fn rename_object(
&mut self,
principal: &str,
request: RenameObjectRequest,
) -> Result<RenameObjectResult, RuntimeError> {
if !crate::bucket_basics::is_directory_bucket_name(&request.bucket) {
return Err(RuntimeError::InvalidBucketName(
"RenameObject is only supported for directory buckets".to_string(),
));
}
if request.source_key.ends_with('/') || request.destination_key.ends_with('/') {
return Err(RuntimeError::InvalidObjectKey(
"RenameObject does not support keys ending with '/'".to_string(),
));
}
if !validate_object_key(&request.source_key)
|| !validate_object_key(&request.destination_key)
{
return Err(RuntimeError::InvalidObjectKey(request.destination_key));
}
let source_resource = object_resource(&request.bucket, &request.source_key);
let destination_resource = object_resource(&request.bucket, &request.destination_key);
self.authorize(principal, S3Action::RenameObject, &source_resource)?;
let (current_version, remove_source_entry) = {
let bucket_state = self.require_bucket_mut(&request.bucket)?;
let source_object = bucket_state
.objects
.get_mut(&request.source_key)
.ok_or_else(|| RuntimeError::NoSuchKey(source_resource.clone()))?;
let version = source_object
.versions
.pop()
.ok_or_else(|| RuntimeError::NoSuchKey(source_resource.clone()))?;
if version.delete_marker {
return Err(RuntimeError::NoSuchKey(source_resource));
}
(version, source_object.versions.is_empty())
};
if remove_source_entry {
self.require_bucket_mut(&request.bucket)?
.objects
.remove(&request.source_key);
}
self.require_bucket_mut(&request.bucket)?
.objects
.entry(request.destination_key.clone())
.or_default()
.versions
.push(current_version.clone());
self.audit_allowed(
principal,
S3Action::RenameObject,
&destination_resource,
Some(current_version.version_id.clone()),
);
Ok(RenameObjectResult {
bucket: request.bucket,
source_key: request.source_key,
destination_key: request.destination_key,
version_id: current_version.version_id,
etag: current_version.etag,
})
}
pub fn write_get_object_response(
&mut self,
principal: &str,
request: WriteGetObjectResponseRequest,
) -> Result<(), RuntimeError> {
self.authorize(principal, S3Action::WriteGetObjectResponse, "*")?;
if request.request_route.trim().is_empty() || request.request_token.trim().is_empty() {
return Err(RuntimeError::InvalidEncryption(
"WriteGetObjectResponse requires non-empty request route and request token"
.to_string(),
));
}
let status_code = request.status_code.unwrap_or(200);
if status_code >= 400 {
if request.error_code.as_deref().is_none() || request.error_message.as_deref().is_none()
{
return Err(RuntimeError::InvalidEncryption(
"WriteGetObjectResponse error responses require both error code and error message"
.to_string(),
));
}
if !request.body.is_empty() {
return Err(RuntimeError::InvalidEncryption(
"WriteGetObjectResponse cannot include a transformed body for an error response"
.to_string(),
));
}
} else if request.error_code.is_some() || request.error_message.is_some() {
return Err(RuntimeError::InvalidEncryption(
"WriteGetObjectResponse successful responses cannot include error headers"
.to_string(),
));
}
self.audit_allowed(
principal,
S3Action::WriteGetObjectResponse,
"*",
Some(format!("{}:{}", request.request_route, status_code)),
);
Ok(())
}
}