use super::ndjson;
use super::transform::TransformResult;
use std::collections::HashMap;
const COL_SEP: u8 = 0x00;
const VAL_SEP: u8 = 0x01;
const METADATA_VERSION_UNIFORM: u8 = 1;
const METADATA_VERSION_GROUPED: u8 = 2;
const MIN_ROWS: usize = 5;
const ABSENT_KEY: &[u8] = b"\x02";
const MIN_GROUP_ELEMENTS: usize = 3;
type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
fn find_object_array(data: &[u8]) -> Option<ArraySpan> {
let mut best: Option<ArraySpan> = None;
let mut pos = 0;
while pos < data.len() {
match data[pos] {
b'"' => {
pos = skip_string(data, pos)?;
}
b'[' => {
let bracket_pos = pos;
pos += 1;
while pos < data.len() && data[pos].is_ascii_whitespace() {
pos += 1;
}
if pos < data.len() && data[pos] == b'{' {
if let Some(span) = try_parse_array(data, bracket_pos) {
if span.num_elements >= MIN_ROWS
&& best
.as_ref()
.is_none_or(|b| span.num_elements > b.num_elements)
{
best = Some(span);
}
}
}
}
_ => {
pos += 1;
}
}
}
best
}
struct ArraySpan {
elements: Vec<(usize, usize)>,
separators: Vec<Vec<u8>>,
num_elements: usize,
}
fn try_parse_array(data: &[u8], bracket_pos: usize) -> Option<ArraySpan> {
let mut pos = bracket_pos + 1;
while pos < data.len() && data[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= data.len() || data[pos] != b'{' {
return None;
}
let mut elements = Vec::new();
let mut separators = Vec::new();
loop {
if pos >= data.len() || data[pos] != b'{' {
break;
}
let elem_start = pos;
pos = skip_object(data, pos)?;
let elem_end = pos;
elements.push((elem_start, elem_end));
let sep_start = pos;
while pos < data.len() && data[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= data.len() {
return None;
}
if data[pos] == b',' {
pos += 1; while pos < data.len() && data[pos].is_ascii_whitespace() {
pos += 1;
}
separators.push(data[sep_start..pos].to_vec());
} else if data[pos] == b']' {
return Some(ArraySpan {
num_elements: elements.len(),
elements,
separators,
});
} else {
return None; }
}
None
}
fn skip_string(data: &[u8], pos: usize) -> Option<usize> {
if pos >= data.len() || data[pos] != b'"' {
return None;
}
let mut i = pos + 1;
let mut escaped = false;
while i < data.len() {
if escaped {
escaped = false;
} else if data[i] == b'\\' {
escaped = true;
} else if data[i] == b'"' {
return Some(i + 1);
}
i += 1;
}
None }
fn skip_object(data: &[u8], pos: usize) -> Option<usize> {
if pos >= data.len() || data[pos] != b'{' {
return None;
}
let mut i = pos + 1;
let mut depth = 1;
while i < data.len() && depth > 0 {
match data[i] {
b'"' => {
i = skip_string(data, i)?;
continue;
}
b'{' => depth += 1,
b'}' => depth -= 1,
b'[' => {
i = skip_array(data, i)?;
continue;
}
_ => {}
}
i += 1;
}
Some(i)
}
fn skip_array(data: &[u8], pos: usize) -> Option<usize> {
if pos >= data.len() || data[pos] != b'[' {
return None;
}
let mut i = pos + 1;
let mut depth = 1;
while i < data.len() && depth > 0 {
match data[i] {
b'"' => {
i = skip_string(data, i)?;
continue;
}
b'[' => depth += 1,
b']' => depth -= 1,
b'{' => {
i = skip_object(data, i)?;
continue;
}
_ => {}
}
i += 1;
}
Some(i)
}
type ParsedElement = (Vec<Vec<u8>>, Vec<Vec<u8>>);
fn parse_element(element: &[u8]) -> Option<ParsedElement> {
let mut pos = 0;
while pos < element.len() && element[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= element.len() || element[pos] != b'{' {
return None;
}
let mut parts: Vec<Vec<u8>> = Vec::new();
let mut values: Vec<Vec<u8>> = Vec::new();
let mut part_start = 0;
pos += 1;
loop {
while pos < element.len() && element[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= element.len() {
return None;
}
if element[pos] == b'}' {
parts.push(element[part_start..].to_vec());
break;
}
if element[pos] != b'"' {
return None;
}
pos = skip_string(element, pos)?;
while pos < element.len() && element[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= element.len() || element[pos] != b':' {
return None;
}
pos += 1;
while pos < element.len() && element[pos].is_ascii_whitespace() {
pos += 1;
}
parts.push(element[part_start..pos].to_vec());
let (value, value_end) = extract_value_no_ws(element, pos)?;
values.push(value);
pos = value_end;
part_start = pos;
while pos < element.len() && element[pos].is_ascii_whitespace() {
pos += 1;
}
if pos >= element.len() {
return None;
}
if element[pos] == b',' {
pos += 1;
} else if element[pos] == b'}' {
parts.push(element[part_start..].to_vec());
break;
} else {
return None;
}
}
if values.is_empty() {
return None;
}
Some((parts, values))
}
fn extract_value_no_ws(data: &[u8], pos: usize) -> Option<(Vec<u8>, usize)> {
if pos >= data.len() {
return None;
}
let start = pos;
match data[pos] {
b'"' => {
let end = skip_string(data, pos)?;
Some((data[start..end].to_vec(), end))
}
b'{' => {
let end = skip_object(data, pos)?;
Some((data[start..end].to_vec(), end))
}
b'[' => {
let end = skip_array(data, pos)?;
Some((data[start..end].to_vec(), end))
}
_ => {
let mut p = pos;
while p < data.len() {
match data[p] {
b',' | b'}' | b']' => break,
_ if data[p].is_ascii_whitespace() => break,
_ => p += 1,
}
}
if p == start {
None
} else {
Some((data[start..p].to_vec(), p))
}
}
}
}
pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
if data.is_empty() {
return None;
}
let span = find_object_array(data)?;
if let Some(result) = preprocess_uniform(data, &span) {
return Some(result);
}
preprocess_grouped(data, &span)
}
fn build_uniform_columnar(
template_parts: &[Vec<u8>],
columns: &[Vec<Vec<u8>>],
num_rows: usize,
) -> (Vec<u8>, Vec<u8>) {
let num_cols = columns.len();
let mut col_data = Vec::new();
for (ci, col) in columns.iter().enumerate() {
for (ri, val) in col.iter().enumerate() {
col_data.extend_from_slice(val);
if ri < num_rows - 1 {
col_data.push(VAL_SEP);
}
}
if ci < num_cols - 1 {
col_data.push(COL_SEP);
}
}
let mut metadata = Vec::new();
metadata.push(METADATA_VERSION_UNIFORM);
metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
for part in template_parts {
metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
metadata.extend_from_slice(part);
}
(col_data, metadata)
}
fn preprocess_uniform(data: &[u8], span: &ArraySpan) -> Option<TransformResult> {
let first_elem = &data[span.elements[0].0..span.elements[0].1];
let (template_parts, first_values) = parse_element(first_elem)?;
let num_cols = first_values.len();
if template_parts.len() != num_cols + 1 {
return None;
}
let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
for v in &first_values {
columns.push(vec![v.clone()]);
}
for &(elem_start, elem_end) in &span.elements[1..] {
let elem = &data[elem_start..elem_end];
let (parts, values) = parse_element(elem)?;
if values.len() != num_cols {
return None; }
if parts.len() != template_parts.len() {
return None;
}
for (a, b) in parts.iter().zip(template_parts.iter()) {
if a != b {
return None; }
}
for (col, val) in values.iter().enumerate() {
columns[col].push(val.clone());
}
}
let num_rows = span.elements.len();
let num_cols = first_values.len();
let mut col_data = Vec::with_capacity(data.len());
for (ci, col) in columns.iter().enumerate() {
for (ri, val) in col.iter().enumerate() {
col_data.extend_from_slice(val);
if ri < num_rows - 1 {
col_data.push(VAL_SEP);
}
}
if ci < num_cols - 1 {
col_data.push(COL_SEP);
}
}
let prefix = &data[..span.elements[0].0];
let suffix_start = span.elements[num_rows - 1].1;
let suffix = &data[suffix_start..];
let mut metadata = Vec::new();
metadata.push(METADATA_VERSION_UNIFORM);
metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
metadata.extend_from_slice(&(prefix.len() as u32).to_le_bytes());
metadata.extend_from_slice(prefix);
metadata.extend_from_slice(&(suffix.len() as u32).to_le_bytes());
metadata.extend_from_slice(suffix);
metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
for part in &template_parts {
metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
metadata.extend_from_slice(part);
}
let all_same = span.separators.windows(2).all(|w| w[0] == w[1]);
if all_same && !span.separators.is_empty() {
metadata.push(1); let sep = &span.separators[0];
metadata.extend_from_slice(&(sep.len() as u16).to_le_bytes());
metadata.extend_from_slice(sep);
} else {
metadata.push(0); for sep in &span.separators {
metadata.extend_from_slice(&(sep.len() as u16).to_le_bytes());
metadata.extend_from_slice(sep);
}
}
if col_data.len() + metadata.len() >= data.len() {
return None;
}
Some(TransformResult {
data: col_data,
metadata,
})
}
fn flatten_group_nested(
col_data: &[u8],
num_rows: usize,
) -> Option<(Vec<u8>, Vec<ndjson::NestedGroupInfo>)> {
let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
if columns.is_empty() || num_rows == 0 {
return None;
}
let mut nested_groups: Vec<ndjson::NestedGroupInfo> = Vec::new();
let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
for (col_idx, &col_chunk) in columns.iter().enumerate() {
let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
if values.len() != num_rows {
return None;
}
let mut all_objects = true;
let mut has_non_null = false;
for val in &values {
if *val == b"null" {
continue;
}
has_non_null = true;
if !val.starts_with(b"{") {
all_objects = false;
break;
}
}
if !all_objects || !has_non_null {
let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
output_columns.push(col_values);
continue;
}
let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
let mut nested_template: Vec<Vec<u8>> = Vec::new();
type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
let mut parse_failed = false;
for val in &values {
if *val == b"null" {
parsed_rows.push(None);
continue;
}
if nested_template.is_empty() {
match ndjson::parse_nested_object_with_template(val) {
Some((template, kv_pairs)) => {
for (key, _) in &kv_pairs {
if !all_sub_keys.iter().any(|k| k == key) {
all_sub_keys.push(key.clone());
}
}
nested_template = template;
parsed_rows.push(Some(kv_pairs));
}
None => {
parse_failed = true;
break;
}
}
} else {
match ndjson::parse_nested_object_kv(val) {
Some(kv_pairs) => {
for (key, _) in &kv_pairs {
if !all_sub_keys.iter().any(|k| k == key) {
all_sub_keys.push(key.clone());
}
}
parsed_rows.push(Some(kv_pairs));
}
None => {
parse_failed = true;
break;
}
}
}
}
if parse_failed || all_sub_keys.is_empty() {
let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
output_columns.push(col_values);
continue;
}
let num_sub_keys = all_sub_keys.len();
let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
for parsed in &parsed_rows {
match parsed {
Some(kv_pairs) => {
for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
let found = kv_pairs.iter().find(|(k, _)| k == sk);
match found {
Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
None => sub_columns[sk_idx].push(ABSENT_KEY.to_vec()),
}
}
}
None => {
for sc in sub_columns.iter_mut() {
sc.push(b"null".to_vec());
}
}
}
}
nested_groups.push(ndjson::NestedGroupInfo {
original_col_index: col_idx as u16,
sub_keys: all_sub_keys,
nested_template,
absence_bitmap: Vec::new(),
});
for sc in sub_columns {
output_columns.push(sc);
}
}
if nested_groups.is_empty() {
return None;
}
let num_out_cols = output_columns.len();
let mut out = Vec::new();
for (ci, col) in output_columns.iter().enumerate() {
for (ri, val) in col.iter().enumerate() {
out.extend_from_slice(val);
if ri < num_rows - 1 {
out.push(VAL_SEP);
}
}
if ci < num_out_cols - 1 {
out.push(COL_SEP);
}
}
Some((out, nested_groups))
}
fn unflatten_group_nested(
flat_data: &[u8],
nested_groups: &[ndjson::NestedGroupInfo],
num_rows: usize,
total_flat_cols: usize,
) -> Vec<u8> {
let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
if flat_columns.len() != total_flat_cols {
return flat_data.to_vec();
}
let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
for chunk in &flat_columns {
let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
if vals.len() != num_rows {
return flat_data.to_vec();
}
flat_col_values.push(vals);
}
let original_num_cols = total_flat_cols
- nested_groups
.iter()
.map(|g| g.sub_keys.len())
.sum::<usize>()
+ nested_groups.len();
let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
for (gi, group) in nested_groups.iter().enumerate() {
if (group.original_col_index as usize) < original_num_cols {
original_col_map[group.original_col_index as usize] = Some(gi);
}
}
let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
let mut flat_idx = 0;
for entry in original_col_map.iter().take(original_num_cols) {
if let Some(gi) = entry {
let group = &nested_groups[*gi];
let num_sub = group.sub_keys.len();
let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
for row in 0..num_rows {
let all_null = (0..num_sub).all(|si| {
flat_idx + si < flat_col_values.len()
&& flat_col_values[flat_idx + si][row] == b"null"
});
if all_null {
merged_col.push(b"null".to_vec());
} else if !group.nested_template.is_empty()
&& group.nested_template.len() == num_sub + 1
{
let has_absent = (0..num_sub).any(|si| {
flat_idx + si < flat_col_values.len()
&& flat_col_values[flat_idx + si][row] == ABSENT_KEY
});
if !has_absent {
let mut obj = Vec::new();
obj.extend_from_slice(&group.nested_template[0]);
if flat_idx < flat_col_values.len() {
obj.extend_from_slice(flat_col_values[flat_idx][row]);
}
for si in 1..num_sub {
obj.extend_from_slice(&group.nested_template[si]);
if flat_idx + si < flat_col_values.len() {
obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
}
}
obj.extend_from_slice(&group.nested_template[num_sub]);
merged_col.push(obj);
} else {
let mut obj = Vec::new();
let mut first_written = false;
for si in 0..num_sub {
if flat_idx + si >= flat_col_values.len() {
break;
}
let val = flat_col_values[flat_idx + si][row];
if val == ABSENT_KEY {
continue;
}
if !first_written {
if si == 0 {
obj.extend_from_slice(&group.nested_template[0]);
} else {
let t0 = &group.nested_template[0];
let ti = &group.nested_template[si];
if let Some(brace_pos) = t0.iter().position(|&b| b == b'{') {
obj.extend_from_slice(&t0[..brace_pos + 1]);
if let Some(comma_pos) = ti.iter().position(|&b| b == b',')
{
obj.extend_from_slice(&ti[comma_pos + 1..]);
} else {
obj.extend_from_slice(ti);
}
} else {
obj.extend_from_slice(ti);
}
}
first_written = true;
} else {
obj.extend_from_slice(&group.nested_template[si]);
}
obj.extend_from_slice(val);
}
obj.extend_from_slice(&group.nested_template[num_sub]);
merged_col.push(obj);
}
} else {
let mut obj = Vec::new();
obj.push(b'{');
let mut first = true;
for si in 0..num_sub {
if flat_idx + si >= flat_col_values.len() {
break;
}
let val = flat_col_values[flat_idx + si][row];
if val == b"null" || val == ABSENT_KEY {
continue;
}
if !first {
obj.push(b',');
}
first = false;
obj.push(b'"');
obj.extend_from_slice(&group.sub_keys[si]);
obj.push(b'"');
obj.push(b':');
obj.extend_from_slice(val);
}
obj.push(b'}');
merged_col.push(obj);
}
}
output_columns.push(merged_col);
flat_idx += num_sub;
} else {
if flat_idx < flat_col_values.len() {
let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
.iter()
.map(|v| v.to_vec())
.collect();
output_columns.push(col);
}
flat_idx += 1;
}
}
let num_out_cols = output_columns.len();
let mut out = Vec::new();
for (ci, col) in output_columns.iter().enumerate() {
for (ri, val) in col.iter().enumerate() {
out.extend_from_slice(val);
if ri < num_rows - 1 {
out.push(VAL_SEP);
}
}
if ci < num_out_cols - 1 {
out.push(COL_SEP);
}
}
out
}
fn preprocess_grouped(data: &[u8], span: &ArraySpan) -> Option<TransformResult> {
let num_elements = span.elements.len();
if num_elements < MIN_ROWS {
return None;
}
let mut parsed: Vec<Option<ParsedElement>> = Vec::with_capacity(num_elements);
for &(start, end) in &span.elements {
parsed.push(parse_element(&data[start..end]));
}
let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
let mut residual_indices: Vec<usize> = Vec::new();
for (idx, parsed_elem) in parsed.into_iter().enumerate() {
if let Some((parts, values)) = parsed_elem {
let mut key = Vec::new();
for part in &parts {
key.extend_from_slice(&(part.len() as u32).to_le_bytes());
key.extend_from_slice(part);
}
group_map
.entry(key)
.or_insert_with(|| (parts, Vec::new()))
.1
.push((idx, values));
} else {
residual_indices.push(idx);
}
}
let mut groups: Vec<SchemaGroup> = Vec::new();
for (_key, (template_parts, rows)) in group_map {
if rows.len() >= MIN_GROUP_ELEMENTS {
groups.push((template_parts, rows));
} else {
for (idx, _) in &rows {
residual_indices.push(*idx);
}
}
}
if groups.is_empty() {
return None;
}
groups.sort_by_key(|(_, rows)| rows[0].0);
residual_indices.sort_unstable();
struct GroupOutput {
element_indices: Vec<u32>,
col_data: Vec<u8>,
group_metadata: Vec<u8>,
nested_meta: Option<Vec<u8>>,
}
let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
for (template_parts, rows) in &groups {
let num_cols = template_parts.len() - 1;
let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
let mut element_indices: Vec<u32> = Vec::with_capacity(rows.len());
for (idx, values) in rows {
element_indices.push(*idx as u32);
for (col, val) in values.iter().enumerate() {
columns[col].push(val.clone());
}
}
let (col_data, group_metadata) =
build_uniform_columnar(template_parts, &columns, rows.len());
let num_rows = rows.len();
let (final_col_data, nested_meta) =
if let Some((flattened, nested_groups)) = flatten_group_nested(&col_data, num_rows) {
if flattened.len() < col_data.len() {
let total_flat_cols = flattened.split(|&b| b == COL_SEP).count() as u16;
let unflattened = unflatten_group_nested(
&flattened,
&nested_groups,
num_rows,
total_flat_cols as usize,
);
if unflattened == col_data {
let serialized = ndjson::serialize_nested_info(&nested_groups);
let mut meta = Vec::new();
meta.extend_from_slice(&(num_rows as u32).to_le_bytes());
meta.extend_from_slice(&total_flat_cols.to_le_bytes());
meta.extend_from_slice(&serialized);
(flattened, Some(meta))
} else {
(col_data, None)
}
} else {
(col_data, None)
}
} else {
(col_data, None)
};
group_outputs.push(GroupOutput {
element_indices,
col_data: final_col_data,
group_metadata,
nested_meta,
});
}
let separator = if span.separators.is_empty() {
b",".to_vec()
} else {
span.separators[0].clone()
};
let mut residual_data = Vec::new();
for (i, &idx) in residual_indices.iter().enumerate() {
let (start, end) = span.elements[idx];
residual_data.extend_from_slice(&data[start..end]);
if i < residual_indices.len() - 1 {
residual_data.push(0x00); }
}
let prefix = &data[..span.elements[0].0];
let suffix_start = span.elements[num_elements - 1].1;
let suffix = &data[suffix_start..];
let mut metadata = Vec::new();
metadata.push(METADATA_VERSION_GROUPED);
metadata.extend_from_slice(&(prefix.len() as u32).to_le_bytes());
metadata.extend_from_slice(prefix);
metadata.extend_from_slice(&(suffix.len() as u32).to_le_bytes());
metadata.extend_from_slice(suffix);
metadata.extend_from_slice(&(separator.len() as u16).to_le_bytes());
metadata.extend_from_slice(&separator);
let all_same = span.separators.windows(2).all(|w| w[0] == w[1]);
if all_same && !span.separators.is_empty() && span.separators[0] == separator {
metadata.push(1); } else {
metadata.push(0); metadata.extend_from_slice(&(span.separators.len() as u32).to_le_bytes());
for sep in &span.separators {
metadata.extend_from_slice(&(sep.len() as u16).to_le_bytes());
metadata.extend_from_slice(sep);
}
}
metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
for group in &group_outputs {
metadata.extend_from_slice(&(group.element_indices.len() as u32).to_le_bytes());
for &idx in &group.element_indices {
metadata.extend_from_slice(&idx.to_le_bytes());
}
metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
metadata.extend_from_slice(&group.group_metadata);
metadata.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
metadata.extend_from_slice(&group.col_data);
}
metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
for &idx in &residual_indices {
metadata.extend_from_slice(&(idx as u32).to_le_bytes());
}
metadata.extend_from_slice(&(residual_data.len() as u32).to_le_bytes());
metadata.extend_from_slice(&residual_data);
let mut data_out = Vec::new();
for group in &group_outputs {
data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
data_out.extend_from_slice(&group.col_data);
}
data_out.extend_from_slice(&residual_data);
let mut metadata = Vec::new();
metadata.push(METADATA_VERSION_GROUPED);
metadata.extend_from_slice(&(prefix.len() as u32).to_le_bytes());
metadata.extend_from_slice(prefix);
metadata.extend_from_slice(&(suffix.len() as u32).to_le_bytes());
metadata.extend_from_slice(suffix);
metadata.extend_from_slice(&(separator.len() as u16).to_le_bytes());
metadata.extend_from_slice(&separator);
if all_same && !span.separators.is_empty() && span.separators[0] == separator {
metadata.push(1);
} else {
metadata.push(0);
metadata.extend_from_slice(&(span.separators.len() as u32).to_le_bytes());
for sep in &span.separators {
metadata.extend_from_slice(&(sep.len() as u16).to_le_bytes());
metadata.extend_from_slice(sep);
}
}
metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
for group in &group_outputs {
metadata.extend_from_slice(&(group.element_indices.len() as u32).to_le_bytes());
for &idx in &group.element_indices {
metadata.extend_from_slice(&idx.to_le_bytes());
}
metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
metadata.extend_from_slice(&group.group_metadata);
if let Some(ref nested) = group.nested_meta {
metadata.push(1u8); metadata.extend_from_slice(&(nested.len() as u32).to_le_bytes());
metadata.extend_from_slice(nested);
} else {
metadata.push(0u8); }
}
metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
for &idx in &residual_indices {
metadata.extend_from_slice(&(idx as u32).to_le_bytes());
}
metadata.extend_from_slice(&(residual_data.len() as u32).to_le_bytes());
if data_out.len() + metadata.len() >= data.len() {
return None;
}
Some(TransformResult {
data: data_out,
metadata,
})
}
pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
if metadata.is_empty() {
return data.to_vec();
}
match metadata[0] {
METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
_ => data.to_vec(),
}
}
fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
if metadata.len() < 7 {
return data.to_vec();
}
let mut mpos = 1; let num_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let num_cols = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
if num_rows == 0 || num_cols == 0 {
return data.to_vec();
}
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let prefix_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + prefix_len > metadata.len() {
return data.to_vec();
}
let prefix = &metadata[mpos..mpos + prefix_len];
mpos += prefix_len;
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let suffix_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + suffix_len > metadata.len() {
return data.to_vec();
}
let suffix = &metadata[mpos..mpos + suffix_len];
mpos += suffix_len;
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let num_parts = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
for _ in 0..num_parts {
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let part_len = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
if mpos + part_len > metadata.len() {
return data.to_vec();
}
parts.push(metadata[mpos..mpos + part_len].to_vec());
mpos += part_len;
}
if parts.len() != num_cols + 1 {
return data.to_vec();
}
if mpos >= metadata.len() {
return data.to_vec();
}
let sep_uniform = metadata[mpos] != 0;
mpos += 1;
let separators: Vec<Vec<u8>> = if sep_uniform {
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let sep_len = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
if mpos + sep_len > metadata.len() {
return data.to_vec();
}
let sep = metadata[mpos..mpos + sep_len].to_vec();
vec![sep; num_rows.saturating_sub(1)]
} else {
let mut seps = Vec::with_capacity(num_rows.saturating_sub(1));
for _ in 0..num_rows.saturating_sub(1) {
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let sep_len = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
if mpos + sep_len > metadata.len() {
return data.to_vec();
}
seps.push(metadata[mpos..mpos + sep_len].to_vec());
mpos += sep_len;
}
seps
};
let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
if col_chunks.len() != num_cols {
return data.to_vec();
}
let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
for chunk in &col_chunks {
let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
if vals.len() != num_rows {
return data.to_vec();
}
columns.push(vals);
}
let mut output = Vec::with_capacity(data.len() * 2);
output.extend_from_slice(prefix);
for row in 0..num_rows {
output.extend_from_slice(&parts[0]);
output.extend_from_slice(columns[0][row]);
for col in 1..num_cols {
output.extend_from_slice(&parts[col]);
output.extend_from_slice(columns[col][row]);
}
output.extend_from_slice(&parts[num_cols]);
if row < num_rows - 1 && row < separators.len() {
output.extend_from_slice(&separators[row]);
}
}
output.extend_from_slice(suffix);
output
}
fn parse_group_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize)> {
if metadata.len() < 9 {
return None;
}
let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut parts = Vec::with_capacity(num_parts);
for _ in 0..num_parts {
if pos + 2 > metadata.len() {
return None;
}
let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + part_len > metadata.len() {
return None;
}
parts.push(metadata[pos..pos + part_len].to_vec());
pos += part_len;
}
if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
return None;
}
Some((parts, num_rows, num_cols))
}
fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
if metadata.len() < 2 {
return data.to_vec();
}
let mut mpos = 1;
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let prefix_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + prefix_len > metadata.len() {
return data.to_vec();
}
let prefix = metadata[mpos..mpos + prefix_len].to_vec();
mpos += prefix_len;
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let suffix_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + suffix_len > metadata.len() {
return data.to_vec();
}
let suffix = metadata[mpos..mpos + suffix_len].to_vec();
mpos += suffix_len;
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let sep_len = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
if mpos + sep_len > metadata.len() {
return data.to_vec();
}
let default_separator = metadata[mpos..mpos + sep_len].to_vec();
mpos += sep_len;
if mpos >= metadata.len() {
return data.to_vec();
}
let sep_flag = metadata[mpos];
mpos += 1;
let per_element_separators: Option<Vec<Vec<u8>>> = if sep_flag == 1 {
None
} else {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let sep_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let mut seps = Vec::with_capacity(sep_count);
for _ in 0..sep_count {
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let s_len = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
if mpos + s_len > metadata.len() {
return data.to_vec();
}
seps.push(metadata[mpos..mpos + s_len].to_vec());
mpos += s_len;
}
Some(seps)
};
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
let mut element_slots: Vec<(usize, Vec<u8>)> = Vec::new();
let mut dpos: usize = 0;
for _ in 0..num_groups {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let group_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let mut element_indices = Vec::with_capacity(group_count);
for _ in 0..group_count {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
element_indices.push(idx);
}
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + gm_len > metadata.len() {
return data.to_vec();
}
let group_metadata = &metadata[mpos..mpos + gm_len];
mpos += gm_len;
if mpos >= metadata.len() {
return data.to_vec();
}
let has_nested = metadata[mpos];
mpos += 1;
let nested_info: Option<(usize, u16, Vec<ndjson::NestedGroupInfo>)> = if has_nested == 1 {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let nested_meta_len =
u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + nested_meta_len > metadata.len() {
return data.to_vec();
}
let nested_meta_bytes = &metadata[mpos..mpos + nested_meta_len];
mpos += nested_meta_len;
if nested_meta_bytes.len() < 6 {
return data.to_vec();
}
let nested_num_rows =
u32::from_le_bytes(nested_meta_bytes[0..4].try_into().unwrap()) as usize;
let total_flat_cols = u16::from_le_bytes(nested_meta_bytes[4..6].try_into().unwrap());
match ndjson::deserialize_nested_info(&nested_meta_bytes[6..]) {
Some((groups, _)) => Some((nested_num_rows, total_flat_cols, groups)),
None => return data.to_vec(),
}
} else {
None
};
if dpos + 4 > data.len() {
return data.to_vec();
}
let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
dpos += 4;
if dpos + gd_len > data.len() {
return data.to_vec();
}
let group_data_raw = &data[dpos..dpos + gd_len];
dpos += gd_len;
let group_data_owned: Vec<u8>;
let group_data: &[u8] =
if let Some((nested_num_rows, total_flat_cols, ref nested_groups)) = nested_info {
group_data_owned = unflatten_group_nested(
group_data_raw,
nested_groups,
nested_num_rows,
total_flat_cols as usize,
);
&group_data_owned
} else {
group_data_raw
};
let (parts, num_rows, num_cols) = match parse_group_metadata(group_metadata) {
Some(v) => v,
None => return data.to_vec(),
};
if num_rows != group_count {
return data.to_vec();
}
let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
if col_chunks.len() != num_cols {
return data.to_vec();
}
let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
for chunk in &col_chunks {
let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
if vals.len() != num_rows {
return data.to_vec();
}
columns.push(vals);
}
for (row_within_group, &original_idx) in element_indices.iter().enumerate() {
let mut elem = Vec::new();
elem.extend_from_slice(&parts[0]);
elem.extend_from_slice(columns[0][row_within_group]);
for col in 1..num_cols {
elem.extend_from_slice(&parts[col]);
elem.extend_from_slice(columns[col][row_within_group]);
}
elem.extend_from_slice(&parts[num_cols]);
element_slots.push((original_idx, elem));
}
}
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let mut residual_indices = Vec::with_capacity(residual_count);
for _ in 0..residual_count {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
residual_indices.push(idx);
}
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let residual_data_len =
u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
let _ = mpos;
let residual_data = &data[dpos..dpos + residual_data_len.min(data.len() - dpos)];
if residual_count > 0 && !residual_data.is_empty() {
let residual_elements: Vec<&[u8]> = residual_data.split(|&b| b == 0x00).collect();
if residual_elements.len() != residual_count {
return data.to_vec();
}
for (i, &idx) in residual_indices.iter().enumerate() {
element_slots.push((idx, residual_elements[i].to_vec()));
}
}
element_slots.sort_by_key(|(idx, _)| *idx);
let total_elements = element_slots.len();
let separators: Vec<&[u8]> = if let Some(ref per_elem) = per_element_separators {
per_elem.iter().map(|s| s.as_slice()).collect()
} else {
vec![default_separator.as_slice(); total_elements.saturating_sub(1)]
};
let mut output = Vec::with_capacity(data.len() * 2);
output.extend_from_slice(&prefix);
for (i, (_idx, elem)) in element_slots.iter().enumerate() {
output.extend_from_slice(elem);
if i < total_elements - 1 && i < separators.len() {
output.extend_from_slice(separators[i]);
}
}
output.extend_from_slice(&suffix);
output
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn skip_string_basic() {
let data = br#""hello" rest"#;
assert_eq!(skip_string(data, 0), Some(7));
}
#[test]
fn skip_string_escaped() {
let data = br#""he\"llo" rest"#;
assert_eq!(skip_string(data, 0), Some(9));
}
#[test]
fn skip_object_basic() {
let data = br#"{"a": 1, "b": "x"} rest"#;
assert_eq!(skip_object(data, 0), Some(18));
}
#[test]
fn skip_object_nested() {
let data = br#"{"a": {"b": [1,2]}} rest"#;
assert_eq!(skip_object(data, 0), Some(19));
}
#[test]
fn find_array_basic() {
let data = br#"{"data": [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}]}"#;
let span = find_object_array(data);
assert!(span.is_some());
let span = span.unwrap();
assert_eq!(span.num_elements, 5);
}
#[test]
fn find_array_too_few() {
let data = br#"{"data": [{"id": 1}, {"id": 2}, {"id": 3}]}"#;
assert!(find_object_array(data).is_none());
}
#[test]
fn parse_element_simple() {
let elem = br#"{"id": 1, "name": "test"}"#;
let (parts, values) = parse_element(elem).unwrap();
assert_eq!(values.len(), 2);
assert_eq!(values[0], b"1");
assert_eq!(values[1], br#""test""#);
assert_eq!(parts.len(), 3);
}
#[test]
fn roundtrip_simple_array() {
let data = br#"{"items": [{"id": 1, "name": "alpha"}, {"id": 2, "name": "beta"}, {"id": 3, "name": "gamma"}, {"id": 4, "name": "delta"}, {"id": 5, "name": "epsilon"}]}"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_with_wrapper() {
let data = br#"{"data": [{"id": 1, "type": "repo"}, {"id": 2, "type": "repo"}, {"id": 3, "type": "repo"}, {"id": 4, "type": "repo"}, {"id": 5, "type": "repo"}], "meta": {"total": 5}}"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_pretty_printed() {
let data = br#"{
"data": [
{"id": 1, "type": "a", "val": 10},
{"id": 2, "type": "b", "val": 20},
{"id": 3, "type": "c", "val": 30},
{"id": 4, "type": "d", "val": 40},
{"id": 5, "type": "e", "val": 50}
],
"meta": {"count": 5}
}"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_nested_values() {
let data = br#"{"repos": [{"id": 1, "meta": {"stars": 10}}, {"id": 2, "meta": {"stars": 20}}, {"id": 3, "meta": {"stars": 30}}, {"id": 4, "meta": {"stars": 40}}, {"id": 5, "meta": {"stars": 50}}]}"#;
let result = preprocess(data).expect("should produce transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn too_few_elements_returns_none() {
let data = br#"{"data": [{"id": 1}, {"id": 2}, {"id": 3}]}"#;
assert!(preprocess(data).is_none());
}
#[test]
fn schema_mismatch_returns_none() {
let data = br#"{"data": [{"id": 1, "a": 1}, {"id": 2, "b": 2}, {"id": 3, "a": 3}, {"id": 4, "a": 4}, {"id": 5, "a": 5}]}"#;
assert!(preprocess(data).is_none());
}
#[test]
fn no_array_returns_none() {
let data = br#"{"key": "value", "num": 42}"#;
assert!(preprocess(data).is_none());
}
#[test]
fn empty_returns_none() {
assert!(preprocess(b"").is_none());
}
#[test]
fn column_layout_groups_values() {
let data = br#"{"items": [{"type": "a", "score": 10}, {"type": "b", "score": 20}, {"type": "c", "score": 30}, {"type": "d", "score": 40}, {"type": "e", "score": 50}]}"#;
let result = preprocess(data).unwrap();
let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
assert_eq!(cols.len(), 2);
let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
assert_eq!(type_vals.len(), 5);
assert_eq!(type_vals[0], br#""a""#);
assert_eq!(type_vals[4], br#""e""#);
let score_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
assert_eq!(score_vals.len(), 5);
assert_eq!(score_vals[0], b"10");
assert_eq!(score_vals[4], b"50");
}
#[test]
fn roundtrip_top_level_array() {
let data = br#"[{"id": 1, "name": "a"}, {"id": 2, "name": "b"}, {"id": 3, "name": "c"}, {"id": 4, "name": "d"}, {"id": 5, "name": "e"}]"#;
let result = preprocess(data).expect("should handle top-level arrays");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data.to_vec());
}
#[test]
fn roundtrip_larger_dataset() {
let mut json = String::from(r#"{"data": ["#);
for i in 0..20 {
if i > 0 {
json.push_str(", ");
}
json.push_str(&format!(
r#"{{"id": {}, "type": "item", "name": "item_{}", "score": {}, "active": {}}}"#,
i,
i,
i * 10 + 5,
if i % 2 == 0 { "true" } else { "false" }
));
}
json.push_str(r#"], "meta": {"total": 20, "page": 1}}"#);
let data = json.as_bytes();
let result = preprocess(data).expect("should transform 20-element array");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_grouped_json_array_diverse_schemas() {
let mut json = String::from(r#"{"data": ["#);
for i in 0..30 {
if i > 0 {
json.push_str(", ");
}
if i % 3 == 0 {
json.push_str(&format!(
r#"{{"id": {}, "tag": "tag_{}", "active": {}}}"#,
i,
i,
if i % 2 == 0 { "true" } else { "false" }
));
} else {
json.push_str(&format!(
r#"{{"id": {}, "name": "item_{}", "score": {}}}"#,
i,
i,
i * 10
));
}
}
json.push_str(r#"], "meta": {"count": 30}}"#);
let data = json.as_bytes();
let result =
preprocess(data).expect("should produce grouped transform for diverse schemas");
assert_eq!(
result.metadata[0], METADATA_VERSION_GROUPED,
"should use grouped strategy"
);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_grouped_json_array_roundtrip() {
let data = br#"{"statuses": [
{"id": 1, "text": "hello", "user": "alice"},
{"id": 2, "text": "world", "user": "bob"},
{"id": 3, "text": "foo", "user": "charlie"},
{"id": 4, "text": "bar", "retweet": true},
{"id": 5, "text": "baz", "retweet": false},
{"id": 6, "text": "qux", "retweet": true},
{"id": 7, "text": "quux", "user": "dave"},
{"id": 8, "text": "corge", "user": "eve"},
{"id": 9, "text": "grault", "user": "frank"},
{"id": 10, "text": "garply", "user": "grace"}
], "count": 10}"#;
let result = preprocess(data).expect("should produce grouped transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_grouped_json_array_with_residuals() {
let mut json = String::from(r#"{"items": ["#);
for i in 0..30 {
if i > 0 {
json.push_str(", ");
}
if i == 10 || i == 20 {
json.push_str(&format!(
r#"{{"id": {}, "special": true, "data": "unique_{}", "extra": {}}}"#,
i,
i,
i * 100
));
} else if i % 3 == 0 {
json.push_str(&format!(
r#"{{"id": {}, "category": "cat_{}", "weight": {}}}"#,
i,
i,
i as f64 * 1.5
));
} else {
json.push_str(&format!(
r#"{{"id": {}, "name": "item_{}", "value": {}}}"#,
i,
i,
i * 10
));
}
}
json.push_str(r#"]}"#);
let data = json.as_bytes();
let result = preprocess(data).expect("should produce grouped transform with residuals");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_twitter_json_roundtrip() {
let twitter_path = concat!(
env!("CARGO_MANIFEST_DIR"),
"/../../corpus/json-bench/twitter.json"
);
let data = match std::fs::read(twitter_path) {
Ok(d) => d,
Err(_) => {
eprintln!("Skipping test_twitter_json_roundtrip: twitter.json not found");
return;
}
};
let result =
preprocess(&data).expect("twitter.json should be transformable with grouped strategy");
assert_eq!(
result.metadata[0], METADATA_VERSION_GROUPED,
"twitter.json should use grouped strategy"
);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
restored.len(),
data.len(),
"roundtrip length mismatch: got {}, expected {}",
restored.len(),
data.len()
);
assert_eq!(restored, data, "twitter.json roundtrip is not byte-exact");
let compressed_size = result.data.len() + result.metadata.len();
assert!(
compressed_size < data.len(),
"grouped transform should be smaller: {} vs {}",
compressed_size,
data.len()
);
}
#[test]
fn test_grouped_with_nested_flatten() {
let mut json = String::from(r#"{"data": ["#);
for i in 0..30 {
if i > 0 {
json.push_str(", ");
}
if i % 3 == 0 {
json.push_str(&format!(
r#"{{"id": {}, "tag": "tag_{}", "active": {}}}"#,
i,
i,
if i % 2 == 0 { "true" } else { "false" }
));
} else {
json.push_str(&format!(
r#"{{"id": {}, "name": "user_{}", "user": {{"role": "admin", "level": {}, "verified": {}}}}}"#,
i, i, i % 5, if i % 2 == 0 { "true" } else { "false" }
));
}
}
json.push_str(r#"], "meta": {"count": 30}}"#);
let data = json.as_bytes();
let result =
preprocess(data).expect("should produce grouped transform with nested flatten");
assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
let restored = reverse(&result.data, &result.metadata);
assert_eq!(
String::from_utf8_lossy(&restored),
String::from_utf8_lossy(data),
);
assert_eq!(restored, data.to_vec());
}
#[test]
fn test_twitter_json_improved() {
let twitter_path = concat!(
env!("CARGO_MANIFEST_DIR"),
"/../../corpus/json-bench/twitter.json"
);
let data = match std::fs::read(twitter_path) {
Ok(d) => d,
Err(_) => {
eprintln!("Skipping test_twitter_json_improved: twitter.json not found");
return;
}
};
let result = preprocess(&data).expect("twitter.json should transform");
let restored = reverse(&result.data, &result.metadata);
assert_eq!(restored, data, "roundtrip must be byte-exact");
let pre_transform_size = result.data.len() + result.metadata.len();
let ratio = data.len() as f64 / pre_transform_size as f64;
eprintln!(
"twitter.json: original={} pre_transform={} ratio={:.2}x",
data.len(),
pre_transform_size,
ratio
);
assert!(
ratio > 1.0,
"pre-transform should be smaller than original: ratio={:.2}x",
ratio,
);
}
}