bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;
use bucketwarden_s3::sigv4::SigV4Error;

const S3ERR_FEATURE_QUERY_KEY: &str = "x-bucketwarden-s3err-feature";
const S3ERR_CODE_QUERY_KEY: &str = "x-bucketwarden-s3err-code";
const S3ERR_FAMILY_QUERY_KEY: &str = "x-bucketwarden-s3err-family";
const S3ERR_AUDIT_ACTION: &str = "s3err:ServiceSpecificError";

fn runtime_error_from_auth(error: AuthError) -> RuntimeError {
    match error {
        AuthError::UnknownAccessKey(access_key_id) => {
            RuntimeError::InvalidAccessKeyId(access_key_id)
        }
        AuthError::ExpiredCredential(access_key_id) => RuntimeError::ExpiredToken(access_key_id),
        AuthError::DisabledAccessKey(access_key_id)
        | AuthError::RevokedCredential(access_key_id) => RuntimeError::InvalidToken(access_key_id),
        other => RuntimeError::Auth(other),
    }
}

fn runtime_error_from_sigv4(error: SigV4Error, presigned_url: bool) -> RuntimeError {
    match error {
        SigV4Error::MalformedPresignedQuery => RuntimeError::AuthorizationQueryParametersError(
            "malformed presigned URL query".to_string(),
        ),
        SigV4Error::MalformedAuthorization => {
            RuntimeError::AuthorizationHeaderMalformed("malformed authorization header".to_string())
        }
        SigV4Error::AccessKeyMismatch => {
            RuntimeError::InvalidAccessKeyId("access key does not match credentials".to_string())
        }
        SigV4Error::ExpiredPresignedUrl => {
            RuntimeError::ExpiredToken("presigned URL has expired".to_string())
        }
        SigV4Error::InvalidSessionToken(session_error) => {
            RuntimeError::InvalidToken(session_error.to_string())
        }
        SigV4Error::InvalidTimestamp => {
            RuntimeError::RequestTimeTooSkewed("invalid SigV4 timestamp".to_string())
        }
        SigV4Error::MissingField(field) if presigned_url => {
            RuntimeError::AuthorizationQueryParametersError(format!(
                "missing required query field {field}"
            ))
        }
        SigV4Error::MissingField(field) => RuntimeError::AuthorizationHeaderMalformed(format!(
            "missing required signed field {field}"
        )),
        SigV4Error::MissingSignedHeader(header) => {
            RuntimeError::AuthorizationHeaderMalformed(format!("missing signed header {header}"))
        }
        other => RuntimeError::SigV4Authentication(other.to_string()),
    }
}

fn find_query_value<'a>(
    query: &'a std::collections::BTreeMap<String, String>,
    key: &str,
) -> Option<&'a str> {
    query
        .iter()
        .find_map(|(name, value)| name.eq_ignore_ascii_case(key).then_some(value.as_str()))
}

pub(crate) fn resolve_s3err_query_feature_id(
    query: &std::collections::BTreeMap<String, String>,
) -> Option<String> {
    if let Some(feature_id) = find_query_value(query, S3ERR_FEATURE_QUERY_KEY) {
        if feature_id.trim().is_empty() {
            return Some(String::new());
        }
        return Some(feature_id.trim().to_string());
    }
    let Some(code) = find_query_value(query, S3ERR_CODE_QUERY_KEY) else {
        return None;
    };
    if code.trim().is_empty() {
        return Some(String::new());
    }
    let family = find_query_value(query, S3ERR_FAMILY_QUERY_KEY)
        .map(|value| value.trim().to_ascii_lowercase())
        .unwrap_or_else(|| "general".to_string());
    s3_service_specific_error_by_family_code(&family, code.trim())
        .map(|error| error.feature_id.to_string())
        .or(Some(format!("feat:bucketwarden.s3err.{family}.invalid")))
}

impl BucketWarden {
    pub fn handle_s3_http(
        &mut self,
        request: S3HttpRequest,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let request_id = self.next_request_id();
        let extended_request_id = extended_request_id(&request_id);
        self.active_request_scope = None;
        let request = match self.authenticate_s3_http_request(request) {
            Ok(request) => request,
            Err(error) => {
                return Ok(finalize_s3_response(
                    error_response(&error),
                    &request_id,
                    &extended_request_id,
                ));
            }
        };
        let cors_origin = header(&request.headers, "origin").map(str::to_string);
        let cors_method = request.method.to_ascii_uppercase();
        let cors_bucket = resolve_s3_request_target(&request)
            .ok()
            .and_then(|target| target.bucket);
        let response = match self.dispatch_s3_http(request) {
            Ok(response) => response,
            Err(error) => error_response(&error),
        };
        self.active_request_scope = None;
        let response = if let (Some(origin), Some(bucket)) = (cors_origin, cors_bucket) {
            self.apply_actual_cors_response(response, &bucket, &origin, &cors_method)
        } else {
            response
        };
        Ok(finalize_s3_response(
            response,
            &request_id,
            &extended_request_id,
        ))
    }
    pub(crate) fn dispatch_s3_http(
        &mut self,
        request: S3HttpRequest,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let method = request.method.to_ascii_uppercase();
        if request.path == "/WriteGetObjectResponse"
            && header(&request.headers, "host")
                .is_some_and(crate::s3_targets::is_object_lambda_host)
        {
            return self.dispatch_write_get_object_response_http(request, method.as_str());
        }
        if let Some(target) = resolve_s3_website_request_target(&request)? {
            if matches!(method.as_str(), "GET" | "HEAD") {
                let bucket = target.bucket.as_deref().expect("website bucket");
                if let Some(response) = self.signed_region_redirect_response(&request, Some(bucket))
                {
                    return Ok(response);
                }
                let result = self.get_website_object(
                    &request.principal,
                    bucket,
                    target.key.as_deref().unwrap_or_default(),
                )?;
                return Ok(website_object_response(&result, method == "HEAD"));
            }
        }
        let target = resolve_s3_request_target(&request)?;
        if let Some(bucket) = target.bucket.as_deref() {
            if self.buckets.contains_key(bucket) {
                self.enforce_and_record_bucket_request_quota(bucket)?;
            }
        }
        if let Some(response) =
            self.signed_region_redirect_response(&request, target.bucket.as_deref())
        {
            return Ok(response);
        }
        if method == "OPTIONS" {
            if let Some(bucket) = target.bucket.as_deref() {
                return self.cors_preflight(
                    &request,
                    bucket,
                    target.key.as_deref().unwrap_or_default(),
                );
            }
        }
        match (target.bucket.as_deref(), target.key.as_deref()) {
            (None, None) => self.dispatch_service_s3_http(&request, method.as_str()),
            (Some(bucket), None) => self.dispatch_bucket_s3_http(request, method.as_str(), bucket),
            (Some(bucket), Some(key)) => {
                self.dispatch_object_s3_http(request, method.as_str(), bucket, key)
            }
            _ => Ok(general_error_response(
                "MethodNotAllowed",
                405,
                "The specified method is not allowed for this resource.",
            )),
        }
    }
    pub(crate) fn require_s3err_operator_access(
        &mut self,
        principal: &str,
    ) -> Result<(), RuntimeError> {
        match self.operator_action_allowed(principal, OperatorAction::ReadDiagnostics, "*") {
            Ok(true) => Ok(()),
            Ok(false) | Err(_) => {
                self.audit.append(
                    principal,
                    S3ERR_AUDIT_ACTION,
                    "*",
                    AuditOutcome::Denied,
                    None,
                );
                Err(RuntimeError::OperatorActionDenied {
                    principal: principal.to_string(),
                    action: format!("{:?}", OperatorAction::ReadDiagnostics),
                    resource: "*".to_string(),
                })
            }
        }
    }
    pub(crate) fn authenticate_s3_http_request(
        &mut self,
        mut request: S3HttpRequest,
    ) -> Result<S3HttpRequest, RuntimeError> {
        remove_header(&mut request.headers, INTERNAL_SIGV4_REGION_HEADER);
        if request.query.contains_key("X-Amz-Signature") {
            let access_key_id = sigv4_query_access_key(&request.query)?;
            let credential = match self
                .auth
                .resolve_credential(&access_key_id, self.clock_epoch_seconds)
            {
                Ok(credential) => credential,
                Err(error) => {
                    self.audit_auth_failed(Some(&access_key_id), error.to_string());
                    return Err(runtime_error_from_auth(error));
                }
            };
            if let Err(error) = self
                .auth
                .enforce_login_attempt_limit(&credential.principal_id, 5)
            {
                self.audit_auth_failed(Some(&access_key_id), error.to_string());
                return Err(runtime_error_from_auth(error));
            }
            let sigv4_request = sigv4_request_from_http(&request, true);
            let verification = match verify_presigned_url(
                &sigv4_request,
                &credential.credentials,
                self.clock_epoch_seconds,
            ) {
                Ok(verification) => verification,
                Err(error) => {
                    self.auth.record_login_failure(
                        &credential.principal_id,
                        self.clock_epoch_seconds,
                        error.to_string(),
                    );
                    self.audit_auth_failed(Some(&access_key_id), error.to_string());
                    return Err(runtime_error_from_sigv4(error, true));
                }
            };
            request.headers.insert(
                INTERNAL_SIGV4_REGION_HEADER.to_string(),
                verification.region,
            );
            self.auth
                .mark_used(&access_key_id, self.clock_epoch_seconds)?;
            self.auth
                .record_login_success(&credential.principal_id, self.clock_epoch_seconds);
            self.audit_auth_allowed(&credential.principal_id, &access_key_id);
            self.active_request_scope = credential.scope;
            request.principal = credential.principal_id;
            return Ok(request);
        }
        if let Some(authorization) = header(&request.headers, "authorization") {
            let access_key_id = sigv4_authorization_access_key(authorization)?;
            let credential = match self
                .auth
                .resolve_credential(&access_key_id, self.clock_epoch_seconds)
            {
                Ok(credential) => credential,
                Err(error) => {
                    self.audit_auth_failed(Some(&access_key_id), error.to_string());
                    return Err(runtime_error_from_auth(error));
                }
            };
            if let Err(error) = self
                .auth
                .enforce_login_attempt_limit(&credential.principal_id, 5)
            {
                self.audit_auth_failed(Some(&access_key_id), error.to_string());
                return Err(runtime_error_from_auth(error));
            }
            let sigv4_request = sigv4_request_from_http(&request, false);
            let verification = match verify_authorization_header(
                &sigv4_request,
                authorization,
                &credential.credentials,
            ) {
                Ok(verification) => verification,
                Err(error) => {
                    self.auth.record_login_failure(
                        &credential.principal_id,
                        self.clock_epoch_seconds,
                        error.to_string(),
                    );
                    self.audit_auth_failed(Some(&access_key_id), error.to_string());
                    return Err(runtime_error_from_sigv4(error, false));
                }
            };
            request.headers.insert(
                INTERNAL_SIGV4_REGION_HEADER.to_string(),
                verification.region,
            );
            self.auth
                .mark_used(&access_key_id, self.clock_epoch_seconds)?;
            self.auth
                .record_login_success(&credential.principal_id, self.clock_epoch_seconds);
            self.audit_auth_allowed(&credential.principal_id, &access_key_id);
            self.active_request_scope = credential.scope;
            request.principal = credential.principal_id;
        }
        Ok(request)
    }
    pub(crate) fn signed_region_redirect_response(
        &self,
        request: &S3HttpRequest,
        bucket: Option<&str>,
    ) -> Option<S3HttpResponse> {
        let bucket = bucket?;
        let signed_region = header(&request.headers, INTERNAL_SIGV4_REGION_HEADER)?;
        let bucket_state = self.buckets.get(bucket)?;
        let expected_region = bucket_region(bucket_state);
        if signed_region == expected_region {
            return None;
        }
        Some(
            general_error_response(
                "PermanentRedirect",
                301,
                &format!(
                    "The bucket is in this region: {expected_region}. Use this region to retry the request."
                ),
            )
            .with_header("x-amz-bucket-region", expected_region),
        )
    }
}