use super::*;
fn parse_csv_records(body: &[u8]) -> Result<Vec<Vec<String>>, RuntimeError> {
let text = std::str::from_utf8(body).map_err(|_| {
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidtextencoding",
)
})?;
let mut records = Vec::new();
let mut record = Vec::new();
let mut field = String::new();
let mut chars = text.chars().peekable();
let mut in_quotes = false;
let mut record_bytes = 0usize;
while let Some(ch) = chars.next() {
record_bytes += ch.len_utf8();
if record_bytes > SELECT_MAX_RECORD_BYTES {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.overmaxrecordsize",
));
}
if in_quotes {
match ch {
'"' => {
if chars.peek() == Some(&'"') {
field.push('"');
chars.next();
} else {
in_quotes = false;
}
}
_ => field.push(ch),
}
continue;
}
match ch {
'"' if field.is_empty() => in_quotes = true,
'"' => {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.csvunescapedquote",
));
}
',' => {
record.push(std::mem::take(&mut field));
if record.len() > SELECT_MAX_COLUMNS {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.overmaxcolumn",
));
}
}
'\n' => {
record.push(std::mem::take(&mut field));
if record.len() > SELECT_MAX_COLUMNS {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.overmaxcolumn",
));
}
records.push(std::mem::take(&mut record));
record_bytes = 0;
}
'\r' => {
if chars.peek() == Some(&'\n') {
chars.next();
}
record.push(std::mem::take(&mut field));
if record.len() > SELECT_MAX_COLUMNS {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.overmaxcolumn",
));
}
records.push(std::mem::take(&mut record));
record_bytes = 0;
}
_ => field.push(ch),
}
}
if in_quotes {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.csvunescapedquote",
));
}
if !field.is_empty() || !record.is_empty() {
record.push(field);
if record.len() > SELECT_MAX_COLUMNS {
return Err(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.overmaxcolumn",
));
}
records.push(record);
}
Ok(records)
}
fn render_csv_rows(rows: Vec<Vec<String>>) -> Vec<u8> {
let mut output = String::new();
for row in rows {
let line = row
.into_iter()
.map(|field| escape_csv_field(&field))
.collect::<Vec<_>>()
.join(",");
output.push_str(&line);
output.push('\n');
}
output.into_bytes()
}
fn render_json_rows(
column_names: &[String],
rows: Vec<Vec<String>>,
) -> Result<Vec<u8>, RuntimeError> {
let mut output = String::new();
for row in rows {
let mut object = serde_json::Map::with_capacity(column_names.len());
for (index, value) in row.into_iter().enumerate() {
let key = column_names
.get(index)
.cloned()
.unwrap_or_else(|| format!("_{}", index + 1));
object.insert(key, serde_json::Value::String(value));
}
output.push_str(
&serde_json::to_string(&serde_json::Value::Object(object))
.map_err(RuntimeError::SnapshotSerialize)?,
);
output.push('\n');
}
Ok(output.into_bytes())
}
fn parse_json_records(body: &[u8]) -> Result<(Option<Vec<String>>, Vec<Vec<String>>), RuntimeError> {
let text = std::str::from_utf8(body).map_err(|_| {
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.invalidtextencoding",
)
})?;
let trimmed = text.trim();
if trimmed.is_empty() {
return Ok((Some(Vec::new()), Vec::new()));
}
let values: Vec<serde_json::Value> = if trimmed.starts_with('[') {
serde_json::from_str(trimmed).map_err(|_| {
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
)
})?
} else {
trimmed
.lines()
.map(|line| {
serde_json::from_str::<serde_json::Value>(line).map_err(|_| {
RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
)
})
})
.collect::<Result<Vec<_>, _>>()?
};
let mut headers = Vec::new();
let mut rows = Vec::with_capacity(values.len());
for value in values {
let object = value.as_object().ok_or(RuntimeError::ServiceSpecificErrorFeature(
"feat:bucketwarden.s3err.select.objectserializationconflict",
))?;
if headers.is_empty() {
headers.extend(object.keys().cloned());
}
let row = headers
.iter()
.map(|header| match object.get(header) {
Some(serde_json::Value::Null) | None => String::new(),
Some(serde_json::Value::String(value)) => value.clone(),
Some(other) => other.to_string(),
})
.collect::<Vec<_>>();
rows.push(row);
}
Ok((Some(headers), rows))
}
fn escape_csv_field(field: &str) -> String {
if field.contains(',') || field.contains('\n') || field.contains('\r') || field.contains('"') {
format!("\"{}\"", field.replace('"', "\"\""))
} else {
field.to_string()
}
}