bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

fn execute_select_plan(
    plan: &SelectQueryPlan,
    object_body: &[u8],
) -> Result<Vec<u8>, RuntimeError> {
    let (header_row, data_rows) = match plan.input_format {
        SelectInputFormat::Csv => {
            let records = parse_csv_records(object_body)?;
            split_header_and_rows(plan.file_header_info, records)
        }
        SelectInputFormat::Json => parse_json_records(object_body)?,
    };
    let column_names = resolve_output_columns(plan, header_row.as_ref(), &data_rows)?;
    let projected_rows = project_rows(plan, header_row.as_ref(), data_rows)?;

    match plan.output_format {
        SelectOutputFormat::Csv => Ok(render_csv_rows(projected_rows)),
        SelectOutputFormat::Json => render_json_rows(&column_names, projected_rows),
    }
}

fn split_header_and_rows(
    file_header_info: SelectFileHeaderInfo,
    mut records: Vec<Vec<String>>,
) -> (Option<Vec<String>>, Vec<Vec<String>>) {
    match file_header_info {
        SelectFileHeaderInfo::Use | SelectFileHeaderInfo::Ignore if !records.is_empty() => {
            let header = records.remove(0);
            if matches!(file_header_info, SelectFileHeaderInfo::Use) {
                (Some(header), records)
            } else {
                (None, records)
            }
        }
        _ => (None, records),
    }
}

fn resolve_output_columns(
    plan: &SelectQueryPlan,
    header_row: Option<&Vec<String>>,
    data_rows: &[Vec<String>],
) -> Result<Vec<String>, RuntimeError> {
    match &plan.projection {
        SelectProjection::Wildcard => {
            let width = header_row
                .map(|header| header.len())
                .or_else(|| data_rows.first().map(Vec::len))
                .unwrap_or(0);
            if let Some(header) = header_row {
                Ok(header.clone())
            } else {
                Ok((1..=width).map(|index| format!("_{index}")).collect())
            }
        }
        SelectProjection::Columns(columns) => columns
            .iter()
            .map(|column| match column {
                SelectColumnRef::Position(index) => Ok(format!("_{}", index + 1)),
                SelectColumnRef::Header(name) => Ok(resolve_header_name(header_row, name)?.to_string()),
            })
            .collect(),
    }
}

fn project_rows(
    plan: &SelectQueryPlan,
    header_row: Option<&Vec<String>>,
    data_rows: Vec<Vec<String>>,
) -> Result<Vec<Vec<String>>, RuntimeError> {
    let mut projected_rows = Vec::new();
    let limit = plan.limit.unwrap_or(usize::MAX);
    for row in data_rows.into_iter().take(limit) {
        let projected = match &plan.projection {
            SelectProjection::Wildcard => row,
            SelectProjection::Columns(columns) => {
                let mut values = Vec::with_capacity(columns.len());
                for column in columns {
                    values.push(resolve_column_value(&row, header_row, column)?);
                }
                values
            }
        };
        projected_rows.push(projected);
    }
    Ok(projected_rows)
}

fn resolve_column_value(
    row: &[String],
    header_row: Option<&Vec<String>>,
    column: &SelectColumnRef,
) -> Result<String, RuntimeError> {
    match column {
        SelectColumnRef::Position(index) => row.get(*index).cloned().ok_or(
            RuntimeError::ServiceSpecificErrorFeature("feat:bucketwarden.s3err.select.invalidcolumnindex"),
        ),
        SelectColumnRef::Header(name) => {
            let position = resolve_header_index(header_row, name)?;
            row.get(position).cloned().ok_or(
                RuntimeError::ServiceSpecificErrorFeature(
                    "feat:bucketwarden.s3err.select.invalidcolumnindex",
                ),
            )
        }
    }
}

fn resolve_header_name<'a>(
    header_row: Option<&'a Vec<String>>,
    name: &str,
) -> Result<&'a str, RuntimeError> {
    let header = header_row.ok_or(RuntimeError::ServiceSpecificErrorFeature(
        "feat:bucketwarden.s3err.select.invalidfileheaderinfo",
    ))?;
    header
        .iter()
        .find(|candidate| candidate.eq_ignore_ascii_case(name))
        .map(String::as_str)
        .ok_or(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.invalidcolumnindex",
        ))
}

fn resolve_header_index(
    header_row: Option<&Vec<String>>,
    name: &str,
) -> Result<usize, RuntimeError> {
    let header = header_row.ok_or(RuntimeError::ServiceSpecificErrorFeature(
        "feat:bucketwarden.s3err.select.invalidfileheaderinfo",
    ))?;
    header
        .iter()
        .position(|candidate| candidate.eq_ignore_ascii_case(name))
        .ok_or(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.invalidcolumnindex",
        ))
}