use super::*;
fn parse_select_request(body: &[u8]) -> Result<SelectQueryPlan, RuntimeError> {
let xml = String::from_utf8_lossy(body);
let xml = xml.trim();
let expression = text_between(xml, "<Expression>", "</Expression>")
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.missingrequiredparameter",
))?;
if let Some(expression_type) = text_between(xml, "<ExpressionType>", "</ExpressionType>")
.map(str::trim)
.filter(|value| !value.is_empty())
{
if !expression_type.eq_ignore_ascii_case("SQL") {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidrequestparameter",
));
}
}
let input_xml = text_between(xml, "<InputSerialization>", "</InputSerialization>").ok_or(
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.missingrequiredparameter",
),
)?;
let input_serialization = parse_input_serialization(input_xml)?;
let (input_format, file_header_info) =
resolve_input_serialization_behavior(&input_serialization)?;
let output_xml = text_between(xml, "<OutputSerialization>", "</OutputSerialization>").ok_or(
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.missingrequiredparameter",
),
)?;
let output_format = if parse_csv_output(output_xml).is_ok() {
SelectOutputFormat::Csv
} else if output_xml.contains("<JSON") {
SelectOutputFormat::Json
} else {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
));
};
parse_select_expression(expression, file_header_info, output_format)
.map(|mut plan| {
plan.input_format = input_format;
plan
})
}
fn parse_input_serialization(input_xml: &str) -> Result<InputSerialization, RuntimeError> {
let csv = if input_xml.contains("<CSV") {
Some(parse_csv_input(input_xml)?)
} else {
None
};
let json = if input_xml.contains("<JSON") {
Some(parse_json_input(input_xml)?)
} else {
None
};
if csv.is_some() == json.is_some() {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
));
}
Ok(InputSerialization {
compression_type: text_between(input_xml, "<CompressionType>", "</CompressionType>")
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string),
csv,
json,
})
}
fn resolve_input_serialization_behavior(
input_serialization: &InputSerialization,
) -> Result<(SelectInputFormat, SelectFileHeaderInfo), RuntimeError> {
if let Some(csv_input) = &input_serialization.csv {
let file_header_info = match csv_input.file_header_info.as_deref() {
Some(value) if value.eq_ignore_ascii_case("NONE") => SelectFileHeaderInfo::None,
Some(value) if value.eq_ignore_ascii_case("USE") => SelectFileHeaderInfo::Use,
Some(value) if value.eq_ignore_ascii_case("IGNORE") => SelectFileHeaderInfo::Ignore,
Some(_) => {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidfileheaderinfo",
));
}
None => SelectFileHeaderInfo::None,
};
return Ok((SelectInputFormat::Csv, file_header_info));
}
if let Some(json_input) = &input_serialization.json {
let json_type = json_input
.type_
.as_deref()
.unwrap_or("LINES")
.to_ascii_uppercase();
if json_type != "LINES" && json_type != "DOCUMENT" {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
));
}
return Ok((SelectInputFormat::Json, SelectFileHeaderInfo::Use));
}
Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
))
}
fn parse_csv_input(input_xml: &str) -> Result<CsvInput, RuntimeError> {
if !input_xml.contains("<CSV") {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
));
}
Ok(CsvInput {
file_header_info: text_between(input_xml, "<FileHeaderInfo>", "</FileHeaderInfo>")
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string),
compression_type: None,
})
}
fn parse_json_input(input_xml: &str) -> Result<JsonInput, RuntimeError> {
if !input_xml.contains("<JSON") {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
));
}
Ok(JsonInput {
type_: text_between(input_xml, "<Type>", "</Type>")
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string),
})
}
fn parse_csv_output(output_xml: &str) -> Result<CsvOutput, RuntimeError> {
if output_xml.contains("<CSV") {
Ok(CsvOutput {})
} else {
Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
))
}
}
fn parse_select_expression(
expression: &str,
file_header_info: SelectFileHeaderInfo,
output_format: SelectOutputFormat,
) -> Result<SelectQueryPlan, RuntimeError> {
let normalized = expression.trim();
let uppercase = normalized.to_ascii_uppercase();
if !uppercase.starts_with("SELECT ") {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.parseexpectedkeyword",
));
}
let from_index = uppercase
.find(" FROM ")
.ok_or(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.parseselectmissingfrom",
))?;
let select_list = normalized["SELECT ".len()..from_index].trim();
if select_list.is_empty() {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.parseemptyselect",
));
}
let remainder = normalized[from_index + " FROM ".len()..].trim();
let remainder_upper = uppercase[from_index + " FROM ".len()..].trim();
let source_end = remainder.find(char::is_whitespace).unwrap_or(remainder.len());
let source = &remainder[..source_end];
if !source.eq_ignore_ascii_case("S3Object") {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invaliddatasource",
));
}
let mut alias: Option<&str> = None;
let mut tail = remainder[source_end..].trim();
let mut tail_upper = remainder_upper[source_end..].trim();
if !tail.is_empty() && !tail_upper.starts_with("LIMIT ") {
let alias_end = tail.find(char::is_whitespace).unwrap_or(tail.len());
alias = Some(&tail[..alias_end]);
tail = tail[alias_end..].trim();
tail_upper = tail_upper[alias_end..].trim();
}
let limit = if tail.is_empty() {
None
} else if let Some(limit_value) = tail_upper.strip_prefix("LIMIT ") {
let parsed = limit_value.trim().parse::<usize>().map_err(|_| {
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.parseexpectednumber",
)
})?;
if parsed > SELECT_MAX_ROWS {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.maxoperatorsexceeded",
));
}
Some(parsed)
} else if tail_upper.starts_with("WHERE ")
|| tail_upper.starts_with("GROUP ")
|| tail_upper.starts_with("ORDER ")
|| tail_upper.starts_with("HAVING ")
{
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.unsupportedsqloperation",
));
} else {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.unsupportedsqlstructure",
));
};
let projection = parse_select_projection(select_list, alias, file_header_info)?;
Ok(SelectQueryPlan {
input_format: SelectInputFormat::Csv,
projection,
file_header_info,
output_format,
limit,
})
}
fn parse_select_projection(
select_list: &str,
alias: Option<&str>,
file_header_info: SelectFileHeaderInfo,
) -> Result<SelectProjection, RuntimeError> {
if select_list == "*" {
return Ok(SelectProjection::Wildcard);
}
if select_list.contains('*') {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.parseasteriskisnotaloneinselectlist",
));
}
let mut columns = Vec::new();
for raw_column in select_list.split(',') {
let token = raw_column.trim();
if token.is_empty() {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.parseexpectedexpression",
));
}
if token.contains('(')
|| token.contains(')')
|| token.contains('\'')
|| token.contains('"')
|| token.contains('[')
|| token.contains(']')
{
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.unsupportedsqlstructure",
));
}
let token = if let Some((prefix, suffix)) = token.split_once('.') {
if suffix == "*" {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.parseinvalidcontextforwildcardinselectlist",
));
}
if alias.is_none_or(|alias_name| !prefix.eq_ignore_ascii_case(alias_name)) {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidtablealias",
));
}
suffix
} else {
token
};
if let Some(position) = parse_select_position(token)? {
columns.push(SelectColumnRef::Position(position));
continue;
}
if matches!(
file_header_info,
SelectFileHeaderInfo::None | SelectFileHeaderInfo::Ignore
) {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidfileheaderinfo",
));
}
columns.push(SelectColumnRef::Header(token.to_string()));
}
Ok(SelectProjection::Columns(columns))
}
fn parse_select_position(token: &str) -> Result<Option<usize>, RuntimeError> {
if let Some(index) = token.strip_prefix('_') {
let position = index.parse::<usize>().map_err(|_| {
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidcolumnindex",
)
})?;
if position == 0 || position > SELECT_MAX_COLUMNS {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidcolumnindex",
));
}
return Ok(Some(position - 1));
}
Ok(None)
}