bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

const COMPAT_REGION: &str = "us-east-1";
const COMPAT_ACCOUNT_ID: &str = "000000000000";

impl BucketWarden {
    pub(crate) fn dispatch_query_compat_http(
        &mut self,
        request: &S3HttpRequest,
        method: &str,
    ) -> Option<Result<S3HttpResponse, RuntimeError>> {
        if method != "POST" {
            return None;
        }
        if let Some(target) = header(&request.headers, "x-amz-target") {
            if let Some(action) = target.strip_prefix("AmazonSQS.") {
                let mut params = match parse_json_params(&request.body) {
                    Ok(params) => params,
                    Err(error) => return Some(Err(error)),
                };
                params.insert("Action".to_string(), action.to_string());
                return Some(match action {
                    "CreateQueue" => self.compat_create_queue_json(request, &params),
                    "GetQueueUrl" => self.compat_get_queue_url_json(request, &params),
                    "DeleteQueue" => self.compat_delete_queue_json(&params),
                    _ => Ok(json_response("{}")),
                });
            }
        }
        let mut params = request
            .query
            .iter()
            .map(|(key, value)| (key.clone(), value.clone()))
            .collect::<BTreeMap<_, _>>();
        if let Ok(body_params) = parse_form_urlencoded(&request.body) {
            params.extend(body_params);
        }
        let action = params.get("Action")?;
        Some(match action.as_str() {
            "CreateQueue" => self.compat_create_queue(request, &params),
            "GetQueueUrl" => self.compat_get_queue_url(request, &params),
            "DeleteQueue" => self.compat_delete_queue(&params),
            "CreateTopic" => self.compat_create_topic(&params),
            "DeleteTopic" => self.compat_delete_topic(&params),
            _ => return None,
        })
    }

    fn compat_create_queue(
        &mut self,
        request: &S3HttpRequest,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let queue_name = required_form_value(params, "QueueName")?;
        self.compat_sqs_queues
            .insert(queue_name.to_string(), queue_name.to_string());
        let queue_url = compat_queue_url(request, queue_name);
        Ok(query_xml_response(
            "CreateQueue",
            &format!(
                "<CreateQueueResult><QueueUrl>{}</QueueUrl></CreateQueueResult>",
                xml_escape(&queue_url)
            ),
        ))
    }

    fn compat_create_queue_json(
        &mut self,
        request: &S3HttpRequest,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let queue_name = required_form_value(params, "QueueName")?;
        self.compat_sqs_queues
            .insert(queue_name.to_string(), queue_name.to_string());
        let queue_url = compat_queue_url(request, queue_name);
        Ok(json_response(&format!(
            "{{\"QueueUrl\":\"{}\"}}",
            json_escape(&queue_url)
        )))
    }

    fn compat_get_queue_url(
        &self,
        request: &S3HttpRequest,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let queue_name = required_form_value(params, "QueueName")?;
        if !self.compat_sqs_queues.contains_key(queue_name) {
            return Ok(query_error_response(
                "AWS.SimpleQueueService.NonExistentQueue",
                400,
                "The specified queue does not exist.",
            ));
        }
        let queue_url = compat_queue_url(request, queue_name);
        Ok(query_xml_response(
            "GetQueueUrl",
            &format!(
                "<GetQueueUrlResult><QueueUrl>{}</QueueUrl></GetQueueUrlResult>",
                xml_escape(&queue_url)
            ),
        ))
    }

    fn compat_get_queue_url_json(
        &self,
        request: &S3HttpRequest,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let queue_name = required_form_value(params, "QueueName")?;
        let queue_url = compat_queue_url(request, queue_name);
        Ok(json_response(&format!(
            "{{\"QueueUrl\":\"{}\"}}",
            json_escape(&queue_url)
        )))
    }

    fn compat_delete_queue(
        &mut self,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let queue_url = required_form_value(params, "QueueUrl")?;
        if let Some(queue_name) = queue_name_from_url(queue_url) {
            self.compat_sqs_queues.remove(&queue_name);
        }
        Ok(query_xml_response("DeleteQueue", "<DeleteQueueResult />"))
    }

    fn compat_delete_queue_json(
        &mut self,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let queue_url = required_form_value(params, "QueueUrl")?;
        if let Some(queue_name) = queue_name_from_url(queue_url) {
            self.compat_sqs_queues.remove(&queue_name);
        }
        Ok(json_response("{}"))
    }

    fn compat_create_topic(
        &mut self,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let topic_name = required_form_value(params, "Name")?;
        let topic_arn = compat_topic_arn(topic_name);
        self.compat_sns_topics
            .insert(topic_name.to_string(), topic_arn.clone());
        Ok(query_xml_response(
            "CreateTopic",
            &format!(
                "<CreateTopicResult><TopicArn>{}</TopicArn></CreateTopicResult>",
                xml_escape(&topic_arn)
            ),
        ))
    }

    fn compat_delete_topic(
        &mut self,
        params: &BTreeMap<String, String>,
    ) -> Result<S3HttpResponse, RuntimeError> {
        let topic_arn = required_form_value(params, "TopicArn")?;
        if let Some(topic_name) = topic_arn.rsplit(':').next() {
            self.compat_sns_topics.remove(topic_name);
        }
        Ok(query_xml_response("DeleteTopic", "<DeleteTopicResult />"))
    }
}

fn parse_form_urlencoded(body: &[u8]) -> Result<BTreeMap<String, String>, RuntimeError> {
    let raw = std::str::from_utf8(body)
        .map_err(|error| RuntimeError::AuthorizationQueryParametersError(error.to_string()))?;
    let mut params = BTreeMap::new();
    for pair in raw.split('&').filter(|part| !part.is_empty()) {
        let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
        params.insert(form_component_decode(key), form_component_decode(value));
    }
    Ok(params)
}

fn parse_json_params(body: &[u8]) -> Result<BTreeMap<String, String>, RuntimeError> {
    let value: serde_json::Value = serde_json::from_slice(body)
        .map_err(|error| RuntimeError::AuthorizationQueryParametersError(error.to_string()))?;
    let Some(object) = value.as_object() else {
        return Err(RuntimeError::AuthorizationQueryParametersError(
            "JSON compatibility request body must be an object".to_string(),
        ));
    };
    let mut params = BTreeMap::new();
    for (key, value) in object {
        if let Some(text) = value.as_str() {
            params.insert(key.clone(), text.to_string());
        }
    }
    Ok(params)
}

fn form_component_decode(value: &str) -> String {
    percent_decode(&value.replace('+', "%20"))
}

fn required_form_value<'a>(
    params: &'a BTreeMap<String, String>,
    key: &str,
) -> Result<&'a str, RuntimeError> {
    params
        .get(key)
        .map(String::as_str)
        .filter(|value| !value.trim().is_empty())
        .ok_or_else(|| {
            RuntimeError::AuthorizationQueryParametersError(format!(
                "missing required form field {key}"
            ))
        })
}

fn compat_queue_url(request: &S3HttpRequest, queue_name: &str) -> String {
    let host = header(&request.headers, "host").unwrap_or("127.0.0.1:9000");
    format!("http://{host}/{COMPAT_ACCOUNT_ID}/{queue_name}")
}

fn compat_topic_arn(topic_name: &str) -> String {
    format!("arn:aws:sns:{COMPAT_REGION}:{COMPAT_ACCOUNT_ID}:{topic_name}")
}

fn queue_name_from_url(queue_url: &str) -> Option<String> {
    queue_url
        .trim_end_matches('/')
        .rsplit('/')
        .next()
        .filter(|value| !value.is_empty())
        .map(str::to_string)
}

fn query_xml_response(action: &str, body: &str) -> S3HttpResponse {
    S3HttpResponse::new(200)
        .with_header("content-type", "text/xml; charset=utf-8")
        .with_body(format!(
            "<{action}Response>{body}<ResponseMetadata><RequestId>bucketwarden-compat</RequestId></ResponseMetadata></{action}Response>"
        ))
}

fn query_error_response(code: &str, status: u16, message: &str) -> S3HttpResponse {
    S3HttpResponse::new(status)
        .with_header("content-type", "text/xml; charset=utf-8")
        .with_body(format!(
            "<ErrorResponse><Error><Type>Sender</Type><Code>{}</Code><Message>{}</Message></Error><RequestId>bucketwarden-compat</RequestId></ErrorResponse>",
            xml_escape(code),
            xml_escape(message)
        ))
}

fn json_response(body: &str) -> S3HttpResponse {
    S3HttpResponse::new(200)
        .with_header("content-type", "application/x-amz-json-1.0")
        .with_body(body)
}

fn json_escape(value: &str) -> String {
    value.replace('\\', "\\\\").replace('"', "\\\"")
}