1pub mod json;
7pub mod json_array;
8pub mod ndjson;
9pub mod schema;
10pub mod transform;
11pub mod typed_encoding;
12pub mod value_dict;
13
14use crate::dcx::{FormatHint, Mode};
15use transform::{
16 TRANSFORM_JSON_ARRAY_COLUMNAR, TRANSFORM_JSON_KEY_INTERN, TRANSFORM_NDJSON_COLUMNAR,
17 TRANSFORM_NESTED_FLATTEN, TRANSFORM_TYPED_ENCODING, TRANSFORM_VALUE_DICT, TransformChain,
18};
19
20pub fn detect_format(data: &[u8]) -> FormatHint {
22 if data.is_empty() {
23 return FormatHint::Generic;
24 }
25
26 let trimmed = trim_leading_whitespace(data);
27
28 if starts_with_byte(trimmed, b'{') || starts_with_byte(trimmed, b'[') {
29 if is_ndjson(data) {
30 return FormatHint::Ndjson;
31 }
32 return FormatHint::Json;
33 }
34
35 FormatHint::Generic
36}
37
38pub fn detect_from_extension(path: &str) -> Option<FormatHint> {
40 let ext = path.rsplit('.').next()?.to_lowercase();
41 match ext.as_str() {
42 "json" => Some(FormatHint::Json),
43 "ndjson" | "jsonl" => Some(FormatHint::Ndjson),
44 _ => None,
45 }
46}
47
48pub fn preprocess(data: &[u8], format: FormatHint, mode: Mode) -> (Vec<u8>, TransformChain) {
56 let mut chain = TransformChain::new();
57 let mut current = data.to_vec();
58
59 let mut columnar_applied = false;
62 let mut ndjson_transform_applied = false;
64
65 if format == FormatHint::Ndjson {
70 if let Some(result) = ndjson::preprocess(¤t) {
71 let is_uniform_columnar = !result.metadata.is_empty() && result.metadata[0] == 1;
72 chain.push(TRANSFORM_NDJSON_COLUMNAR, result.metadata);
73 current = result.data;
74 ndjson_transform_applied = true;
75 columnar_applied = is_uniform_columnar;
76 }
77 }
78
79 let mut json_array_applied = false;
84 if !columnar_applied && !ndjson_transform_applied && format == FormatHint::Json {
85 if let Some(result) = json_array::preprocess(¤t) {
86 let is_uniform = !result.metadata.is_empty() && result.metadata[0] == 1;
87 chain.push(TRANSFORM_JSON_ARRAY_COLUMNAR, result.metadata);
88 current = result.data;
89 json_array_applied = true;
90 columnar_applied = is_uniform;
91 }
92 }
93
94 if columnar_applied && !ndjson_transform_applied {
98 let ja_meta = &chain.records.last().unwrap().metadata;
100 if ja_meta.len() >= 5 {
101 let num_rows = u32::from_le_bytes(ja_meta[1..5].try_into().unwrap()) as usize;
102 if let Some((flat_data, nested_groups)) =
103 ndjson::flatten_nested_columns(¤t, num_rows)
104 {
105 let total_flat_cols = flat_data.split(|&b| b == 0x00).count() as u16;
107
108 let unflattened = ndjson::unflatten_nested_columns(
113 &flat_data,
114 &nested_groups,
115 num_rows,
116 total_flat_cols as usize,
117 );
118 if unflattened == current {
119 let mut nested_meta = Vec::new();
120 nested_meta.extend_from_slice(&(num_rows as u32).to_le_bytes());
121 nested_meta.extend_from_slice(&total_flat_cols.to_le_bytes());
122 nested_meta
123 .extend_from_slice(&ndjson::serialize_nested_info(&nested_groups));
124 chain.push(TRANSFORM_NESTED_FLATTEN, nested_meta);
125 current = flat_data;
126 }
127 }
131 }
132 }
133
134 if columnar_applied && mode == Mode::Fast {
138 if let Some(result) = typed_encoding::preprocess(¤t) {
139 chain.push(TRANSFORM_TYPED_ENCODING, result.metadata);
140 current = result.data;
141 }
142 }
143
144 if columnar_applied {
152 if let Some(result) = value_dict::preprocess(¤t) {
153 chain.push(TRANSFORM_VALUE_DICT, result.metadata);
154 current = result.data;
155 }
156 }
157
158 if columnar_applied || ndjson_transform_applied || json_array_applied {
159 return (current, chain);
160 }
161
162 if matches!(mode, Mode::Balanced | Mode::Max)
164 && matches!(format, FormatHint::Json | FormatHint::Ndjson)
165 && let Some(result) = json::preprocess(¤t)
166 {
167 chain.push(TRANSFORM_JSON_KEY_INTERN, result.metadata);
168 current = result.data;
169 }
170
171 (current, chain)
172}
173
174pub fn reverse_preprocess(data: &[u8], chain: &TransformChain) -> Vec<u8> {
176 let mut current = data.to_vec();
177
178 for record in chain.records.iter().rev() {
180 match record.id {
181 TRANSFORM_JSON_KEY_INTERN => {
182 current = json::reverse(¤t, &record.metadata);
183 }
184 TRANSFORM_NDJSON_COLUMNAR => {
185 current = ndjson::reverse(¤t, &record.metadata);
186 }
187 TRANSFORM_JSON_ARRAY_COLUMNAR => {
188 current = json_array::reverse(¤t, &record.metadata);
189 }
190 TRANSFORM_VALUE_DICT => {
191 current = value_dict::reverse(¤t, &record.metadata);
192 }
193 TRANSFORM_TYPED_ENCODING => {
194 current = typed_encoding::reverse(¤t, &record.metadata);
195 }
196 TRANSFORM_NESTED_FLATTEN => {
197 if record.metadata.len() >= 6 {
199 let num_rows =
200 u32::from_le_bytes(record.metadata[0..4].try_into().unwrap()) as usize;
201 let total_flat_cols =
202 u16::from_le_bytes(record.metadata[4..6].try_into().unwrap()) as usize;
203 if let Some((nested_groups, _)) =
204 ndjson::deserialize_nested_info(&record.metadata[6..])
205 {
206 current = ndjson::unflatten_nested_columns(
207 ¤t,
208 &nested_groups,
209 num_rows,
210 total_flat_cols,
211 );
212 }
213 }
214 }
215 _ => {} }
217 }
218
219 current
220}
221
222fn trim_leading_whitespace(data: &[u8]) -> &[u8] {
225 let start = data
226 .iter()
227 .position(|&b| !b.is_ascii_whitespace())
228 .unwrap_or(data.len());
229 &data[start..]
230}
231
232fn starts_with_byte(data: &[u8], byte: u8) -> bool {
233 data.first() == Some(&byte)
234}
235
236fn is_ndjson(data: &[u8]) -> bool {
237 let mut json_lines = 0;
238 let mut total_lines = 0;
239
240 for line in data.split(|&b| b == b'\n') {
241 let trimmed = trim_leading_whitespace(line);
242 if trimmed.is_empty() {
243 continue;
244 }
245 total_lines += 1;
246 if starts_with_byte(trimmed, b'{') {
247 json_lines += 1;
248 }
249 }
250
251 total_lines >= 2 && json_lines as f64 / total_lines as f64 > 0.8
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn detect_json() {
260 assert_eq!(detect_format(b" {\"key\": \"value\"}"), FormatHint::Json);
261 assert_eq!(detect_format(b"[1, 2, 3]"), FormatHint::Json);
262 }
263
264 #[test]
265 fn detect_ndjson() {
266 let data = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
267 assert_eq!(detect_format(data), FormatHint::Ndjson);
268 }
269
270 #[test]
271 fn detect_generic_fallback() {
272 assert_eq!(detect_format(b""), FormatHint::Generic);
273 assert_eq!(detect_format(b"just some random text"), FormatHint::Generic);
274 }
275
276 #[test]
277 fn extension_detection() {
278 assert_eq!(detect_from_extension("test.json"), Some(FormatHint::Json));
279 assert_eq!(
280 detect_from_extension("data.ndjson"),
281 Some(FormatHint::Ndjson)
282 );
283 assert_eq!(detect_from_extension("file.txt"), None);
284 }
285
286 #[test]
287 fn preprocess_json_key_interning() {
288 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25}"#;
289 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
290 assert!(!chain.is_empty(), "should have applied key interning");
291 assert!(
292 preprocessed.len() < data.len(),
293 "preprocessed should be smaller"
294 );
295
296 let restored = reverse_preprocess(&preprocessed, &chain);
298 assert_eq!(restored, data.to_vec());
299 }
300
301 #[test]
302 fn preprocess_ndjson_columnar() {
303 let data = br#"{"ts":"a","val":1}
304{"ts":"b","val":2}
305{"ts":"c","val":3}
306"#;
307 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Balanced);
308 assert!(!chain.is_empty());
309 assert_eq!(
311 chain.records[0].id,
312 transform::TRANSFORM_NDJSON_COLUMNAR,
313 "NDJSON should use columnar transform"
314 );
315
316 let restored = reverse_preprocess(&preprocessed, &chain);
317 assert_eq!(restored, data.to_vec());
318 }
319
320 #[test]
321 fn preprocess_ndjson_columnar_fast_mode() {
322 let data = br#"{"ts":"a","val":1}
324{"ts":"b","val":2}
325{"ts":"c","val":3}
326"#;
327 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
328 assert!(!chain.is_empty());
329 assert_eq!(chain.records[0].id, transform::TRANSFORM_NDJSON_COLUMNAR);
330
331 let restored = reverse_preprocess(&preprocessed, &chain);
332 assert_eq!(restored, data.to_vec());
333
334 let cols: Vec<&[u8]> = preprocessed.split(|&b| b == 0x00).collect();
336 assert_eq!(cols.len(), 2, "should have 2 columns");
337 }
338
339 #[test]
340 fn preprocess_json_array_columnar() {
341 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "b"}, {"id": 3, "type": "c"}, {"id": 4, "type": "d"}, {"id": 5, "type": "e"}], "meta": {"count": 5}}"#;
342 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
343 assert!(!chain.is_empty());
344 assert_eq!(
345 chain.records[0].id,
346 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
347 "JSON with array should use array columnar transform"
348 );
349
350 let restored = reverse_preprocess(&preprocessed, &chain);
351 assert_eq!(restored, data.to_vec());
352 }
353
354 #[test]
355 fn preprocess_json_array_too_few_falls_through() {
356 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}], "meta": {"count": 3}, "data2": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}]}"#;
358 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
359 if !chain.is_empty() {
361 assert_ne!(
362 chain.records[0].id,
363 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
364 "3 elements should NOT trigger array columnar"
365 );
366 }
367
368 let restored = reverse_preprocess(&preprocessed, &chain);
369 assert_eq!(restored, data.to_vec());
370 }
371
372 #[test]
373 fn preprocess_non_json_passthrough() {
374 let data = b"just some plain text with no JSON keys";
375 let (preprocessed, chain) = preprocess(data, FormatHint::Generic, Mode::Fast);
376 assert!(chain.is_empty());
377 assert_eq!(preprocessed, data.to_vec());
378 }
379
380 #[test]
381 fn test_json_array_nested_flatten_roundtrip() {
382 let mut json = String::from(r#"{"data": ["#);
384 for i in 0..10 {
385 if i > 0 {
386 json.push_str(", ");
387 }
388 json.push_str(&format!(
389 r#"{{"id": {}, "name": "item_{}", "meta": {{"score": {}, "active": {}, "tag": "t{}"}}}}"#,
390 i, i, i * 10, if i % 2 == 0 { "true" } else { "false" }, i
391 ));
392 }
393 json.push_str(r#"], "total": 10}"#);
394
395 let data = json.as_bytes();
396 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Fast);
397 assert!(!chain.is_empty());
398 assert_eq!(
399 chain.records[0].id,
400 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
401 "should apply json_array columnar first"
402 );
403
404 let has_nested_flatten = chain
406 .records
407 .iter()
408 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
409 assert!(
410 has_nested_flatten,
411 "should apply nested flatten for objects with nested fields"
412 );
413
414 let restored = reverse_preprocess(&preprocessed, &chain);
416 assert_eq!(
417 String::from_utf8_lossy(&restored),
418 String::from_utf8_lossy(data),
419 );
420 assert_eq!(restored, data.to_vec());
421 }
422
423 #[test]
424 fn test_json_array_nested_flatten_improves_ratio() {
425 let mut json = String::from(r#"{"items": ["#);
428 for i in 0..50 {
429 if i > 0 {
430 json.push_str(", ");
431 }
432 json.push_str(&format!(
433 r#"{{"id": {}, "user": {{"name": "user_{}", "role": "admin", "level": {}, "verified": true, "email": "user_{}@test.com"}}}}"#,
434 i, i, i % 5, i
435 ));
436 }
437 json.push_str(r#"]}"#);
438
439 let data = json.as_bytes();
440
441 let (preprocessed_with, chain_with) = preprocess(data, FormatHint::Json, Mode::Fast);
443 assert!(
444 chain_with
445 .records
446 .iter()
447 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN),
448 "nested flatten should activate"
449 );
450
451 let restored = reverse_preprocess(&preprocessed_with, &chain_with);
453 assert_eq!(restored, data.to_vec());
454
455 let num_cols_with = preprocessed_with.split(|&b| b == 0x00).count();
457 assert!(
460 num_cols_with > 2,
461 "nested flatten should produce more columns: got {}",
462 num_cols_with
463 );
464 }
465
466 #[test]
467 fn test_ndjson_unaffected() {
468 let mut ndjson = String::new();
470 for i in 0..10 {
471 ndjson.push_str(&format!(
472 r#"{{"id":{},"user":{{"name":"u{}","level":{}}}}}"#,
473 i,
474 i,
475 i % 3
476 ));
477 ndjson.push('\n');
478 }
479
480 let data = ndjson.as_bytes();
481 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
482 assert!(!chain.is_empty());
483 assert_eq!(
484 chain.records[0].id,
485 transform::TRANSFORM_NDJSON_COLUMNAR,
486 "NDJSON should use its own columnar transform"
487 );
488
489 let has_standalone_nested = chain
491 .records
492 .iter()
493 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
494 assert!(
495 !has_standalone_nested,
496 "NDJSON path should NOT use standalone nested flatten (it handles nesting internally)"
497 );
498
499 let restored = reverse_preprocess(&preprocessed, &chain);
501 assert_eq!(restored, data.to_vec());
502 }
503
504 #[test]
505 fn test_ndjson_large_delta_integer_roundtrip() {
506 let edges: &[i64] = &[
510 0, -1, 1, -2147483648, 2147483647, -9007199254740991, 9007199254740991,
511 ];
512 let mut ndjson = String::new();
513 for i in 0..203 {
514 ndjson.push_str(&format!(
515 "{{\"val\":{},\"idx\":{}}}\n",
516 edges[i % 7],
517 i
518 ));
519 }
520
521 let data = ndjson.as_bytes();
522
523 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
525
526 assert!(
528 chain
529 .records
530 .iter()
531 .any(|r| r.id == transform::TRANSFORM_TYPED_ENCODING),
532 "typed encoding should be applied in Fast mode"
533 );
534
535 let restored = reverse_preprocess(&preprocessed, &chain);
536 assert_eq!(restored, data.to_vec(), "byte-exact roundtrip failed");
537 }
538
539 #[test]
540 fn test_nested_flatten_varying_subkeys_roundtrip() {
541 let mut json = String::from(r#"{"objects":["#);
548 for i in 0..250 {
549 if i > 0 {
550 json.push(',');
551 }
552 let license = if i >= 6 {
554 r#","license":"MIT""#
555 } else {
556 ""
557 };
558 let links = match i % 5 {
560 0 => format!(r#"{{"homepage":"h{i}","repository":"r{i}","bugs":"b{i}","npm":"n{i}"}}"#),
561 1 => format!(r#"{{"homepage":"h{i}","npm":"n{i}","repository":"r{i}"}}"#),
562 2 => format!(r#"{{"npm":"n{i}"}}"#),
563 3 => format!(r#"{{"bugs":"b{i}","homepage":"h{i}","npm":"n{i}"}}"#),
564 _ => format!(r#"{{"npm":"n{i}","repository":"r{i}"}}"#),
565 };
566 let publisher = if i % 3 == 0 {
567 format!(r#"{{"email":"u{i}@t.com","username":"u{i}","actor":"a{i}"}}"#)
568 } else {
569 format!(r#"{{"email":"u{i}@t.com","username":"u{i}"}}"#)
570 };
571 json.push_str(&format!(
572 r#"{{"dl":{{"m":{},"w":{}}},"dep":"{}","sc":{},"pkg":{{"name":"p{i}","kw":["j","t"],"ver":"{i}.0","pub":{publisher},"mnt":[{{"u":"u{i}"}}]{license},"links":{links}}},"score":{{"f":0.5,"d":{{"q":0.8}}}},"flags":{{"x":0}}}}"#,
573 1000 * (i + 1),
574 250 * (i + 1),
575 i * 5,
576 1697.0894 + i as f64 * 0.1,
577 ));
578 }
579 json.push_str(r#"],"total":250}"#);
580
581 let data = json.as_bytes();
582
583 for mode in [Mode::Fast, Mode::Balanced] {
584 let (preprocessed, chain) = preprocess(data, FormatHint::Json, mode);
585 assert!(!chain.is_empty(), "should apply transforms in {mode} mode");
586 let restored = reverse_preprocess(&preprocessed, &chain);
587 assert_eq!(
588 restored.len(),
589 data.len(),
590 "length mismatch in {mode} mode",
591 );
592 assert_eq!(restored, data.to_vec(), "roundtrip failed in {mode} mode");
593 }
594 }
595}