bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
#![allow(unused_imports)]
mod common;
use bucketwarden_lock::{LockError, ObjectLock, RetentionMode};
use bucketwarden_s3::{
    sigv4::{authorization_header, presigned_url_query, sha256_hex, AwsCredentials, SigV4Request},
    *,
};
use bucketwarden_server::*;
use common::*;
use std::collections::BTreeMap;
#[test]
fn http_bucket_replication_round_trips_and_replicates_preserved_state() {
    let mut runtime = runtime();
    runtime
        .handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-001"))
        .expect("source");
    runtime
        .handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-002"))
        .expect("destination");
    runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001")
                .with_query("object-lock", "")
                .with_body(
                    b"<ObjectLockConfiguration><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>"
                        .to_vec(),
                ),
        )
        .expect("object lock");
    runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-002")
                .with_query("object-lock", "")
                .with_body(
                    b"<ObjectLockConfiguration><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>"
                        .to_vec(),
                ),
        )
        .expect("destination object lock");
    runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001")
                .with_query("encryption", "")
                .with_body(
                    b"<ServerSideEncryptionConfiguration><Rule><ApplyServerSideEncryptionByDefault><SSEAlgorithm>aws:kms</SSEAlgorithm><KMSMasterKeyID>local-dev</KMSMasterKeyID></ApplyServerSideEncryptionByDefault></Rule></ServerSideEncryptionConfiguration>"
                        .to_vec(),
                ),
        )
        .expect("encryption");
    let replication_xml = br#"<ReplicationConfiguration>
        <Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>
        <Rule>
            <ID>records-replica</ID>
            <Status>Enabled</Status>
            <Filter><Prefix>records/</Prefix></Filter>
            <Destination><Bucket>arn:aws:s3:::archive-002</Bucket></Destination>
            <DeleteMarkerReplication><Status>Enabled</Status></DeleteMarkerReplication>
            <SourceSelectionCriteria><SseKmsEncryptedObjects><Status>Enabled</Status></SseKmsEncryptedObjects></SourceSelectionCriteria>
        </Rule>
    </ReplicationConfiguration>"#;
    let put_config = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001")
                .with_query("replication", "")
                .with_body(replication_xml.to_vec()),
        )
        .expect("put replication");
    assert_eq!(put_config.status, 200);
    let get_config = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "GET", "/archive-001").with_query("replication", ""),
        )
        .expect("get replication");
    let get_config_xml = String::from_utf8(get_config.body).expect("xml");
    assert!(get_config_xml
        .contains("<Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>"));
    assert!(get_config_xml.contains("<Bucket>arn:aws:s3:::archive-002</Bucket>"));
    assert!(get_config_xml.contains("<DeleteMarkerReplication><Status>Enabled</Status>"));
    let v1 = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.json")
                .with_header("content-type", "application/json")
                .with_header("x-amz-meta-owner", "team-a")
                .with_header("x-amz-object-lock-legal-hold", "ON")
                .with_header("x-amz-object-lock-mode", "GOVERNANCE")
                .with_header(
                    "x-amz-object-lock-retain-until-date",
                    "1970-01-02T00:00:00Z",
                )
                .with_body(br#"{"version":1}"#.to_vec()),
        )
        .expect("put v1");
    let v1_id = v1.headers.get("x-amz-version-id").expect("v1").clone();
    runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.json")
                .with_query("tagging", "")
                .with_query("versionId", v1_id.clone())
                .with_body(b"<Tagging><TagSet><Tag><Key>class</Key><Value>regulated</Value></Tag></TagSet></Tagging>".to_vec()),
        )
        .expect("tag v1");
    let v2 = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.json")
                .with_body(br#"{"version":2}"#.to_vec()),
        )
        .expect("put v2");
    let v2_id = v2.headers.get("x-amz-version-id").expect("v2").clone();
    runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001/tmp/skip.json")
                .with_body(b"skip".to_vec()),
        )
        .expect("skip prefix");
    let delete_marker = runtime
        .handle_s3_http(S3HttpRequest::new(
            "alice",
            "DELETE",
            "/archive-001/records/a.json",
        ))
        .expect("delete marker");
    assert_eq!(delete_marker.status, 204);
    let run_response = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "POST", "/archive-001").with_query("replication", ""),
        )
        .expect("run replication");
    assert_eq!(run_response.status, 200);
    let run_xml = String::from_utf8(run_response.body).expect("run xml");
    assert!(run_xml.contains("<SourceBucket>archive-001</SourceBucket>"));
    assert!(run_xml.contains("<ReplicatedObjectVersions>2</ReplicatedObjectVersions>"));
    assert!(run_xml.contains("<ReplicatedDeleteMarkers>1</ReplicatedDeleteMarkers>"));
    assert!(run_xml.contains("<SkippedMissingDestinations>0</SkippedMissingDestinations>"));
    let replicated_v1 = runtime
        .get_object_version("alice", "archive-002", "records/a.json", &v1_id)
        .expect("replicated v1");
    assert_eq!(replicated_v1.body, br#"{"version":1}"#);
    assert_eq!(replicated_v1.metadata.content_type, "application/json");
    assert_eq!(
        replicated_v1.metadata.user_metadata.get("owner"),
        Some(&"team-a".to_string())
    );
    assert_eq!(
        replicated_v1
            .metadata
            .encryption
            .as_ref()
            .map(|encryption| encryption.algorithm.as_str()),
        Some("aws:kms")
    );
    assert_eq!(
        replicated_v1.replication_status.as_deref(),
        Some("COMPLETED")
    );
    let replicated_head = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "HEAD", "/archive-002/records/a.json")
                .with_query("versionId", v1_id.clone()),
        )
        .expect("head replicated");
    assert_eq!(
        replicated_head.headers.get("x-amz-replication-status"),
        Some(&"COMPLETED".to_string())
    );
    let replicated_tags = runtime
        .get_object_tagging("alice", "archive-002", "records/a.json", Some(&v1_id))
        .expect("replicated tags");
    assert_eq!(
        replicated_tags.tags.get("class"),
        Some(&"regulated".to_string())
    );
    let replicated_hold = runtime
        .get_object_legal_hold("alice", "archive-002", "records/a.json", Some(&v1_id))
        .expect("replicated legal hold");
    assert!(replicated_hold.enabled);
    let replicated_retention = runtime
        .get_object_retention("alice", "archive-002", "records/a.json", Some(&v1_id))
        .expect("replicated retention");
    assert_eq!(replicated_retention.mode.as_deref(), Some("GOVERNANCE"));
    assert_eq!(
        replicated_retention.retain_until_epoch_seconds,
        Some(86_400)
    );
    assert!(matches!(
        runtime.get_object("alice", "archive-002", "tmp/skip.json"),
        Err(RuntimeError::NoSuchKey(_))
    ));
    let idempotent = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "POST", "/archive-001").with_query("replication", ""),
        )
        .expect("idempotent");
    let idempotent_xml = String::from_utf8(idempotent.body).expect("idempotent xml");
    assert!(idempotent_xml.contains("<ReplicatedObjectVersions>0</ReplicatedObjectVersions>"));
    assert!(idempotent_xml.contains("<ReplicatedDeleteMarkers>0</ReplicatedDeleteMarkers>"));
    assert!(idempotent_xml.contains("<SkippedExistingVersions>3</SkippedExistingVersions>"));
    runtime
        .delete_object_version("alice", "archive-001", "records/a.json", &v2_id, true)
        .expect("delete source version");
    let version_delete_run = runtime
        .run_bucket_replication("alice", "archive-001")
        .expect("replicate version delete");
    assert_eq!(version_delete_run.replicated_deleted_versions, 1);
    assert!(matches!(
        runtime.get_object_version("alice", "archive-002", "records/a.json", &v2_id),
        Err(RuntimeError::NoSuchVersion { .. })
    ));
    let delete_config = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "DELETE", "/archive-001").with_query("replication", ""),
        )
        .expect("delete replication");
    assert_eq!(delete_config.status, 204);
    let missing_config = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "GET", "/archive-001").with_query("replication", ""),
        )
        .expect("missing config");
    assert_eq!(missing_config.status, 404);
    assert!(String::from_utf8(missing_config.body)
        .expect("xml")
        .contains("<Code>ReplicationConfigurationNotFoundError</Code>"));
}
#[test]
fn http_bucket_replication_reports_admin_failure_modes_and_observability() {
    let mut runtime = runtime();
    runtime
        .handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-001"))
        .expect("source");
    runtime
        .handle_s3_http(S3HttpRequest::new("alice", "PUT", "/archive-002"))
        .expect("destination");
    let invalid_config = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001")
                .with_query("replication", "")
                .with_body(
                    b"<ReplicationConfiguration><Rule><Status>Enabled</Status></Rule></ReplicationConfiguration>"
                        .to_vec(),
                ),
        )
        .expect("invalid replication");
    assert_eq!(invalid_config.status, 400);
    assert!(String::from_utf8(invalid_config.body)
        .expect("xml")
        .contains("<Code>MalformedXML</Code>"));
    runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001/records/a.txt")
                .with_body(b"replicate me".to_vec()),
        )
        .expect("source object");
    let missing_destination_config = br#"<ReplicationConfiguration>
        <Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>
        <Rule>
            <ID>missing-destination</ID>
            <Status>Enabled</Status>
            <Filter><Prefix>records/</Prefix></Filter>
            <Destination><Bucket>arn:aws:s3:::missing-destination</Bucket></Destination>
        </Rule>
    </ReplicationConfiguration>"#;
    let put_missing_destination = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001")
                .with_query("replication", "")
                .with_body(missing_destination_config.to_vec()),
        )
        .expect("put missing destination config");
    assert_eq!(put_missing_destination.status, 200);
    let missing_destination_run = runtime
        .run_bucket_replication("alice", "archive-001")
        .expect("missing destination run");
    assert_eq!(missing_destination_run.replicated_object_versions, 0);
    assert_eq!(missing_destination_run.skipped_missing_destinations, 1);
    let disabled_config = br#"<ReplicationConfiguration>
        <Role>arn:aws:iam::123456789012:role/bucketwarden-replication</Role>
        <Rule>
            <ID>disabled-rule</ID>
            <Status>Disabled</Status>
            <Filter><Prefix>records/</Prefix></Filter>
            <Destination><Bucket>arn:aws:s3:::archive-002</Bucket></Destination>
        </Rule>
    </ReplicationConfiguration>"#;
    let put_disabled = runtime
        .handle_s3_http(
            S3HttpRequest::new("alice", "PUT", "/archive-001")
                .with_query("replication", "")
                .with_body(disabled_config.to_vec()),
        )
        .expect("put disabled config");
    assert_eq!(put_disabled.status, 200);
    let disabled_run = runtime
        .run_bucket_replication("alice", "archive-001")
        .expect("disabled run");
    assert_eq!(disabled_run.replicated_object_versions, 0);
    assert_eq!(disabled_run.skipped_missing_destinations, 0);
    assert!(matches!(
        runtime.get_object("alice", "archive-002", "records/a.txt"),
        Err(RuntimeError::NoSuchKey(_))
    ));
    let summary = runtime.replication_summary();
    assert_eq!(summary.put_objects, 1);
    assert_eq!(summary.delete_objects, 0);
    let json_lines = runtime.replication_json_lines();
    assert!(json_lines.contains("\"bucket\":\"archive-001\""));
    assert!(runtime
        .audit_events()
        .iter()
        .any(|event| event.action == "s3:ReplicateObject"
            && event.detail.as_deref().is_some_and(|detail| {
                detail.contains("missing_destinations=1")
                    || detail.contains("objects=0,markers=0,deleted=0,existing=0")
            })));
}
#[test]
fn snapshot_restores_in_flight_multipart_uploads_and_upload_counter() {
    let mut runtime = runtime();
    runtime
        .create_bucket("alice", "archive-001")
        .expect("bucket");
    let upload = runtime
        .create_multipart_upload(
            "alice",
            CreateMultipartUploadRequest {
                bucket: "archive-001".to_string(),
                key: "large.bin".to_string(),
                metadata: ObjectMetadata::default(),
            },
            ObjectLock::none(),
        )
        .expect("init");
    let part = runtime
        .upload_part(
            "alice",
            UploadPartRequest {
                bucket: "archive-001".to_string(),
                key: "large.bin".to_string(),
                upload_id: upload.upload_id.clone(),
                part_number: 1,
                body: b"persisted part".to_vec(),
            },
        )
        .expect("part");
    let snapshot_json = runtime.snapshot_json().expect("snapshot");
    let mut restored =
        BucketWarden::restore_json(RuntimeConfig::development(), &snapshot_json).expect("restore");
    let completed = restored
        .complete_multipart_upload(
            "alice",
            CompleteMultipartUploadRequest {
                bucket: "archive-001".to_string(),
                key: "large.bin".to_string(),
                upload_id: upload.upload_id.clone(),
                parts: vec![CompletedPart {
                    part_number: 1,
                    etag: part.etag,
                }],
            },
        )
        .expect("complete restored");
    let next_upload = restored
        .create_multipart_upload(
            "alice",
            CreateMultipartUploadRequest {
                bucket: "archive-001".to_string(),
                key: "next.bin".to_string(),
                metadata: ObjectMetadata::default(),
            },
            ObjectLock::none(),
        )
        .expect("next upload");
    assert_eq!(completed.version_id, "v1");
    assert_eq!(
        restored
            .get_object("alice", "archive-001", "large.bin")
            .expect("get")
            .body,
        b"persisted part"
    );
    assert_eq!(upload.upload_id, "u1");
    assert_eq!(next_upload.upload_id, "u2");
}