athena_rs 3.26.2

Hyper performant polyglot Database driver
Documentation
use actix_web::HttpResponse;
use actix_web::http::StatusCode;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::RequestId;
use aws_smithy_types::error::metadata::ProvideErrorMetadata;
use serde_json::{Value, json};

use crate::api::response::error_response_with_code;

#[allow(dead_code)]
pub struct StoragePublicErrorCodeDescriptor {
    pub code: &'static str,
    pub status: u16,
    pub description: &'static str,
}

pub const STORAGE_ERROR_CODE_R2_INVALID_ACCESS_KEY_ID_FORMAT: &str =
    "storage_r2_invalid_access_key_id_format";
pub const STORAGE_ERROR_CODE_STORAGE_UPSTREAM_INVALID_ARGUMENT: &str =
    "storage_upstream_invalid_argument";
pub const STORAGE_ERROR_CODE_STORAGE_UPSTREAM_ACCESS_DENIED: &str =
    "storage_upstream_access_denied";
pub const STORAGE_ERROR_CODE_STORAGE_BUCKET_NOT_FOUND: &str = "storage_bucket_not_found";
pub const STORAGE_ERROR_CODE_STORAGE_UPSTREAM_UNAVAILABLE: &str = "storage_upstream_unavailable";
pub const STORAGE_ERROR_CODE_STORAGE_LIST_BUCKETS_FAILED: &str = "storage_list_buckets_failed";
pub const STORAGE_ERROR_CODE_STORAGE_LIST_OBJECTS_FAILED: &str = "storage_list_objects_failed";

pub const STORAGE_PUBLIC_ERROR_CODES: &[StoragePublicErrorCodeDescriptor] = &[
    StoragePublicErrorCodeDescriptor {
        code: STORAGE_ERROR_CODE_R2_INVALID_ACCESS_KEY_ID_FORMAT,
        status: 400,
        description: "Cloudflare R2 direct credentials used an invalid S3 access key ID format for the selected endpoint.",
    },
    StoragePublicErrorCodeDescriptor {
        code: STORAGE_ERROR_CODE_STORAGE_UPSTREAM_INVALID_ARGUMENT,
        status: 400,
        description: "The upstream S3-compatible backend rejected the request as caller-correctable.",
    },
    StoragePublicErrorCodeDescriptor {
        code: STORAGE_ERROR_CODE_STORAGE_UPSTREAM_ACCESS_DENIED,
        status: 403,
        description: "The upstream S3-compatible backend rejected the request because the provided credentials lack permission.",
    },
    StoragePublicErrorCodeDescriptor {
        code: STORAGE_ERROR_CODE_STORAGE_BUCKET_NOT_FOUND,
        status: 404,
        description: "The requested bucket does not exist on the selected storage backend.",
    },
    StoragePublicErrorCodeDescriptor {
        code: STORAGE_ERROR_CODE_STORAGE_UPSTREAM_UNAVAILABLE,
        status: 503,
        description: "Athena could not reach or complete the request against the selected storage backend.",
    },
    StoragePublicErrorCodeDescriptor {
        code: STORAGE_ERROR_CODE_STORAGE_LIST_BUCKETS_FAILED,
        status: 502,
        description: "Athena failed while normalizing an upstream bucket-list response.",
    },
    StoragePublicErrorCodeDescriptor {
        code: STORAGE_ERROR_CODE_STORAGE_LIST_OBJECTS_FAILED,
        status: 502,
        description: "Athena failed while normalizing an upstream object-list response.",
    },
];

#[derive(Debug, Clone, PartialEq, Eq)]
struct ClassifiedStorageError {
    code: &'static str,
    http_status: StatusCode,
    message: &'static str,
}

pub fn map_s3_operation_error<E>(
    operation: &'static str,
    default_message: &'static str,
    default_code: &'static str,
    err: &SdkError<E>,
) -> HttpResponse
where
    E: std::fmt::Debug + std::fmt::Display + ProvideErrorMetadata,
{
    let detail = describe_s3_error(operation, err);
    let mut data = json!({
        "operation": operation,
        "backend": "s3",
    });

    let classified = match err {
        SdkError::ServiceError(se) => {
            let svc = se.err();
            let raw = se.raw();
            let upstream_code = svc.code().map(str::to_string);
            let upstream_message = svc.message().map(str::to_string);
            let upstream_status = raw.status().as_u16();
            let request_id = svc.meta().request_id().map(str::to_string);

            attach_storage_error_metadata(
                &mut data,
                upstream_code.as_deref(),
                upstream_message.as_deref(),
                Some(upstream_status),
                request_id.as_deref(),
            );

            classify_service_error(
                upstream_code.as_deref(),
                upstream_message.as_deref(),
                default_message,
                default_code,
            )
        }
        SdkError::DispatchFailure(_) | SdkError::TimeoutError(_) => ClassifiedStorageError {
            code: STORAGE_ERROR_CODE_STORAGE_UPSTREAM_UNAVAILABLE,
            http_status: StatusCode::SERVICE_UNAVAILABLE,
            message: "Storage backend unavailable",
        },
        _ => ClassifiedStorageError {
            code: default_code,
            http_status: StatusCode::BAD_GATEWAY,
            message: default_message,
        },
    };

    error_response_with_code(
        classified.http_status,
        classified.message,
        detail,
        classified.code,
        Some(data),
    )
}

fn attach_storage_error_metadata(
    data: &mut Value,
    upstream_code: Option<&str>,
    upstream_message: Option<&str>,
    upstream_status: Option<u16>,
    request_id: Option<&str>,
) {
    if let Some(map) = data.as_object_mut() {
        if let Some(code) = upstream_code {
            map.insert("upstream_code".to_string(), Value::String(code.to_string()));
        }
        if let Some(message) = upstream_message {
            map.insert(
                "upstream_message".to_string(),
                Value::String(message.to_string()),
            );
        }
        if let Some(status) = upstream_status {
            map.insert("upstream_status".to_string(), json!(status));
        }
        if let Some(request_id) = request_id {
            map.insert(
                "request_id".to_string(),
                Value::String(request_id.to_string()),
            );
        }
    }
}

fn classify_service_error(
    upstream_code: Option<&str>,
    upstream_message: Option<&str>,
    default_message: &'static str,
    default_code: &'static str,
) -> ClassifiedStorageError {
    match upstream_code {
        Some("NoSuchBucket") => ClassifiedStorageError {
            code: STORAGE_ERROR_CODE_STORAGE_BUCKET_NOT_FOUND,
            http_status: StatusCode::NOT_FOUND,
            message: "Bucket not found",
        },
        Some("AccessDenied") => ClassifiedStorageError {
            code: STORAGE_ERROR_CODE_STORAGE_UPSTREAM_ACCESS_DENIED,
            http_status: StatusCode::FORBIDDEN,
            message: "Storage access denied",
        },
        Some("InvalidArgument")
            if upstream_message
                .map(is_r2_access_key_length_error)
                .unwrap_or(false) =>
        {
            ClassifiedStorageError {
                code: STORAGE_ERROR_CODE_R2_INVALID_ACCESS_KEY_ID_FORMAT,
                http_status: StatusCode::BAD_REQUEST,
                message: "Invalid storage credentials",
            }
        }
        Some("InvalidArgument") => ClassifiedStorageError {
            code: STORAGE_ERROR_CODE_STORAGE_UPSTREAM_INVALID_ARGUMENT,
            http_status: StatusCode::BAD_REQUEST,
            message: "Invalid storage request",
        },
        Some("SlowDown" | "ServiceUnavailable") => ClassifiedStorageError {
            code: STORAGE_ERROR_CODE_STORAGE_UPSTREAM_UNAVAILABLE,
            http_status: StatusCode::SERVICE_UNAVAILABLE,
            message: "Storage backend unavailable",
        },
        _ => ClassifiedStorageError {
            code: default_code,
            http_status: StatusCode::BAD_GATEWAY,
            message: default_message,
        },
    }
}

pub fn is_r2_access_key_length_error(message: &str) -> bool {
    let normalized = message.trim().to_ascii_lowercase();
    normalized.contains("credential access key has length") && normalized.contains("should be 32")
}

pub(crate) fn describe_s3_error<E>(operation: &str, err: &SdkError<E>) -> String
where
    E: std::fmt::Debug + std::fmt::Display + ProvideErrorMetadata,
{
    match err {
        SdkError::ServiceError(se) => {
            let svc = se.err();
            let raw = se.raw();
            let mut fields: Vec<String> = Vec::new();
            if let Some(code) = svc.code() {
                fields.push(format!("code={code}"));
            }
            if let Some(message) = svc.message() {
                fields.push(format!("message={message}"));
            }
            fields.push(format!("http_status={}", raw.status().as_u16()));
            if let Some(request_id) = svc.meta().request_id() {
                fields.push(format!("request_id={request_id}"));
            }
            format!("{operation} failed ({})", fields.join(", "))
        }
        SdkError::ResponseError(re) => {
            let raw = re.raw();
            format!(
                "{operation} failed (http_status={}, response_error={re:?})",
                raw.status().as_u16()
            )
        }
        SdkError::TimeoutError(source) => {
            format!("{operation} failed (timeout: {source:?})")
        }
        SdkError::DispatchFailure(df) => {
            format!("{operation} failed (dispatch: {df:?})")
        }
        other => format!("{operation} failed ({other:?})"),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::collections::HashSet;

    #[test]
    fn public_storage_error_codes_are_unique() {
        let mut seen = HashSet::new();
        for descriptor in STORAGE_PUBLIC_ERROR_CODES {
            assert!(
                seen.insert(descriptor.code),
                "duplicate storage error code: {}",
                descriptor.code
            );
        }
    }

    #[test]
    fn classifies_r2_access_key_length_error() {
        let classified = classify_service_error(
            Some("InvalidArgument"),
            Some("Credential access key has length 36, should be 32"),
            "Failed to list objects",
            STORAGE_ERROR_CODE_STORAGE_LIST_OBJECTS_FAILED,
        );

        assert_eq!(
            classified,
            ClassifiedStorageError {
                code: STORAGE_ERROR_CODE_R2_INVALID_ACCESS_KEY_ID_FORMAT,
                http_status: StatusCode::BAD_REQUEST,
                message: "Invalid storage credentials",
            }
        );
    }

    #[test]
    fn classifies_missing_bucket() {
        let classified = classify_service_error(
            Some("NoSuchBucket"),
            Some("The specified bucket does not exist"),
            "Failed to list objects",
            STORAGE_ERROR_CODE_STORAGE_LIST_OBJECTS_FAILED,
        );

        assert_eq!(
            classified,
            ClassifiedStorageError {
                code: STORAGE_ERROR_CODE_STORAGE_BUCKET_NOT_FOUND,
                http_status: StatusCode::NOT_FOUND,
                message: "Bucket not found",
            }
        );
    }

    #[test]
    fn detects_r2_access_key_length_error_text() {
        assert!(is_r2_access_key_length_error(
            "Credential access key has length 36, should be 32"
        ));
        assert!(!is_r2_access_key_length_error(
            "The provided token is malformed"
        ));
    }
}