use std::collections::HashMap;
use super::transform::TransformResult;
const COL_SEP: u8 = 0x00;
const VAL_SEP: u8 = 0x01;
const METADATA_VERSION_UNIFORM: u8 = 1;
const METADATA_VERSION_GROUPED: u8 = 2;
const METADATA_VERSION_SELECTIVE: u8 = 3;
#[allow(dead_code)]
const METADATA_VERSION_TYPED: u8 = 4;
const MIN_GROUP_ROWS: usize = 5;
const SELECTIVE_MIN_AVG_LEN: usize = 128;
const SELECTIVE_MAX_CARDINALITY: f64 = 0.7;
type SchemaGroup<'a> = (Vec<&'a [u8]>, Vec<(usize, Vec<&'a [u8]>)>);
fn extract_value(line: &[u8], mut pos: usize) -> Option<(&[u8], usize)> {
while pos < line.len() && line[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= line.len() {
return None;
}
let start = pos;
match line[pos] {
b'"' => {
pos += 1;
let mut escaped = false;
while pos < line.len() {
if escaped {
escaped = false;
} else if line[pos] == b'\\' {
escaped = true;
} else if line[pos] == b'"' {
pos += 1;
return Some((&line[start..pos], pos));
}
pos += 1;
}
None }
b'{' => {
let mut depth = 1;
pos += 1;
while pos < line.len() && depth > 0 {
match line[pos] {
b'"' => {
pos += 1;
let mut escaped = false;
while pos < line.len() {
if escaped {
escaped = false;
} else if line[pos] == b'\\' {
escaped = true;
} else if line[pos] == b'"' {
break;
}
pos += 1;
}
}
b'{' => depth += 1,
b'}' => depth -= 1,
_ => {}
}
pos += 1;
}
if depth != 0 || pos > line.len() {
return None; }
Some((&line[start..pos], pos))
}
b'[' => {
let mut depth = 1;
pos += 1;
while pos < line.len() && depth > 0 {
match line[pos] {
b'"' => {
pos += 1;
let mut escaped = false;
while pos < line.len() {
if escaped {
escaped = false;
} else if line[pos] == b'\\' {
escaped = true;
} else if line[pos] == b'"' {
break;
}
pos += 1;
}
}
b'[' => depth += 1,
b']' => depth -= 1,
_ => {}
}
pos += 1;
}
if depth != 0 || pos > line.len() {
return None; }
Some((&line[start..pos], pos))
}
_ => {
while pos < line.len() {
match line[pos] {
b',' | b'}' | b']' => break,
_ if line[pos].is_ascii_whitespace() => break,
_ => pos += 1,
}
}
if pos == start {
None
} else {
Some((&line[start..pos], pos))
}
}
}
}
type ParsedLine<'a> = (Vec<&'a [u8]>, Vec<&'a [u8]>);
fn parse_line(line: &[u8]) -> Option<ParsedLine<'_>> {
let mut pos = 0;
while pos < line.len() && line[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= line.len() || line[pos] != b'{' {
return None;
}
let mut parts: Vec<&[u8]> = Vec::new();
let mut values: Vec<&[u8]> = Vec::new();
let mut part_start = 0;
pos += 1;
loop {
while pos < line.len() && line[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= line.len() {
return None;
}
if line[pos] == b'}' {
parts.push(&line[part_start..]);
break;
}
if line[pos] != b'"' {
return None;
}
pos += 1;
let mut escaped = false;
while pos < line.len() {
if escaped {
escaped = false;
} else if line[pos] == b'\\' {
escaped = true;
} else if line[pos] == b'"' {
pos += 1;
break;
}
pos += 1;
}
while pos < line.len() && line[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= line.len() || line[pos] != b':' {
return None;
}
pos += 1;
while pos < line.len() && line[pos].is_ascii_whitespace() {
pos += 1;
}
parts.push(&line[part_start..pos]);
let (value, value_end) = extract_value(line, pos)?;
values.push(value);
pos = value_end;
part_start = pos;
while pos < line.len() && line[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= line.len() {
return None;
}
if line[pos] == b',' {
pos += 1;
} else if line[pos] == b'}' {
parts.push(&line[part_start..]);
break;
} else {
return None; }
}
if values.is_empty() {
return None;
}
Some((parts, values))
}
fn split_lines(data: &[u8]) -> Vec<&[u8]> {
let mut lines: Vec<&[u8]> = Vec::new();
let mut start = 0;
for pos in memchr::memchr_iter(b'\n', data) {
lines.push(&data[start..pos]);
start = pos + 1;
}
if start < data.len() {
lines.push(&data[start..]);
}
lines
}
fn build_uniform_columnar(
template_parts: &[&[u8]],
columns: &[Vec<&[u8]>],
num_rows: usize,
has_trailing_newline: bool,
) -> (Vec<u8>, Vec<u8>) {
let num_cols = columns.len();
let mut col_data = Vec::new();
for (ci, col) in columns.iter().enumerate() {
for (ri, val) in col.iter().enumerate() {
col_data.extend_from_slice(val);
if ri < num_rows - 1 {
col_data.push(VAL_SEP);
}
}
if ci < num_cols - 1 {
col_data.push(COL_SEP);
}
}
let mut metadata = Vec::new();
metadata.push(METADATA_VERSION_UNIFORM);
metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
metadata.push(u8::from(has_trailing_newline));
metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
for part in template_parts {
metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
metadata.extend_from_slice(part);
}
(col_data, metadata)
}
fn classify_columns(columns: &[Vec<&[u8]>], num_rows: usize) -> Vec<bool> {
use std::collections::HashSet;
columns
.iter()
.map(|col_values| {
if num_rows < 10 {
return true; }
let unique: HashSet<&[u8]> = col_values.iter().copied().collect();
let cardinality_ratio = unique.len() as f64 / num_rows as f64;
let avg_len = col_values.iter().map(|v| v.len()).sum::<usize>() / num_rows;
!(cardinality_ratio > SELECTIVE_MAX_CARDINALITY && avg_len >= SELECTIVE_MIN_AVG_LEN)
})
.collect()
}
fn build_selective_columnar(
template_parts: &[&[u8]],
columns: &[Vec<&[u8]>],
extract_mask: &[bool],
num_rows: usize,
has_trailing_newline: bool,
) -> (Vec<u8>, Vec<u8>) {
let num_total_cols = columns.len();
let extracted_indices: Vec<u16> = (0..num_total_cols)
.filter(|&i| extract_mask[i])
.map(|i| i as u16)
.collect();
let inline_indices: Vec<u16> = (0..num_total_cols)
.filter(|&i| !extract_mask[i])
.map(|i| i as u16)
.collect();
let mut extracted_data = Vec::new();
for (ei, &col_idx) in extracted_indices.iter().enumerate() {
let col = &columns[col_idx as usize];
for (ri, val) in col.iter().enumerate() {
extracted_data.extend_from_slice(val);
if ri < num_rows - 1 {
extracted_data.push(VAL_SEP);
}
}
if ei < extracted_indices.len() - 1 {
extracted_data.push(COL_SEP);
}
}
let mut inline_data = Vec::new();
if !inline_indices.is_empty() {
#[allow(clippy::needless_range_loop)]
for row in 0..num_rows {
for (ii, &col_idx) in inline_indices.iter().enumerate() {
inline_data.extend_from_slice(columns[col_idx as usize][row]);
if ii < inline_indices.len() - 1 {
inline_data.push(VAL_SEP);
}
}
if row < num_rows - 1 {
inline_data.push(COL_SEP);
}
}
}
let mut data = Vec::with_capacity(4 + extracted_data.len() + inline_data.len());
data.extend_from_slice(&(extracted_data.len() as u32).to_le_bytes());
data.extend_from_slice(&extracted_data);
data.extend_from_slice(&inline_data);
let mut metadata = Vec::new();
metadata.push(METADATA_VERSION_SELECTIVE);
metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
metadata.extend_from_slice(&(num_total_cols as u16).to_le_bytes());
metadata.push(u8::from(has_trailing_newline));
metadata.extend_from_slice(&(extracted_indices.len() as u16).to_le_bytes());
for &idx in &extracted_indices {
metadata.extend_from_slice(&idx.to_le_bytes());
}
metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
for part in template_parts {
metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
metadata.extend_from_slice(part);
}
(data, metadata)
}
fn preprocess_uniform(
non_empty: &[&[u8]],
has_trailing_newline: bool,
) -> Option<(Vec<u8>, Vec<u8>)> {
if non_empty.len() < 2 {
return None;
}
let (template_parts, first_values) = parse_line(non_empty[0])?;
let num_cols = first_values.len();
if template_parts.len() != num_cols + 1 {
return None;
}
let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
for v in &first_values {
columns.push(vec![*v]);
}
for &line in &non_empty[1..] {
let (parts, values) = parse_line(line)?;
if values.len() != num_cols || parts.len() != template_parts.len() {
return None;
}
for (a, b) in parts.iter().zip(template_parts.iter()) {
if a != b {
return None;
}
}
for (col, val) in values.iter().enumerate() {
columns[col].push(*val);
}
}
let extract_mask = classify_columns(&columns, non_empty.len());
let all_extracted = extract_mask.iter().all(|&e| e);
if all_extracted {
Some(build_uniform_columnar(
&template_parts,
&columns,
non_empty.len(),
has_trailing_newline,
))
} else {
Some(build_selective_columnar(
&template_parts,
&columns,
&extract_mask,
non_empty.len(),
has_trailing_newline,
))
}
}
fn find_discriminator<'a>(parsed: &[Option<ParsedLine<'a>>]) -> Option<usize> {
use std::collections::HashSet;
let mut samples: Vec<&ParsedLine<'a>> = Vec::new();
for line in parsed.iter().flatten() {
samples.push(line);
if samples.len() >= 200 {
break;
}
}
if samples.len() < 10 {
return None;
}
let num_cols = samples.iter().map(|s| s.1.len()).min().unwrap_or(0);
if num_cols == 0 {
return None;
}
let mut best_col = None;
let mut best_cardinality = usize::MAX;
for col_idx in 0..num_cols {
let total_len: usize = samples.iter().map(|s| s.1[col_idx].len()).sum();
let avg_len = total_len / samples.len();
if avg_len > 30 {
continue;
}
let unique: HashSet<&[u8]> = samples.iter().map(|s| s.1[col_idx]).collect();
let cardinality = unique.len();
if cardinality > 1 && cardinality < samples.len() / 3 && cardinality < best_cardinality {
best_col = Some(col_idx);
best_cardinality = cardinality;
}
}
best_col
}
fn preprocess_grouped<'a>(
non_empty: &[&'a [u8]],
has_trailing_newline: bool,
) -> Option<(Vec<u8>, Vec<u8>)> {
if non_empty.len() < MIN_GROUP_ROWS {
return None;
}
let parsed: Vec<Option<ParsedLine<'a>>> = non_empty.iter().map(|&l| parse_line(l)).collect();
let disc_col = find_discriminator(&parsed);
drop(parsed);
let result_no_disc = preprocess_grouped_core(non_empty, has_trailing_newline, None);
if let Some(dc) = disc_col {
let result_disc = preprocess_grouped_core(non_empty, has_trailing_newline, Some(dc));
match (&result_no_disc, &result_disc) {
(Some((d1, m1)), Some((d2, m2))) => {
if d2.len() + m2.len() < d1.len() + m1.len() {
return result_disc;
}
return result_no_disc;
}
(None, Some(_)) => return result_disc,
_ => return result_no_disc,
}
}
result_no_disc
}
fn preprocess_grouped_core<'a>(
non_empty: &[&'a [u8]],
has_trailing_newline: bool,
disc_col: Option<usize>,
) -> Option<(Vec<u8>, Vec<u8>)> {
let mut parsed: Vec<Option<ParsedLine<'a>>> = Vec::with_capacity(non_empty.len());
for &line in non_empty {
parsed.push(parse_line(line));
}
let mut group_map: HashMap<Vec<u8>, SchemaGroup<'a>> = HashMap::new();
let mut residual_indices: Vec<usize> = Vec::new();
for (idx, parsed_line) in parsed.into_iter().enumerate() {
if let Some((parts, values)) = parsed_line {
let mut key = Vec::new();
for part in &parts {
key.extend_from_slice(&(part.len() as u32).to_le_bytes());
key.extend_from_slice(part);
}
if let Some(dc) = disc_col {
if dc < values.len() {
key.push(0xFF);
key.extend_from_slice(values[dc]);
}
}
group_map
.entry(key)
.or_insert_with(|| (parts, Vec::new()))
.1
.push((idx, values));
} else {
residual_indices.push(idx);
}
}
let mut groups: Vec<SchemaGroup<'a>> = Vec::new();
for (_key, (template_parts, rows)) in group_map {
if rows.len() >= MIN_GROUP_ROWS {
groups.push((template_parts, rows));
} else {
for (idx, _) in &rows {
residual_indices.push(*idx);
}
}
}
if groups.is_empty() {
return None;
}
groups.sort_by_key(|(_, rows)| rows[0].0);
residual_indices.sort_unstable();
struct GroupOutput {
row_indices: Vec<u32>,
col_data: Vec<u8>,
group_metadata: Vec<u8>,
}
let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
for (template_parts, rows) in &groups {
let num_cols = template_parts.len() - 1;
let mut columns: Vec<Vec<&[u8]>> = (0..num_cols).map(|_| Vec::new()).collect();
let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
for (idx, values) in rows {
row_indices.push(*idx as u32);
for (col, val) in values.iter().enumerate() {
columns[col].push(*val);
}
}
let extract_mask = classify_columns(&columns, rows.len());
let all_extracted = extract_mask.iter().all(|&e| e);
let (mut col_data, mut group_metadata) = if all_extracted {
build_uniform_columnar(template_parts, &columns, rows.len(), false)
} else {
build_selective_columnar(template_parts, &columns, &extract_mask, rows.len(), false)
};
if all_extracted {
if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, rows.len())
{
let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
let unflattened = unflatten_nested_columns(
&flat_data,
&nested_groups,
rows.len(),
total_flat_cols,
);
if unflattened == col_data {
let nested_bytes = serialize_nested_info(&nested_groups);
group_metadata.extend_from_slice(&nested_bytes);
col_data = flat_data;
} else {
group_metadata.push(0u8); }
} else {
group_metadata.push(0u8); }
}
group_outputs.push(GroupOutput {
row_indices,
col_data,
group_metadata,
});
}
let mut data_out = Vec::new();
for group in &group_outputs {
data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
data_out.extend_from_slice(&group.col_data);
}
let residual_start = data_out.len();
for (i, &idx) in residual_indices.iter().enumerate() {
data_out.extend_from_slice(non_empty[idx]);
if i < residual_indices.len() - 1 {
data_out.push(b'\n');
}
}
let _residual_len = data_out.len() - residual_start;
let mut metadata = Vec::new();
metadata.push(METADATA_VERSION_GROUPED);
metadata.push(u8::from(has_trailing_newline));
metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
for group in &group_outputs {
metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
for &idx in &group.row_indices {
metadata.extend_from_slice(&idx.to_le_bytes());
}
metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
metadata.extend_from_slice(&group.group_metadata);
}
metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
for &idx in &residual_indices {
metadata.extend_from_slice(&(idx as u32).to_le_bytes());
}
Some((data_out, metadata))
}
pub(crate) struct NestedGroupInfo {
pub(crate) original_col_index: u16,
pub(crate) sub_keys: Vec<Vec<u8>>,
pub(crate) nested_template: Vec<Vec<u8>>,
pub(crate) absence_bitmap: Vec<u8>,
}
pub(crate) fn flatten_nested_columns(
col_data: &[u8],
num_rows: usize,
) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
if columns.is_empty() || num_rows == 0 {
return None;
}
let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
for (col_idx, &col_chunk) in columns.iter().enumerate() {
let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
if values.len() != num_rows {
return None;
}
let mut all_objects = true;
let mut has_non_null = false;
for val in &values {
if *val == b"null" {
continue;
}
has_non_null = true;
if !val.starts_with(b"{") {
all_objects = false;
break;
}
}
if !all_objects || !has_non_null {
let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
output_columns.push(col_values);
continue;
}
let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
let mut nested_template: Vec<Vec<u8>> = Vec::new();
type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
for val in &values {
if *val == b"null" {
parsed_rows.push(None);
continue;
}
if nested_template.is_empty() {
match parse_nested_object_with_template(val) {
Some((template, kv_pairs)) => {
for (key, _) in &kv_pairs {
if !all_sub_keys.iter().any(|k| k == key) {
all_sub_keys.push(key.clone());
}
}
nested_template = template;
parsed_rows.push(Some(kv_pairs));
}
None => {
all_sub_keys.clear();
break;
}
}
} else {
match parse_nested_object_kv(val) {
Some(kv_pairs) => {
for (key, _) in &kv_pairs {
if !all_sub_keys.iter().any(|k| k == key) {
all_sub_keys.push(key.clone());
}
}
parsed_rows.push(Some(kv_pairs));
}
None => {
all_sub_keys.clear();
break;
}
}
}
}
if all_sub_keys.is_empty() {
let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
output_columns.push(col_values);
continue;
}
let num_sub_keys = all_sub_keys.len();
let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
let total_bits = num_sub_keys * num_rows;
let bitmap_bytes = total_bits.div_ceil(8);
let mut absence_bitmap = vec![0u8; bitmap_bytes];
let mut has_any_absent = false;
for (row_idx, parsed) in parsed_rows.iter().enumerate() {
match parsed {
Some(kv_pairs) => {
for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
let found = kv_pairs.iter().find(|(k, _)| k == sk);
match found {
Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
None => {
sub_columns[sk_idx].push(b"null".to_vec());
let bit_idx = sk_idx * num_rows + row_idx;
absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
has_any_absent = true;
}
}
}
}
None => {
for sc in &mut sub_columns {
sc.push(b"null".to_vec());
}
}
}
}
nested_groups.push(NestedGroupInfo {
original_col_index: col_idx as u16,
sub_keys: all_sub_keys,
nested_template,
absence_bitmap: if has_any_absent {
absence_bitmap
} else {
Vec::new()
},
});
for sc in sub_columns {
output_columns.push(sc);
}
}
if nested_groups.is_empty() {
return None;
}
let num_out_cols = output_columns.len();
let mut out = Vec::new();
for (ci, col) in output_columns.iter().enumerate() {
for (ri, val) in col.iter().enumerate() {
out.extend_from_slice(val);
if ri < num_rows - 1 {
out.push(VAL_SEP);
}
}
if ci < num_out_cols - 1 {
out.push(COL_SEP);
}
}
Some((out, nested_groups))
}
#[allow(clippy::type_complexity)]
pub(crate) fn parse_nested_object_with_template(
obj: &[u8],
) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
let mut pos = 0;
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() || obj[pos] != b'{' {
return None;
}
pos += 1;
let mut parts: Vec<Vec<u8>> = Vec::new();
let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
let mut part_start = 0;
loop {
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() {
return None;
}
if obj[pos] == b'}' {
parts.push(obj[part_start..].to_vec());
break;
}
if obj[pos] != b'"' {
return None;
}
let key_str_start = pos + 1;
pos += 1;
let mut escaped = false;
while pos < obj.len() {
if escaped {
escaped = false;
} else if obj[pos] == b'\\' {
escaped = true;
} else if obj[pos] == b'"' {
break;
}
pos += 1;
}
if pos >= obj.len() {
return None;
}
let key = obj[key_str_start..pos].to_vec();
pos += 1;
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() || obj[pos] != b':' {
return None;
}
pos += 1;
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
parts.push(obj[part_start..pos].to_vec());
let value_start = pos;
let (value, value_end) = extract_value(obj, value_start)?;
pos = value_end;
pairs.push((key, value.to_vec()));
part_start = pos;
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() {
return None;
}
if obj[pos] == b',' {
pos += 1;
} else if obj[pos] == b'}' {
parts.push(obj[part_start..].to_vec());
break;
} else {
return None;
}
}
if pairs.is_empty() {
return None;
}
Some((parts, pairs))
}
pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
let mut pos = 0;
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() || obj[pos] != b'{' {
return None;
}
pos += 1;
let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
loop {
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() {
return None;
}
if obj[pos] == b'}' {
break;
}
if obj[pos] != b'"' {
return None;
}
pos += 1;
let key_start = pos;
let mut escaped = false;
while pos < obj.len() {
if escaped {
escaped = false;
} else if obj[pos] == b'\\' {
escaped = true;
} else if obj[pos] == b'"' {
break;
}
pos += 1;
}
if pos >= obj.len() {
return None;
}
let key = obj[key_start..pos].to_vec();
pos += 1;
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() || obj[pos] != b':' {
return None;
}
pos += 1;
let (value, value_end) = extract_value(obj, pos)?;
pos = value_end;
pairs.push((key, value.to_vec()));
while pos < obj.len() && obj[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= obj.len() {
return None;
}
if obj[pos] == b',' {
pos += 1;
} else if obj[pos] == b'}' {
break;
} else {
return None;
}
}
if pairs.is_empty() {
return None;
}
Some(pairs)
}
pub(crate) fn unflatten_nested_columns(
flat_data: &[u8],
nested_groups: &[NestedGroupInfo],
num_rows: usize,
total_flat_cols: usize,
) -> Vec<u8> {
let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
if flat_columns.len() != total_flat_cols {
return flat_data.to_vec();
}
let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
for chunk in &flat_columns {
let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
if vals.len() != num_rows {
return flat_data.to_vec();
}
flat_col_values.push(vals);
}
let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
let original_num_cols = total_flat_cols
- nested_groups
.iter()
.map(|g| g.sub_keys.len())
.sum::<usize>()
+ nested_groups.len();
let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
for (gi, group) in nested_groups.iter().enumerate() {
if (group.original_col_index as usize) < original_num_cols {
original_col_map[group.original_col_index as usize] = Some(gi);
}
}
let mut flat_idx = 0;
for entry in original_col_map.iter().take(original_num_cols) {
if let Some(gi) = entry {
let group = &nested_groups[*gi];
let num_sub = group.sub_keys.len();
let is_absent = |si: usize, row: usize| -> bool {
if group.absence_bitmap.is_empty() {
return false; }
let bit_idx = si * num_rows + row;
let byte_idx = bit_idx / 8;
if byte_idx >= group.absence_bitmap.len() {
return false;
}
(group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
};
let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
for row in 0..num_rows {
let all_null = (0..num_sub).all(|si| {
flat_idx + si < flat_col_values.len()
&& flat_col_values[flat_idx + si][row] == b"null"
});
if all_null && !group.absence_bitmap.is_empty() {
let any_present_null = (0..num_sub).any(|si| {
flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
});
if any_present_null {
} else {
merged_col.push(b"null".to_vec());
continue;
}
} else if all_null {
merged_col.push(b"null".to_vec());
continue;
}
let has_absent = (0..num_sub).any(|si| is_absent(si, row));
if !has_absent
&& !group.nested_template.is_empty()
&& group.nested_template.len() == num_sub + 1
{
let mut obj = Vec::new();
obj.extend_from_slice(&group.nested_template[0]);
if flat_idx < flat_col_values.len() {
obj.extend_from_slice(flat_col_values[flat_idx][row]);
}
for si in 1..num_sub {
obj.extend_from_slice(&group.nested_template[si]);
if flat_idx + si < flat_col_values.len() {
obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
}
}
obj.extend_from_slice(&group.nested_template[num_sub]);
merged_col.push(obj);
} else {
let mut obj = Vec::new();
obj.push(b'{');
let mut first = true;
for si in 0..num_sub {
if flat_idx + si >= flat_col_values.len() {
break;
}
if is_absent(si, row) {
continue; }
let val = flat_col_values[flat_idx + si][row];
if !first {
obj.push(b',');
}
first = false;
obj.push(b'"');
obj.extend_from_slice(&group.sub_keys[si]);
obj.push(b'"');
obj.push(b':');
obj.extend_from_slice(val);
}
obj.push(b'}');
merged_col.push(obj);
}
}
output_columns.push(merged_col);
flat_idx += num_sub;
} else {
if flat_idx < flat_col_values.len() {
let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
.iter()
.map(|v| v.to_vec())
.collect();
output_columns.push(col);
}
flat_idx += 1;
}
}
let num_out_cols = output_columns.len();
let mut out = Vec::new();
for (ci, col) in output_columns.iter().enumerate() {
for (ri, val) in col.iter().enumerate() {
out.extend_from_slice(val);
if ri < num_rows - 1 {
out.push(VAL_SEP);
}
}
if ci < num_out_cols - 1 {
out.push(COL_SEP);
}
}
out
}
pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
let mut out = Vec::new();
let version = if has_absence {
3u8
} else if has_template {
2u8
} else {
1u8
};
out.push(version);
out.push(groups.len() as u8);
for group in groups {
out.extend_from_slice(&group.original_col_index.to_le_bytes());
out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
for key in &group.sub_keys {
out.extend_from_slice(&(key.len() as u16).to_le_bytes());
out.extend_from_slice(key);
}
if has_template || version == 3 {
out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
for part in &group.nested_template {
out.extend_from_slice(&(part.len() as u16).to_le_bytes());
out.extend_from_slice(part);
}
}
if version == 3 {
let bm_len = group.absence_bitmap.len() as u32;
out.extend_from_slice(&bm_len.to_le_bytes());
out.extend_from_slice(&group.absence_bitmap);
}
}
out
}
pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
if data.is_empty() {
return None;
}
let mut pos = 0;
let version = data[pos];
pos += 1;
if version != 1 && version != 2 && version != 3 {
return None;
}
let has_template = version == 2 || version == 3;
let has_absence = version == 3;
if pos >= data.len() {
return None;
}
let num_groups = data[pos] as usize;
pos += 1;
let mut groups = Vec::with_capacity(num_groups);
for _ in 0..num_groups {
if pos + 4 > data.len() {
return None;
}
let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
pos += 2;
let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut sub_keys = Vec::with_capacity(num_sub_cols);
for _ in 0..num_sub_cols {
if pos + 2 > data.len() {
return None;
}
let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + key_len > data.len() {
return None;
}
sub_keys.push(data[pos..pos + key_len].to_vec());
pos += key_len;
}
let nested_template = if has_template {
if pos + 2 > data.len() {
return None;
}
let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut parts = Vec::with_capacity(num_parts);
for _ in 0..num_parts {
if pos + 2 > data.len() {
return None;
}
let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + part_len > data.len() {
return None;
}
parts.push(data[pos..pos + part_len].to_vec());
pos += part_len;
}
parts
} else {
Vec::new()
};
let absence_bitmap = if has_absence {
if pos + 4 > data.len() {
return None;
}
let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
if pos + bm_len > data.len() {
return None;
}
let bm = data[pos..pos + bm_len].to_vec();
pos += bm_len;
bm
} else {
Vec::new()
};
groups.push(NestedGroupInfo {
original_col_index,
sub_keys,
nested_template,
absence_bitmap,
});
}
Some((groups, pos))
}
pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
if data.is_empty() {
return None;
}
let has_trailing_newline = data.last() == Some(&b'\n');
let lines = split_lines(data);
let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
if non_empty.len() < 2 {
return None;
}
if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
let is_selective = !metadata.is_empty() && metadata[0] == METADATA_VERSION_SELECTIVE;
let size_ok = if is_selective {
(col_data.len() + metadata.len()) * 100 <= data.len() * 105
} else {
col_data.len() + metadata.len() < data.len()
};
if size_ok {
if is_selective {
return Some(TransformResult {
data: col_data,
metadata,
});
}
let num_rows = non_empty.len();
if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
let unflattened =
unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
if unflattened == col_data {
let nested_bytes = serialize_nested_info(&nested_groups);
metadata.extend_from_slice(&nested_bytes);
return Some(TransformResult {
data: flat_data,
metadata,
});
}
}
metadata.push(0u8); return Some(TransformResult {
data: col_data,
metadata,
});
}
}
if let Some((grouped_data, grouped_metadata)) =
preprocess_grouped(&non_empty, has_trailing_newline)
{
if grouped_data.len() + grouped_metadata.len() < data.len() {
return Some(TransformResult {
data: grouped_data,
metadata: grouped_metadata,
});
}
}
None
}
pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
if metadata.is_empty() {
return data.to_vec();
}
match metadata[0] {
METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
METADATA_VERSION_SELECTIVE => reverse_selective(data, metadata),
_ => data.to_vec(),
}
}
fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
if metadata.len() < 10 {
return data.to_vec();
}
let mut pos = 0;
pos += 1;
let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let has_trailing_newline = metadata[pos] != 0;
pos += 1;
let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
for _ in 0..num_parts {
if pos + 2 > metadata.len() {
return data.to_vec();
}
let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + part_len > metadata.len() {
return data.to_vec();
}
parts.push(metadata[pos..pos + part_len].to_vec());
pos += part_len;
}
if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
return data.to_vec();
}
let remaining_metadata = &metadata[pos..];
if !remaining_metadata.is_empty()
&& (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
{
if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
let total_flat_cols = data.split(|&b| b == COL_SEP).count();
let unflattened =
unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
return reverse_uniform_from_parts(
&unflattened,
&parts,
num_rows,
num_cols,
has_trailing_newline,
);
}
}
reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
}
fn reverse_uniform_from_parts(
data: &[u8],
parts: &[Vec<u8>],
num_rows: usize,
num_cols: usize,
has_trailing_newline: bool,
) -> Vec<u8> {
let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
if col_chunks.len() != num_cols {
return data.to_vec();
}
let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
for chunk in &col_chunks {
let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
if vals.len() != num_rows {
return data.to_vec();
}
columns.push(vals);
}
let template_size_per_row: usize = parts.iter().map(std::vec::Vec::len).sum();
let values_total: usize = columns
.iter()
.map(|col| col.iter().map(|v| v.len()).sum::<usize>())
.sum();
let newline_count = if has_trailing_newline {
num_rows
} else {
num_rows - 1
};
let total_size = template_size_per_row * num_rows + values_total + newline_count;
let mut output = Vec::with_capacity(total_size);
#[allow(clippy::needless_range_loop)]
for row in 0..num_rows {
output.extend_from_slice(&parts[0]);
output.extend_from_slice(columns[0][row]);
for col in 1..num_cols {
output.extend_from_slice(&parts[col]);
output.extend_from_slice(columns[col][row]);
}
output.extend_from_slice(&parts[num_cols]);
if row < num_rows - 1 || has_trailing_newline {
output.push(b'\n');
}
}
output
}
struct SelectiveMetadata {
parts: Vec<Vec<u8>>,
num_rows: usize,
num_total_cols: usize,
has_trailing_newline: bool,
extracted_col_indices: Vec<u16>,
}
fn parse_selective_metadata(metadata: &[u8]) -> Option<SelectiveMetadata> {
if metadata.len() < 12 {
return None;
}
let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let num_total_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let has_trailing_newline = metadata[pos] != 0;
pos += 1;
let num_extracted = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut extracted_col_indices = Vec::with_capacity(num_extracted);
for _ in 0..num_extracted {
if pos + 2 > metadata.len() {
return None;
}
let idx = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap());
pos += 2;
extracted_col_indices.push(idx);
}
if pos + 2 > metadata.len() {
return None;
}
let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut parts = Vec::with_capacity(num_parts);
for _ in 0..num_parts {
if pos + 2 > metadata.len() {
return None;
}
let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + part_len > metadata.len() {
return None;
}
parts.push(metadata[pos..pos + part_len].to_vec());
pos += part_len;
}
if parts.len() != num_total_cols + 1 || num_rows == 0 || num_total_cols == 0 {
return None;
}
Some(SelectiveMetadata {
parts,
num_rows,
num_total_cols,
has_trailing_newline,
extracted_col_indices,
})
}
fn reverse_selective(data: &[u8], metadata: &[u8]) -> Vec<u8> {
let sm = match parse_selective_metadata(metadata) {
Some(v) => v,
None => return data.to_vec(),
};
reverse_selective_from_data(data, &sm)
}
fn reverse_selective_from_data(data: &[u8], sm: &SelectiveMetadata) -> Vec<u8> {
if data.len() < 4 {
return data.to_vec();
}
let extracted_data_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
if 4 + extracted_data_len > data.len() {
return data.to_vec();
}
let extracted_section = &data[4..4 + extracted_data_len];
let inline_section = &data[4 + extracted_data_len..];
let num_extracted = sm.extracted_col_indices.len();
let num_inline = sm.num_total_cols - num_extracted;
let extracted_columns: Vec<Vec<&[u8]>> = if num_extracted > 0 && !extracted_section.is_empty() {
let col_chunks: Vec<&[u8]> = extracted_section.split(|&b| b == COL_SEP).collect();
if col_chunks.len() != num_extracted {
return data.to_vec();
}
let mut cols = Vec::with_capacity(num_extracted);
for chunk in &col_chunks {
let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
if vals.len() != sm.num_rows {
return data.to_vec();
}
cols.push(vals);
}
cols
} else if num_extracted > 0 {
return data.to_vec();
} else {
Vec::new()
};
let inline_rows: Vec<Vec<&[u8]>> = if num_inline > 0 && !inline_section.is_empty() {
let row_chunks: Vec<&[u8]> = inline_section.split(|&b| b == COL_SEP).collect();
if row_chunks.len() != sm.num_rows {
return data.to_vec();
}
let mut rows = Vec::with_capacity(sm.num_rows);
for chunk in &row_chunks {
let vals: Vec<&[u8]> = if num_inline > 1 {
chunk.split(|&b| b == VAL_SEP).collect()
} else {
vec![*chunk]
};
if vals.len() != num_inline {
return data.to_vec();
}
rows.push(vals);
}
rows
} else if num_inline > 0 {
return data.to_vec();
} else {
Vec::new()
};
let mut extracted_positions = vec![None; sm.num_total_cols];
let mut inline_positions = vec![None; sm.num_total_cols];
for (ei, &col_idx) in sm.extracted_col_indices.iter().enumerate() {
if (col_idx as usize) < sm.num_total_cols {
extracted_positions[col_idx as usize] = Some(ei);
}
}
let mut ii = 0;
for col in 0..sm.num_total_cols {
if extracted_positions[col].is_none() {
inline_positions[col] = Some(ii);
ii += 1;
}
}
let mut output = Vec::with_capacity(data.len() * 2);
for row in 0..sm.num_rows {
output.extend_from_slice(&sm.parts[0]);
for col in 0..sm.num_total_cols {
if let Some(ei) = extracted_positions[col] {
output.extend_from_slice(extracted_columns[ei][row]);
} else if let Some(ii_idx) = inline_positions[col] {
output.extend_from_slice(inline_rows[row][ii_idx]);
}
output.extend_from_slice(&sm.parts[col + 1]);
}
if row < sm.num_rows - 1 || sm.has_trailing_newline {
output.push(b'\n');
}
}
output
}
fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
if metadata.len() < 8 {
return data.to_vec();
}
let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
mpos += 1;
let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
let mut dpos: usize = 0;
for _ in 0..num_groups {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let group_row_count =
u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let mut row_indices = Vec::with_capacity(group_row_count);
for _ in 0..group_row_count {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
row_indices.push(idx);
}
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + gm_len > metadata.len() {
return data.to_vec();
}
let group_metadata = &metadata[mpos..mpos + gm_len];
mpos += gm_len;
if dpos + 4 > data.len() {
return data.to_vec();
}
let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
dpos += 4;
if dpos + gd_len > data.len() {
return data.to_vec();
}
let group_data = &data[dpos..dpos + gd_len];
dpos += gd_len;
let group_version = if group_metadata.is_empty() {
0
} else {
group_metadata[0]
};
if group_version == METADATA_VERSION_SELECTIVE {
let sm = match parse_selective_metadata(group_metadata) {
Some(v) => v,
None => return data.to_vec(),
};
if sm.num_rows != group_row_count {
return data.to_vec();
}
let reconstructed = reverse_selective_from_data(group_data, &sm);
let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
if row_within_group < lines.len() && original_idx < total_rows {
output_lines[original_idx] = Some(lines[row_within_group].to_vec());
}
}
} else {
let reconstructed = reverse_uniform(group_data, group_metadata);
let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
if row_within_group < lines.len() && original_idx < total_rows {
output_lines[original_idx] = Some(lines[row_within_group].to_vec());
}
}
}
}
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let mut residual_indices = Vec::with_capacity(residual_count);
for _ in 0..residual_count {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
residual_indices.push(idx);
}
let residual_data = &data[dpos..];
if residual_count > 0 {
let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
vec![]
} else {
residual_data.split(|&b| b == b'\n').collect()
};
if residual_lines.len() != residual_count {
return data.to_vec();
}
for (i, &idx) in residual_indices.iter().enumerate() {
if idx < total_rows {
output_lines[idx] = Some(residual_lines[i].to_vec());
}
}
}
let mut output = Vec::with_capacity(data.len() * 2);
for (i, slot) in output_lines.iter().enumerate() {
match slot {
Some(line) => output.extend_from_slice(line),
None => {
return data.to_vec();
}
}
if i < total_rows - 1 || has_trailing_newline {
output.push(b'\n');
}
}
output
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_value_string() {
let line = br#""hello","next""#;
let (val, end) = extract_value(line, 0).unwrap();
assert_eq!(val, b"\"hello\"");
assert_eq!(end, 7);
}
#[test]
fn extract_value_number() {
let line = b"42,next";
let (val, end) = extract_value(line, 0).unwrap();
assert_eq!(val, b"42");
assert_eq!(end, 2);
}
#[test]
fn extract_value_bool() {
let line = b"true,next";
let (val, end) = extract_value(line, 0).unwrap();
assert_eq!(val, b"true");
assert_eq!(end, 4);
}
#[test]
fn extract_value_null() {
let line = b"null,next";
let (val, end) = extract_value(line, 0).unwrap();
assert_eq!(val, b"null");
assert_eq!(end, 4);
}
#[test]
fn extract_value_object() {
let line = br#"{"a":1,"b":"x"},next"#;
let (val, end) = extract_value(line, 0).unwrap();
assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
assert_eq!(end, 15);
}
#[test]
fn extract_value_array() {
let line = b"[1,2,3],next";
let (val, end) = extract_value(line, 0).unwrap();
assert_eq!(val, b"[1,2,3]");
assert_eq!(end, 7);
}
#[test]
fn extract_value_string_with_escapes() {
let line = br#""he\"llo",next"#;
let (val, end) = extract_value(line, 0).unwrap();
assert_eq!(val, br#""he\"llo""#.to_vec());
assert_eq!(end, 9);
}
#[test]
fn parse_line_simple() {
let line = br#"{"a":1,"b":"x"}"#;
let (parts, values) = parse_line(line).unwrap();
assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
assert_eq!(values[0], b"1");
assert_eq!(values[1], b"\"x\"");
assert_eq!(parts[0], br#"{"a":"#.to_vec());
assert_eq!(parts[1], br#","b":"#.to_vec());
assert_eq!(parts[2], b"}");
}
#[test]
fn roundtrip_simple() {
let data = br#"{"a":1,"b":"x"}
{"a":2,"b":"y"}
{"a":3,"b":"z"}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_no_trailing_newline() {
let data = br#"{"a":1,"b":"x"}
{"a":2,"b":"y"}
{"a":3,"b":"z"}"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_nested_values() {
let data = br#"{"id":1,"meta":{"x":10,"y":20}}
{"id":2,"meta":{"x":30,"y":40}}
{"id":3,"meta":{"x":50,"y":60}}
{"id":4,"meta":{"x":70,"y":80}}
{"id":5,"meta":{"x":90,"y":100}}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_mixed_types() {
let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn schema_mismatch_too_few_returns_none() {
let data = br#"{"a":1,"b":2}
{"a":1,"c":3}
"#;
assert!(preprocess(data).is_none());
}
#[test]
fn different_num_keys_too_few_returns_none() {
let data = br#"{"a":1,"b":2}
{"a":1}
"#;
assert!(preprocess(data).is_none());
}
#[test]
fn single_line_returns_none() {
let data = br#"{"a":1,"b":2}
"#;
assert!(preprocess(data).is_none());
}
#[test]
fn empty_returns_none() {
assert!(preprocess(b"").is_none());
}
#[test]
fn column_layout_groups_similar_values() {
let data = br#"{"type":"page_view","user":"alice"}
{"type":"api_call","user":"alice"}
{"type":"click","user":"bob"}
"#;
let result = preprocess(data).unwrap();
let col_data = &result.data;
let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
assert_eq!(cols.len(), 2);
let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
assert_eq!(type_vals.len(), 3);
assert_eq!(type_vals[0], br#""page_view""#);
assert_eq!(type_vals[1], br#""api_call""#);
assert_eq!(type_vals[2], br#""click""#);
let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
assert_eq!(user_vals.len(), 3);
assert_eq!(user_vals[0], br#""alice""#);
assert_eq!(user_vals[1], br#""alice""#);
assert_eq!(user_vals[2], br#""bob""#);
}
#[test]
fn roundtrip_string_with_escaped_chars() {
let data = br#"{"msg":"he said \"hi\"","val":1}
{"msg":"line\nbreak","val":2}
{"msg":"tab\there","val":3}
{"msg":"back\\slash","val":4}
{"msg":"normal text","val":5}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_negative_and_float_numbers() {
let data = br#"{"x":-3.14,"y":0}
{"x":2.718,"y":-1}
{"x":0.001,"y":999}
{"x":-100,"y":-200}
{"x":42.0,"y":7}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn reverse_roundtrip_small_data() {
let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
assert_eq!(vals.len(), 2);
assert_eq!(parts.len(), 3);
let big_data = br#"{"x":-3.14,"y":0}
{"x":2.718,"y":-1}
"#
.repeat(20);
let result = preprocess(&big_data).expect("should produce transform with 40 rows");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, big_data);
}
#[test]
fn grouped_roundtrip_two_schemas() {
let mut data = Vec::new();
for i in 0..10 {
data.extend_from_slice(
format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
);
data.push(b'\n');
}
for i in 10..20 {
data.extend_from_slice(
format!(
r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
i, i, i
)
.as_bytes(),
);
data.push(b'\n');
}
let result = preprocess(&data).expect("should produce grouped transform");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data);
}
#[test]
fn grouped_roundtrip_interleaved_schemas() {
let mut data = Vec::new();
for i in 0..20 {
if i % 2 == 0 {
data.extend_from_slice(
format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
);
} else {
data.extend_from_slice(
format!(
r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
i, i, i
)
.as_bytes(),
);
}
data.push(b'\n');
}
let result = preprocess(&data).expect("should produce grouped transform");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data);
}
#[test]
fn grouped_roundtrip_with_residuals() {
let mut data = Vec::new();
for i in 0..8 {
data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
data.push(b'\n');
}
data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
data.push(b'\n');
data.extend_from_slice(br#"{"p":"q"}"#);
data.push(b'\n');
for i in 0..6 {
data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
data.push(b'\n');
}
let result = preprocess(&data).expect("should produce grouped transform");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(&data),
);
assert_eq!(restored, data);
}
#[test]
fn grouped_roundtrip_no_trailing_newline() {
let mut data = Vec::new();
for i in 0..6 {
data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
data.push(b'\n');
}
for i in 0..6 {
data.extend_from_slice(
format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
);
if i < 5 {
data.push(b'\n');
}
}
let result = preprocess(&data).expect("should produce grouped transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data);
}
#[test]
fn uniform_still_preferred_over_grouped() {
let data = br#"{"a":1,"b":"x"}
{"a":2,"b":"y"}
{"a":3,"b":"z"}
{"a":4,"b":"w"}
{"a":5,"b":"v"}
"#;
let result = preprocess(data).expect("should produce transform");
assert_eq!(
result.metadata[0], METADATA_VERSION_UNIFORM,
"uniform schema should use Strategy 1"
);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn grouped_gharchive_simulation() {
let mut data = Vec::new();
for i in 0..50 {
if i % 5 == 0 {
data.extend_from_slice(
format!(
r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
i, i, i, i
)
.as_bytes(),
);
} else {
data.extend_from_slice(
format!(
r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
i, i, i
)
.as_bytes(),
);
}
data.push(b'\n');
}
let result = preprocess(&data).expect("should produce grouped transform");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data);
}
#[test]
fn grouped_nested_flatten_per_group() {
let mut data = Vec::new();
for i in 0..30 {
if i % 3 == 0 {
data.extend_from_slice(
format!(
r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"b","extra":"yes"}}"#,
i,
i * 10,
i * 20
)
.as_bytes(),
);
} else {
data.extend_from_slice(
format!(
r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"a"}}"#,
i,
i * 10,
i * 20
)
.as_bytes(),
);
}
data.push(b'\n');
}
let result = preprocess(&data).expect("should produce grouped transform");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "grouped nested flatten roundtrip failed");
}
#[test]
fn grouped_discriminator_two_pass() {
let mut data = Vec::new();
for i in 0..60 {
let etype = if i % 2 == 0 { "push" } else { "create" };
data.extend_from_slice(
format!(
r#"{{"id":{},"type":"{}","payload":{{"ref":"r{}","size":{}}}}}"#,
i,
etype,
i,
i * 10
)
.as_bytes(),
);
data.push(b'\n');
}
let result = preprocess(&data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "discriminator two-pass roundtrip failed");
}
#[test]
fn test_nested_decomposition_basic() {
let data = br#"{"id":1,"meta":{"x":10,"y":20}}
{"id":2,"meta":{"x":30,"y":40}}
{"id":3,"meta":{"x":50,"y":60}}
"#;
let result = preprocess(data).expect("should produce transform");
assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
assert_eq!(
cols.len(),
3,
"should have 3 columns after flattening: got {}",
cols.len()
);
let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
}
#[test]
fn test_nested_roundtrip() {
let data = br#"{"id":1,"meta":{"x":10,"y":20}}
{"id":2,"meta":{"x":30,"y":40}}
{"id":3,"meta":{"x":50,"y":60}}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_nested_mixed_schemas() {
let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
{"ts":"c","meta":{"query":"pricing","results_count":25}}
{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
{"ts":"e","meta":{"query":"api docs","results_count":41}}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_nested_no_nested_objects() {
let data = br#"{"a":1,"b":"x"}
{"a":2,"b":"y"}
{"a":3,"b":"z"}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
let meta = &result.metadata;
let last_byte = meta[meta.len() - 1];
assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
}
#[test]
fn test_nested_real_corpus() {
let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_nested_roundtrip_with_null_values() {
let data = br#"{"id":1,"meta":{"x":10}}
{"id":2,"meta":null}
{"id":3,"meta":{"x":30}}
{"id":4,"meta":null}
{"id":5,"meta":{"x":50}}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_nested_string_values_preserved_exact() {
let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
{"id":2,"meta":{"name":"Bob","score":200}}
{"id":3,"meta":{"name":"Charlie","score":300}}
"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_parse_nested_object_kv() {
let obj = br#"{"query":"benchmark","results_count":14}"#;
let pairs = parse_nested_object_kv(obj).unwrap();
assert_eq!(pairs.len(), 2);
assert_eq!(pairs[0].0, b"query");
assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
assert_eq!(pairs[1].0, b"results_count");
assert_eq!(pairs[1].1, b"14");
}
#[test]
fn test_nested_varying_subkeys_roundtrip() {
let mut lines = Vec::new();
for i in 0..50 {
let line = if i % 2 == 0 {
format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
} else {
format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
};
lines.push(line);
}
let ndjson = lines.join("\n") + "\n";
let data = ndjson.as_bytes();
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
std::str::from_utf8(&restored).unwrap(),
std::str::from_utf8(data).unwrap(),
"varying sub-keys roundtrip must be byte-exact"
);
}
#[test]
fn test_nested_explicit_null_preserved() {
let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
{\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
{\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
std::str::from_utf8(&restored).unwrap(),
std::str::from_utf8(data).unwrap(),
"explicit null values must be preserved"
);
}
#[test]
fn null_heavy_30_rows_roundtrip() {
let mut data = Vec::new();
for i in 0..30 {
data.extend_from_slice(format!("{{\"id\":{},\"val\":null}}\n", i).as_bytes());
}
let result = preprocess(&data);
if let Some(result) = result {
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
restored,
data,
"null-heavy 30-row roundtrip failed.\nOriginal len={}, Restored len={}\nOrig first 200: {:?}\nRest first 200: {:?}",
data.len(),
restored.len(),
String::from_utf8_lossy(&data[..data.len().min(200)]),
String::from_utf8_lossy(&restored[..restored.len().min(200)])
);
}
}
#[test]
fn null_heavy_60_rows_roundtrip() {
let mut data = Vec::new();
for i in 0..60 {
let name = if i % 10 == 0 {
format!("\"user_{}\"", i)
} else {
"null".to_string()
};
data.extend_from_slice(
format!("{{\"id\":{},\"name\":{},\"email\":null,\"score\":null,\"active\":null,\"tags\":null}}\n", i, name).as_bytes(),
);
}
let result = preprocess(&data);
if let Some(result) = result {
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "null-heavy 60-row ndjson roundtrip failed");
}
}
#[test]
fn selective_columnar_roundtrip() {
let mut data = Vec::new();
for i in 0..50 {
let event_type = match i % 3 {
0 => "push",
1 => "pull_request",
_ => "create",
};
let payload = format!(
"{{\"commits\":[{{\"sha\":\"abc{:04}def\",\"message\":\"commit message number {} with extra text to make it long enough for selective columnar threshold of 128 bytes average value length\"}}]}}",
i, i
);
data.extend_from_slice(
format!(
"{{\"id\":{},\"type\":\"{}\",\"payload\":{}}}\n",
i, event_type, payload
)
.as_bytes(),
);
}
let result = preprocess(&data).expect("should preprocess");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "selective columnar roundtrip failed");
}
#[test]
fn selective_columnar_uses_version3() {
let mut data = Vec::new();
for i in 0..50 {
let payload = format!(
"{{\"data\":\"unique_payload_{:04}\",\"extra\":\"padding_text_to_make_this_value_long_enough_to_exceed_the_128_byte_threshold_for_selective_columnar_detection_{:04}\"}}",
i,
i * 7
);
data.extend_from_slice(
format!("{{\"type\":\"event\",\"payload\":{}}}\n", payload).as_bytes(),
);
}
let result = preprocess(&data).expect("should preprocess");
assert_eq!(
result.metadata[0], METADATA_VERSION_SELECTIVE,
"expected selective columnar (version=3), got version={}",
result.metadata[0]
);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "selective columnar v3 roundtrip failed");
}
#[test]
fn selective_grouped_roundtrip() {
let mut data = Vec::new();
for i in 0..30 {
let payload = format!(
"{{\"sha\":\"hash_{:04}\",\"msg\":\"unique commit message number {} that is quite long and needs to exceed the 128 byte threshold for selective columnar to activate on this column\"}}",
i, i
);
data.extend_from_slice(
format!(
"{{\"id\":{},\"type\":\"push\",\"payload\":{}}}\n",
i, payload
)
.as_bytes(),
);
}
for i in 0..20 {
let body = format!(
"{{\"title\":\"PR title {}\",\"body\":\"This is a long unique pull request body number {} with details and extra text to exceed the 128 byte threshold for the selective columnar transform\"}}",
i, i
);
data.extend_from_slice(
format!(
"{{\"id\":{},\"type\":\"pr\",\"payload\":{},\"org\":\"myorg\"}}\n",
100 + i,
body
)
.as_bytes(),
);
}
let result = preprocess(&data).expect("should preprocess");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "selective grouped roundtrip failed");
}
#[test]
fn selective_all_low_cardinality_stays_uniform() {
let mut data = Vec::new();
for i in 0..50 {
let status = match i % 2 {
0 => "active",
_ => "inactive",
};
data.extend_from_slice(
format!("{{\"type\":\"event\",\"status\":\"{}\"}}\n", status).as_bytes(),
);
}
let result = preprocess(&data).expect("should preprocess");
assert_eq!(
result.metadata[0], METADATA_VERSION_UNIFORM,
"low-cardinality data should use uniform (version=1)"
);
}
#[test]
fn selective_columnar_single_inline_column() {
let mut data = Vec::new();
for i in 0..30 {
let unique_msg = format!(
"A unique message for row {} with enough text to exceed the 128 byte threshold for selective columnar detection, adding padding here to be safe: extra_{:04}",
i,
i * 13
);
data.extend_from_slice(
format!(
"{{\"type\":\"log\",\"level\":\"info\",\"msg\":\"{}\"}}\n",
unique_msg
)
.as_bytes(),
);
}
let result = preprocess(&data).expect("should preprocess");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "single-inline-column roundtrip failed");
}
}