1pub mod json;
7pub mod json_array;
8pub mod ndjson;
9pub mod schema;
10pub mod transform;
11pub mod typed_encoding;
12pub mod value_dict;
13
14use crate::dcx::{FormatHint, Mode};
15use transform::{
16 TRANSFORM_JSON_ARRAY_COLUMNAR, TRANSFORM_JSON_KEY_INTERN, TRANSFORM_NDJSON_COLUMNAR,
17 TRANSFORM_NESTED_FLATTEN, TRANSFORM_TYPED_ENCODING, TRANSFORM_VALUE_DICT, TransformChain,
18};
19
20pub fn detect_format(data: &[u8]) -> FormatHint {
22 if data.is_empty() {
23 return FormatHint::Generic;
24 }
25
26 let trimmed = trim_leading_whitespace(data);
27
28 if starts_with_byte(trimmed, b'{') || starts_with_byte(trimmed, b'[') {
29 if is_ndjson(data) {
30 return FormatHint::Ndjson;
31 }
32 return FormatHint::Json;
33 }
34
35 FormatHint::Generic
36}
37
38pub fn detect_from_extension(path: &str) -> Option<FormatHint> {
40 let ext = path.rsplit('.').next()?.to_lowercase();
41 match ext.as_str() {
42 "json" => Some(FormatHint::Json),
43 "ndjson" | "jsonl" => Some(FormatHint::Ndjson),
44 _ => None,
45 }
46}
47
48pub fn preprocess(data: &[u8], format: FormatHint, mode: Mode) -> (Vec<u8>, TransformChain) {
56 let mut chain = TransformChain::new();
57 let mut current = data.to_vec();
58
59 let mut columnar_applied = false;
62 let mut ndjson_transform_applied = false;
64
65 if format == FormatHint::Ndjson {
70 if let Some(result) = ndjson::preprocess(¤t) {
71 let is_uniform_columnar = !result.metadata.is_empty() && result.metadata[0] == 1;
72 chain.push(TRANSFORM_NDJSON_COLUMNAR, result.metadata);
73 current = result.data;
74 ndjson_transform_applied = true;
75 columnar_applied = is_uniform_columnar;
76 }
77 }
78
79 let mut json_array_applied = false;
84 if !columnar_applied && !ndjson_transform_applied && format == FormatHint::Json {
85 if let Some(result) = json_array::preprocess(¤t) {
86 let is_uniform = !result.metadata.is_empty() && result.metadata[0] == 1;
87 chain.push(TRANSFORM_JSON_ARRAY_COLUMNAR, result.metadata);
88 current = result.data;
89 json_array_applied = true;
90 columnar_applied = is_uniform;
91 }
92 }
93
94 if columnar_applied && !ndjson_transform_applied {
98 let ja_meta = &chain.records.last().unwrap().metadata;
100 if ja_meta.len() >= 5 {
101 let num_rows = u32::from_le_bytes(ja_meta[1..5].try_into().unwrap()) as usize;
102 if let Some((flat_data, nested_groups)) =
103 ndjson::flatten_nested_columns(¤t, num_rows)
104 {
105 let total_flat_cols = flat_data.split(|&b| b == 0x00).count() as u16;
107
108 let unflattened = ndjson::unflatten_nested_columns(
113 &flat_data,
114 &nested_groups,
115 num_rows,
116 total_flat_cols as usize,
117 );
118 if unflattened == current {
119 let mut nested_meta = Vec::new();
120 nested_meta.extend_from_slice(&(num_rows as u32).to_le_bytes());
121 nested_meta.extend_from_slice(&total_flat_cols.to_le_bytes());
122 nested_meta.extend_from_slice(&ndjson::serialize_nested_info(&nested_groups));
123 chain.push(TRANSFORM_NESTED_FLATTEN, nested_meta);
124 current = flat_data;
125 }
126 }
130 }
131 }
132
133 if columnar_applied && mode == Mode::Fast {
137 if let Some(result) = typed_encoding::preprocess(¤t) {
138 chain.push(TRANSFORM_TYPED_ENCODING, result.metadata);
139 current = result.data;
140 }
141 }
142
143 if columnar_applied {
151 if let Some(result) = value_dict::preprocess(¤t) {
152 chain.push(TRANSFORM_VALUE_DICT, result.metadata);
153 current = result.data;
154 }
155 }
156
157 if columnar_applied || ndjson_transform_applied || json_array_applied {
158 return (current, chain);
159 }
160
161 if matches!(mode, Mode::Balanced | Mode::Max)
163 && matches!(format, FormatHint::Json | FormatHint::Ndjson)
164 && let Some(result) = json::preprocess(¤t)
165 {
166 chain.push(TRANSFORM_JSON_KEY_INTERN, result.metadata);
167 current = result.data;
168 }
169
170 (current, chain)
171}
172
173pub fn reverse_preprocess(data: &[u8], chain: &TransformChain) -> Vec<u8> {
175 let mut current = data.to_vec();
176
177 for record in chain.records.iter().rev() {
179 match record.id {
180 TRANSFORM_JSON_KEY_INTERN => {
181 current = json::reverse(¤t, &record.metadata);
182 }
183 TRANSFORM_NDJSON_COLUMNAR => {
184 current = ndjson::reverse(¤t, &record.metadata);
185 }
186 TRANSFORM_JSON_ARRAY_COLUMNAR => {
187 current = json_array::reverse(¤t, &record.metadata);
188 }
189 TRANSFORM_VALUE_DICT => {
190 current = value_dict::reverse(¤t, &record.metadata);
191 }
192 TRANSFORM_TYPED_ENCODING => {
193 current = typed_encoding::reverse(¤t, &record.metadata);
194 }
195 TRANSFORM_NESTED_FLATTEN => {
196 if record.metadata.len() >= 6 {
198 let num_rows =
199 u32::from_le_bytes(record.metadata[0..4].try_into().unwrap()) as usize;
200 let total_flat_cols =
201 u16::from_le_bytes(record.metadata[4..6].try_into().unwrap()) as usize;
202 if let Some((nested_groups, _)) =
203 ndjson::deserialize_nested_info(&record.metadata[6..])
204 {
205 current = ndjson::unflatten_nested_columns(
206 ¤t,
207 &nested_groups,
208 num_rows,
209 total_flat_cols,
210 );
211 }
212 }
213 }
214 _ => {} }
216 }
217
218 current
219}
220
221fn trim_leading_whitespace(data: &[u8]) -> &[u8] {
224 let start = data
225 .iter()
226 .position(|&b| !b.is_ascii_whitespace())
227 .unwrap_or(data.len());
228 &data[start..]
229}
230
231fn starts_with_byte(data: &[u8], byte: u8) -> bool {
232 data.first() == Some(&byte)
233}
234
235fn is_ndjson(data: &[u8]) -> bool {
236 let mut json_lines = 0;
237 let mut total_lines = 0;
238
239 for line in data.split(|&b| b == b'\n') {
240 let trimmed = trim_leading_whitespace(line);
241 if trimmed.is_empty() {
242 continue;
243 }
244 total_lines += 1;
245 if starts_with_byte(trimmed, b'{') {
246 json_lines += 1;
247 }
248 }
249
250 total_lines >= 2 && json_lines as f64 / total_lines as f64 > 0.8
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn detect_json() {
259 assert_eq!(detect_format(b" {\"key\": \"value\"}"), FormatHint::Json);
260 assert_eq!(detect_format(b"[1, 2, 3]"), FormatHint::Json);
261 }
262
263 #[test]
264 fn detect_ndjson() {
265 let data = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
266 assert_eq!(detect_format(data), FormatHint::Ndjson);
267 }
268
269 #[test]
270 fn detect_generic_fallback() {
271 assert_eq!(detect_format(b""), FormatHint::Generic);
272 assert_eq!(detect_format(b"just some random text"), FormatHint::Generic);
273 }
274
275 #[test]
276 fn extension_detection() {
277 assert_eq!(detect_from_extension("test.json"), Some(FormatHint::Json));
278 assert_eq!(
279 detect_from_extension("data.ndjson"),
280 Some(FormatHint::Ndjson)
281 );
282 assert_eq!(detect_from_extension("file.txt"), None);
283 }
284
285 #[test]
286 fn preprocess_json_key_interning() {
287 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25}"#;
288 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
289 assert!(!chain.is_empty(), "should have applied key interning");
290 assert!(
291 preprocessed.len() < data.len(),
292 "preprocessed should be smaller"
293 );
294
295 let restored = reverse_preprocess(&preprocessed, &chain);
297 assert_eq!(restored, data.to_vec());
298 }
299
300 #[test]
301 fn preprocess_ndjson_columnar() {
302 let data = br#"{"ts":"a","val":1}
303{"ts":"b","val":2}
304{"ts":"c","val":3}
305"#;
306 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Balanced);
307 assert!(!chain.is_empty());
308 assert_eq!(
310 chain.records[0].id,
311 transform::TRANSFORM_NDJSON_COLUMNAR,
312 "NDJSON should use columnar transform"
313 );
314
315 let restored = reverse_preprocess(&preprocessed, &chain);
316 assert_eq!(restored, data.to_vec());
317 }
318
319 #[test]
320 fn preprocess_ndjson_columnar_fast_mode() {
321 let data = br#"{"ts":"a","val":1}
323{"ts":"b","val":2}
324{"ts":"c","val":3}
325"#;
326 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
327 assert!(!chain.is_empty());
328 assert_eq!(chain.records[0].id, transform::TRANSFORM_NDJSON_COLUMNAR);
329
330 let restored = reverse_preprocess(&preprocessed, &chain);
331 assert_eq!(restored, data.to_vec());
332
333 let cols: Vec<&[u8]> = preprocessed.split(|&b| b == 0x00).collect();
335 assert_eq!(cols.len(), 2, "should have 2 columns");
336 }
337
338 #[test]
339 fn preprocess_json_array_columnar() {
340 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "b"}, {"id": 3, "type": "c"}, {"id": 4, "type": "d"}, {"id": 5, "type": "e"}], "meta": {"count": 5}}"#;
341 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
342 assert!(!chain.is_empty());
343 assert_eq!(
344 chain.records[0].id,
345 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
346 "JSON with array should use array columnar transform"
347 );
348
349 let restored = reverse_preprocess(&preprocessed, &chain);
350 assert_eq!(restored, data.to_vec());
351 }
352
353 #[test]
354 fn preprocess_json_array_too_few_falls_through() {
355 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}], "meta": {"count": 3}, "data2": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}]}"#;
357 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
358 if !chain.is_empty() {
360 assert_ne!(
361 chain.records[0].id,
362 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
363 "3 elements should NOT trigger array columnar"
364 );
365 }
366
367 let restored = reverse_preprocess(&preprocessed, &chain);
368 assert_eq!(restored, data.to_vec());
369 }
370
371 #[test]
372 fn preprocess_non_json_passthrough() {
373 let data = b"just some plain text with no JSON keys";
374 let (preprocessed, chain) = preprocess(data, FormatHint::Generic, Mode::Fast);
375 assert!(chain.is_empty());
376 assert_eq!(preprocessed, data.to_vec());
377 }
378
379 #[test]
380 fn test_json_array_nested_flatten_roundtrip() {
381 let mut json = String::from(r#"{"data": ["#);
383 for i in 0..10 {
384 if i > 0 {
385 json.push_str(", ");
386 }
387 json.push_str(&format!(
388 r#"{{"id": {}, "name": "item_{}", "meta": {{"score": {}, "active": {}, "tag": "t{}"}}}}"#,
389 i, i, i * 10, if i % 2 == 0 { "true" } else { "false" }, i
390 ));
391 }
392 json.push_str(r#"], "total": 10}"#);
393
394 let data = json.as_bytes();
395 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Fast);
396 assert!(!chain.is_empty());
397 assert_eq!(
398 chain.records[0].id,
399 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
400 "should apply json_array columnar first"
401 );
402
403 let has_nested_flatten = chain
405 .records
406 .iter()
407 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
408 assert!(
409 has_nested_flatten,
410 "should apply nested flatten for objects with nested fields"
411 );
412
413 let restored = reverse_preprocess(&preprocessed, &chain);
415 assert_eq!(
416 String::from_utf8_lossy(&restored),
417 String::from_utf8_lossy(data),
418 );
419 assert_eq!(restored, data.to_vec());
420 }
421
422 #[test]
423 fn test_json_array_nested_flatten_improves_ratio() {
424 let mut json = String::from(r#"{"items": ["#);
427 for i in 0..50 {
428 if i > 0 {
429 json.push_str(", ");
430 }
431 json.push_str(&format!(
432 r#"{{"id": {}, "user": {{"name": "user_{}", "role": "admin", "level": {}, "verified": true, "email": "user_{}@test.com"}}}}"#,
433 i, i, i % 5, i
434 ));
435 }
436 json.push_str(r#"]}"#);
437
438 let data = json.as_bytes();
439
440 let (preprocessed_with, chain_with) = preprocess(data, FormatHint::Json, Mode::Fast);
442 assert!(
443 chain_with
444 .records
445 .iter()
446 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN),
447 "nested flatten should activate"
448 );
449
450 let restored = reverse_preprocess(&preprocessed_with, &chain_with);
452 assert_eq!(restored, data.to_vec());
453
454 let num_cols_with = preprocessed_with.split(|&b| b == 0x00).count();
456 assert!(
459 num_cols_with > 2,
460 "nested flatten should produce more columns: got {}",
461 num_cols_with
462 );
463 }
464
465 #[test]
466 fn test_ndjson_unaffected() {
467 let mut ndjson = String::new();
469 for i in 0..10 {
470 ndjson.push_str(&format!(
471 r#"{{"id":{},"user":{{"name":"u{}","level":{}}}}}"#,
472 i,
473 i,
474 i % 3
475 ));
476 ndjson.push('\n');
477 }
478
479 let data = ndjson.as_bytes();
480 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
481 assert!(!chain.is_empty());
482 assert_eq!(
483 chain.records[0].id,
484 transform::TRANSFORM_NDJSON_COLUMNAR,
485 "NDJSON should use its own columnar transform"
486 );
487
488 let has_standalone_nested = chain
490 .records
491 .iter()
492 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
493 assert!(
494 !has_standalone_nested,
495 "NDJSON path should NOT use standalone nested flatten (it handles nesting internally)"
496 );
497
498 let restored = reverse_preprocess(&preprocessed, &chain);
500 assert_eq!(restored, data.to_vec());
501 }
502
503 #[test]
504 fn test_ndjson_large_delta_integer_roundtrip() {
505 let edges: &[i64] = &[
509 0,
510 -1,
511 1,
512 -2147483648,
513 2147483647,
514 -9007199254740991,
515 9007199254740991,
516 ];
517 let mut ndjson = String::new();
518 for i in 0..203 {
519 ndjson.push_str(&format!("{{\"val\":{},\"idx\":{}}}\n", edges[i % 7], i));
520 }
521
522 let data = ndjson.as_bytes();
523
524 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
526
527 assert!(
529 chain
530 .records
531 .iter()
532 .any(|r| r.id == transform::TRANSFORM_TYPED_ENCODING),
533 "typed encoding should be applied in Fast mode"
534 );
535
536 let restored = reverse_preprocess(&preprocessed, &chain);
537 assert_eq!(restored, data.to_vec(), "byte-exact roundtrip failed");
538 }
539
540 #[test]
541 fn test_nested_flatten_varying_subkeys_roundtrip() {
542 let mut json = String::from(r#"{"objects":["#);
549 for i in 0..250 {
550 if i > 0 {
551 json.push(',');
552 }
553 let license = if i >= 6 { r#","license":"MIT""# } else { "" };
555 let links = match i % 5 {
557 0 => format!(
558 r#"{{"homepage":"h{i}","repository":"r{i}","bugs":"b{i}","npm":"n{i}"}}"#
559 ),
560 1 => format!(r#"{{"homepage":"h{i}","npm":"n{i}","repository":"r{i}"}}"#),
561 2 => format!(r#"{{"npm":"n{i}"}}"#),
562 3 => format!(r#"{{"bugs":"b{i}","homepage":"h{i}","npm":"n{i}"}}"#),
563 _ => format!(r#"{{"npm":"n{i}","repository":"r{i}"}}"#),
564 };
565 let publisher = if i % 3 == 0 {
566 format!(r#"{{"email":"u{i}@t.com","username":"u{i}","actor":"a{i}"}}"#)
567 } else {
568 format!(r#"{{"email":"u{i}@t.com","username":"u{i}"}}"#)
569 };
570 json.push_str(&format!(
571 r#"{{"dl":{{"m":{},"w":{}}},"dep":"{}","sc":{},"pkg":{{"name":"p{i}","kw":["j","t"],"ver":"{i}.0","pub":{publisher},"mnt":[{{"u":"u{i}"}}]{license},"links":{links}}},"score":{{"f":0.5,"d":{{"q":0.8}}}},"flags":{{"x":0}}}}"#,
572 1000 * (i + 1),
573 250 * (i + 1),
574 i * 5,
575 1697.0894 + i as f64 * 0.1,
576 ));
577 }
578 json.push_str(r#"],"total":250}"#);
579
580 let data = json.as_bytes();
581
582 for mode in [Mode::Fast, Mode::Balanced] {
583 let (preprocessed, chain) = preprocess(data, FormatHint::Json, mode);
584 assert!(!chain.is_empty(), "should apply transforms in {mode} mode");
585 let restored = reverse_preprocess(&preprocessed, &chain);
586 assert_eq!(restored.len(), data.len(), "length mismatch in {mode} mode",);
587 assert_eq!(restored, data.to_vec(), "roundtrip failed in {mode} mode");
588 }
589 }
590}