1pub mod errors;
2pub mod stats;
3
4use crate::io_helpers::buf_reader::get_bufreader;
5use crate::json::paths::ValuePaths;
6use crate::json::{Value, ValueType};
7use crate::{io_helpers, Cli, Settings};
8
9use self::errors::collection::{
10 Errors, ErrorsPar, IndexedNDJSONError, IntoEnumeratedErrFiltered, IntoErrFiltered,
11 NDJSONProcessingErrors,
12};
13use self::errors::NDJSONError;
14pub use self::stats::{FileStats, Stats};
15
16use dashmap::DashMap;
17use indicatif::{ProgressBar, ProgressStyle};
18use rayon::iter::ParallelBridge;
19use rayon::prelude::ParallelIterator;
20
21use std::error::Error;
22use std::fs::File;
23use std::io::{self, prelude::*};
24use std::iter::Zip;
25use std::ops::RangeFrom;
26use std::path::PathBuf;
27use std::sync::atomic::{AtomicUsize, Ordering};
28use std::sync::mpsc::Receiver;
29
30type IJSONCandidate = (usize, String);
32type IdJSON = (String, Value);
33type IdJSONIter<'a> = Box<dyn Iterator<Item = IdJSON> + 'a>;
34type NDJSONErrors = Errors<IndexedNDJSONError>;
35type NDJSONErrorsPar = ErrorsPar<IndexedNDJSONError>;
36
37trait Indexed: Iterator {
38 fn indexed(self) -> Zip<RangeFrom<usize>, Self>
39 where
40 Self: Sized,
41 {
42 (1usize..).zip(self)
43 }
44}
45
46impl<T> Indexed for T where T: Iterator {}
47
48pub fn parse_ndjson_receiver<'a>(
54 _args: &Cli,
55 receiver: Receiver<String>,
56 errors: &NDJSONErrors,
57) -> Result<IdJSONIter<'a>, Box<dyn Error>> {
58 let json_iter = receiver
59 .into_iter()
60 .indexed()
61 .map(|(i, json_candidate)| {
62 (
63 i.to_string(),
64 serde_json::from_str::<Value>(&json_candidate),
65 )
66 })
67 .to_err_filtered(errors.new_ref());
68
69 Ok(Box::new(json_iter))
70}
71
72pub fn parse_ndjson_receiver_par<'a>(
78 args: &Cli,
79 receiver: Receiver<String>,
80 errors: &'a NDJSONErrorsPar,
81) -> impl ParallelIterator<Item = IdJSON> + 'a {
82 let receiver = receiver.into_iter().indexed();
83 parse_ndjson_iter_par(args, receiver, errors)
84}
85
86pub fn parse_ndjson_bufreader_par<'a>(
94 args: &Cli,
95 file_path: &PathBuf,
96 errors: &'a NDJSONErrorsPar,
97) -> Result<impl ParallelIterator<Item = IdJSON> + 'a, NDJSONError> {
98 let reader = get_bufreader(args, file_path)?;
99
100 let iter = reader.lines().enumerate();
101 let iter = iter.filter_map(|(i, line)| {
102 let i = i + 1; let io_errors = errors.new_ref();
104 match line {
105 Err(e) => {
106 io_errors.push(IndexedNDJSONError {
107 location: i.to_string(),
108 error: NDJSONError::IOError(e),
109 });
110 None
111 }
112 Ok(json) => Some((i, json)),
113 }
114 });
115
116 Ok(parse_ndjson_iter_par(args, iter, errors))
117}
118
119pub fn parse_ndjson_iter_par<'a>(
126 args: &Cli,
127 iter: impl Iterator<Item = IJSONCandidate> + Send + 'a,
128 errors: &'a NDJSONErrorsPar,
129) -> impl ParallelIterator<Item = IdJSON> + 'a {
130 let iter = iter.take(args.lines.unwrap_or(usize::MAX));
131
132 let json_iter = iter.par_bridge().map(|(i, json_candidate)| {
133 (
134 i.to_string(),
135 serde_json::from_str::<Value>(&json_candidate),
136 )
137 });
138
139 json_iter.filter_map(|(id, json)| {
140 let json_parse_errors = errors.new_ref();
141 match json {
142 Err(e) => {
143 json_parse_errors.push(IndexedNDJSONError {
144 location: id,
145 error: NDJSONError::JSONParsingError(e),
146 });
147 None
148 }
149 Ok(json) => Some((id, json)),
150 }
151 })
152}
153
154pub fn parse_ndjson_bufreader<'a>(
160 _args: &Cli,
161 reader: impl BufRead + 'a,
162 errors: &NDJSONErrors,
163) -> IdJSONIter<'a> {
164 let json_iter = reader.lines();
165
166 let json_iter = json_iter.to_enumerated_err_filtered(errors.new_ref());
167
168 let json_iter = json_iter.map(|(i, json_candidate)| {
169 (
170 i.to_string(),
171 serde_json::from_str::<Value>(&json_candidate),
172 )
173 });
174 let json_iter = json_iter.to_err_filtered(errors.new_ref());
175
176 Box::new(json_iter)
177}
178
179pub fn parse_ndjson_file<'a>(args: &Cli, file: File, errors: &NDJSONErrors) -> IdJSONIter<'a> {
185 let reader = io::BufReader::new(file);
186 parse_ndjson_bufreader(args, reader, errors)
187}
188
189pub fn parse_ndjson_file_path<'a>(
195 args: &Cli,
196 file_path: &PathBuf,
197 errors: &NDJSONErrors,
198) -> Result<IdJSONIter<'a>, NDJSONError> {
199 let reader = get_bufreader(args, file_path)?;
200 Ok(parse_ndjson_bufreader(args, reader, errors))
201}
202
203pub fn expand_jsonpath_query<'a>(
207 settings: &'a Settings,
208 json_iter: impl Iterator<Item = IdJSON> + 'a,
209 errors: &NDJSONErrors,
210) -> IdJSONIter<'a> {
211 let missing = errors.new_ref();
212 let json_iter_out: IdJSONIter<'a>;
213 if let Some(ref selector) = settings.jsonpath_selector {
214 let path = settings.args.jsonpath.to_owned();
215 let path = path.expect("must exist for jsonpath_selector to exist");
216 let expanded = json_iter.flat_map(move |(ref id, ref json)| {
217 let selected = selector.query(json);
218 if selected.is_empty() {
219 missing.push(IndexedNDJSONError::new(
220 id.to_owned(),
221 NDJSONError::EmptyQuery,
222 ))
223 }
224 selected
225 .into_iter()
226 .enumerate()
227 .map(|(i, json)| (format!("{id}:{path}[{i}]"), json.to_owned()))
228 .collect::<Vec<_>>()
229 });
230 json_iter_out = Box::new(expanded);
231 } else {
232 json_iter_out = Box::new(json_iter);
233 }
234 json_iter_out
235}
236
237pub fn expand_jsonpath_query_par<'a>(
241 settings: &'a Settings,
242 json_iter: impl ParallelIterator<Item = IdJSON> + 'a,
243 errors: &NDJSONErrorsPar,
244) -> impl ParallelIterator<Item = IdJSON> + 'a {
245 let missing = errors.new_ref();
246
247 json_iter.flat_map(move |(id, json)| {
248 if let Some(ref selector) = settings.jsonpath_selector {
249 let path = settings.args.jsonpath.to_owned();
250 let path = path.expect("must exist for jsonpath_selector to exist");
251
252 let selected = selector.query(&json);
253 if selected.is_empty() {
254 missing.push(IndexedNDJSONError::new(
255 id.to_owned(),
256 NDJSONError::EmptyQuery,
257 ))
258 }
259 selected
260 .into_iter()
261 .enumerate()
262 .map(|(i, json)| (format!("{id}:{path}[{i}]"), json.to_owned()))
263 .collect::<Vec<_>>()
264 } else {
265 vec![(id, json)]
266 }
267 })
268}
269
270pub fn apply_settings<'a>(
274 settings: &'a Settings,
275 json_iter: impl Iterator<Item = IdJSON> + 'a,
276 errors: &NDJSONErrors,
277) -> IdJSONIter<'a> {
278 let args = &settings.args;
279
280 let json_iter = limit(args, json_iter);
281 expand_jsonpath_query(settings, json_iter, errors)
282}
283
284pub fn apply_settings_par<'a>(
288 settings: &'a Settings,
289 json_iter: impl ParallelIterator<Item = IdJSON> + 'a,
290 errors: &NDJSONErrorsPar,
291) -> impl ParallelIterator<Item = IdJSON> + 'a {
292 expand_jsonpath_query_par(settings, json_iter, errors)
293}
294
295pub fn process_json_iterable(
300 settings: &Settings,
301 json_iter: impl Iterator<Item = IdJSON>,
302 errors: &NDJSONErrors,
303) -> Stats {
304 let mut fs = Stats::new();
305 let args = &settings.args;
306
307 let json_iter = apply_settings(settings, json_iter, errors);
308
309 let spinner = ProgressBar::new_spinner().with_style(
310 ProgressStyle::with_template("{spinner} {elapsed_precise} Lines: {pos:>10}\t{per_sec}\n")
311 .unwrap(),
312 );
313
314 for (_id, json) in json_iter {
315 spinner.inc(1);
316 fs.line_count += 1;
317
318 for value_path in json.value_paths(args.explode_arrays, args.inspect_arrays) {
319 let path = value_path.jsonpath();
320 let counter = fs.keys_count.entry(path.to_owned()).or_insert(0);
321 *counter += 1;
322
323 let type_ = value_path.value.value_type();
324 let path_type = format!("{}::{}", path, type_);
325 let counter = fs.keys_types_count.entry(path_type).or_insert(0);
326 *counter += 1;
327 }
328 }
329 spinner.finish();
330
331 for indexed_error in errors.container.borrow().as_slice() {
332 let IndexedNDJSONError { location, error } = indexed_error;
333 let location = location.to_owned();
334 match error {
335 NDJSONError::JSONParsingError(_) => fs.bad_lines.push(location),
337 NDJSONError::EmptyQuery => fs.empty_lines.push(location),
338 NDJSONError::IOError(_) => fs.bad_lines.push(location),
339 }
340 }
341 fs
342}
343
344pub fn process_json_iterable_par<'a>(
349 settings: &Settings,
350 json_iter: impl ParallelIterator<Item = IdJSON> + 'a,
351 errors: &'a NDJSONErrorsPar,
352) -> Stats {
353 let mut fs = Stats::new();
354 let args = &settings.args;
355
356 let keys_count: DashMap<String, usize> = DashMap::new();
357 let keys_types_count: DashMap<String, usize> = DashMap::new();
358 let line_count = AtomicUsize::new(0);
359
360 let json_iter = apply_settings_par(settings, json_iter, errors);
361
362 let spinner = ProgressBar::new_spinner().with_style(
363 ProgressStyle::with_template("{spinner} {elapsed_precise} Lines: {pos:>10}\t{per_sec}\n")
364 .unwrap(),
365 );
366
367 json_iter.for_each(|(_id, json)| {
368 line_count.fetch_add(1, Ordering::Release);
369
370 for value_path in json.value_paths(args.explode_arrays, args.inspect_arrays) {
371 let path = value_path.jsonpath();
372 let mut counter = keys_count.entry(path.to_owned()).or_insert(0);
373 *counter.value_mut() += 1;
374
375 let type_ = value_path.value.value_type();
376 let path_type = format!("{}::{}", path, type_);
377 let mut counter = keys_types_count.entry(path_type).or_insert(0);
378 *counter.value_mut() += 1;
379 }
380 spinner.inc(1);
381 });
382
383 spinner.finish();
384
385 for indexed_error in errors.container.lock().unwrap().as_slice() {
386 let IndexedNDJSONError { location, error } = indexed_error;
387 let location = location.to_owned();
388 match error {
389 NDJSONError::JSONParsingError(_) => fs.bad_lines.push(location),
390 NDJSONError::EmptyQuery => fs.empty_lines.push(location),
391 NDJSONError::IOError(_) => fs.bad_lines.push(location),
392 }
393 }
394
395 fs.keys_count = keys_count
396 .into_read_only()
397 .iter()
398 .map(|(k, v)| (k.to_owned(), v.to_owned()))
399 .collect();
400 fs.line_count = line_count.load(Ordering::Acquire);
401 fs.keys_types_count = keys_types_count
402 .into_read_only()
403 .iter()
404 .map(|(k, v)| (k.to_owned(), v.to_owned()))
405 .collect();
406 fs
407}
408
409pub fn limit<'a, I, T>(args: &Cli, iter: I) -> Box<dyn Iterator<Item = T> + 'a>
413where
414 I: Iterator<Item = T> + 'a,
415{
416 if let Some(n) = args.lines {
417 Box::new(iter.take(n))
418 } else {
419 Box::new(iter)
420 }
421}
422
423#[deprecated(note = "Superseded by `apply_settings`")]
429pub fn parse_iter<E, I>(args: &Cli, iter: I) -> impl Iterator<Item = Result<String, E>>
430where
431 I: Iterator<Item = Result<String, E>>,
432{
433 if let Some(n) = args.lines {
434 iter.take(n)
435 } else {
436 iter.take(usize::MAX)
437 }
438}
439
440pub struct StatsResult {
441 pub stats: Stats,
442 pub errors: Box<dyn NDJSONProcessingErrors>,
443}
444
445pub trait JSONStats {
446 fn json_stats(self, settings: &Settings) -> Result<StatsResult, NDJSONError>;
447}
448
449impl JSONStats for io::Stdin {
451 fn json_stats(self, settings: &Settings) -> Result<StatsResult, NDJSONError> {
452 let stats;
453 let errors: Box<dyn NDJSONProcessingErrors>;
454 if settings.args.parallel {
455 let stdin = io_helpers::stdin::spawn_stdin_channel(self, 1_000_000);
456 let _errors = ErrorsPar::default();
457 let json_iter = parse_ndjson_receiver_par(&settings.args, stdin, &_errors);
458 stats = process_json_iterable_par(settings, json_iter, &_errors);
459 errors = Box::new(_errors);
460 } else {
461 let stdin = self.lock();
462 let _errors = Errors::default();
463 let json_iter = parse_ndjson_bufreader(&settings.args, stdin, &_errors);
464 stats = process_json_iterable(settings, json_iter, &_errors);
465 errors = Box::new(_errors);
466 }
467 Ok(StatsResult { stats, errors })
468 }
469}
470
471impl JSONStats for &PathBuf {
472 fn json_stats(self, settings: &Settings) -> Result<StatsResult, NDJSONError> {
473 let stats;
474 let errors: Box<dyn NDJSONProcessingErrors>;
475 if settings.args.parallel {
476 let _errors = ErrorsPar::default();
477 let json_iter = parse_ndjson_bufreader_par(&settings.args, self, &_errors)?;
478 stats = process_json_iterable_par(settings, json_iter, &_errors);
479 errors = Box::new(_errors);
480 } else {
481 let _errors = Errors::default();
482 let json_iter = parse_ndjson_file_path(&settings.args, self, &_errors)?;
483 stats = process_json_iterable(settings, json_iter, &_errors);
484 errors = Box::new(_errors);
485 }
486 Ok(StatsResult { stats, errors })
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use crate::json::IndexMap;
493 use serde_json::json;
494
495 use super::*;
496 use std::fs::File;
497 use std::io::{Seek, SeekFrom, Write};
498
499 #[test]
502 fn line_read() {
503 let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
504 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
505 writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
506 tmpfile.seek(SeekFrom::Start(0)).unwrap();
507 let path = tmpfile.path().to_path_buf();
508
509 let args = Cli::default();
510 let buf_reader: Box<dyn BufRead> = get_bufreader(&args, &path).unwrap();
511 let mut indexed = buf_reader.lines().indexed();
512
513 let (i, s) = indexed.next().unwrap();
514 assert_eq!(1, i);
515 assert_eq!(r#"{"key1": 123}"#.to_string(), s.unwrap());
516 let (i, s) = indexed.next().unwrap();
517 assert_eq!(2, i);
518 assert_eq!(r#"{"key2": 123}"#.to_string(), s.unwrap());
519
520 assert!(indexed.next().is_none());
521 }
522
523 #[test]
524 fn simple_json_stats() {
525 let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
526 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
527 writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
528 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
529 tmpfile.seek(SeekFrom::Start(0)).unwrap();
530 let path = tmpfile.path().to_path_buf();
531
532 let expected = StatsResult {
533 stats: Stats {
534 keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
535 line_count: 3,
536 bad_lines: vec![],
537 keys_types_count: IndexMap::from([
538 ("$.key1::Number".to_string(), 2),
539 ("$.key2::Number".to_string(), 1),
540 ]),
541 empty_lines: vec![],
542 },
543 errors: Box::new(Errors::<NDJSONError>::default()),
544 };
545
546 let args = Cli::default();
547 let settings = Settings::init(args).unwrap();
548
549 let actual = path.json_stats(&settings).unwrap();
550 assert_eq!(expected.stats, actual.stats);
551 }
552
553 #[test]
554 fn simple_json_stats_par() {
555 let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
556 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
557 writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
558 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
559 tmpfile.seek(SeekFrom::Start(0)).unwrap();
560 let path = tmpfile.path().to_path_buf();
561
562 let expected = StatsResult {
563 stats: Stats {
564 keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
565 line_count: 3,
566 bad_lines: vec![],
567 keys_types_count: IndexMap::from([
568 ("$.key1::Number".to_string(), 2),
569 ("$.key2::Number".to_string(), 1),
570 ]),
571 empty_lines: vec![],
572 },
573 errors: Box::new(Errors::<NDJSONErrorsPar>::default()),
574 };
575
576 let mut args = Cli::default();
577 args.parallel = true;
578 let settings = Settings::init(args).unwrap();
579
580 let actual = path.json_stats(&settings).unwrap();
581 assert_eq!(expected.stats, actual.stats);
582 }
583
584 #[test]
585 fn simple_ndjson() {
586 let mut tmpfile: File = tempfile::tempfile().unwrap();
587 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
588 writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
589 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
590 tmpfile.seek(SeekFrom::Start(0)).unwrap();
591 let reader = io::BufReader::new(tmpfile);
592
593 let expected: Vec<IdJSON> = vec![
594 (1.to_string(), json!({"key1": 123})),
595 (2.to_string(), json!({"key2": 123})),
596 (3.to_string(), json!({"key1": 123})),
597 ];
598
599 let args = Cli::default();
600 let errors = Errors::default();
601
602 let json_iter = parse_ndjson_bufreader(&args, reader, &errors);
603 assert_eq!(expected, json_iter.collect::<Vec<IdJSON>>());
604 assert!(errors.container.borrow().is_empty())
605 }
606
607 #[test]
608 fn bad_ndjson_file() {
609 let mut tmpfile: File = tempfile::tempfile().unwrap();
610 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
611 writeln!(tmpfile, r#"not valid json"#).unwrap();
612 writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
613 tmpfile.seek(SeekFrom::Start(0)).unwrap();
614 let reader = io::BufReader::new(tmpfile);
615
616 let expected: Vec<IdJSON> = vec![
617 (1.to_string(), json!({"key1": 123})),
618 (3.to_string(), json!({"key1": 123})),
619 ];
620
621 let args = Cli::default();
622 let errors = Errors::default();
623
624 let json_iter = parse_ndjson_bufreader(&args, reader, &errors);
625 assert_eq!(expected, json_iter.collect::<Vec<IdJSON>>());
626 assert!(errors.container.borrow().len() == 1)
627 }
628
629 #[test]
630 fn simple_expand_jsonpath_query() {
631 let json_iter_in: Vec<IdJSON> = vec![
632 (1.to_string(), json!({"key1": [1, 2, 3]})),
633 (2.to_string(), json!({"key2": 123})),
634 (3.to_string(), json!({"key1": [4, 5]})),
635 ];
636 let json_iter_in = json_iter_in.iter().cloned();
637
638 let mut args = Cli::default();
639 args.jsonpath = Some("$.key1[*]".to_string());
640 let settings = Settings::init(args).unwrap();
641 let errors = Errors::default();
642
643 let expected: Vec<IdJSON> = vec![
644 ("1:$.key1[*][0]".to_string(), json!(1)),
645 ("1:$.key1[*][1]".to_string(), json!(2)),
646 ("1:$.key1[*][2]".to_string(), json!(3)),
647 ("3:$.key1[*][0]".to_string(), json!(4)),
648 ("3:$.key1[*][1]".to_string(), json!(5)),
649 ];
650
651 let json_iter = expand_jsonpath_query(&settings, json_iter_in, &errors);
652 assert_eq!(expected, json_iter.collect::<Vec<IdJSON>>());
653 assert!(errors.container.borrow().len() == 1)
654 }
655
656 #[test]
657 fn simple_process_json_iterable() {
658 let json_iter_in: Vec<IdJSON> = vec![
659 (1.to_string(), json!({"key1": 123})),
660 (2.to_string(), json!({"key2": 123})),
661 (3.to_string(), json!({"key1": 123})),
662 ];
663 let json_iter_in = json_iter_in.iter().cloned();
664
665 let args = Cli::default();
666 let settings = Settings::init(args).unwrap();
667 let errors = Errors::default();
668
669 let expected = Stats {
670 keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
671 line_count: 3,
672 keys_types_count: IndexMap::from([
673 ("$.key1::Number".to_string(), 2),
674 ("$.key2::Number".to_string(), 1),
675 ]),
676 ..Default::default()
677 };
678
679 let stats = process_json_iterable(&settings, json_iter_in, &errors);
680 assert_eq!(expected, stats);
681 assert!(errors.container.borrow().is_empty())
682 }
683
684 #[test]
685 fn bad_process_json_iterable_path_query() {
686 let json_iter_in: Vec<IdJSON> = vec![
687 (1.to_string(), json!({"key1": 123})),
688 (2.to_string(), json!({"key2": 123})),
689 (3.to_string(), json!({"key1": 123})),
690 ];
691 let json_iter_in = json_iter_in.iter().cloned();
692
693 let mut args = Cli::default();
694 args.jsonpath = Some("$.key1".to_string());
695 let settings = Settings::init(args).unwrap();
696 let errors = Errors::default();
697
698 let expected = Stats {
699 keys_count: IndexMap::from([("$".to_string(), 2)]),
700 line_count: 2,
701 keys_types_count: IndexMap::from([("$::Number".to_string(), 2)]),
702 empty_lines: vec![2.to_string()],
703 ..Default::default()
704 };
705
706 let stats = process_json_iterable(&settings, json_iter_in, &errors);
707 assert_eq!(expected, stats);
708 assert!(errors.container.borrow().len() == 1)
709 }
710
711 #[test]
712 fn simple_process_json_iterable_par() {
713 let iter: Vec<(String, Value)> = vec![
714 (1.to_string(), json!({"key1": 123})),
715 (2.to_string(), json!({"key2": 123})),
716 (3.to_string(), json!({"key1": 123})),
717 ];
718 let iter = iter.into_iter().par_bridge();
719
720 let expected = Stats {
721 keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
722 line_count: 3,
723 keys_types_count: IndexMap::from([
724 ("$.key1::Number".to_string(), 2),
725 ("$.key2::Number".to_string(), 1),
726 ]),
727 ..Default::default()
728 };
729
730 let args = Cli::default();
731 let settings = Settings::init(args).unwrap();
732 let errors = ErrorsPar::default();
733 let stats = process_json_iterable_par(&settings, iter, &errors);
734 assert_eq!(expected, stats);
735 }
736
737 #[test]
738 fn simple_process_json_iterable_par_jsonpath() {
739 let iter: Vec<(String, Value)> = vec![
740 (1.to_string(), json!({"key1": 123})),
741 (2.to_string(), json!({"a": {"key2": 123}})),
742 (3.to_string(), json!({"key1": 123})),
743 ];
744 let iter = iter.into_iter().par_bridge();
745
746 let expected = Stats {
747 keys_count: IndexMap::from([("$.key2".to_string(), 1)]),
748 line_count: 1,
749 keys_types_count: IndexMap::from([("$.key2::Number".to_string(), 1)]),
750 empty_lines: vec![1.to_string(), 3.to_string()],
751 ..Default::default()
752 };
753
754 let mut args = Cli::default();
755 args.jsonpath = Some("$.a".to_string());
756 let settings = Settings::init(args).unwrap();
757 let errors = ErrorsPar::default();
758 let stats = process_json_iterable_par(&settings, iter, &errors);
759 assert_eq!(expected, stats);
760 }
761
762 #[test]
763 fn add_filestats() {
764 let lhs = stats::FileStats {
765 file_path: "file/1.json".to_string(),
766 stats: Stats {
767 keys_count: IndexMap::from([("$.key1".to_string(), 3), ("$.key2".to_string(), 2)]),
768 line_count: 5,
769 keys_types_count: IndexMap::from([
770 ("$.key1::Number".to_string(), 3),
771 ("$.key2::Number".to_string(), 2),
772 ]),
773 bad_lines: vec!["4".to_string()],
774 empty_lines: vec!["5".to_string()],
775 },
776 };
777 let rhs = stats::FileStats {
778 file_path: "file/2.json".to_string(),
779 stats: Stats {
780 keys_count: IndexMap::from([("$.key3".to_string(), 3), ("$.key2".to_string(), 2)]),
781 line_count: 7,
782 keys_types_count: IndexMap::from([
783 ("$.key3::Number".to_string(), 3),
784 ("$.key2::Number".to_string(), 2),
785 ]),
786 bad_lines: vec!["1".to_string()],
787 empty_lines: vec!["2".to_string()],
788 },
789 };
790 let expected = Stats {
791 keys_count: IndexMap::from([
792 ("$.key1".to_string(), 3),
793 ("$.key2".to_string(), 4),
794 ("$.key3".to_string(), 3),
795 ]),
796 line_count: 12,
797 keys_types_count: IndexMap::from([
798 ("$.key1::Number".to_string(), 3),
799 ("$.key2::Number".to_string(), 4),
800 ("$.key3::Number".to_string(), 3),
801 ]),
802 bad_lines: vec!["file/1.json:4".to_string(), "file/2.json:1".to_string()],
803 empty_lines: vec!["file/1.json:5".to_string(), "file/2.json:2".to_string()],
804 };
805
806 let vec_of_file_stats = vec![lhs.clone(), rhs.clone()];
807 let actual_ref = lhs.clone() + &rhs;
808 let actual = lhs + rhs;
809
810 assert_eq!(actual, expected);
811 assert_eq!(actual_ref, expected);
812 assert_eq!(vec_of_file_stats.iter().sum::<Stats>(), expected);
813 }
814}