bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

pub(crate) fn parse_bucket_versioning_status(
    body: &[u8],
) -> Result<BucketVersioningStatus, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    match text_between(&xml, "<Status>", "</Status>").map(str::trim) {
        Some("Enabled") => Ok(BucketVersioningStatus::Enabled),
        Some("Suspended") | None => Ok(BucketVersioningStatus::Suspended),
        Some(value) => Err(RuntimeError::InvalidBucketVersioningStatus(
            value.to_string(),
        )),
    }
}

pub(crate) fn parse_create_bucket_configuration(
    body: &[u8],
) -> Result<CreateBucketConfiguration, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    let body = xml.trim();
    if body.is_empty() {
        return Ok(CreateBucketConfiguration {
            location_constraint: Some(DEFAULT_BUCKET_REGION.to_string()),
        });
    }
    let location_constraint = text_between(body, "<LocationConstraint>", "</LocationConstraint>")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .unwrap_or(DEFAULT_BUCKET_REGION)
        .to_string();
    validate_bucket_region(&location_constraint)?;
    Ok(CreateBucketConfiguration {
        location_constraint: Some(location_constraint),
    })
}

pub(crate) fn parse_create_bucket_region(body: &[u8]) -> Result<String, RuntimeError> {
    Ok(parse_create_bucket_configuration(body)?
        .location_constraint
        .unwrap_or_else(|| DEFAULT_BUCKET_REGION.to_string()))
}

pub(crate) fn parse_bucket_ownership_controls(body: &[u8]) -> Result<String, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    let value = text_between(&xml, "<ObjectOwnership>", "</ObjectOwnership>")
        .map(str::trim)
        .ok_or_else(|| {
            RuntimeError::InvalidOwnershipControls(
                "OwnershipControls must include ObjectOwnership".to_string(),
            )
        })?;
    validate_object_ownership(value)?;
    Ok(value.to_string())
}

pub(crate) fn parse_bucket_cors_rules(body: &[u8]) -> Result<Vec<CorsRule>, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    let mut rules = Vec::new();
    let mut remainder = xml.as_ref();
    while let Some(start) = remainder.find("<CORSRule>") {
        remainder = &remainder[start + "<CORSRule>".len()..];
        let Some(end) = remainder.find("</CORSRule>") else {
            return Err(RuntimeError::InvalidCorsRule(
                "unclosed CORSRule".to_string(),
            ));
        };
        let rule_xml = &remainder[..end];
        remainder = &remainder[end + "</CORSRule>".len()..];
        let rule = CorsRule {
            allowed_origins: text_values(rule_xml, "AllowedOrigin"),
            allowed_methods: text_values(rule_xml, "AllowedMethod")
                .into_iter()
                .map(|method| method.to_ascii_uppercase())
                .collect(),
            allowed_headers: text_values(rule_xml, "AllowedHeader")
                .into_iter()
                .map(|header| header.to_ascii_lowercase())
                .collect(),
            expose_headers: text_values(rule_xml, "ExposeHeader"),
            max_age_seconds: text_between(rule_xml, "<MaxAgeSeconds>", "</MaxAgeSeconds>")
                .map(str::trim)
                .map(|value| {
                    value.parse::<u64>().map_err(|_| {
                        RuntimeError::InvalidCorsRule(format!("invalid MaxAgeSeconds: {value}"))
                    })
                })
                .transpose()?,
        };
        if rule.allowed_origins.is_empty() || rule.allowed_methods.is_empty() {
            return Err(RuntimeError::InvalidCorsRule(
                "AllowedOrigin and AllowedMethod are required".to_string(),
            ));
        }
        rules.push(rule);
    }
    if rules.is_empty() && xml.contains("<CORSConfiguration") {
        return Ok(Vec::new());
    }
    if rules.is_empty() {
        return Err(RuntimeError::InvalidCorsRule(
            "missing CORSRule".to_string(),
        ));
    }
    Ok(rules)
}

pub(crate) fn parse_bucket_website_configuration(
    bucket: &str,
    body: &[u8],
) -> Result<BucketWebsiteConfiguration, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    let index_document_suffix = text_between(&xml, "<Suffix>", "</Suffix>")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .ok_or_else(|| {
            RuntimeError::InvalidWebsiteConfiguration(
                "IndexDocument Suffix is required".to_string(),
            )
        })?
        .to_string();
    let error_document_key = parse_error_document(&xml).map(|document| document.key);
    let routing_rules = parse_website_routing_rules(&xml)?;
    let config = BucketWebsiteConfiguration {
        bucket: bucket.to_string(),
        index_document_suffix,
        error_document_key,
        routing_rules,
    };
    validate_website_configuration(&config)?;
    Ok(config)
}

pub(crate) fn parse_error_document(xml: &str) -> Option<ErrorDocument> {
    text_between(xml, "<ErrorDocument>", "</ErrorDocument>")
        .and_then(|section| text_between(section, "<Key>", "</Key>"))
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(|key| ErrorDocument {
            key: key.to_string(),
        })
}

pub(crate) fn parse_bucket_logging_status(xml: &str) -> Result<BucketLoggingStatus, RuntimeError> {
    if !xml.contains("<BucketLoggingStatus") {
        return Err(RuntimeError::InvalidBucketLogging(
            "missing BucketLoggingStatus root".to_string(),
        ));
    }
    if !xml.contains("<LoggingEnabled>") {
        return Ok(BucketLoggingStatus {
            logging_enabled: None,
        });
    }
    let target_bucket = text_between(xml, "<TargetBucket>", "</TargetBucket>")
        .map(str::trim)
        .filter(|value| !value.is_empty());
    let Some(target_bucket) = target_bucket else {
        return Err(RuntimeError::InvalidBucketLogging(
            "LoggingEnabled requires TargetBucket".to_string(),
        ));
    };
    let target_prefix = text_between(xml, "<TargetPrefix>", "</TargetPrefix>")
        .map(str::trim)
        .unwrap_or_default()
        .to_string();
    Ok(BucketLoggingStatus {
        logging_enabled: Some(BucketLoggingEnabled {
            target_bucket: target_bucket.to_string(),
            target_prefix,
        }),
    })
}

pub(crate) fn validate_bucket_logging_xml(xml: &str) -> Result<(), RuntimeError> {
    parse_bucket_logging_status(xml).map(|_| ())
}

pub(crate) fn parse_website_routing_rules(
    xml: &str,
) -> Result<Vec<WebsiteRoutingRule>, RuntimeError> {
    let mut rules = Vec::new();
    let mut remainder = xml;
    while let Some(start) = remainder.find("<RoutingRule>") {
        remainder = &remainder[start + "<RoutingRule>".len()..];
        let Some(end) = remainder.find("</RoutingRule>") else {
            return Err(RuntimeError::InvalidWebsiteConfiguration(
                "unclosed RoutingRule".to_string(),
            ));
        };
        let rule_xml = &remainder[..end];
        remainder = &remainder[end + "</RoutingRule>".len()..];
        rules.push(WebsiteRoutingRule {
            condition_key_prefix_equals: text_between(
                rule_xml,
                "<KeyPrefixEquals>",
                "</KeyPrefixEquals>",
            )
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
            redirect_host_name: text_between(rule_xml, "<HostName>", "</HostName>")
                .map(str::trim)
                .filter(|value| !value.is_empty())
                .map(str::to_string),
            redirect_replace_key_prefix_with: text_between(
                rule_xml,
                "<ReplaceKeyPrefixWith>",
                "</ReplaceKeyPrefixWith>",
            )
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
            redirect_http_redirect_code: text_between(
                rule_xml,
                "<HttpRedirectCode>",
                "</HttpRedirectCode>",
            )
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
        });
    }
    Ok(rules)
}

pub(crate) fn parse_bucket_request_payment_payer(body: &[u8]) -> Result<String, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    let payer = text_between(&xml, "<Payer>", "</Payer>")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .ok_or_else(|| {
            RuntimeError::InvalidRequestPaymentConfiguration(
                "RequestPaymentConfiguration must include Payer".to_string(),
            )
        })?;
    match payer {
        "BucketOwner" | "Requester" => Ok(payer.to_string()),
        other => Err(RuntimeError::InvalidRequestPaymentConfiguration(
            other.to_string(),
        )),
    }
}

pub(crate) fn parse_bucket_abac_status(body: &[u8]) -> Result<String, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    if !xml.contains("<AbacStatus") {
        return Err(RuntimeError::InvalidBucketAbac(
            "missing AbacStatus root".to_string(),
        ));
    }
    let status = text_between(&xml, "<Status>", "</Status>")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .ok_or_else(|| {
            RuntimeError::InvalidBucketAbac("AbacStatus must include Status".to_string())
        })?;
    match status {
        "Enabled" | "Disabled" => Ok(status.to_string()),
        other => Err(RuntimeError::InvalidBucketAbac(other.to_string())),
    }
}

pub(crate) fn parse_bucket_accelerate_status(body: &[u8]) -> Result<String, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    if !xml.contains("<AccelerateConfiguration") {
        return Err(RuntimeError::InvalidAccelerateConfiguration(
            "missing AccelerateConfiguration root".to_string(),
        ));
    }
    let status = text_between(&xml, "<Status>", "</Status>")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .ok_or_else(|| {
            RuntimeError::InvalidAccelerateConfiguration(
                "AccelerateConfiguration must include Status".to_string(),
            )
        })?;
    match status {
        "Enabled" | "Suspended" => Ok(status.to_string()),
        other => Err(RuntimeError::InvalidAccelerateConfiguration(
            other.to_string(),
        )),
    }
}

pub(crate) fn parse_public_access_block_configuration(
    bucket: &str,
    body: &[u8],
) -> Result<PublicAccessBlockConfiguration, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    if !xml.contains("<PublicAccessBlockConfiguration") {
        return Err(RuntimeError::InvalidPublicAccessBlockConfiguration(
            "missing PublicAccessBlockConfiguration root".to_string(),
        ));
    }
    Ok(PublicAccessBlockConfiguration {
        bucket: bucket.to_string(),
        block_public_acls: parse_required_xml_bool(&xml, "BlockPublicAcls")?,
        ignore_public_acls: parse_required_xml_bool(&xml, "IgnorePublicAcls")?,
        block_public_policy: parse_required_xml_bool(&xml, "BlockPublicPolicy")?,
        restrict_public_buckets: parse_required_xml_bool(&xml, "RestrictPublicBuckets")?,
    })
}

pub(crate) fn parse_create_session_request(
    bucket: &str,
    headers: &BTreeMap<String, String>,
) -> Result<CreateSessionRequest, RuntimeError> {
    let session_mode = header(headers, "x-amz-create-session-mode")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(str::to_string);
    if let Some(mode) = session_mode.as_deref() {
        match mode {
            "ReadOnly" | "ReadWrite" => {}
            other => return Err(RuntimeError::InvalidCreateSession(other.to_string())),
        }
    }
    let bucket_key_enabled = header(headers, "x-amz-server-side-encryption-bucket-key-enabled")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(|value| match value {
            "true" | "TRUE" => Ok(true),
            "false" | "FALSE" => Ok(false),
            other => Err(RuntimeError::InvalidCreateSession(format!(
                "invalid x-amz-server-side-encryption-bucket-key-enabled value: {other}"
            ))),
        })
        .transpose()?;
    Ok(CreateSessionRequest {
        bucket: bucket.to_string(),
        session_mode,
        server_side_encryption: header(headers, "x-amz-server-side-encryption")
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
        kms_key_id: header(headers, "x-amz-server-side-encryption-aws-kms-key-id")
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
        encryption_context: header(headers, "x-amz-server-side-encryption-context")
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
        bucket_key_enabled,
    })
}

fn parse_required_xml_bool(xml: &str, tag: &str) -> Result<bool, RuntimeError> {
    let value = text_between(xml, &format!("<{tag}>"), &format!("</{tag}>"))
        .map(str::trim)
        .ok_or_else(|| {
            RuntimeError::InvalidPublicAccessBlockConfiguration(format!("missing required {tag}"))
        })?;
    match value {
        "true" | "TRUE" => Ok(true),
        "false" | "FALSE" => Ok(false),
        other => Err(RuntimeError::InvalidPublicAccessBlockConfiguration(
            format!("invalid {tag} value: {other}"),
        )),
    }
}