use aws_sdk_s3::error::ProvideErrorMetadata;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::head_object::HeadObjectError;
use aws_smithy_types::byte_stream::error::Error as ByteStreamError;
use axum::{
extract::rejection::JsonRejection,
http::header,
http::StatusCode,
response::{IntoResponse, Response},
};
use ndarray::ShapeError;
use reqwest::Error as ReqwestError;
use serde::{Deserialize, Serialize};
use std::error::Error;
use thiserror::Error;
use tokio::sync::AcquireError;
use tracing::{event, Level};
use zune_inflate::errors::InflateDecodeErrors;
use crate::types::DValue;
#[derive(Debug, Error)]
pub enum ActiveStorageError {
#[error("failed to decompress data: {error}")]
DecompressionBlosc2 { error: &'static str },
#[error("failed to decompress data")]
DecompressionFlate2(#[from] std::io::Error),
#[error("failed to decompress data")]
DecompressionZune(#[from] InflateDecodeErrors),
#[error("cannot perform {operation} on empty array or selection")]
EmptyArray { operation: &'static str },
#[error("failed to convert from bytes to {type_name}")]
FromBytes { type_name: &'static str },
#[error("Incompatible value {0} for missing")]
IncompatibleMissing(DValue),
#[error("Insufficient memory to process request ({requested} > {total})")]
InsufficientMemory { requested: usize, total: usize },
#[error("request data is not valid")]
RequestDataJsonRejection(#[from] JsonRejection),
#[error("request data is not valid")]
RequestDataValidationSingle(#[from] validator::ValidationError),
#[error("request data is not valid")]
RequestDataValidation(#[from] validator::ValidationErrors),
#[error("S3 request error")]
S3RequestError { error: String },
#[error("error receiving object from S3 storage")]
S3ByteStream(#[from] ByteStreamError),
#[error("S3 response missing Content-Length header")]
S3ContentLengthMissing,
#[error("error retrieving object from S3 storage")]
S3GetObject(#[from] SdkError<GetObjectError>),
#[error("error retrieving object metadata from S3 storage")]
S3HeadObject(#[from] SdkError<HeadObjectError>),
#[error("error receiving object metadata from S3 storage")]
Forbidden,
#[error("error acquiring resources")]
SemaphoreAcquireError(#[from] AcquireError),
#[error("failed to create array from shape")]
ShapeInvalid(#[from] ShapeError),
#[error(transparent)]
TryFromInt(#[from] std::num::TryFromIntError),
#[error("unsupported operation {operation}")]
UnsupportedOperation { operation: String },
#[error("chunk cache error {error}")]
ChunkCacheError { error: String },
#[error("HTTP request error")]
ReqwestProcessingError(#[from] ReqwestError),
#[error("HTTP request error")]
HTTPRequestError { error: String },
#[error("HTTP response missing Content-Length header")]
HTTPContentLengthMissing,
#[error("unsupported interface type {interface_type}")]
UnsupportedInterfaceType { interface_type: String },
}
impl IntoResponse for ActiveStorageError {
fn into_response(self) -> Response {
ErrorResponse::from(self).into_response()
}
}
#[derive(Deserialize, Serialize)]
struct ErrorBody {
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
caused_by: Option<Vec<String>>,
}
impl ErrorBody {
fn new<E>(error: &E) -> Self
where
E: std::error::Error + Send + Sync,
{
let message = error.to_string();
let mut caused_by = None;
let mut current = error.source();
while let Some(source) = current {
let mut causes: Vec<String> = caused_by.unwrap_or_default();
causes.push(source.to_string());
caused_by = Some(causes);
current = source.source();
}
if let Some(caused_by) = caused_by.as_mut() {
caused_by.dedup()
}
ErrorBody { message, caused_by }
}
}
#[derive(Deserialize, Serialize)]
struct ErrorResponse {
#[serde(skip)]
status: StatusCode,
error: ErrorBody,
}
impl ErrorResponse {
fn new<E>(status: StatusCode, error: &E) -> Self
where
E: std::error::Error + Send + Sync,
{
ErrorResponse {
status,
error: ErrorBody::new(error),
}
}
fn bad_request<E>(error: &E) -> Self
where
E: std::error::Error + Send + Sync,
{
Self::new(StatusCode::BAD_REQUEST, error)
}
fn unauthorised<E>(error: &E) -> Self
where
E: std::error::Error + Send + Sync,
{
Self::new(StatusCode::UNAUTHORIZED, error)
}
fn not_found<E>(error: &E) -> Self
where
E: std::error::Error + Send + Sync,
{
Self::new(StatusCode::NOT_FOUND, error)
}
fn internal_server_error<E>(error: &E) -> Self
where
E: std::error::Error + Send + Sync,
{
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error)
}
}
impl From<ActiveStorageError> for ErrorResponse {
fn from(error: ActiveStorageError) -> Self {
let response = match &error {
ActiveStorageError::DecompressionBlosc2 { error: _ }
| ActiveStorageError::DecompressionFlate2(_)
| ActiveStorageError::DecompressionZune(_)
| ActiveStorageError::EmptyArray { operation: _ }
| ActiveStorageError::IncompatibleMissing(_)
| ActiveStorageError::InsufficientMemory {
requested: _,
total: _,
}
| ActiveStorageError::RequestDataJsonRejection(_)
| ActiveStorageError::RequestDataValidationSingle(_)
| ActiveStorageError::RequestDataValidation(_)
| ActiveStorageError::S3ContentLengthMissing
| ActiveStorageError::ShapeInvalid(_)
| ActiveStorageError::HTTPContentLengthMissing => Self::bad_request(&error),
ActiveStorageError::UnsupportedOperation { operation: _ }
| ActiveStorageError::Forbidden => Self::not_found(&error),
ActiveStorageError::FromBytes { type_name: _ }
| ActiveStorageError::TryFromInt(_)
| ActiveStorageError::S3ByteStream(_)
| ActiveStorageError::SemaphoreAcquireError(_)
| ActiveStorageError::ChunkCacheError { error: _ } => {
Self::internal_server_error(&error)
}
ActiveStorageError::S3GetObject(sdk_error) => {
match &sdk_error {
SdkError::ConstructionFailure(_)
| SdkError::DispatchFailure(_)
| SdkError::ResponseError(_)
| SdkError::TimeoutError(_) => Self::internal_server_error(&error),
SdkError::ServiceError(get_obj_error) => {
let get_obj_error = get_obj_error.err();
match get_obj_error {
GetObjectError::InvalidObjectState(_)
| GetObjectError::NoSuchKey(_) => Self::bad_request(&error),
_ => {
match get_obj_error.code() {
Some("NoSuchBucket") => Self::bad_request(&error),
Some("InvalidAccessKeyId")
| Some("SignatureDoesNotMatch")
| Some("AccessDenied") => Self::unauthorised(&error),
_ => Self::internal_server_error(&error),
}
}
}
}
_ => Self::internal_server_error(&error),
}
}
ActiveStorageError::S3HeadObject(sdk_error) => {
match &sdk_error {
SdkError::ConstructionFailure(_)
| SdkError::DispatchFailure(_)
| SdkError::ResponseError(_)
| SdkError::TimeoutError(_) => Self::internal_server_error(&error),
SdkError::ServiceError(head_obj_error) => {
let head_obj_error = head_obj_error.err();
match head_obj_error {
HeadObjectError::NotFound(_) => Self::bad_request(&error),
_ => Self::internal_server_error(&error),
}
}
_ => Self::internal_server_error(&error),
}
}
ActiveStorageError::ReqwestProcessingError(reqwest_error) => {
if reqwest_error.is_request() {
Self::bad_request(&error)
} else {
Self::internal_server_error(&error)
}
}
ActiveStorageError::HTTPRequestError { error: _ }|
ActiveStorageError::S3RequestError { error: _ }
=> {
Self::bad_request(&error)
}
ActiveStorageError::UnsupportedInterfaceType { interface_type: _ } => {
Self::bad_request(&error)
}
};
if response.status.is_server_error() {
event!(Level::ERROR, "{}", error.to_string());
let mut current = error.source();
while let Some(source) = current {
event!(Level::ERROR, "Caused by: {}", source.to_string());
current = source.source();
}
}
response
}
}
impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let json_body = serde_json::to_string_pretty(&self);
match json_body {
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to serialise error response: {err}"),
)
.into_response(),
Ok(json_body) => (
self.status,
[(&header::CONTENT_TYPE, mime::APPLICATION_JSON.to_string())],
json_body,
)
.into_response(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use aws_sdk_s3::types::error::NoSuchKey;
use aws_smithy_runtime_api::http::Response as SmithyResponse;
use aws_smithy_runtime_api::http::StatusCode as SmithyStatusCode;
use aws_smithy_types::error::ErrorMetadata as SmithyError;
use hyper::HeaderMap;
async fn body_string(response: Response) -> String {
String::from_utf8(
hyper::body::to_bytes(response.into_body())
.await
.unwrap()
.to_vec(),
)
.unwrap()
}
async fn test_active_storage_error(
error: ActiveStorageError,
status: StatusCode,
message: &str,
caused_by: Option<Vec<&'static str>>,
) {
let response = error.into_response();
assert_eq!(status, response.status());
let mut headers = HeaderMap::new();
headers.insert(&header::CONTENT_TYPE, "application/json".parse().unwrap());
assert_eq!(headers, *response.headers());
let error_response: ErrorResponse =
serde_json::from_str(&body_string(response).await).unwrap();
assert_eq!(message.to_string(), error_response.error.message);
let caused_by = caused_by.map(|cb| cb.iter().map(|s| s.to_string()).collect());
assert_eq!(caused_by, error_response.error.caused_by);
}
#[tokio::test]
async fn decompression_blosc2_error() {
let str_error = "decompression error";
let error = ActiveStorageError::DecompressionBlosc2 { error: str_error };
let message = "failed to decompress data: decompression error";
let caused_by = None;
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn decompression_flate2_error() {
let io_error = std::io::Error::new(std::io::ErrorKind::InvalidInput, "decompression error");
let error = ActiveStorageError::DecompressionFlate2(io_error);
let message = "failed to decompress data";
let caused_by = Some(vec!["decompression error"]);
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn decompression_zune_error() {
let zune_error = InflateDecodeErrors::new_with_error(
zune_inflate::errors::DecodeErrorStatus::InsufficientData,
);
let error = ActiveStorageError::DecompressionZune(zune_error);
let message = "failed to decompress data";
let caused_by = Some(vec!["Insufficient data\n\n\n"]);
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn empty_array_op_error() {
let error = ActiveStorageError::EmptyArray { operation: "foo" };
let message = "cannot perform foo on empty array or selection";
let caused_by = None;
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn from_bytes_error() {
let error = ActiveStorageError::FromBytes { type_name: "foo" };
let message = "failed to convert from bytes to foo";
let caused_by = None;
test_active_storage_error(error, StatusCode::INTERNAL_SERVER_ERROR, message, caused_by)
.await;
}
#[tokio::test]
async fn incompatible_missing() {
let value = 32.into();
let error = ActiveStorageError::IncompatibleMissing(value);
let message = "Incompatible value 32 for missing";
let caused_by = None;
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn insufficient_memory() {
let error = ActiveStorageError::InsufficientMemory {
requested: 2,
total: 1,
};
let message = "Insufficient memory to process request (2 > 1)";
let caused_by = None;
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn request_data_validation_single() {
let validation_error = validator::ValidationError::new("foo");
let error = ActiveStorageError::RequestDataValidationSingle(validation_error);
let message = "request data is not valid";
let caused_by = Some(vec!["Validation error: foo [{}]"]);
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn request_data_validation() {
let mut validation_errors = validator::ValidationErrors::new();
let validation_error = validator::ValidationError::new("foo");
validation_errors.add("bar", validation_error);
let error = ActiveStorageError::RequestDataValidation(validation_errors);
let message = "request data is not valid";
let caused_by = Some(vec!["bar: Validation error: foo [{}]"]);
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn s3_content_length_missing() {
let error = ActiveStorageError::S3ContentLengthMissing;
let message = "S3 response missing Content-Length header";
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, None).await;
}
async fn test_s3_get_object_error(
sdk_error: SdkError<GetObjectError>,
status: StatusCode,
caused_by: Option<Vec<&'static str>>,
) {
let error = ActiveStorageError::S3GetObject(sdk_error);
let message = "error retrieving object from S3 storage";
test_active_storage_error(error, status, message, caused_by).await;
}
fn get_smithy_response() -> SmithyResponse {
let sdk_body = "body";
let status: SmithyStatusCode = 400.try_into().unwrap();
SmithyResponse::new(status, sdk_body.into())
}
#[tokio::test]
async fn s3_get_object_error() {
let no_such_key = NoSuchKey::builder().build();
let get_object_error = GetObjectError::NoSuchKey(no_such_key);
let sdk_error = SdkError::service_error(get_object_error, get_smithy_response());
let caused_by = Some(vec!["service error", "NoSuchKey"]);
test_s3_get_object_error(sdk_error, StatusCode::BAD_REQUEST, caused_by).await;
}
#[tokio::test]
async fn s3_get_object_invalid_access_key_error() {
let smithy_error = SmithyError::builder()
.message("fake smithy error")
.code("InvalidAccessKeyId")
.build();
let get_object_error = GetObjectError::generic(smithy_error);
let sdk_error = SdkError::service_error(get_object_error, get_smithy_response());
let caused_by = Some(vec![
"service error",
"unhandled error (InvalidAccessKeyId)",
"Error { code: \"InvalidAccessKeyId\", message: \"fake smithy error\" }",
]);
test_s3_get_object_error(sdk_error, StatusCode::UNAUTHORIZED, caused_by).await;
}
#[tokio::test]
async fn s3_get_object_no_such_bucket() {
let smithy_error = SmithyError::builder()
.message("fake smithy error")
.code("NoSuchBucket")
.build();
let get_object_error = GetObjectError::generic(smithy_error);
let sdk_error = SdkError::service_error(get_object_error, get_smithy_response());
let caused_by = Some(vec![
"service error",
"unhandled error (NoSuchBucket)",
"Error { code: \"NoSuchBucket\", message: \"fake smithy error\" }",
]);
test_s3_get_object_error(sdk_error, StatusCode::BAD_REQUEST, caused_by).await;
}
#[tokio::test]
async fn s3_get_object_sig_does_not_match_error() {
let smithy_error = SmithyError::builder()
.message("fake smithy error")
.code("SignatureDoesNotMatch")
.build();
let get_object_error = GetObjectError::generic(smithy_error);
let sdk_error = SdkError::service_error(get_object_error, get_smithy_response());
let caused_by = Some(vec![
"service error",
"unhandled error (SignatureDoesNotMatch)",
"Error { code: \"SignatureDoesNotMatch\", message: \"fake smithy error\" }",
]);
test_s3_get_object_error(sdk_error, StatusCode::UNAUTHORIZED, caused_by).await;
}
#[tokio::test]
async fn s3_get_object_access_denied_error() {
let smithy_error = SmithyError::builder()
.message("fake smithy error")
.code("AccessDenied")
.build();
let get_object_error = GetObjectError::generic(smithy_error);
let sdk_error = SdkError::service_error(get_object_error, get_smithy_response());
let caused_by = Some(vec![
"service error",
"unhandled error (AccessDenied)",
"Error { code: \"AccessDenied\", message: \"fake smithy error\" }",
]);
test_s3_get_object_error(sdk_error, StatusCode::UNAUTHORIZED, caused_by).await;
}
#[tokio::test]
async fn s3_byte_stream_error() {
let error = ActiveStorageError::S3ByteStream(
std::io::Error::from(std::io::ErrorKind::UnexpectedEof).into(),
);
let message = "error receiving object from S3 storage";
let caused_by = Some(vec!["IO error", "unexpected end of file"]);
test_active_storage_error(error, StatusCode::INTERNAL_SERVER_ERROR, message, caused_by)
.await;
}
#[tokio::test]
async fn semaphore_acquire_error() {
let sem = tokio::sync::Semaphore::new(1);
sem.close();
let error = ActiveStorageError::SemaphoreAcquireError(sem.acquire().await.unwrap_err());
let message = "error acquiring resources";
let caused_by = Some(vec!["semaphore closed"]);
test_active_storage_error(error, StatusCode::INTERNAL_SERVER_ERROR, message, caused_by)
.await;
}
#[tokio::test]
async fn shape_error() {
let error = ActiveStorageError::ShapeInvalid(ShapeError::from_kind(
ndarray::ErrorKind::OutOfBounds,
));
let message = "failed to create array from shape";
let caused_by = Some(vec!["ShapeError/OutOfBounds: out of bounds indexing"]);
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}
#[tokio::test]
async fn try_from_int_error() {
let error = ActiveStorageError::TryFromInt(u8::try_from(-1_i8).unwrap_err());
let message = "out of range integral type conversion attempted";
let caused_by = None;
test_active_storage_error(error, StatusCode::INTERNAL_SERVER_ERROR, message, caused_by)
.await;
}
#[tokio::test]
async fn unsupported_operation() {
let error = ActiveStorageError::UnsupportedOperation {
operation: "foo".to_string(),
};
let message = "unsupported operation foo";
let caused_by = None;
test_active_storage_error(error, StatusCode::NOT_FOUND, message, caused_by).await;
}
}