use super::*;
const COMPAT_REGION: &str = "us-east-1";
const COMPAT_ACCOUNT_ID: &str = "000000000000";
impl BucketWarden {
pub(crate) fn dispatch_query_compat_http(
&mut self,
request: &S3HttpRequest,
method: &str,
) -> Option<Result<S3HttpResponse, RuntimeError>> {
if method != "POST" {
return None;
}
if let Some(target) = header(&request.headers, "x-amz-target") {
if let Some(action) = target.strip_prefix("AmazonSQS.") {
let mut params = match parse_json_params(&request.body) {
Ok(params) => params,
Err(error) => return Some(Err(error)),
};
params.insert("Action".to_string(), action.to_string());
return Some(match action {
"CreateQueue" => self.compat_create_queue_json(request, ¶ms),
"GetQueueUrl" => self.compat_get_queue_url_json(request, ¶ms),
"DeleteQueue" => self.compat_delete_queue_json(¶ms),
_ => Ok(json_response("{}")),
});
}
}
let mut params = request
.query
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect::<BTreeMap<_, _>>();
if let Ok(body_params) = parse_form_urlencoded(&request.body) {
params.extend(body_params);
}
let action = params.get("Action")?;
Some(match action.as_str() {
"CreateQueue" => self.compat_create_queue(request, ¶ms),
"GetQueueUrl" => self.compat_get_queue_url(request, ¶ms),
"DeleteQueue" => self.compat_delete_queue(¶ms),
"CreateTopic" => self.compat_create_topic(¶ms),
"DeleteTopic" => self.compat_delete_topic(¶ms),
_ => return None,
})
}
fn compat_create_queue(
&mut self,
request: &S3HttpRequest,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let queue_name = required_form_value(params, "QueueName")?;
self.compat_sqs_queues
.insert(queue_name.to_string(), queue_name.to_string());
let queue_url = compat_queue_url(request, queue_name);
Ok(query_xml_response(
"CreateQueue",
&format!(
"<CreateQueueResult><QueueUrl>{}</QueueUrl></CreateQueueResult>",
xml_escape(&queue_url)
),
))
}
fn compat_create_queue_json(
&mut self,
request: &S3HttpRequest,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let queue_name = required_form_value(params, "QueueName")?;
self.compat_sqs_queues
.insert(queue_name.to_string(), queue_name.to_string());
let queue_url = compat_queue_url(request, queue_name);
Ok(json_response(&format!(
"{{\"QueueUrl\":\"{}\"}}",
json_escape(&queue_url)
)))
}
fn compat_get_queue_url(
&self,
request: &S3HttpRequest,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let queue_name = required_form_value(params, "QueueName")?;
if !self.compat_sqs_queues.contains_key(queue_name) {
return Ok(query_error_response(
"AWS.SimpleQueueService.NonExistentQueue",
400,
"The specified queue does not exist.",
));
}
let queue_url = compat_queue_url(request, queue_name);
Ok(query_xml_response(
"GetQueueUrl",
&format!(
"<GetQueueUrlResult><QueueUrl>{}</QueueUrl></GetQueueUrlResult>",
xml_escape(&queue_url)
),
))
}
fn compat_get_queue_url_json(
&self,
request: &S3HttpRequest,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let queue_name = required_form_value(params, "QueueName")?;
let queue_url = compat_queue_url(request, queue_name);
Ok(json_response(&format!(
"{{\"QueueUrl\":\"{}\"}}",
json_escape(&queue_url)
)))
}
fn compat_delete_queue(
&mut self,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let queue_url = required_form_value(params, "QueueUrl")?;
if let Some(queue_name) = queue_name_from_url(queue_url) {
self.compat_sqs_queues.remove(&queue_name);
}
Ok(query_xml_response("DeleteQueue", "<DeleteQueueResult />"))
}
fn compat_delete_queue_json(
&mut self,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let queue_url = required_form_value(params, "QueueUrl")?;
if let Some(queue_name) = queue_name_from_url(queue_url) {
self.compat_sqs_queues.remove(&queue_name);
}
Ok(json_response("{}"))
}
fn compat_create_topic(
&mut self,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let topic_name = required_form_value(params, "Name")?;
let topic_arn = compat_topic_arn(topic_name);
self.compat_sns_topics
.insert(topic_name.to_string(), topic_arn.clone());
Ok(query_xml_response(
"CreateTopic",
&format!(
"<CreateTopicResult><TopicArn>{}</TopicArn></CreateTopicResult>",
xml_escape(&topic_arn)
),
))
}
fn compat_delete_topic(
&mut self,
params: &BTreeMap<String, String>,
) -> Result<S3HttpResponse, RuntimeError> {
let topic_arn = required_form_value(params, "TopicArn")?;
if let Some(topic_name) = topic_arn.rsplit(':').next() {
self.compat_sns_topics.remove(topic_name);
}
Ok(query_xml_response("DeleteTopic", "<DeleteTopicResult />"))
}
}
fn parse_form_urlencoded(body: &[u8]) -> Result<BTreeMap<String, String>, RuntimeError> {
let raw = std::str::from_utf8(body)
.map_err(|error| RuntimeError::AuthorizationQueryParametersError(error.to_string()))?;
let mut params = BTreeMap::new();
for pair in raw.split('&').filter(|part| !part.is_empty()) {
let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
params.insert(form_component_decode(key), form_component_decode(value));
}
Ok(params)
}
fn parse_json_params(body: &[u8]) -> Result<BTreeMap<String, String>, RuntimeError> {
let value: serde_json::Value = serde_json::from_slice(body)
.map_err(|error| RuntimeError::AuthorizationQueryParametersError(error.to_string()))?;
let Some(object) = value.as_object() else {
return Err(RuntimeError::AuthorizationQueryParametersError(
"JSON compatibility request body must be an object".to_string(),
));
};
let mut params = BTreeMap::new();
for (key, value) in object {
if let Some(text) = value.as_str() {
params.insert(key.clone(), text.to_string());
}
}
Ok(params)
}
fn form_component_decode(value: &str) -> String {
percent_decode(&value.replace('+', "%20"))
}
fn required_form_value<'a>(
params: &'a BTreeMap<String, String>,
key: &str,
) -> Result<&'a str, RuntimeError> {
params
.get(key)
.map(String::as_str)
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| {
RuntimeError::AuthorizationQueryParametersError(format!(
"missing required form field {key}"
))
})
}
fn compat_queue_url(request: &S3HttpRequest, queue_name: &str) -> String {
let host = header(&request.headers, "host").unwrap_or("127.0.0.1:9000");
format!("http://{host}/{COMPAT_ACCOUNT_ID}/{queue_name}")
}
fn compat_topic_arn(topic_name: &str) -> String {
format!("arn:aws:sns:{COMPAT_REGION}:{COMPAT_ACCOUNT_ID}:{topic_name}")
}
fn queue_name_from_url(queue_url: &str) -> Option<String> {
queue_url
.trim_end_matches('/')
.rsplit('/')
.next()
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn query_xml_response(action: &str, body: &str) -> S3HttpResponse {
S3HttpResponse::new(200)
.with_header("content-type", "text/xml; charset=utf-8")
.with_body(format!(
"<{action}Response>{body}<ResponseMetadata><RequestId>bucketwarden-compat</RequestId></ResponseMetadata></{action}Response>"
))
}
fn query_error_response(code: &str, status: u16, message: &str) -> S3HttpResponse {
S3HttpResponse::new(status)
.with_header("content-type", "text/xml; charset=utf-8")
.with_body(format!(
"<ErrorResponse><Error><Type>Sender</Type><Code>{}</Code><Message>{}</Message></Error><RequestId>bucketwarden-compat</RequestId></ErrorResponse>",
xml_escape(code),
xml_escape(message)
))
}
fn json_response(body: &str) -> S3HttpResponse {
S3HttpResponse::new(200)
.with_header("content-type", "application/x-amz-json-1.0")
.with_body(body)
}
fn json_escape(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}