#![allow(unused_imports)]
mod common;
use bucketwarden_lock::{LockError, ObjectLock, RetentionMode};
use bucketwarden_s3::{
sigv4::{authorization_header, presigned_url_query, sha256_hex, AwsCredentials, SigV4Request},
*,
};
use bucketwarden_server::*;
use common::*;
use std::collections::BTreeMap;
#[test]
fn http_bucket_replication_round_trips_and_replicates_preserved_state() {
let mut runtime = runtime();
runtime
.handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-001"))
.expect("source");
runtime
.handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-002"))
.expect("destination");
runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001")
.with_query("object-lock", "")
.with_body(
b"<ObjectLockConfiguration><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>"
.to_vec(),
),
)
.expect("object lock");
runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-002")
.with_query("object-lock", "")
.with_body(
b"<ObjectLockConfiguration><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>"
.to_vec(),
),
)
.expect("destination object lock");
runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001")
.with_query("encryption", "")
.with_body(
b"<ServerSideEncryptionConfiguration><Rule><ApplyServerSideEncryptionByDefault><SSEAlgorithm>aws:kms</SSEAlgorithm><KMSMasterKeyID>local-dev</KMSMasterKeyID></ApplyServerSideEncryptionByDefault></Rule></ServerSideEncryptionConfiguration>"
.to_vec(),
),
)
.expect("encryption");
let replication_xml = br#"<ReplicationConfiguration>
<Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>
<Rule>
<ID>records-replica</ID>
<Status>Enabled</Status>
<Filter><Prefix>records/</Prefix></Filter>
<Destination><Bucket>arn:aws:s3:::archive-002</Bucket></Destination>
<DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>
<SourceSelectionCriteria><SseKmsEncryptedObjects><Status>Enabled</Status></SseKmsEncryptedObjects></SourceSelectionCriteria>
</Rule>
</ReplicationConfiguration>"#;
let put_config = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001")
.with_query("replication", "")
.with_body(replication_xml.to_vec()),
)
.expect("put replication");
assert_eq!(put_config.status, 200);
let get_config = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "GET", "/archive-001").with_query("replication", ""),
)
.expect("get replication");
let get_config_xml = String::from_utf8(get_config.body).expect("xml");
assert!(get_config_xml
.contains("<Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>"));
assert!(get_config_xml.contains("<Bucket>arn:aws:s3:::archive-002</Bucket>"));
assert!(get_config_xml.contains("<DeleteMarkerReplication><Status>Enabled</Status>"));
let v1 = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.json")
.with_header("content-type", "application/json")
.with_header("x-amz-meta-owner", "team-a")
.with_header("x-amz-object-lock-legal-hold", "ON")
.with_header("x-amz-object-lock-mode", "GOVERNANCE")
.with_header(
"x-amz-object-lock-retain-until-date",
"1970-01-02T00:00:00Z",
)
.with_body(br#"{"version":1}"#.to_vec()),
)
.expect("put v1");
let v1_id = v1.headers.get("x-amz-version-id").expect("v1").clone();
runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.json")
.with_query("tagging", "")
.with_query("versionId", v1_id.clone())
.with_body(b"<Tagging><TagSet><Tag><Key>class</Key><Value>regulated</Value></Tag></TagSet></Tagging>".to_vec()),
)
.expect("tag v1");
let v2 = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.json")
.with_body(br#"{"version":2}"#.to_vec()),
)
.expect("put v2");
let v2_id = v2.headers.get("x-amz-version-id").expect("v2").clone();
runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001/tmp/skip.json")
.with_body(b"skip".to_vec()),
)
.expect("skip prefix");
let delete_marker = runtime
.handle_s3_http(S3HttpRequest::new(
"alice",
"DELETE",
"/archive-001/records/a.json",
))
.expect("delete marker");
assert_eq!(delete_marker.status, 204);
let run_response = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "POST", "/archive-001").with_query("replication", ""),
)
.expect("run replication");
assert_eq!(run_response.status, 200);
let run_xml = String::from_utf8(run_response.body).expect("run xml");
assert!(run_xml.contains("<SourceBucket>archive-001</SourceBucket>"));
assert!(run_xml.contains("<ReplicatedObjectVersions>2</ReplicatedObjectVersions>"));
assert!(run_xml.contains("<ReplicatedDeleteMarkers>1</ReplicatedDeleteMarkers>"));
assert!(run_xml.contains("<SkippedMissingDestinations>0</SkippedMissingDestinations>"));
let replicated_v1 = runtime
.get_object_version("alice", "archive-002", "records/a.json", &v1_id)
.expect("replicated v1");
assert_eq!(replicated_v1.body, br#"{"version":1}"#);
assert_eq!(replicated_v1.metadata.content_type, "application/json");
assert_eq!(
replicated_v1.metadata.user_metadata.get("owner"),
Some(&"team-a".to_string())
);
assert_eq!(
replicated_v1
.metadata
.encryption
.as_ref()
.map(|encryption| encryption.algorithm.as_str()),
Some("aws:kms")
);
assert_eq!(
replicated_v1.replication_status.as_deref(),
Some("COMPLETED")
);
let replicated_head = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "HEAD", "/archive-002/records/a.json")
.with_query("versionId", v1_id.clone()),
)
.expect("head replicated");
assert_eq!(
replicated_head.headers.get("x-amz-replication-status"),
Some(&"COMPLETED".to_string())
);
let replicated_tags = runtime
.get_object_tagging("alice", "archive-002", "records/a.json", Some(&v1_id))
.expect("replicated tags");
assert_eq!(
replicated_tags.tags.get("class"),
Some(&"regulated".to_string())
);
let replicated_hold = runtime
.get_object_legal_hold("alice", "archive-002", "records/a.json", Some(&v1_id))
.expect("replicated legal hold");
assert!(replicated_hold.enabled);
let replicated_retention = runtime
.get_object_retention("alice", "archive-002", "records/a.json", Some(&v1_id))
.expect("replicated retention");
assert_eq!(replicated_retention.mode.as_deref(), Some("GOVERNANCE"));
assert_eq!(
replicated_retention.retain_until_epoch_seconds,
Some(86_400)
);
assert!(matches!(
runtime.get_object("alice", "archive-002", "tmp/skip.json"),
Err(RuntimeError::NoSuchKey(_))
));
let idempotent = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "POST", "/archive-001").with_query("replication", ""),
)
.expect("idempotent");
let idempotent_xml = String::from_utf8(idempotent.body).expect("idempotent xml");
assert!(idempotent_xml.contains("<ReplicatedObjectVersions>0</ReplicatedObjectVersions>"));
assert!(idempotent_xml.contains("<ReplicatedDeleteMarkers>0</ReplicatedDeleteMarkers>"));
assert!(idempotent_xml.contains("<SkippedExistingVersions>3</SkippedExistingVersions>"));
runtime
.delete_object_version("alice", "archive-001", "records/a.json", &v2_id, true)
.expect("delete source version");
let version_delete_run = runtime
.run_bucket_replication("alice", "archive-001")
.expect("replicate version delete");
assert_eq!(version_delete_run.replicated_deleted_versions, 1);
assert!(matches!(
runtime.get_object_version("alice", "archive-002", "records/a.json", &v2_id),
Err(RuntimeError::NoSuchVersion { .. })
));
let delete_config = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "DELETE", "/archive-001").with_query("replication", ""),
)
.expect("delete replication");
assert_eq!(delete_config.status, 204);
let missing_config = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "GET", "/archive-001").with_query("replication", ""),
)
.expect("missing config");
assert_eq!(missing_config.status, 404);
assert!(String::from_utf8(missing_config.body)
.expect("xml")
.contains("<Code>ReplicationConfigurationNotFoundError</Code>"));
}
#[test]
fn http_bucket_replication_reports_admin_failure_modes_and_observability() {
let mut runtime = runtime();
runtime
.handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-001"))
.expect("source");
runtime
.handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-002"))
.expect("destination");
let invalid_config = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001")
.with_query("replication", "")
.with_body(
b"<ReplicationConfiguration><Rule><Status>Enabled</Status></Rule></ReplicationConfiguration>"
.to_vec(),
),
)
.expect("invalid replication");
assert_eq!(invalid_config.status, 400);
assert!(String::from_utf8(invalid_config.body)
.expect("xml")
.contains("<Code>MalformedXML</Code>"));
runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.txt")
.with_body(b"replicate me".to_vec()),
)
.expect("source object");
let missing_destination_config = br#"<ReplicationConfiguration>
<Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>
<Rule>
<ID>missing-destination</ID>
<Status>Enabled</Status>
<Filter><Prefix>records/</Prefix></Filter>
<Destination><Bucket>arn:aws:s3:::missing-destination</Bucket></Destination>
</Rule>
</ReplicationConfiguration>"#;
let put_missing_destination = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001")
.with_query("replication", "")
.with_body(missing_destination_config.to_vec()),
)
.expect("put missing destination config");
assert_eq!(put_missing_destination.status, 200);
let missing_destination_run = runtime
.run_bucket_replication("alice", "archive-001")
.expect("missing destination run");
assert_eq!(missing_destination_run.replicated_object_versions, 0);
assert_eq!(missing_destination_run.skipped_missing_destinations, 1);
let disabled_config = br#"<ReplicationConfiguration>
<Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>
<Rule>
<ID>disabled-rule</ID>
<Status>Disabled</Status>
<Filter><Prefix>records/</Prefix></Filter>
<Destination><Bucket>arn:aws:s3:::archive-002</Bucket></Destination>
</Rule>
</ReplicationConfiguration>"#;
let put_disabled = runtime
.handle_s3_http(
S3HttpRequest::new("alice", "PUT", "/archive-001")
.with_query("replication", "")
.with_body(disabled_config.to_vec()),
)
.expect("put disabled config");
assert_eq!(put_disabled.status, 200);
let disabled_run = runtime
.run_bucket_replication("alice", "archive-001")
.expect("disabled run");
assert_eq!(disabled_run.replicated_object_versions, 0);
assert_eq!(disabled_run.skipped_missing_destinations, 0);
assert!(matches!(
runtime.get_object("alice", "archive-002", "records/a.txt"),
Err(RuntimeError::NoSuchKey(_))
));
let summary = runtime.replication_summary();
assert_eq!(summary.put_objects, 1);
assert_eq!(summary.delete_objects, 0);
let json_lines = runtime.replication_json_lines();
assert!(json_lines.contains("\"bucket\":\"archive-001\""));
assert!(runtime
.audit_events()
.iter()
.any(|event| event.action == "s3:ReplicateObject"
&& event.detail.as_deref().is_some_and(|detail| {
detail.contains("missing_destinations=1")
|| detail.contains("objects=0,markers=0,deleted=0,existing=0")
})));
}
#[test]
fn snapshot_restores_in_flight_multipart_uploads_and_upload_counter() {
let mut runtime = runtime();
runtime
.create_bucket("alice", "archive-001")
.expect("bucket");
let upload = runtime
.create_multipart_upload(
"alice",
CreateMultipartUploadRequest {
bucket: "archive-001".to_string(),
key: "large.bin".to_string(),
metadata: ObjectMetadata::default(),
},
ObjectLock::none(),
)
.expect("init");
let part = runtime
.upload_part(
"alice",
UploadPartRequest {
bucket: "archive-001".to_string(),
key: "large.bin".to_string(),
upload_id: upload.upload_id.clone(),
part_number: 1,
body: b"persisted part".to_vec(),
},
)
.expect("part");
let snapshot_json = runtime.snapshot_json().expect("snapshot");
let mut restored =
BucketWarden::restore_json(RuntimeConfig::development(), &snapshot_json).expect("restore");
let completed = restored
.complete_multipart_upload(
"alice",
CompleteMultipartUploadRequest {
bucket: "archive-001".to_string(),
key: "large.bin".to_string(),
upload_id: upload.upload_id.clone(),
parts: vec![CompletedPart {
part_number: 1,
etag: part.etag,
}],
},
)
.expect("complete restored");
let next_upload = restored
.create_multipart_upload(
"alice",
CreateMultipartUploadRequest {
bucket: "archive-001".to_string(),
key: "next.bin".to_string(),
metadata: ObjectMetadata::default(),
},
ObjectLock::none(),
)
.expect("next upload");
assert_eq!(completed.version_id, "v1");
assert_eq!(
restored
.get_object("alice", "archive-001", "large.bin")
.expect("get")
.body,
b"persisted part"
);
assert_eq!(upload.upload_id, "u1");
assert_eq!(next_upload.upload_id, "u2");
}