1pub mod json;
7pub mod json_array;
8pub mod ndjson;
9pub mod schema;
10pub mod transform;
11pub mod typed_encoding;
12pub mod value_dict;
13
14use std::borrow::Cow;
15
16use crate::dcx::{FormatHint, Mode};
17use transform::{
18 TRANSFORM_JSON_ARRAY_COLUMNAR, TRANSFORM_JSON_KEY_INTERN, TRANSFORM_NDJSON_COLUMNAR,
19 TRANSFORM_NESTED_FLATTEN, TRANSFORM_TYPED_ENCODING, TRANSFORM_VALUE_DICT, TransformChain,
20};
21
22pub fn detect_format(data: &[u8]) -> FormatHint {
24 if data.is_empty() {
25 return FormatHint::Generic;
26 }
27
28 let trimmed = trim_leading_whitespace(data);
29
30 if starts_with_byte(trimmed, b'{') || starts_with_byte(trimmed, b'[') {
31 if is_ndjson(data) {
32 return FormatHint::Ndjson;
33 }
34 return FormatHint::Json;
35 }
36
37 FormatHint::Generic
38}
39
40pub fn detect_from_extension(path: &str) -> Option<FormatHint> {
42 let ext = path.rsplit('.').next()?.to_lowercase();
43 match ext.as_str() {
44 "json" => Some(FormatHint::Json),
45 "ndjson" | "jsonl" => Some(FormatHint::Ndjson),
46 _ => None,
47 }
48}
49
50pub fn preprocess(data: &[u8], format: FormatHint, mode: Mode) -> (Vec<u8>, TransformChain) {
58 let mut chain = TransformChain::new();
59 let mut current: Cow<'_, [u8]> = Cow::Borrowed(data);
60
61 let mut columnar_applied = false;
64 let mut ndjson_transform_applied = false;
66
67 if format == FormatHint::Ndjson {
72 if let Some(result) = ndjson::preprocess(¤t) {
73 let is_uniform_columnar = !result.metadata.is_empty() && result.metadata[0] == 1;
74 chain.push(TRANSFORM_NDJSON_COLUMNAR, result.metadata);
75 current = Cow::Owned(result.data);
76 ndjson_transform_applied = true;
77 columnar_applied = is_uniform_columnar;
78 }
79 }
80
81 let mut json_array_applied = false;
86 if !columnar_applied && !ndjson_transform_applied && format == FormatHint::Json {
87 if let Some(result) = json_array::preprocess(¤t) {
88 let is_uniform = !result.metadata.is_empty() && result.metadata[0] == 1;
89 chain.push(TRANSFORM_JSON_ARRAY_COLUMNAR, result.metadata);
90 current = Cow::Owned(result.data);
91 json_array_applied = true;
92 columnar_applied = is_uniform;
93 }
94 }
95
96 if columnar_applied && !ndjson_transform_applied {
100 let ja_meta = &chain.records.last().unwrap().metadata;
102 if ja_meta.len() >= 5 {
103 let num_rows = u32::from_le_bytes(ja_meta[1..5].try_into().unwrap()) as usize;
104 if let Some((flat_data, nested_groups)) =
105 ndjson::flatten_nested_columns(¤t, num_rows)
106 {
107 let total_flat_cols = flat_data.split(|&b| b == 0x00).count() as u16;
109
110 let unflattened = ndjson::unflatten_nested_columns(
115 &flat_data,
116 &nested_groups,
117 num_rows,
118 total_flat_cols as usize,
119 );
120 if unflattened == current.as_ref() {
121 let mut nested_meta = Vec::new();
122 nested_meta.extend_from_slice(&(num_rows as u32).to_le_bytes());
123 nested_meta.extend_from_slice(&total_flat_cols.to_le_bytes());
124 nested_meta.extend_from_slice(&ndjson::serialize_nested_info(&nested_groups));
125 chain.push(TRANSFORM_NESTED_FLATTEN, nested_meta);
126 current = Cow::Owned(flat_data);
127 }
128 }
132 }
133 }
134
135 if columnar_applied && mode == Mode::Fast {
139 if let Some(result) = typed_encoding::preprocess(¤t) {
140 chain.push(TRANSFORM_TYPED_ENCODING, result.metadata);
141 current = Cow::Owned(result.data);
142 }
143 }
144
145 if columnar_applied {
153 if let Some(result) = value_dict::preprocess(¤t) {
154 chain.push(TRANSFORM_VALUE_DICT, result.metadata);
155 current = Cow::Owned(result.data);
156 }
157 }
158
159 if columnar_applied || ndjson_transform_applied || json_array_applied {
160 return (current.into_owned(), chain);
161 }
162
163 if matches!(mode, Mode::Balanced | Mode::Max)
165 && matches!(format, FormatHint::Json | FormatHint::Ndjson)
166 && let Some(result) = json::preprocess(¤t)
167 {
168 chain.push(TRANSFORM_JSON_KEY_INTERN, result.metadata);
169 current = Cow::Owned(result.data);
170 }
171
172 (current.into_owned(), chain)
173}
174
175pub fn reverse_preprocess(data: &[u8], chain: &TransformChain) -> Vec<u8> {
177 let mut current: Cow<'_, [u8]> = Cow::Borrowed(data);
178
179 for record in chain.records.iter().rev() {
181 match record.id {
182 TRANSFORM_JSON_KEY_INTERN => {
183 current = Cow::Owned(json::reverse(¤t, &record.metadata));
184 }
185 TRANSFORM_NDJSON_COLUMNAR => {
186 current = Cow::Owned(ndjson::reverse(¤t, &record.metadata));
187 }
188 TRANSFORM_JSON_ARRAY_COLUMNAR => {
189 current = Cow::Owned(json_array::reverse(¤t, &record.metadata));
190 }
191 TRANSFORM_VALUE_DICT => {
192 current = Cow::Owned(value_dict::reverse(¤t, &record.metadata));
193 }
194 TRANSFORM_TYPED_ENCODING => {
195 current = Cow::Owned(typed_encoding::reverse(¤t, &record.metadata));
196 }
197 TRANSFORM_NESTED_FLATTEN => {
198 if record.metadata.len() >= 6 {
200 let num_rows =
201 u32::from_le_bytes(record.metadata[0..4].try_into().unwrap()) as usize;
202 let total_flat_cols =
203 u16::from_le_bytes(record.metadata[4..6].try_into().unwrap()) as usize;
204 if let Some((nested_groups, _)) =
205 ndjson::deserialize_nested_info(&record.metadata[6..])
206 {
207 current = Cow::Owned(ndjson::unflatten_nested_columns(
208 ¤t,
209 &nested_groups,
210 num_rows,
211 total_flat_cols,
212 ));
213 }
214 }
215 }
216 _ => {} }
218 }
219
220 current.into_owned()
221}
222
223fn trim_leading_whitespace(data: &[u8]) -> &[u8] {
226 let start = data
227 .iter()
228 .position(|&b| !b.is_ascii_whitespace())
229 .unwrap_or(data.len());
230 &data[start..]
231}
232
233fn starts_with_byte(data: &[u8], byte: u8) -> bool {
234 data.first() == Some(&byte)
235}
236
237fn is_ndjson(data: &[u8]) -> bool {
238 let mut json_lines = 0;
239 let mut total_lines = 0;
240
241 for line in data.split(|&b| b == b'\n') {
242 let trimmed = trim_leading_whitespace(line);
243 if trimmed.is_empty() {
244 continue;
245 }
246 total_lines += 1;
247 if starts_with_byte(trimmed, b'{') {
248 json_lines += 1;
249 }
250 }
251
252 total_lines >= 2 && json_lines as f64 / total_lines as f64 > 0.8
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn detect_json() {
261 assert_eq!(detect_format(b" {\"key\": \"value\"}"), FormatHint::Json);
262 assert_eq!(detect_format(b"[1, 2, 3]"), FormatHint::Json);
263 }
264
265 #[test]
266 fn detect_ndjson() {
267 let data = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
268 assert_eq!(detect_format(data), FormatHint::Ndjson);
269 }
270
271 #[test]
272 fn detect_generic_fallback() {
273 assert_eq!(detect_format(b""), FormatHint::Generic);
274 assert_eq!(detect_format(b"just some random text"), FormatHint::Generic);
275 }
276
277 #[test]
278 fn extension_detection() {
279 assert_eq!(detect_from_extension("test.json"), Some(FormatHint::Json));
280 assert_eq!(
281 detect_from_extension("data.ndjson"),
282 Some(FormatHint::Ndjson)
283 );
284 assert_eq!(detect_from_extension("file.txt"), None);
285 }
286
287 #[test]
288 fn preprocess_json_key_interning() {
289 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25}"#;
290 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
291 assert!(!chain.is_empty(), "should have applied key interning");
292 assert!(
293 preprocessed.len() < data.len(),
294 "preprocessed should be smaller"
295 );
296
297 let restored = reverse_preprocess(&preprocessed, &chain);
299 assert_eq!(restored, data.to_vec());
300 }
301
302 #[test]
303 fn preprocess_ndjson_columnar() {
304 let data = br#"{"ts":"a","val":1}
305{"ts":"b","val":2}
306{"ts":"c","val":3}
307"#;
308 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Balanced);
309 assert!(!chain.is_empty());
310 assert_eq!(
312 chain.records[0].id,
313 transform::TRANSFORM_NDJSON_COLUMNAR,
314 "NDJSON should use columnar transform"
315 );
316
317 let restored = reverse_preprocess(&preprocessed, &chain);
318 assert_eq!(restored, data.to_vec());
319 }
320
321 #[test]
322 fn preprocess_ndjson_columnar_fast_mode() {
323 let data = br#"{"ts":"a","val":1}
325{"ts":"b","val":2}
326{"ts":"c","val":3}
327"#;
328 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
329 assert!(!chain.is_empty());
330 assert_eq!(chain.records[0].id, transform::TRANSFORM_NDJSON_COLUMNAR);
331
332 let restored = reverse_preprocess(&preprocessed, &chain);
333 assert_eq!(restored, data.to_vec());
334
335 let cols: Vec<&[u8]> = preprocessed.split(|&b| b == 0x00).collect();
337 assert_eq!(cols.len(), 2, "should have 2 columns");
338 }
339
340 #[test]
341 fn preprocess_json_array_columnar() {
342 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "b"}, {"id": 3, "type": "c"}, {"id": 4, "type": "d"}, {"id": 5, "type": "e"}], "meta": {"count": 5}}"#;
343 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
344 assert!(!chain.is_empty());
345 assert_eq!(
346 chain.records[0].id,
347 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
348 "JSON with array should use array columnar transform"
349 );
350
351 let restored = reverse_preprocess(&preprocessed, &chain);
352 assert_eq!(restored, data.to_vec());
353 }
354
355 #[test]
356 fn preprocess_json_array_too_few_falls_through() {
357 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}], "meta": {"count": 3}, "data2": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}]}"#;
359 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
360 if !chain.is_empty() {
362 assert_ne!(
363 chain.records[0].id,
364 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
365 "3 elements should NOT trigger array columnar"
366 );
367 }
368
369 let restored = reverse_preprocess(&preprocessed, &chain);
370 assert_eq!(restored, data.to_vec());
371 }
372
373 #[test]
374 fn preprocess_non_json_passthrough() {
375 let data = b"just some plain text with no JSON keys";
376 let (preprocessed, chain) = preprocess(data, FormatHint::Generic, Mode::Fast);
377 assert!(chain.is_empty());
378 assert_eq!(preprocessed, data.to_vec());
379 }
380
381 #[test]
382 fn test_json_array_nested_flatten_roundtrip() {
383 let mut json = String::from(r#"{"data": ["#);
385 for i in 0..10 {
386 if i > 0 {
387 json.push_str(", ");
388 }
389 json.push_str(&format!(
390 r#"{{"id": {}, "name": "item_{}", "meta": {{"score": {}, "active": {}, "tag": "t{}"}}}}"#,
391 i, i, i * 10, if i % 2 == 0 { "true" } else { "false" }, i
392 ));
393 }
394 json.push_str(r#"], "total": 10}"#);
395
396 let data = json.as_bytes();
397 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Fast);
398 assert!(!chain.is_empty());
399 assert_eq!(
400 chain.records[0].id,
401 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
402 "should apply json_array columnar first"
403 );
404
405 let has_nested_flatten = chain
407 .records
408 .iter()
409 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
410 assert!(
411 has_nested_flatten,
412 "should apply nested flatten for objects with nested fields"
413 );
414
415 let restored = reverse_preprocess(&preprocessed, &chain);
417 assert_eq!(
418 String::from_utf8_lossy(&restored),
419 String::from_utf8_lossy(data),
420 );
421 assert_eq!(restored, data.to_vec());
422 }
423
424 #[test]
425 fn test_json_array_nested_flatten_improves_ratio() {
426 let mut json = String::from(r#"{"items": ["#);
429 for i in 0..50 {
430 if i > 0 {
431 json.push_str(", ");
432 }
433 json.push_str(&format!(
434 r#"{{"id": {}, "user": {{"name": "user_{}", "role": "admin", "level": {}, "verified": true, "email": "user_{}@test.com"}}}}"#,
435 i, i, i % 5, i
436 ));
437 }
438 json.push_str(r"]}");
439
440 let data = json.as_bytes();
441
442 let (preprocessed_with, chain_with) = preprocess(data, FormatHint::Json, Mode::Fast);
444 assert!(
445 chain_with
446 .records
447 .iter()
448 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN),
449 "nested flatten should activate"
450 );
451
452 let restored = reverse_preprocess(&preprocessed_with, &chain_with);
454 assert_eq!(restored, data.to_vec());
455
456 let num_cols_with = preprocessed_with.split(|&b| b == 0x00).count();
458 assert!(
461 num_cols_with > 2,
462 "nested flatten should produce more columns: got {}",
463 num_cols_with
464 );
465 }
466
467 #[test]
468 fn test_ndjson_unaffected() {
469 let mut ndjson = String::new();
471 for i in 0..10 {
472 ndjson.push_str(&format!(
473 r#"{{"id":{},"user":{{"name":"u{}","level":{}}}}}"#,
474 i,
475 i,
476 i % 3
477 ));
478 ndjson.push('\n');
479 }
480
481 let data = ndjson.as_bytes();
482 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
483 assert!(!chain.is_empty());
484 assert_eq!(
485 chain.records[0].id,
486 transform::TRANSFORM_NDJSON_COLUMNAR,
487 "NDJSON should use its own columnar transform"
488 );
489
490 let has_standalone_nested = chain
492 .records
493 .iter()
494 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
495 assert!(
496 !has_standalone_nested,
497 "NDJSON path should NOT use standalone nested flatten (it handles nesting internally)"
498 );
499
500 let restored = reverse_preprocess(&preprocessed, &chain);
502 assert_eq!(restored, data.to_vec());
503 }
504
505 #[test]
506 fn test_ndjson_large_delta_integer_roundtrip() {
507 let edges: &[i64] = &[
511 0,
512 -1,
513 1,
514 -2147483648,
515 2147483647,
516 -9007199254740991,
517 9007199254740991,
518 ];
519 let mut ndjson = String::new();
520 for i in 0..203 {
521 ndjson.push_str(&format!("{{\"val\":{},\"idx\":{}}}\n", edges[i % 7], i));
522 }
523
524 let data = ndjson.as_bytes();
525
526 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
528
529 assert!(
531 chain
532 .records
533 .iter()
534 .any(|r| r.id == transform::TRANSFORM_TYPED_ENCODING),
535 "typed encoding should be applied in Fast mode"
536 );
537
538 let restored = reverse_preprocess(&preprocessed, &chain);
539 assert_eq!(restored, data.to_vec(), "byte-exact roundtrip failed");
540 }
541
542 #[test]
543 fn test_nested_flatten_varying_subkeys_roundtrip() {
544 let mut json = String::from(r#"{"objects":["#);
551 for i in 0..250 {
552 if i > 0 {
553 json.push(',');
554 }
555 let license = if i >= 6 { r#","license":"MIT""# } else { "" };
557 let links = match i % 5 {
559 0 => format!(
560 r#"{{"homepage":"h{i}","repository":"r{i}","bugs":"b{i}","npm":"n{i}"}}"#
561 ),
562 1 => format!(r#"{{"homepage":"h{i}","npm":"n{i}","repository":"r{i}"}}"#),
563 2 => format!(r#"{{"npm":"n{i}"}}"#),
564 3 => format!(r#"{{"bugs":"b{i}","homepage":"h{i}","npm":"n{i}"}}"#),
565 _ => format!(r#"{{"npm":"n{i}","repository":"r{i}"}}"#),
566 };
567 let publisher = if i % 3 == 0 {
568 format!(r#"{{"email":"u{i}@t.com","username":"u{i}","actor":"a{i}"}}"#)
569 } else {
570 format!(r#"{{"email":"u{i}@t.com","username":"u{i}"}}"#)
571 };
572 json.push_str(&format!(
573 r#"{{"dl":{{"m":{},"w":{}}},"dep":"{}","sc":{},"pkg":{{"name":"p{i}","kw":["j","t"],"ver":"{i}.0","pub":{publisher},"mnt":[{{"u":"u{i}"}}]{license},"links":{links}}},"score":{{"f":0.5,"d":{{"q":0.8}}}},"flags":{{"x":0}}}}"#,
574 1000 * (i + 1),
575 250 * (i + 1),
576 i * 5,
577 1697.0894 + i as f64 * 0.1,
578 ));
579 }
580 json.push_str(r#"],"total":250}"#);
581
582 let data = json.as_bytes();
583
584 for mode in [Mode::Fast, Mode::Balanced] {
585 let (preprocessed, chain) = preprocess(data, FormatHint::Json, mode);
586 assert!(!chain.is_empty(), "should apply transforms in {mode} mode");
587 let restored = reverse_preprocess(&preprocessed, &chain);
588 assert_eq!(restored.len(), data.len(), "length mismatch in {mode} mode",);
589 assert_eq!(restored, data.to_vec(), "roundtrip failed in {mode} mode");
590 }
591 }
592}