bucketwarden-server 0.1.0

BucketWarden storage server runtime.
Documentation
use super::*;

fn parse_csv_records(body: &[u8]) -> Result<Vec<Vec<String>>, RuntimeError> {
    let text = std::str::from_utf8(body).map_err(|_| {
        RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.invalidtextencoding",
        )
    })?;

    let mut records = Vec::new();
    let mut record = Vec::new();
    let mut field = String::new();
    let mut chars = text.chars().peekable();
    let mut in_quotes = false;
    let mut record_bytes = 0usize;

    while let Some(ch) = chars.next() {
        record_bytes += ch.len_utf8();
        if record_bytes > SELECT_MAX_RECORD_BYTES {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.overmaxrecordsize",
            ));
        }

        if in_quotes {
            match ch {
                '"' => {
                    if chars.peek() == Some(&'"') {
                        field.push('"');
                        chars.next();
                    } else {
                        in_quotes = false;
                    }
                }
                _ => field.push(ch),
            }
            continue;
        }

        match ch {
            '"' if field.is_empty() => in_quotes = true,
            '"' => {
                return Err(RuntimeError::ServiceSpecificErrorFeature(
                    "feat:bucketwarden.s3err.select.csvunescapedquote",
                ));
            }
            ',' => {
                record.push(std::mem::take(&mut field));
                if record.len() > SELECT_MAX_COLUMNS {
                    return Err(RuntimeError::ServiceSpecificErrorFeature(
                        "feat:bucketwarden.s3err.select.overmaxcolumn",
                    ));
                }
            }
            '\n' => {
                record.push(std::mem::take(&mut field));
                if record.len() > SELECT_MAX_COLUMNS {
                    return Err(RuntimeError::ServiceSpecificErrorFeature(
                        "feat:bucketwarden.s3err.select.overmaxcolumn",
                    ));
                }
                records.push(std::mem::take(&mut record));
                record_bytes = 0;
            }
            '\r' => {
                if chars.peek() == Some(&'\n') {
                    chars.next();
                }
                record.push(std::mem::take(&mut field));
                if record.len() > SELECT_MAX_COLUMNS {
                    return Err(RuntimeError::ServiceSpecificErrorFeature(
                        "feat:bucketwarden.s3err.select.overmaxcolumn",
                    ));
                }
                records.push(std::mem::take(&mut record));
                record_bytes = 0;
            }
            _ => field.push(ch),
        }
    }

    if in_quotes {
        return Err(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.csvunescapedquote",
        ));
    }

    if !field.is_empty() || !record.is_empty() {
        record.push(field);
        if record.len() > SELECT_MAX_COLUMNS {
            return Err(RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.overmaxcolumn",
            ));
        }
        records.push(record);
    }

    Ok(records)
}

fn render_csv_rows(rows: Vec<Vec<String>>) -> Vec<u8> {
    let mut output = String::new();
    for row in rows {
        let line = row
            .into_iter()
            .map(|field| escape_csv_field(&field))
            .collect::<Vec<_>>()
            .join(",");
        output.push_str(&line);
        output.push('\n');
    }
    output.into_bytes()
}

fn render_json_rows(
    column_names: &[String],
    rows: Vec<Vec<String>>,
) -> Result<Vec<u8>, RuntimeError> {
    let mut output = String::new();
    for row in rows {
        let mut object = serde_json::Map::with_capacity(column_names.len());
        for (index, value) in row.into_iter().enumerate() {
            let key = column_names
                .get(index)
                .cloned()
                .unwrap_or_else(|| format!("_{}", index + 1));
            object.insert(key, serde_json::Value::String(value));
        }
        output.push_str(
            &serde_json::to_string(&serde_json::Value::Object(object))
                .map_err(RuntimeError::SnapshotSerialize)?,
        );
        output.push('\n');
    }
    Ok(output.into_bytes())
}

fn parse_json_records(body: &[u8]) -> Result<(Option<Vec<String>>, Vec<Vec<String>>), RuntimeError> {
    let text = std::str::from_utf8(body).map_err(|_| {
        RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.invalidtextencoding",
        )
    })?;
    let trimmed = text.trim();
    if trimmed.is_empty() {
        return Ok((Some(Vec::new()), Vec::new()));
    }

    let values: Vec<serde_json::Value> = if trimmed.starts_with('[') {
        serde_json::from_str(trimmed).map_err(|_| {
            RuntimeError::ServiceSpecificErrorFeature(
                "feat:bucketwarden.s3err.select.objectserializationconflict",
            )
        })?
    } else {
        trimmed
            .lines()
            .map(|line| {
                serde_json::from_str::<serde_json::Value>(line).map_err(|_| {
                    RuntimeError::ServiceSpecificErrorFeature(
                        "feat:bucketwarden.s3err.select.objectserializationconflict",
                    )
                })
            })
            .collect::<Result<Vec<_>, _>>()?
    };

    let mut headers = Vec::new();
    let mut rows = Vec::with_capacity(values.len());
    for value in values {
        let object = value.as_object().ok_or(RuntimeError::ServiceSpecificErrorFeature(
            "feat:bucketwarden.s3err.select.objectserializationconflict",
        ))?;
        if headers.is_empty() {
            headers.extend(object.keys().cloned());
        }
        let row = headers
            .iter()
            .map(|header| match object.get(header) {
                Some(serde_json::Value::Null) | None => String::new(),
                Some(serde_json::Value::String(value)) => value.clone(),
                Some(other) => other.to_string(),
            })
            .collect::<Vec<_>>();
        rows.push(row);
    }
    Ok((Some(headers), rows))
}

fn escape_csv_field(field: &str) -> String {
    if field.contains(',') || field.contains('\n') || field.contains('\r') || field.contains('"') {
        format!("\"{}\"", field.replace('"', "\"\""))
    } else {
        field.to_string()
    }
}