use super::*;
impl BucketWarden {
pub fn get_bucket_replication(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketReplicationConfiguration, RuntimeError> {
self.authorize(principal, S3Action::GetBucketReplication, bucket)?;
let state = self.require_bucket(bucket)?.replication.clone();
if state.rules.is_empty() {
return Err(RuntimeError::NoSuchBucketReplication(bucket.to_string()));
}
self.audit_allowed(
principal,
S3Action::GetBucketReplication,
bucket,
Some(state.rules.len().to_string()),
);
Ok(BucketReplicationConfiguration {
bucket: bucket.to_string(),
role: state.role,
rules: state.rules,
})
}
pub fn put_bucket_replication(
&mut self,
principal: &str,
bucket: &str,
role: Option<String>,
rules: Vec<ReplicationRule>,
) -> Result<BucketReplicationConfiguration, RuntimeError> {
self.authorize(principal, S3Action::PutBucketReplication, bucket)?;
validate_replication_rules(&rules)?;
self.require_bucket_mut(bucket)?.replication = BucketReplicationState {
role: role.clone(),
rules: rules.clone(),
configured_epoch_seconds: self.clock_epoch_seconds,
};
self.audit_allowed(
principal,
S3Action::PutBucketReplication,
bucket,
Some(rules.len().to_string()),
);
Ok(BucketReplicationConfiguration {
bucket: bucket.to_string(),
role,
rules,
})
}
pub fn delete_bucket_replication(
&mut self,
principal: &str,
bucket: &str,
) -> Result<(), RuntimeError> {
self.authorize(principal, S3Action::DeleteBucketReplication, bucket)?;
self.require_bucket_mut(bucket)?.replication = BucketReplicationState::default();
self.audit_allowed(principal, S3Action::DeleteBucketReplication, bucket, None);
Ok(())
}
pub fn run_bucket_replication(
&mut self,
principal: &str,
source_bucket: &str,
) -> Result<ReplicationRunResult, RuntimeError> {
self.authorize(principal, S3Action::RunBucketReplication, source_bucket)?;
let config = self.require_bucket(source_bucket)?.replication.clone();
if config.rules.is_empty() {
return Err(RuntimeError::NoSuchBucketReplication(
source_bucket.to_string(),
));
}
let mut result = ReplicationRunResult {
source_bucket: source_bucket.to_string(),
..ReplicationRunResult::default()
};
for rule in config.rules.iter().filter(|rule| rule.status == "Enabled") {
let Some(destination_bucket) = replication_destination_bucket(rule) else {
result.skipped_missing_destinations += 1;
continue;
};
if !self.buckets.contains_key(destination_bucket) {
result.skipped_missing_destinations += 1;
continue;
}
let source_versions = self.replication_source_versions(
source_bucket,
rule,
config.configured_epoch_seconds,
)?;
self.replicate_versions_to_destination(
source_bucket,
destination_bucket,
rule,
&source_versions,
&mut result,
)?;
}
self.audit_allowed(
principal,
S3Action::RunBucketReplication,
source_bucket,
Some(format!(
"objects={},markers={},deleted={},existing={},encrypted_skipped={},missing_destinations={}",
result.replicated_object_versions,
result.replicated_delete_markers,
result.replicated_deleted_versions,
result.skipped_existing_versions,
result.skipped_encrypted_objects,
result.skipped_missing_destinations
)),
);
Ok(result)
}
pub(crate) fn replication_source_versions(
&self,
source_bucket: &str,
rule: &ReplicationRule,
configured_epoch_seconds: u64,
) -> Result<Vec<ReplicationSourceVersion>, RuntimeError> {
let bucket_state = self.require_bucket(source_bucket)?;
let mut versions = Vec::new();
for (key, object) in &bucket_state.objects {
if !replication_rule_matches(rule, key) {
continue;
}
for version in &object.versions {
if !tag_filter_matches(&rule.tag_filter, &version.tags) {
continue;
}
if version.delete_marker && !rule.delete_marker_replication {
continue;
}
if !rule.existing_object_replication
&& version.last_modified_epoch_seconds < configured_epoch_seconds
{
continue;
}
versions.push(ReplicationSourceVersion {
key: key.clone(),
version: version.clone(),
});
}
}
Ok(versions)
}
pub(crate) fn replicate_versions_to_destination(
&mut self,
source_bucket: &str,
destination_bucket: &str,
rule: &ReplicationRule,
source_versions: &[ReplicationSourceVersion],
result: &mut ReplicationRunResult,
) -> Result<(), RuntimeError> {
let mut completed_source_versions = Vec::<(String, String)>::new();
let mut replicated_events = Vec::<(String, String, String)>::new();
let mut deleted_replica_events = Vec::<(String, String)>::new();
let mut desired_versions = BTreeMap::<String, Vec<String>>::new();
for source in source_versions {
if source.version.metadata.encryption.is_some() && !rule.replicate_encrypted_objects {
continue;
}
desired_versions
.entry(source.key.clone())
.or_default()
.push(source.version.version_id.clone());
}
{
let destination_state = self.require_bucket_mut(destination_bucket)?;
let destination_owner = bucket_owner(destination_state);
for source in source_versions {
if source.version.metadata.encryption.is_some() && !rule.replicate_encrypted_objects
{
result.skipped_encrypted_objects += 1;
continue;
}
let object = destination_state
.objects
.entry(source.key.clone())
.or_default();
if object
.versions
.iter()
.any(|version| version.version_id == source.version.version_id)
{
result.skipped_existing_versions += 1;
completed_source_versions
.push((source.key.clone(), source.version.version_id.clone()));
continue;
}
let mut version = source.version.clone();
version.owner = destination_owner.clone();
version.replication_status = Some("COMPLETED".to_string());
if version.delete_marker {
result.replicated_delete_markers += 1;
replicated_events.push((
"s3:Replication:DeleteMarkerReplicated".to_string(),
source.key.clone(),
source.version.version_id.clone(),
));
} else {
result.replicated_object_versions += 1;
replicated_events.push((
"s3:Replication:ObjectReplicated".to_string(),
source.key.clone(),
source.version.version_id.clone(),
));
}
object.versions.push(version);
completed_source_versions
.push((source.key.clone(), source.version.version_id.clone()));
}
let mut empty_keys = Vec::new();
for (key, object) in &mut destination_state.objects {
if !replication_rule_matches(rule, key) {
continue;
}
let desired = desired_versions.get(key);
let before = object.versions.len();
let mut removed_versions = Vec::new();
object.versions.retain(|version| {
let keep = version.replication_status.as_deref() != Some("COMPLETED")
|| desired.is_some_and(|ids| ids.contains(&version.version_id));
if !keep {
removed_versions.push(version.version_id.clone());
}
keep
});
result.replicated_deleted_versions += before - object.versions.len();
for version_id in removed_versions {
deleted_replica_events.push((key.clone(), version_id));
}
if object.versions.is_empty() {
empty_keys.push(key.clone());
}
}
for key in empty_keys {
destination_state.objects.remove(&key);
}
}
for (key, version_id) in completed_source_versions {
self.version_by_id_mut(source_bucket, &key, &version_id)?
.replication_status = Some("COMPLETED".to_string());
}
for (event_name, key, version_id) in replicated_events {
self.emit_notification_event(&event_name, source_bucket, &key, &version_id);
}
for (key, version_id) in deleted_replica_events {
self.emit_notification_event(
"s3:Replication:ObjectDeleted",
source_bucket,
&key,
&version_id,
);
}
Ok(())
}
pub(crate) fn cors_preflight(
&mut self,
request: &S3HttpRequest,
bucket: &str,
key: &str,
) -> Result<S3HttpResponse, RuntimeError> {
let origin = header(&request.headers, "origin")
.ok_or_else(|| RuntimeError::CorsNotAllowed("missing Origin".to_string()))?;
let requested_method = header(&request.headers, "access-control-request-method")
.ok_or_else(|| RuntimeError::CorsNotAllowed("missing requested method".to_string()))?;
let requested_headers = header(&request.headers, "access-control-request-headers")
.map(parse_header_list)
.unwrap_or_default();
let bucket_state = self.require_bucket(bucket)?;
let Some(rule) = bucket_state.cors_rules.iter().find(|rule| {
cors_value_matches(&rule.allowed_origins, origin)
&& cors_value_matches(&rule.allowed_methods, requested_method)
&& requested_headers.iter().all(|requested| {
rule.allowed_headers.is_empty()
|| cors_value_matches(&rule.allowed_headers, requested)
})
}) else {
return Err(RuntimeError::CorsNotAllowed(origin.to_string()));
};
let response_origin = cors_response_origin(&rule.allowed_origins, origin)
.ok_or_else(|| RuntimeError::CorsNotAllowed(origin.to_string()))?;
let mut response = S3HttpResponse::new(200)
.with_header("access-control-allow-origin", response_origin)
.with_header("vary", "Origin")
.with_header(
"access-control-allow-methods",
rule.allowed_methods.join(", "),
);
if !rule.allowed_headers.is_empty() {
response = response.with_header(
"access-control-allow-headers",
rule.allowed_headers.join(", "),
);
}
if !rule.expose_headers.is_empty() {
response = response.with_header(
"access-control-expose-headers",
rule.expose_headers.join(", "),
);
}
if let Some(max_age) = rule.max_age_seconds {
response = response.with_header("access-control-max-age", max_age.to_string());
}
self.audit.append(
&request.principal,
&format!("{:?}", S3Action::GetBucketCors),
&object_resource(bucket, key),
AuditOutcome::Allowed,
Some("cors-preflight".to_string()),
);
Ok(response)
}
pub(crate) fn apply_actual_cors_response(
&self,
response: S3HttpResponse,
bucket: &str,
origin: &str,
method: &str,
) -> S3HttpResponse {
let Some(bucket_state) = self.buckets.get(bucket) else {
return response;
};
let Some(rule) = bucket_state.cors_rules.iter().find(|rule| {
cors_value_matches(&rule.allowed_origins, origin)
&& cors_value_matches(&rule.allowed_methods, method)
}) else {
return response;
};
let Some(response_origin) = cors_response_origin(&rule.allowed_origins, origin) else {
return response;
};
let mut response = response
.with_header("access-control-allow-origin", response_origin)
.with_header("vary", "Origin");
if !rule.expose_headers.is_empty() {
response = response.with_header(
"access-control-expose-headers",
rule.expose_headers.join(", "),
);
}
response
}
}
#[cfg(test)]
mod tests;