use super::*;
fn execute_select_plan(
plan: &SelectQueryPlan,
object_body: &[u8],
) -> Result<Vec<u8>, RuntimeError> {
let (header_row, data_rows) = match plan.input_format {
SelectInputFormat::Csv => {
let records = parse_csv_records(object_body)?;
split_header_and_rows(plan.file_header_info, records)
}
SelectInputFormat::Json => parse_json_records(object_body)?,
};
let column_names = resolve_output_columns(plan, header_row.as_ref(), &data_rows)?;
let projected_rows = project_rows(plan, header_row.as_ref(), data_rows)?;
match plan.output_format {
SelectOutputFormat::Csv => Ok(render_csv_rows(projected_rows)),
SelectOutputFormat::Json => render_json_rows(&column_names, projected_rows),
}
}
fn split_header_and_rows(
file_header_info: SelectFileHeaderInfo,
mut records: Vec<Vec<String>>,
) -> (Option<Vec<String>>, Vec<Vec<String>>) {
match file_header_info {
SelectFileHeaderInfo::Use | SelectFileHeaderInfo::Ignore if !records.is_empty() => {
let header = records.remove(0);
if matches!(file_header_info, SelectFileHeaderInfo::Use) {
(Some(header), records)
} else {
(None, records)
}
}
_ => (None, records),
}
}
fn resolve_output_columns(
plan: &SelectQueryPlan,
header_row: Option<&Vec<String>>,
data_rows: &[Vec<String>],
) -> Result<Vec<String>, RuntimeError> {
match &plan.projection {
SelectProjection::Wildcard => {
let width = header_row
.map(|header| header.len())
.or_else(|| data_rows.first().map(Vec::len))
.unwrap_or(0);
if let Some(header) = header_row {
Ok(header.clone())
} else {
Ok((1..=width).map(|index| format!("_{index}")).collect())
}
}
SelectProjection::Columns(columns) => columns
.iter()
.map(|column| match column {
SelectColumnRef::Position(index) => Ok(format!("_{}", index + 1)),
SelectColumnRef::Header(name) => Ok(resolve_header_name(header_row, name)?.to_string()),
})
.collect(),
}
}
fn project_rows(
plan: &SelectQueryPlan,
header_row: Option<&Vec<String>>,
data_rows: Vec<Vec<String>>,
) -> Result<Vec<Vec<String>>, RuntimeError> {
let mut projected_rows = Vec::new();
let limit = plan.limit.unwrap_or(usize::MAX);
for row in data_rows.into_iter().take(limit) {
let projected = match &plan.projection {
SelectProjection::Wildcard => row,
SelectProjection::Columns(columns) => {
let mut values = Vec::with_capacity(columns.len());
for column in columns {
values.push(resolve_column_value(&row, header_row, column)?);
}
values
}
};
projected_rows.push(projected);
}
Ok(projected_rows)
}
fn resolve_column_value(
row: &[String],
header_row: Option<&Vec<String>>,
column: &SelectColumnRef,
) -> Result<String, RuntimeError> {
match column {
SelectColumnRef::Position(index) => row.get(*index).cloned().ok_or(
RuntimeError::ServiceSpecificErrorFeature("feat:bucketwarden.s3err.select.invalidcolumnindex"),
),
SelectColumnRef::Header(name) => {
let position = resolve_header_index(header_row, name)?;
row.get(position).cloned().ok_or(
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidcolumnindex",
),
)
}
}
}
fn resolve_header_name<'a>(
header_row: Option<&'a Vec<String>>,
name: &str,
) -> Result<&'a str, RuntimeError> {
let header = header_row.ok_or(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidfileheaderinfo",
))?;
header
.iter()
.find(|candidate| candidate.eq_ignore_ascii_case(name))
.map(String::as_str)
.ok_or(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidcolumnindex",
))
}
fn resolve_header_index(
header_row: Option<&Vec<String>>,
name: &str,
) -> Result<usize, RuntimeError> {
let header = header_row.ok_or(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidfileheaderinfo",
))?;
header
.iter()
.position(|candidate| candidate.eq_ignore_ascii_case(name))
.ok_or(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidcolumnindex",
))
}