1pub mod json;
7pub mod json_array;
8pub mod ndjson;
9pub mod schema;
10pub mod transform;
11pub mod typed_encoding;
12pub mod value_dict;
13
14use crate::dcx::{FormatHint, Mode};
15use transform::{
16 TRANSFORM_JSON_ARRAY_COLUMNAR, TRANSFORM_JSON_KEY_INTERN, TRANSFORM_NDJSON_COLUMNAR,
17 TRANSFORM_NESTED_FLATTEN, TRANSFORM_TYPED_ENCODING, TRANSFORM_VALUE_DICT, TransformChain,
18};
19
20pub fn detect_format(data: &[u8]) -> FormatHint {
22 if data.is_empty() {
23 return FormatHint::Generic;
24 }
25
26 let trimmed = trim_leading_whitespace(data);
27
28 if starts_with_byte(trimmed, b'{') || starts_with_byte(trimmed, b'[') {
29 if is_ndjson(data) {
30 return FormatHint::Ndjson;
31 }
32 return FormatHint::Json;
33 }
34
35 FormatHint::Generic
36}
37
38pub fn detect_from_extension(path: &str) -> Option<FormatHint> {
40 let ext = path.rsplit('.').next()?.to_lowercase();
41 match ext.as_str() {
42 "json" => Some(FormatHint::Json),
43 "ndjson" | "jsonl" => Some(FormatHint::Ndjson),
44 _ => None,
45 }
46}
47
48pub fn preprocess(data: &[u8], format: FormatHint, mode: Mode) -> (Vec<u8>, TransformChain) {
56 let mut chain = TransformChain::new();
57 let mut current = data.to_vec();
58
59 let mut columnar_applied = false;
62 let mut ndjson_transform_applied = false;
64
65 if format == FormatHint::Ndjson {
70 if let Some(result) = ndjson::preprocess(¤t) {
71 let is_uniform_columnar = !result.metadata.is_empty() && result.metadata[0] == 1;
72 chain.push(TRANSFORM_NDJSON_COLUMNAR, result.metadata);
73 current = result.data;
74 ndjson_transform_applied = true;
75 columnar_applied = is_uniform_columnar;
76 }
77 }
78
79 let mut json_array_applied = false;
84 if !columnar_applied && !ndjson_transform_applied && format == FormatHint::Json {
85 if let Some(result) = json_array::preprocess(¤t) {
86 let is_uniform = !result.metadata.is_empty() && result.metadata[0] == 1;
87 chain.push(TRANSFORM_JSON_ARRAY_COLUMNAR, result.metadata);
88 current = result.data;
89 json_array_applied = true;
90 columnar_applied = is_uniform;
91 }
92 }
93
94 if columnar_applied && !ndjson_transform_applied {
98 let ja_meta = &chain.records.last().unwrap().metadata;
100 if ja_meta.len() >= 5 {
101 let num_rows = u32::from_le_bytes(ja_meta[1..5].try_into().unwrap()) as usize;
102 if let Some((flat_data, nested_groups)) =
103 ndjson::flatten_nested_columns(¤t, num_rows)
104 {
105 let total_flat_cols = flat_data.split(|&b| b == 0x00).count() as u16;
107 let mut nested_meta = Vec::new();
108 nested_meta.extend_from_slice(&(num_rows as u32).to_le_bytes());
109 nested_meta.extend_from_slice(&total_flat_cols.to_le_bytes());
110 nested_meta.extend_from_slice(&ndjson::serialize_nested_info(&nested_groups));
111 chain.push(TRANSFORM_NESTED_FLATTEN, nested_meta);
112 current = flat_data;
113 }
114 }
115 }
116
117 if columnar_applied && mode == Mode::Fast {
121 if let Some(result) = typed_encoding::preprocess(¤t) {
122 chain.push(TRANSFORM_TYPED_ENCODING, result.metadata);
123 current = result.data;
124 }
125 }
126
127 if columnar_applied {
135 if let Some(result) = value_dict::preprocess(¤t) {
136 chain.push(TRANSFORM_VALUE_DICT, result.metadata);
137 current = result.data;
138 }
139 }
140
141 if columnar_applied || ndjson_transform_applied || json_array_applied {
142 return (current, chain);
143 }
144
145 if matches!(mode, Mode::Balanced | Mode::Max)
147 && matches!(format, FormatHint::Json | FormatHint::Ndjson)
148 && let Some(result) = json::preprocess(¤t)
149 {
150 chain.push(TRANSFORM_JSON_KEY_INTERN, result.metadata);
151 current = result.data;
152 }
153
154 (current, chain)
155}
156
157pub fn reverse_preprocess(data: &[u8], chain: &TransformChain) -> Vec<u8> {
159 let mut current = data.to_vec();
160
161 for record in chain.records.iter().rev() {
163 match record.id {
164 TRANSFORM_JSON_KEY_INTERN => {
165 current = json::reverse(¤t, &record.metadata);
166 }
167 TRANSFORM_NDJSON_COLUMNAR => {
168 current = ndjson::reverse(¤t, &record.metadata);
169 }
170 TRANSFORM_JSON_ARRAY_COLUMNAR => {
171 current = json_array::reverse(¤t, &record.metadata);
172 }
173 TRANSFORM_VALUE_DICT => {
174 current = value_dict::reverse(¤t, &record.metadata);
175 }
176 TRANSFORM_TYPED_ENCODING => {
177 current = typed_encoding::reverse(¤t, &record.metadata);
178 }
179 TRANSFORM_NESTED_FLATTEN => {
180 if record.metadata.len() >= 6 {
182 let num_rows =
183 u32::from_le_bytes(record.metadata[0..4].try_into().unwrap()) as usize;
184 let total_flat_cols =
185 u16::from_le_bytes(record.metadata[4..6].try_into().unwrap()) as usize;
186 if let Some((nested_groups, _)) =
187 ndjson::deserialize_nested_info(&record.metadata[6..])
188 {
189 current = ndjson::unflatten_nested_columns(
190 ¤t,
191 &nested_groups,
192 num_rows,
193 total_flat_cols,
194 );
195 }
196 }
197 }
198 _ => {} }
200 }
201
202 current
203}
204
205fn trim_leading_whitespace(data: &[u8]) -> &[u8] {
208 let start = data
209 .iter()
210 .position(|&b| !b.is_ascii_whitespace())
211 .unwrap_or(data.len());
212 &data[start..]
213}
214
215fn starts_with_byte(data: &[u8], byte: u8) -> bool {
216 data.first() == Some(&byte)
217}
218
219fn is_ndjson(data: &[u8]) -> bool {
220 let mut json_lines = 0;
221 let mut total_lines = 0;
222
223 for line in data.split(|&b| b == b'\n') {
224 let trimmed = trim_leading_whitespace(line);
225 if trimmed.is_empty() {
226 continue;
227 }
228 total_lines += 1;
229 if starts_with_byte(trimmed, b'{') {
230 json_lines += 1;
231 }
232 }
233
234 total_lines >= 2 && json_lines as f64 / total_lines as f64 > 0.8
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn detect_json() {
243 assert_eq!(detect_format(b" {\"key\": \"value\"}"), FormatHint::Json);
244 assert_eq!(detect_format(b"[1, 2, 3]"), FormatHint::Json);
245 }
246
247 #[test]
248 fn detect_ndjson() {
249 let data = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
250 assert_eq!(detect_format(data), FormatHint::Ndjson);
251 }
252
253 #[test]
254 fn detect_generic_fallback() {
255 assert_eq!(detect_format(b""), FormatHint::Generic);
256 assert_eq!(detect_format(b"just some random text"), FormatHint::Generic);
257 }
258
259 #[test]
260 fn extension_detection() {
261 assert_eq!(detect_from_extension("test.json"), Some(FormatHint::Json));
262 assert_eq!(
263 detect_from_extension("data.ndjson"),
264 Some(FormatHint::Ndjson)
265 );
266 assert_eq!(detect_from_extension("file.txt"), None);
267 }
268
269 #[test]
270 fn preprocess_json_key_interning() {
271 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25}"#;
272 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
273 assert!(!chain.is_empty(), "should have applied key interning");
274 assert!(
275 preprocessed.len() < data.len(),
276 "preprocessed should be smaller"
277 );
278
279 let restored = reverse_preprocess(&preprocessed, &chain);
281 assert_eq!(restored, data.to_vec());
282 }
283
284 #[test]
285 fn preprocess_ndjson_columnar() {
286 let data = br#"{"ts":"a","val":1}
287{"ts":"b","val":2}
288{"ts":"c","val":3}
289"#;
290 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Balanced);
291 assert!(!chain.is_empty());
292 assert_eq!(
294 chain.records[0].id,
295 transform::TRANSFORM_NDJSON_COLUMNAR,
296 "NDJSON should use columnar transform"
297 );
298
299 let restored = reverse_preprocess(&preprocessed, &chain);
300 assert_eq!(restored, data.to_vec());
301 }
302
303 #[test]
304 fn preprocess_ndjson_columnar_fast_mode() {
305 let data = br#"{"ts":"a","val":1}
307{"ts":"b","val":2}
308{"ts":"c","val":3}
309"#;
310 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
311 assert!(!chain.is_empty());
312 assert_eq!(chain.records[0].id, transform::TRANSFORM_NDJSON_COLUMNAR);
313
314 let restored = reverse_preprocess(&preprocessed, &chain);
315 assert_eq!(restored, data.to_vec());
316
317 let cols: Vec<&[u8]> = preprocessed.split(|&b| b == 0x00).collect();
319 assert_eq!(cols.len(), 2, "should have 2 columns");
320 }
321
322 #[test]
323 fn preprocess_json_array_columnar() {
324 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "b"}, {"id": 3, "type": "c"}, {"id": 4, "type": "d"}, {"id": 5, "type": "e"}], "meta": {"count": 5}}"#;
325 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
326 assert!(!chain.is_empty());
327 assert_eq!(
328 chain.records[0].id,
329 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
330 "JSON with array should use array columnar transform"
331 );
332
333 let restored = reverse_preprocess(&preprocessed, &chain);
334 assert_eq!(restored, data.to_vec());
335 }
336
337 #[test]
338 fn preprocess_json_array_too_few_falls_through() {
339 let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}], "meta": {"count": 3}, "data2": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}]}"#;
341 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
342 if !chain.is_empty() {
344 assert_ne!(
345 chain.records[0].id,
346 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
347 "3 elements should NOT trigger array columnar"
348 );
349 }
350
351 let restored = reverse_preprocess(&preprocessed, &chain);
352 assert_eq!(restored, data.to_vec());
353 }
354
355 #[test]
356 fn preprocess_non_json_passthrough() {
357 let data = b"just some plain text with no JSON keys";
358 let (preprocessed, chain) = preprocess(data, FormatHint::Generic, Mode::Fast);
359 assert!(chain.is_empty());
360 assert_eq!(preprocessed, data.to_vec());
361 }
362
363 #[test]
364 fn test_json_array_nested_flatten_roundtrip() {
365 let mut json = String::from(r#"{"data": ["#);
367 for i in 0..10 {
368 if i > 0 {
369 json.push_str(", ");
370 }
371 json.push_str(&format!(
372 r#"{{"id": {}, "name": "item_{}", "meta": {{"score": {}, "active": {}, "tag": "t{}"}}}}"#,
373 i, i, i * 10, if i % 2 == 0 { "true" } else { "false" }, i
374 ));
375 }
376 json.push_str(r#"], "total": 10}"#);
377
378 let data = json.as_bytes();
379 let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Fast);
380 assert!(!chain.is_empty());
381 assert_eq!(
382 chain.records[0].id,
383 transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
384 "should apply json_array columnar first"
385 );
386
387 let has_nested_flatten = chain
389 .records
390 .iter()
391 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
392 assert!(
393 has_nested_flatten,
394 "should apply nested flatten for objects with nested fields"
395 );
396
397 let restored = reverse_preprocess(&preprocessed, &chain);
399 assert_eq!(
400 String::from_utf8_lossy(&restored),
401 String::from_utf8_lossy(data),
402 );
403 assert_eq!(restored, data.to_vec());
404 }
405
406 #[test]
407 fn test_json_array_nested_flatten_improves_ratio() {
408 let mut json = String::from(r#"{"items": ["#);
411 for i in 0..50 {
412 if i > 0 {
413 json.push_str(", ");
414 }
415 json.push_str(&format!(
416 r#"{{"id": {}, "user": {{"name": "user_{}", "role": "admin", "level": {}, "verified": true, "email": "user_{}@test.com"}}}}"#,
417 i, i, i % 5, i
418 ));
419 }
420 json.push_str(r#"]}"#);
421
422 let data = json.as_bytes();
423
424 let (preprocessed_with, chain_with) = preprocess(data, FormatHint::Json, Mode::Fast);
426 assert!(
427 chain_with
428 .records
429 .iter()
430 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN),
431 "nested flatten should activate"
432 );
433
434 let restored = reverse_preprocess(&preprocessed_with, &chain_with);
436 assert_eq!(restored, data.to_vec());
437
438 let num_cols_with = preprocessed_with.split(|&b| b == 0x00).count();
440 assert!(
443 num_cols_with > 2,
444 "nested flatten should produce more columns: got {}",
445 num_cols_with
446 );
447 }
448
449 #[test]
450 fn test_ndjson_unaffected() {
451 let mut ndjson = String::new();
453 for i in 0..10 {
454 ndjson.push_str(&format!(
455 r#"{{"id":{},"user":{{"name":"u{}","level":{}}}}}"#,
456 i,
457 i,
458 i % 3
459 ));
460 ndjson.push('\n');
461 }
462
463 let data = ndjson.as_bytes();
464 let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
465 assert!(!chain.is_empty());
466 assert_eq!(
467 chain.records[0].id,
468 transform::TRANSFORM_NDJSON_COLUMNAR,
469 "NDJSON should use its own columnar transform"
470 );
471
472 let has_standalone_nested = chain
474 .records
475 .iter()
476 .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
477 assert!(
478 !has_standalone_nested,
479 "NDJSON path should NOT use standalone nested flatten (it handles nesting internally)"
480 );
481
482 let restored = reverse_preprocess(&preprocessed, &chain);
484 assert_eq!(restored, data.to_vec());
485 }
486}