bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

fn parse_select_request(body: &[u8]) -> Result<SelectQueryPlan, RuntimeError> {
    let xml = String::from_utf8_lossy(body);
    let xml = xml.trim();

    let expression = text_between(xml, "<Expression>", "</Expression>")
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .ok_or(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.missingrequiredparameter",
        ))?;

    if let Some(expression_type) = text_between(xml, "<ExpressionType>", "</ExpressionType>")
        .map(str::trim)
        .filter(|value| !value.is_empty())
    {
        if !expression_type.eq_ignore_ascii_case("SQL") {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.invalidrequestparameter",
            ));
        }
    }

    let input_xml = text_between(xml, "<InputSerialization>", "</InputSerialization>").ok_or(
        RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.missingrequiredparameter",
        ),
    )?;
    let input_serialization = parse_input_serialization(input_xml)?;
    let (input_format, file_header_info) =
        resolve_input_serialization_behavior(&input_serialization)?;

    let output_xml = text_between(xml, "<OutputSerialization>", "</OutputSerialization>").ok_or(
        RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.missingrequiredparameter",
        ),
    )?;
    let output_format = if parse_csv_output(output_xml).is_ok() {
        SelectOutputFormat::Csv
    } else if output_xml.contains("<JSON") {
        SelectOutputFormat::Json
    } else {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.objectserializationconflict",
        ));
    };

    parse_select_expression(expression, file_header_info, output_format)
        .map(|mut plan| {
            plan.input_format = input_format;
            plan
        })
}

fn parse_input_serialization(input_xml: &str) -> Result<InputSerialization, RuntimeError> {
    let csv = if input_xml.contains("<CSV") {
        Some(parse_csv_input(input_xml)?)
    } else {
        None
    };
    let json = if input_xml.contains("<JSON") {
        Some(parse_json_input(input_xml)?)
    } else {
        None
    };
    if csv.is_some() == json.is_some() {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.objectserializationconflict",
        ));
    }
    Ok(InputSerialization {
        compression_type: text_between(input_xml, "<CompressionType>", "</CompressionType>")
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
        csv,
        json,
    })
}

fn resolve_input_serialization_behavior(
    input_serialization: &InputSerialization,
) -> Result<(SelectInputFormat, SelectFileHeaderInfo), RuntimeError> {
    if let Some(csv_input) = &input_serialization.csv {
        let file_header_info = match csv_input.file_header_info.as_deref() {
            Some(value) if value.eq_ignore_ascii_case("NONE") => SelectFileHeaderInfo::None,
            Some(value) if value.eq_ignore_ascii_case("USE") => SelectFileHeaderInfo::Use,
            Some(value) if value.eq_ignore_ascii_case("IGNORE") => SelectFileHeaderInfo::Ignore,
            Some(_) => {
                return Err(RuntimeError::ServiceSpecificErrorFeature(
                    "feat:bucketwarden.s3err.select.invalidfileheaderinfo",
                ));
            }
            None => SelectFileHeaderInfo::None,
        };
        return Ok((SelectInputFormat::Csv, file_header_info));
    }
    if let Some(json_input) = &input_serialization.json {
        let json_type = json_input
            .type_
            .as_deref()
            .unwrap_or("LINES")
            .to_ascii_uppercase();
        if json_type != "LINES" && json_type != "DOCUMENT" {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.objectserializationconflict",
            ));
        }
        return Ok((SelectInputFormat::Json, SelectFileHeaderInfo::Use));
    }
    Err(RuntimeError::ServiceSpecificErrorFeature(
        "feat:bucketwarden.s3err.select.objectserializationconflict",
    ))
}

fn parse_csv_input(input_xml: &str) -> Result<CsvInput, RuntimeError> {
    if !input_xml.contains("<CSV") {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.objectserializationconflict",
        ));
    }
    Ok(CsvInput {
        file_header_info: text_between(input_xml, "<FileHeaderInfo>", "</FileHeaderInfo>")
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
        compression_type: None,
    })
}

fn parse_json_input(input_xml: &str) -> Result<JsonInput, RuntimeError> {
    if !input_xml.contains("<JSON") {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.objectserializationconflict",
        ));
    }
    Ok(JsonInput {
        type_: text_between(input_xml, "<Type>", "</Type>")
            .map(str::trim)
            .filter(|value| !value.is_empty())
            .map(str::to_string),
    })
}

fn parse_csv_output(output_xml: &str) -> Result<CsvOutput, RuntimeError> {
    if output_xml.contains("<CSV") {
        Ok(CsvOutput {})
    } else {
        Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.objectserializationconflict",
        ))
    }
}

fn parse_select_expression(
    expression: &str,
    file_header_info: SelectFileHeaderInfo,
    output_format: SelectOutputFormat,
) -> Result<SelectQueryPlan, RuntimeError> {
    let normalized = expression.trim();
    let uppercase = normalized.to_ascii_uppercase();
    if !uppercase.starts_with("SELECT ") {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.parseexpectedkeyword",
        ));
    }

    let from_index = uppercase
        .find(" FROM ")
        .ok_or(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.parseselectmissingfrom",
        ))?;
    let select_list = normalized["SELECT ".len()..from_index].trim();
    if select_list.is_empty() {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.parseemptyselect",
        ));
    }

    let remainder = normalized[from_index + " FROM ".len()..].trim();
    let remainder_upper = uppercase[from_index + " FROM ".len()..].trim();
    let source_end = remainder.find(char::is_whitespace).unwrap_or(remainder.len());
    let source = &remainder[..source_end];
    if !source.eq_ignore_ascii_case("S3Object") {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.invaliddatasource",
        ));
    }

    let mut alias: Option<&str> = None;
    let mut tail = remainder[source_end..].trim();
    let mut tail_upper = remainder_upper[source_end..].trim();
    if !tail.is_empty() && !tail_upper.starts_with("LIMIT ") {
        let alias_end = tail.find(char::is_whitespace).unwrap_or(tail.len());
        alias = Some(&tail[..alias_end]);
        tail = tail[alias_end..].trim();
        tail_upper = tail_upper[alias_end..].trim();
    }

    let limit = if tail.is_empty() {
        None
    } else if let Some(limit_value) = tail_upper.strip_prefix("LIMIT ") {
        let parsed = limit_value.trim().parse::<usize>().map_err(|_| {
            RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.parseexpectednumber",
            )
        })?;
        if parsed > SELECT_MAX_ROWS {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.maxoperatorsexceeded",
            ));
        }
        Some(parsed)
    } else if tail_upper.starts_with("WHERE ")
        || tail_upper.starts_with("GROUP ")
        || tail_upper.starts_with("ORDER ")
        || tail_upper.starts_with("HAVING ")
    {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.unsupportedsqloperation",
        ));
    } else {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.unsupportedsqlstructure",
        ));
    };

    let projection = parse_select_projection(select_list, alias, file_header_info)?;
    Ok(SelectQueryPlan {
        input_format: SelectInputFormat::Csv,
        projection,
        file_header_info,
        output_format,
        limit,
    })
}

fn parse_select_projection(
    select_list: &str,
    alias: Option<&str>,
    file_header_info: SelectFileHeaderInfo,
) -> Result<SelectProjection, RuntimeError> {
    if select_list == "*" {
        return Ok(SelectProjection::Wildcard);
    }
    if select_list.contains('*') {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.parseasteriskisnotaloneinselectlist",
        ));
    }

    let mut columns = Vec::new();
    for raw_column in select_list.split(',') {
        let token = raw_column.trim();
        if token.is_empty() {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.parseexpectedexpression",
            ));
        }
        if token.contains('(')
            || token.contains(')')
            || token.contains('\'')
            || token.contains('"')
            || token.contains('[')
            || token.contains(']')
        {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.unsupportedsqlstructure",
            ));
        }

        let token = if let Some((prefix, suffix)) = token.split_once('.') {
            if suffix == "*" {
                return Err(RuntimeError::ServiceSpecificErrorFeature(
                    "feat:bucketwarden.s3err.select.parseinvalidcontextforwildcardinselectlist",
                ));
            }
            if alias.is_none_or(|alias_name| !prefix.eq_ignore_ascii_case(alias_name)) {
                return Err(RuntimeError::ServiceSpecificErrorFeature(
                    "feat:bucketwarden.s3err.select.invalidtablealias",
                ));
            }
            suffix
        } else {
            token
        };

        if let Some(position) = parse_select_position(token)? {
            columns.push(SelectColumnRef::Position(position));
            continue;
        }

        if matches!(
            file_header_info,
            SelectFileHeaderInfo::None | SelectFileHeaderInfo::Ignore
        ) {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.invalidfileheaderinfo",
            ));
        }
        columns.push(SelectColumnRef::Header(token.to_string()));
    }

    Ok(SelectProjection::Columns(columns))
}

fn parse_select_position(token: &str) -> Result<Option<usize>, RuntimeError> {
    if let Some(index) = token.strip_prefix('_') {
        let position = index.parse::<usize>().map_err(|_| {
            RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.invalidcolumnindex",
            )
        })?;
        if position == 0 || position > SELECT_MAX_COLUMNS {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.invalidcolumnindex",
            ));
        }
        return Ok(Some(position - 1));
    }
    Ok(None)
}