analyse_json/json/
ndjson.rs

1pub mod errors;
2pub mod stats;
3
4use crate::io_helpers::buf_reader::get_bufreader;
5use crate::json::paths::ValuePaths;
6use crate::json::{Value, ValueType};
7use crate::{io_helpers, Cli, Settings};
8
9use self::errors::collection::{
10    Errors, ErrorsPar, IndexedNDJSONError, IntoEnumeratedErrFiltered, IntoErrFiltered,
11    NDJSONProcessingErrors,
12};
13use self::errors::NDJSONError;
14pub use self::stats::{FileStats, Stats};
15
16use dashmap::DashMap;
17use indicatif::{ProgressBar, ProgressStyle};
18use rayon::iter::ParallelBridge;
19use rayon::prelude::ParallelIterator;
20
21use std::error::Error;
22use std::fs::File;
23use std::io::{self, prelude::*};
24use std::iter::Zip;
25use std::ops::RangeFrom;
26use std::path::PathBuf;
27use std::sync::atomic::{AtomicUsize, Ordering};
28use std::sync::mpsc::Receiver;
29
30// Reusable types for function signatures
31type IJSONCandidate = (usize, String);
32type IdJSON = (String, Value);
33type IdJSONIter<'a> = Box<dyn Iterator<Item = IdJSON> + 'a>;
34type NDJSONErrors = Errors<IndexedNDJSONError>;
35type NDJSONErrorsPar = ErrorsPar<IndexedNDJSONError>;
36
37trait Indexed: Iterator {
38    fn indexed(self) -> Zip<RangeFrom<usize>, Self>
39    where
40        Self: Sized,
41    {
42        (1usize..).zip(self)
43    }
44}
45
46impl<T> Indexed for T where T: Iterator {}
47
48// TODO: add or switch to method on `Receiver<String>`?
49/// Indexes data from the mpsc channel, converts it to serde JSON `Value`s and filters out data that does not
50/// parse as JSON to the `errors` container. Single threaded.
51///
52/// See also: [`parse_ndjson_receiver_par`]
53pub fn parse_ndjson_receiver<'a>(
54    _args: &Cli,
55    receiver: Receiver<String>,
56    errors: &NDJSONErrors,
57) -> Result<IdJSONIter<'a>, Box<dyn Error>> {
58    let json_iter = receiver
59        .into_iter()
60        .indexed()
61        .map(|(i, json_candidate)| {
62            (
63                i.to_string(),
64                serde_json::from_str::<Value>(&json_candidate),
65            )
66        })
67        .to_err_filtered(errors.new_ref());
68
69    Ok(Box::new(json_iter))
70}
71
72// TODO: add or switch to method on `Receiver<String>`?
73/// Indexes data from the mpsc channel, converts it to serde JSON `Value`s and filters out data that does not
74/// parse as JSON to the `errors` container. Multithreaded version of [`parse_ndjson_receiver`].
75///
76/// See also: [`parse_ndjson_receiver`], [`parse_ndjson_bufreader_par`] & [`parse_ndjson_iter_par`]
77pub fn parse_ndjson_receiver_par<'a>(
78    args: &Cli,
79    receiver: Receiver<String>,
80    errors: &'a NDJSONErrorsPar,
81) -> impl ParallelIterator<Item = IdJSON> + 'a {
82    let receiver = receiver.into_iter().indexed();
83    parse_ndjson_iter_par(args, receiver, errors)
84}
85
86// TODO: rename or switch function args?
87// TODO: add or switch to method on `&PathBuf`?
88/// Indexes data from the file_path with a bufreader, converts it to serde JSON `Value`s
89/// and filters out data that does not
90/// parse as JSON to the `errors` container. Multithreaded version of [`parse_ndjson_bufreader`].
91///
92/// See also: [`parse_ndjson_bufreader`], [`parse_ndjson_receiver_par`] & [`parse_ndjson_iter_par`]
93pub fn parse_ndjson_bufreader_par<'a>(
94    args: &Cli,
95    file_path: &PathBuf,
96    errors: &'a NDJSONErrorsPar,
97) -> Result<impl ParallelIterator<Item = IdJSON> + 'a, NDJSONError> {
98    let reader = get_bufreader(args, file_path)?;
99
100    let iter = reader.lines().enumerate();
101    let iter = iter.filter_map(|(i, line)| {
102        let i = i + 1; // count lines from 1
103        let io_errors = errors.new_ref();
104        match line {
105            Err(e) => {
106                io_errors.push(IndexedNDJSONError {
107                    location: i.to_string(),
108                    error: NDJSONError::IOError(e),
109                });
110                None
111            }
112            Ok(json) => Some((i, json)),
113        }
114    });
115
116    Ok(parse_ndjson_iter_par(args, iter, errors))
117}
118
119// https://github.com/rayon-rs/rayon/issues/628
120// https://users.rust-lang.org/t/how-to-wrap-a-non-object-safe-trait-in-an-object-safe-one/33904
121/// Processes indexed data from the Iterator, converts it to serde JSON `Value`s
122/// and filters out data that does not parse as JSON to the `errors` container.
123///
124/// See also: [`parse_ndjson_receiver_par`] & [`parse_ndjson_bufreader_par`]
125pub fn parse_ndjson_iter_par<'a>(
126    args: &Cli,
127    iter: impl Iterator<Item = IJSONCandidate> + Send + 'a,
128    errors: &'a NDJSONErrorsPar,
129) -> impl ParallelIterator<Item = IdJSON> + 'a {
130    let iter = iter.take(args.lines.unwrap_or(usize::MAX));
131
132    let json_iter = iter.par_bridge().map(|(i, json_candidate)| {
133        (
134            i.to_string(),
135            serde_json::from_str::<Value>(&json_candidate),
136        )
137    });
138
139    json_iter.filter_map(|(id, json)| {
140        let json_parse_errors = errors.new_ref();
141        match json {
142            Err(e) => {
143                json_parse_errors.push(IndexedNDJSONError {
144                    location: id,
145                    error: NDJSONError::JSONParsingError(e),
146                });
147                None
148            }
149            Ok(json) => Some((id, json)),
150        }
151    })
152}
153
154/// Indexes data from the bufreader, converts it to serde JSON `Value`s
155/// and filters out data that does not
156/// parse as JSON to the `errors` container. Single threaded version of [`parse_ndjson_bufreader_par`].
157///
158/// See also: [`parse_ndjson_bufreader_par`], [`parse_ndjson_file`], [`parse_ndjson_file_path`] & [`parse_ndjson_receiver`]
159pub fn parse_ndjson_bufreader<'a>(
160    _args: &Cli,
161    reader: impl BufRead + 'a,
162    errors: &NDJSONErrors,
163) -> IdJSONIter<'a> {
164    let json_iter = reader.lines();
165
166    let json_iter = json_iter.to_enumerated_err_filtered(errors.new_ref());
167
168    let json_iter = json_iter.map(|(i, json_candidate)| {
169        (
170            i.to_string(),
171            serde_json::from_str::<Value>(&json_candidate),
172        )
173    });
174    let json_iter = json_iter.to_err_filtered(errors.new_ref());
175
176    Box::new(json_iter)
177}
178
179/// Indexes data from the file, converts it to serde JSON `Value`s
180/// and filters out data that does not
181/// parse as JSON to the `errors` container. Single threaded.
182///
183/// See also: [`parse_ndjson_bufreader`], [`parse_ndjson_file_path`] & [`parse_ndjson_receiver`]
184pub fn parse_ndjson_file<'a>(args: &Cli, file: File, errors: &NDJSONErrors) -> IdJSONIter<'a> {
185    let reader = io::BufReader::new(file);
186    parse_ndjson_bufreader(args, reader, errors)
187}
188
189/// Indexes data from the file_path, converts it to serde JSON `Value`s
190/// and filters out data that does not
191/// parse as JSON to the `errors` container. Single threaded.
192///
193/// See also: [`parse_ndjson_bufreader`], [`parse_ndjson_file`] & [`parse_ndjson_receiver`]
194pub fn parse_ndjson_file_path<'a>(
195    args: &Cli,
196    file_path: &PathBuf,
197    errors: &NDJSONErrors,
198) -> Result<IdJSONIter<'a>, NDJSONError> {
199    let reader = get_bufreader(args, file_path)?;
200    Ok(parse_ndjson_bufreader(args, reader, errors))
201}
202
203/// Handles the jsonpath query expansion of the Iterators values. Single threaded
204///
205/// See also [`expand_jsonpath_query_par`]
206pub fn expand_jsonpath_query<'a>(
207    settings: &'a Settings,
208    json_iter: impl Iterator<Item = IdJSON> + 'a,
209    errors: &NDJSONErrors,
210) -> IdJSONIter<'a> {
211    let missing = errors.new_ref();
212    let json_iter_out: IdJSONIter<'a>;
213    if let Some(ref selector) = settings.jsonpath_selector {
214        let path = settings.args.jsonpath.to_owned();
215        let path = path.expect("must exist for jsonpath_selector to exist");
216        let expanded = json_iter.flat_map(move |(ref id, ref json)| {
217            let selected = selector.query(json);
218            if selected.is_empty() {
219                missing.push(IndexedNDJSONError::new(
220                    id.to_owned(),
221                    NDJSONError::EmptyQuery,
222                ))
223            }
224            selected
225                .into_iter()
226                .enumerate()
227                .map(|(i, json)| (format!("{id}:{path}[{i}]"), json.to_owned()))
228                .collect::<Vec<_>>()
229        });
230        json_iter_out = Box::new(expanded);
231    } else {
232        json_iter_out = Box::new(json_iter);
233    }
234    json_iter_out
235}
236
237/// Handles the jsonpath query expansion of the Iterators values. Multi-threaded.
238///
239/// See also [`expand_jsonpath_query`]
240pub fn expand_jsonpath_query_par<'a>(
241    settings: &'a Settings,
242    json_iter: impl ParallelIterator<Item = IdJSON> + 'a,
243    errors: &NDJSONErrorsPar,
244) -> impl ParallelIterator<Item = IdJSON> + 'a {
245    let missing = errors.new_ref();
246
247    json_iter.flat_map(move |(id, json)| {
248        if let Some(ref selector) = settings.jsonpath_selector {
249            let path = settings.args.jsonpath.to_owned();
250            let path = path.expect("must exist for jsonpath_selector to exist");
251
252            let selected = selector.query(&json);
253            if selected.is_empty() {
254                missing.push(IndexedNDJSONError::new(
255                    id.to_owned(),
256                    NDJSONError::EmptyQuery,
257                ))
258            }
259            selected
260                .into_iter()
261                .enumerate()
262                .map(|(i, json)| (format!("{id}:{path}[{i}]"), json.to_owned()))
263                .collect::<Vec<_>>()
264        } else {
265            vec![(id, json)]
266        }
267    })
268}
269
270/// Apply pre-processing based on settings from CLI args. Single threaded.
271///
272/// See also [`apply_settings_par`]
273pub fn apply_settings<'a>(
274    settings: &'a Settings,
275    json_iter: impl Iterator<Item = IdJSON> + 'a,
276    errors: &NDJSONErrors,
277) -> IdJSONIter<'a> {
278    let args = &settings.args;
279
280    let json_iter = limit(args, json_iter);
281    expand_jsonpath_query(settings, json_iter, errors)
282}
283
284/// Apply pre-processing based on settings from CLI args. Multi-threaded.
285///
286/// See also [`apply_settings`]
287pub fn apply_settings_par<'a>(
288    settings: &'a Settings,
289    json_iter: impl ParallelIterator<Item = IdJSON> + 'a,
290    errors: &NDJSONErrorsPar,
291) -> impl ParallelIterator<Item = IdJSON> + 'a {
292    expand_jsonpath_query_par(settings, json_iter, errors)
293}
294
295/// Main function processing the JSON data, collecting key infomation about the content.
296/// Single threaded.
297///
298/// See also [`process_json_iterable_par`]
299pub fn process_json_iterable(
300    settings: &Settings,
301    json_iter: impl Iterator<Item = IdJSON>,
302    errors: &NDJSONErrors,
303) -> Stats {
304    let mut fs = Stats::new();
305    let args = &settings.args;
306
307    let json_iter = apply_settings(settings, json_iter, errors);
308
309    let spinner = ProgressBar::new_spinner().with_style(
310        ProgressStyle::with_template("{spinner} {elapsed_precise} Lines: {pos:>10}\t{per_sec}\n")
311            .unwrap(),
312    );
313
314    for (_id, json) in json_iter {
315        spinner.inc(1);
316        fs.line_count += 1;
317
318        for value_path in json.value_paths(args.explode_arrays, args.inspect_arrays) {
319            let path = value_path.jsonpath();
320            let counter = fs.keys_count.entry(path.to_owned()).or_insert(0);
321            *counter += 1;
322
323            let type_ = value_path.value.value_type();
324            let path_type = format!("{}::{}", path, type_);
325            let counter = fs.keys_types_count.entry(path_type).or_insert(0);
326            *counter += 1;
327        }
328    }
329    spinner.finish();
330
331    for indexed_error in errors.container.borrow().as_slice() {
332        let IndexedNDJSONError { location, error } = indexed_error;
333        let location = location.to_owned();
334        match error {
335            // TODO: use or syntax here?
336            NDJSONError::JSONParsingError(_) => fs.bad_lines.push(location),
337            NDJSONError::EmptyQuery => fs.empty_lines.push(location),
338            NDJSONError::IOError(_) => fs.bad_lines.push(location),
339        }
340    }
341    fs
342}
343
344/// Main function processing the JSON data, collecting key infomation about the content.
345/// Mulit-threaded version of [`process_json_iterable`].
346///
347/// See also [`process_json_iterable_par`]
348pub fn process_json_iterable_par<'a>(
349    settings: &Settings,
350    json_iter: impl ParallelIterator<Item = IdJSON> + 'a,
351    errors: &'a NDJSONErrorsPar,
352) -> Stats {
353    let mut fs = Stats::new();
354    let args = &settings.args;
355
356    let keys_count: DashMap<String, usize> = DashMap::new();
357    let keys_types_count: DashMap<String, usize> = DashMap::new();
358    let line_count = AtomicUsize::new(0);
359
360    let json_iter = apply_settings_par(settings, json_iter, errors);
361
362    let spinner = ProgressBar::new_spinner().with_style(
363        ProgressStyle::with_template("{spinner} {elapsed_precise} Lines: {pos:>10}\t{per_sec}\n")
364            .unwrap(),
365    );
366
367    json_iter.for_each(|(_id, json)| {
368        line_count.fetch_add(1, Ordering::Release);
369
370        for value_path in json.value_paths(args.explode_arrays, args.inspect_arrays) {
371            let path = value_path.jsonpath();
372            let mut counter = keys_count.entry(path.to_owned()).or_insert(0);
373            *counter.value_mut() += 1;
374
375            let type_ = value_path.value.value_type();
376            let path_type = format!("{}::{}", path, type_);
377            let mut counter = keys_types_count.entry(path_type).or_insert(0);
378            *counter.value_mut() += 1;
379        }
380        spinner.inc(1);
381    });
382
383    spinner.finish();
384
385    for indexed_error in errors.container.lock().unwrap().as_slice() {
386        let IndexedNDJSONError { location, error } = indexed_error;
387        let location = location.to_owned();
388        match error {
389            NDJSONError::JSONParsingError(_) => fs.bad_lines.push(location),
390            NDJSONError::EmptyQuery => fs.empty_lines.push(location),
391            NDJSONError::IOError(_) => fs.bad_lines.push(location),
392        }
393    }
394
395    fs.keys_count = keys_count
396        .into_read_only()
397        .iter()
398        .map(|(k, v)| (k.to_owned(), v.to_owned()))
399        .collect();
400    fs.line_count = line_count.load(Ordering::Acquire);
401    fs.keys_types_count = keys_types_count
402        .into_read_only()
403        .iter()
404        .map(|(k, v)| (k.to_owned(), v.to_owned()))
405        .collect();
406    fs
407}
408
409/// Apply line limiting from the arg to the Iterator
410///
411/// See also [`parse_iter`]
412pub fn limit<'a, I, T>(args: &Cli, iter: I) -> Box<dyn Iterator<Item = T> + 'a>
413where
414    I: Iterator<Item = T> + 'a,
415{
416    if let Some(n) = args.lines {
417        Box::new(iter.take(n))
418    } else {
419        Box::new(iter)
420    }
421}
422
423// TODO: Rename?
424/// Early version of [`apply_settings`], kept as an example of alternative version of
425/// [`limit`] that could be used without the need to `Box` the return value
426///
427/// See also [`limit`]
428#[deprecated(note = "Superseded by `apply_settings`")]
429pub fn parse_iter<E, I>(args: &Cli, iter: I) -> impl Iterator<Item = Result<String, E>>
430where
431    I: Iterator<Item = Result<String, E>>,
432{
433    if let Some(n) = args.lines {
434        iter.take(n)
435    } else {
436        iter.take(usize::MAX)
437    }
438}
439
440pub struct StatsResult {
441    pub stats: Stats,
442    pub errors: Box<dyn NDJSONProcessingErrors>,
443}
444
445pub trait JSONStats {
446    fn json_stats(self, settings: &Settings) -> Result<StatsResult, NDJSONError>;
447}
448
449// TODO: Add tests
450impl JSONStats for io::Stdin {
451    fn json_stats(self, settings: &Settings) -> Result<StatsResult, NDJSONError> {
452        let stats;
453        let errors: Box<dyn NDJSONProcessingErrors>;
454        if settings.args.parallel {
455            let stdin = io_helpers::stdin::spawn_stdin_channel(self, 1_000_000);
456            let _errors = ErrorsPar::default();
457            let json_iter = parse_ndjson_receiver_par(&settings.args, stdin, &_errors);
458            stats = process_json_iterable_par(settings, json_iter, &_errors);
459            errors = Box::new(_errors);
460        } else {
461            let stdin = self.lock();
462            let _errors = Errors::default();
463            let json_iter = parse_ndjson_bufreader(&settings.args, stdin, &_errors);
464            stats = process_json_iterable(settings, json_iter, &_errors);
465            errors = Box::new(_errors);
466        }
467        Ok(StatsResult { stats, errors })
468    }
469}
470
471impl JSONStats for &PathBuf {
472    fn json_stats(self, settings: &Settings) -> Result<StatsResult, NDJSONError> {
473        let stats;
474        let errors: Box<dyn NDJSONProcessingErrors>;
475        if settings.args.parallel {
476            let _errors = ErrorsPar::default();
477            let json_iter = parse_ndjson_bufreader_par(&settings.args, self, &_errors)?;
478            stats = process_json_iterable_par(settings, json_iter, &_errors);
479            errors = Box::new(_errors);
480        } else {
481            let _errors = Errors::default();
482            let json_iter = parse_ndjson_file_path(&settings.args, self, &_errors)?;
483            stats = process_json_iterable(settings, json_iter, &_errors);
484            errors = Box::new(_errors);
485        }
486        Ok(StatsResult { stats, errors })
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use crate::json::IndexMap;
493    use serde_json::json;
494
495    use super::*;
496    use std::fs::File;
497    use std::io::{Seek, SeekFrom, Write};
498
499    // TODO: How to test stdin?
500
501    #[test]
502    fn line_read() {
503        let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
504        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
505        writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
506        tmpfile.seek(SeekFrom::Start(0)).unwrap();
507        let path = tmpfile.path().to_path_buf();
508
509        let args = Cli::default();
510        let buf_reader: Box<dyn BufRead> = get_bufreader(&args, &path).unwrap();
511        let mut indexed = buf_reader.lines().indexed();
512
513        let (i, s) = indexed.next().unwrap();
514        assert_eq!(1, i);
515        assert_eq!(r#"{"key1": 123}"#.to_string(), s.unwrap());
516        let (i, s) = indexed.next().unwrap();
517        assert_eq!(2, i);
518        assert_eq!(r#"{"key2": 123}"#.to_string(), s.unwrap());
519
520        assert!(indexed.next().is_none());
521    }
522
523    #[test]
524    fn simple_json_stats() {
525        let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
526        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
527        writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
528        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
529        tmpfile.seek(SeekFrom::Start(0)).unwrap();
530        let path = tmpfile.path().to_path_buf();
531
532        let expected = StatsResult {
533            stats: Stats {
534                keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
535                line_count: 3,
536                bad_lines: vec![],
537                keys_types_count: IndexMap::from([
538                    ("$.key1::Number".to_string(), 2),
539                    ("$.key2::Number".to_string(), 1),
540                ]),
541                empty_lines: vec![],
542            },
543            errors: Box::new(Errors::<NDJSONError>::default()),
544        };
545
546        let args = Cli::default();
547        let settings = Settings::init(args).unwrap();
548
549        let actual = path.json_stats(&settings).unwrap();
550        assert_eq!(expected.stats, actual.stats);
551    }
552
553    #[test]
554    fn simple_json_stats_par() {
555        let mut tmpfile = tempfile::NamedTempFile::new().unwrap();
556        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
557        writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
558        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
559        tmpfile.seek(SeekFrom::Start(0)).unwrap();
560        let path = tmpfile.path().to_path_buf();
561
562        let expected = StatsResult {
563            stats: Stats {
564                keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
565                line_count: 3,
566                bad_lines: vec![],
567                keys_types_count: IndexMap::from([
568                    ("$.key1::Number".to_string(), 2),
569                    ("$.key2::Number".to_string(), 1),
570                ]),
571                empty_lines: vec![],
572            },
573            errors: Box::new(Errors::<NDJSONErrorsPar>::default()),
574        };
575
576        let mut args = Cli::default();
577        args.parallel = true;
578        let settings = Settings::init(args).unwrap();
579
580        let actual = path.json_stats(&settings).unwrap();
581        assert_eq!(expected.stats, actual.stats);
582    }
583
584    #[test]
585    fn simple_ndjson() {
586        let mut tmpfile: File = tempfile::tempfile().unwrap();
587        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
588        writeln!(tmpfile, r#"{{"key2": 123}}"#).unwrap();
589        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
590        tmpfile.seek(SeekFrom::Start(0)).unwrap();
591        let reader = io::BufReader::new(tmpfile);
592
593        let expected: Vec<IdJSON> = vec![
594            (1.to_string(), json!({"key1": 123})),
595            (2.to_string(), json!({"key2": 123})),
596            (3.to_string(), json!({"key1": 123})),
597        ];
598
599        let args = Cli::default();
600        let errors = Errors::default();
601
602        let json_iter = parse_ndjson_bufreader(&args, reader, &errors);
603        assert_eq!(expected, json_iter.collect::<Vec<IdJSON>>());
604        assert!(errors.container.borrow().is_empty())
605    }
606
607    #[test]
608    fn bad_ndjson_file() {
609        let mut tmpfile: File = tempfile::tempfile().unwrap();
610        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
611        writeln!(tmpfile, r#"not valid json"#).unwrap();
612        writeln!(tmpfile, r#"{{"key1": 123}}"#).unwrap();
613        tmpfile.seek(SeekFrom::Start(0)).unwrap();
614        let reader = io::BufReader::new(tmpfile);
615
616        let expected: Vec<IdJSON> = vec![
617            (1.to_string(), json!({"key1": 123})),
618            (3.to_string(), json!({"key1": 123})),
619        ];
620
621        let args = Cli::default();
622        let errors = Errors::default();
623
624        let json_iter = parse_ndjson_bufreader(&args, reader, &errors);
625        assert_eq!(expected, json_iter.collect::<Vec<IdJSON>>());
626        assert!(errors.container.borrow().len() == 1)
627    }
628
629    #[test]
630    fn simple_expand_jsonpath_query() {
631        let json_iter_in: Vec<IdJSON> = vec![
632            (1.to_string(), json!({"key1": [1, 2, 3]})),
633            (2.to_string(), json!({"key2": 123})),
634            (3.to_string(), json!({"key1": [4, 5]})),
635        ];
636        let json_iter_in = json_iter_in.iter().cloned();
637
638        let mut args = Cli::default();
639        args.jsonpath = Some("$.key1[*]".to_string());
640        let settings = Settings::init(args).unwrap();
641        let errors = Errors::default();
642
643        let expected: Vec<IdJSON> = vec![
644            ("1:$.key1[*][0]".to_string(), json!(1)),
645            ("1:$.key1[*][1]".to_string(), json!(2)),
646            ("1:$.key1[*][2]".to_string(), json!(3)),
647            ("3:$.key1[*][0]".to_string(), json!(4)),
648            ("3:$.key1[*][1]".to_string(), json!(5)),
649        ];
650
651        let json_iter = expand_jsonpath_query(&settings, json_iter_in, &errors);
652        assert_eq!(expected, json_iter.collect::<Vec<IdJSON>>());
653        assert!(errors.container.borrow().len() == 1)
654    }
655
656    #[test]
657    fn simple_process_json_iterable() {
658        let json_iter_in: Vec<IdJSON> = vec![
659            (1.to_string(), json!({"key1": 123})),
660            (2.to_string(), json!({"key2": 123})),
661            (3.to_string(), json!({"key1": 123})),
662        ];
663        let json_iter_in = json_iter_in.iter().cloned();
664
665        let args = Cli::default();
666        let settings = Settings::init(args).unwrap();
667        let errors = Errors::default();
668
669        let expected = Stats {
670            keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
671            line_count: 3,
672            keys_types_count: IndexMap::from([
673                ("$.key1::Number".to_string(), 2),
674                ("$.key2::Number".to_string(), 1),
675            ]),
676            ..Default::default()
677        };
678
679        let stats = process_json_iterable(&settings, json_iter_in, &errors);
680        assert_eq!(expected, stats);
681        assert!(errors.container.borrow().is_empty())
682    }
683
684    #[test]
685    fn bad_process_json_iterable_path_query() {
686        let json_iter_in: Vec<IdJSON> = vec![
687            (1.to_string(), json!({"key1": 123})),
688            (2.to_string(), json!({"key2": 123})),
689            (3.to_string(), json!({"key1": 123})),
690        ];
691        let json_iter_in = json_iter_in.iter().cloned();
692
693        let mut args = Cli::default();
694        args.jsonpath = Some("$.key1".to_string());
695        let settings = Settings::init(args).unwrap();
696        let errors = Errors::default();
697
698        let expected = Stats {
699            keys_count: IndexMap::from([("$".to_string(), 2)]),
700            line_count: 2,
701            keys_types_count: IndexMap::from([("$::Number".to_string(), 2)]),
702            empty_lines: vec![2.to_string()],
703            ..Default::default()
704        };
705
706        let stats = process_json_iterable(&settings, json_iter_in, &errors);
707        assert_eq!(expected, stats);
708        assert!(errors.container.borrow().len() == 1)
709    }
710
711    #[test]
712    fn simple_process_json_iterable_par() {
713        let iter: Vec<(String, Value)> = vec![
714            (1.to_string(), json!({"key1": 123})),
715            (2.to_string(), json!({"key2": 123})),
716            (3.to_string(), json!({"key1": 123})),
717        ];
718        let iter = iter.into_iter().par_bridge();
719
720        let expected = Stats {
721            keys_count: IndexMap::from([("$.key1".to_string(), 2), ("$.key2".to_string(), 1)]),
722            line_count: 3,
723            keys_types_count: IndexMap::from([
724                ("$.key1::Number".to_string(), 2),
725                ("$.key2::Number".to_string(), 1),
726            ]),
727            ..Default::default()
728        };
729
730        let args = Cli::default();
731        let settings = Settings::init(args).unwrap();
732        let errors = ErrorsPar::default();
733        let stats = process_json_iterable_par(&settings, iter, &errors);
734        assert_eq!(expected, stats);
735    }
736
737    #[test]
738    fn simple_process_json_iterable_par_jsonpath() {
739        let iter: Vec<(String, Value)> = vec![
740            (1.to_string(), json!({"key1": 123})),
741            (2.to_string(), json!({"a": {"key2": 123}})),
742            (3.to_string(), json!({"key1": 123})),
743        ];
744        let iter = iter.into_iter().par_bridge();
745
746        let expected = Stats {
747            keys_count: IndexMap::from([("$.key2".to_string(), 1)]),
748            line_count: 1,
749            keys_types_count: IndexMap::from([("$.key2::Number".to_string(), 1)]),
750            empty_lines: vec![1.to_string(), 3.to_string()],
751            ..Default::default()
752        };
753
754        let mut args = Cli::default();
755        args.jsonpath = Some("$.a".to_string());
756        let settings = Settings::init(args).unwrap();
757        let errors = ErrorsPar::default();
758        let stats = process_json_iterable_par(&settings, iter, &errors);
759        assert_eq!(expected, stats);
760    }
761
762    #[test]
763    fn add_filestats() {
764        let lhs = stats::FileStats {
765            file_path: "file/1.json".to_string(),
766            stats: Stats {
767                keys_count: IndexMap::from([("$.key1".to_string(), 3), ("$.key2".to_string(), 2)]),
768                line_count: 5,
769                keys_types_count: IndexMap::from([
770                    ("$.key1::Number".to_string(), 3),
771                    ("$.key2::Number".to_string(), 2),
772                ]),
773                bad_lines: vec!["4".to_string()],
774                empty_lines: vec!["5".to_string()],
775            },
776        };
777        let rhs = stats::FileStats {
778            file_path: "file/2.json".to_string(),
779            stats: Stats {
780                keys_count: IndexMap::from([("$.key3".to_string(), 3), ("$.key2".to_string(), 2)]),
781                line_count: 7,
782                keys_types_count: IndexMap::from([
783                    ("$.key3::Number".to_string(), 3),
784                    ("$.key2::Number".to_string(), 2),
785                ]),
786                bad_lines: vec!["1".to_string()],
787                empty_lines: vec!["2".to_string()],
788            },
789        };
790        let expected = Stats {
791            keys_count: IndexMap::from([
792                ("$.key1".to_string(), 3),
793                ("$.key2".to_string(), 4),
794                ("$.key3".to_string(), 3),
795            ]),
796            line_count: 12,
797            keys_types_count: IndexMap::from([
798                ("$.key1::Number".to_string(), 3),
799                ("$.key2::Number".to_string(), 4),
800                ("$.key3::Number".to_string(), 3),
801            ]),
802            bad_lines: vec!["file/1.json:4".to_string(), "file/2.json:1".to_string()],
803            empty_lines: vec!["file/1.json:5".to_string(), "file/2.json:2".to_string()],
804        };
805
806        let vec_of_file_stats = vec![lhs.clone(), rhs.clone()];
807        let actual_ref = lhs.clone() + &rhs;
808        let actual = lhs + rhs;
809
810        assert_eq!(actual, expected);
811        assert_eq!(actual_ref, expected);
812        assert_eq!(vec_of_file_stats.iter().sum::<Stats>(), expected);
813    }
814}