1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
use std::io::{stdout, Write};
use std::str;

use jiff::Unit;
use simd_csv::ByteRecord;

use crate::collections::ClusteredInsertHashmap;
use crate::config::{Config, Delimiter};
use crate::dates;
use crate::scales::{Extent, ExtentBuilder};
use crate::select::SelectedColumns;
use crate::util;
use crate::CliResult;

static USAGE: &str = r#"
Complete CSV data by adding rows for missing values of a given column.

This command is able to handle either integer or partial dates (year-month-date,
year-month or just year).

A --min and/or --max flag can be used to specify a range to complete. Note that
if input contains values outside of the specified range, they will be filtered
out from the output.

If you know your input is already sorted on the column to complete, you can
leverage the -S/--sorted flag to make the command work faster and use less
memory.

This command is also able to check whether the given column is complete using
the --check flag.

Examples:

Complete integer column named "score" from 1 to 10:
    $ xan complete -m 1 -M 10 score input.csv

Complete already sorted date values in column named "date":
    $ xan complete -D --sorted date input.csv

Check completeness of values (already sorted in descending order) in "score" column:
    $ xan complete --check --sorted --reverse score input.csv

Complete integer column named "score" within groups defined by columns "name" and "category":
    $ xan complete --groupby name,category score input.csv

Usage:
    xan complete [options] <column> [<input>]
    xan complete --help

complete options:
    --check                  Check that the input is complete. When used with
                             either --min or --max, only checks completeness
                             within the specified range.
    -m, --min <value>        Minimum value of range to complete. Note that values
                             less than this minimum value in the input will be
                             filtered out.
    -M, --max <value>        Maximum value of range to complete. Note that values
                             greater than this maximum value in the input will be
                             filtered out.
    -D, --dates              Set to indicate your values are dates (supporting
                             year, year-month or year-month-day).
    -S, --sorted             Indicate that the input is already sorted.
    -R, --reverse            Whether to consider the data in reverse order.
    -g, --groupby <cols>     Select columns to group by. The completion will be
                             done independently within each group.

Common options:
    -h, --help               Display this message
    -o, --output <file>      Write output to <file> instead of stdout.
    -n, --no-headers         When set, the first row will not be evaled
                             as headers.
    -d, --delimiter <arg>    The field delimiter for reading CSV data.
                             Must be a single character.

"#;

#[derive(Deserialize, Debug)]
struct Args {
    arg_column: SelectedColumns,
    arg_input: Option<String>,
    flag_min: Option<String>,
    flag_max: Option<String>,
    flag_output: Option<String>,
    flag_no_headers: bool,
    flag_delimiter: Option<Delimiter>,
    flag_check: bool,
    flag_dates: bool,
    flag_sorted: bool,
    flag_reverse: bool,
    flag_groupby: Option<SelectedColumns>,
}

impl Args {
    fn get_value_from_str(&self, cell: &str) -> CliResult<Value> {
        if self.flag_dates {
            Value::new_date(cell)
        } else {
            Value::new_integer(cell)
        }
    }

    fn get_value_from_bytes(&self, cell: &[u8]) -> CliResult<Value> {
        self.get_value_from_str(str::from_utf8(cell).unwrap())
    }
}

#[derive(Debug, PartialEq, Clone, Copy)]
enum ValueType {
    Integer,
    Date(Unit),
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
enum Value {
    Integer(i64),
    Date(dates::PartialDate),
}

impl Value {
    fn new_date(s: &str) -> CliResult<Self> {
        Ok(Self::Date(dates::parse_partial_date(s).map_or_else(
            || Err(format!("Invalid date format: {}", s)),
            Ok,
        )?))
    }

    fn new_integer(s: &str) -> CliResult<Self> {
        Ok(Self::Integer(
            s.parse::<i64>()
                .map_err(|_| format!("Invalid integer format: {}", s))?,
        ))
    }

    fn next(&self) -> Self {
        match self {
            Self::Integer(i) => Self::Integer(i + 1),
            Self::Date(d) => Self::Date(d.next()),
        }
    }

    fn previous(&self) -> Self {
        match self {
            Self::Integer(i) => Self::Integer(i - 1),
            Self::Date(d) => Self::Date(d.previous()),
        }
    }

    fn advance(&self, reverse: bool) -> Self {
        if reverse {
            self.previous()
        } else {
            self.next()
        }
    }

    fn to_bytes(self) -> Vec<u8> {
        match self {
            Self::Integer(i) => i.to_string().into_bytes(),
            Self::Date(ref d) => dates::format_partial_date(d.as_unit(), d.as_date()).into_bytes(),
        }
    }

    fn as_type(&self) -> ValueType {
        match self {
            Self::Integer(_) => ValueType::Integer,
            Self::Date(d) => ValueType::Date(d.as_unit()),
        }
    }
}

fn check_type(expected_value_type: &mut Option<ValueType>, value_type: ValueType) -> CliResult<()> {
    if let Some(expected) = expected_value_type {
        if value_type == *expected {
            Ok(())
        } else {
            Err(format!(
                "Inconsistent value units: first seen was {:?} and then found {:?}",
                expected, value_type,
            ))?
        }
    } else {
        *expected_value_type = Some(value_type);
        Ok(())
    }
}

fn mutate_record_to_emit(
    record: &mut ByteRecord,
    len: usize,
    completed: (usize, &[u8]),
    group_mask: Option<&Vec<bool>>,
    group_opt: Option<&ByteRecord>,
) {
    record.clear();

    let mut group_index: usize = 0;

    for i in 0..len {
        if matches!(group_mask, Some(mask) if mask[i]) {
            record.push_field(&group_opt.unwrap()[group_index]);
            group_index += 1;
        } else if completed.0 == i {
            record.push_field(completed.1);
        } else {
            record.push_field(b"");
        }
    }
}

pub fn run(argv: &[&str]) -> CliResult<()> {
    let args: Args = util::get_args(USAGE, argv)?;

    if args.flag_groupby.is_some() && args.flag_sorted {
        Err("--groupby cannot be used with --sorted")?;
    }

    let min: Option<Value> = args
        .flag_min
        .as_ref()
        .map(|m| args.get_value_from_str(m))
        .transpose()?;
    let max: Option<Value> = args
        .flag_max
        .as_ref()
        .map(|m| args.get_value_from_str(m))
        .transpose()?;

    // All values must have a same type
    let mut expected_value_type: Option<ValueType> = None;

    if let (Some(min_v), Some(max_v)) = (&min, &max) {
        if min_v.as_type() != max_v.as_type() {
            Err(format!(
                "min and max have different units: {:?} vs {:?}",
                min_v.as_type(),
                max_v.as_type(),
            ))?;
        }

        if min_v > max_v {
            Err("min cannot be greater than max")?;
        }

        check_type(&mut expected_value_type, min_v.as_type())?;
    }

    let mut extent_builder = ExtentBuilder::<Value>::new();

    if let Some(m) = min {
        extent_builder.clamp_min(m);
    }
    if let Some(m) = max {
        extent_builder.clamp_max(m);
    }

    // Will be equal to None if either min or max is not specified.
    // If both min and max are specified, then when processing the input values,
    // there will be no need to found the extreme values of the input range.
    let mut extent: Option<Extent<Value>> = extent_builder.clone().build();

    let rconf = Config::new(&args.arg_input)
        .no_headers(args.flag_no_headers)
        .select(args.arg_column.clone())
        .delimiter(args.flag_delimiter);

    let mut wtr_opt = (!(args.flag_check))
        .then(|| Config::new(&args.flag_output).simd_writer())
        .transpose()?;

    let mut output_record = ByteRecord::new();

    let mut rdr = rconf.simd_reader()?;
    let headers = rdr.byte_headers()?.clone();

    let column_to_complete_index = rconf.single_selection(&headers)?;

    let groupby_sel_opt = args
        .flag_groupby
        .as_ref()
        .map(|sel| sel.selection(&headers, !rconf.no_headers))
        .transpose()?;

    let groupby_mask_opt = groupby_sel_opt.as_ref().map(|sel| sel.mask(headers.len()));

    if matches!(&groupby_sel_opt, Some(sel) if sel.contains(column_to_complete_index)) {
        Err("Cannot complete a column that is also used in --groupby!")?;
    }

    if !rconf.no_headers {
        if let Some(wtr) = wtr_opt.as_mut() {
            wtr.write_byte_record(&headers)?;
        }
    }

    let mut record = ByteRecord::new();

    // reading records and grouping them if needed
    let mut records_per_group: ClusteredInsertHashmap<ByteRecord, Vec<ByteRecord>> =
        ClusteredInsertHashmap::new();

    if !args.flag_sorted {
        // Collecting records per group
        if let Some(flag_groupby) = &args.flag_groupby {
            let group_sel = flag_groupby.selection(&headers, !args.flag_no_headers)?;
            while rdr.read_byte_record(&mut record)? {
                let value: Value = args.get_value_from_bytes(&record[column_to_complete_index])?;
                check_type(&mut expected_value_type, value.as_type())?;
                // Meaning we need to find the extent from the input values
                // (either min or max or both were not specified)
                if extent.is_none() {
                    extent_builder.process(value);
                // Skipping the value if outside of the specified range
                } else if matches!(extent, Some(e) if value < e.min() || value > e.max()) {
                    continue;
                }

                let key = group_sel.select(&record).collect::<ByteRecord>();
                records_per_group.insert_with_or_else(
                    key,
                    || vec![record.clone()],
                    |v| v.push(record.clone()),
                );
            }
        // Collecting all records together (artificial single group)
        } else {
            records_per_group.insert_with(ByteRecord::new(), || {
                rdr.byte_records().collect::<Result<Vec<_>, _>>().unwrap()
            });
        };
    // Input is sorted, no need to collect nor group records (will process them
    // directly later, here just creating an empty group to enter the processing loop)
    } else {
        records_per_group.insert_with(ByteRecord::new(), Vec::new);
    }

    // if extent was not determined yet, do it now
    if extent.is_none() {
        extent = extent_builder.build();
    }

    let min = min.or_else(|| extent.as_ref().map(|e| e.min()));
    let max = max.or_else(|| extent.as_ref().map(|e| e.max()));

    // Can be None if min is None when not using --reverse or max is None when
    //using --reverse (with -S/--sorted), meaning we start completing
    // from the first value in the input
    let current_value: Option<Value> = if args.flag_reverse { max } else { min };

    // closure to process ALREADY SORTED records in a group
    let mut process_records_in_group = |records: &mut dyn Iterator<
        Item = Result<ByteRecord, simd_csv::Error>,
    >,
                                        group_key: &ByteRecord|
     -> CliResult<()> {
        let mut local_current_value: Option<Value> = current_value;

        for record in records {
            let record = record?;
            let value: Value = args.get_value_from_bytes(&record[column_to_complete_index])?;
            check_type(&mut expected_value_type, value.as_type())?;

            if matches!(min, Some(m) if value < m) {
                if args.flag_reverse {
                    // stop completing or checking if we go below min of the range
                    break;
                } else {
                    // skip values below min of the range
                    continue;
                }
            }
            if matches!(max, Some(m) if value > m) {
                if args.flag_reverse {
                    // skip values over max of the range
                    continue;
                } else {
                    // stop completing or checking if we go over max of the range
                    break;
                }
            }

            if local_current_value.is_some() {
                // writing missing values
                if let Some(wtr) = wtr_opt.as_mut() {
                    // until we reach the input value, complete missing values
                    while match (args.flag_reverse, local_current_value) {
                        (true, Some(cv)) => cv > value,
                        (false, Some(cv)) => cv < value,
                        _ => false,
                    } {
                        mutate_record_to_emit(
                            &mut output_record,
                            headers.len(),
                            (
                                column_to_complete_index,
                                &local_current_value.unwrap().to_bytes(),
                            ),
                            groupby_mask_opt.as_ref(),
                            Some(group_key),
                        );

                        wtr.write_byte_record(&output_record)?;

                        local_current_value =
                            local_current_value.map(|v| v.advance(args.flag_reverse));
                    }
                // checking for completeness
                } else if value != local_current_value.unwrap() {
                    // in case of using min flag (or max flag when flag_reverse is true)
                    // and having 'value' outside of that range,
                    // or if there are repeated values in the input
                    if match (args.flag_reverse, local_current_value) {
                        // If using max flag, this condition being true means
                        // the current value is out of range, ignoring it.
                        // else if conditon is true, means there are repeated values in the input
                        (true, Some(cv)) => cv < value,
                        // if using min flag, this condition being true means
                        // the current value is out of range, ignoring it
                        // else if conditon is true, means there are repeated values in the input
                        (false, Some(cv)) => cv > value,
                        _ => false,
                    } {
                        continue;
                    }
                    Err(format!(
                        "file is not complete: missing value {:?}",
                        local_current_value.unwrap()
                    ))?;
                }
            // meaning we are at the first record of the group
            } else {
                local_current_value = Some(value);
            }

            local_current_value = local_current_value.map(|v| v.advance(args.flag_reverse));

            if let Some(wtr) = wtr_opt.as_mut() {
                wtr.write_byte_record(&record)?;
            }
        }

        // No more input records in the group, but we may need to complete/check
        // to min or max (if set by the flag) depending on the direction
        if (args.flag_reverse && min.is_some()) || (!args.flag_reverse && max.is_some()) {
            // completing/writing missing values
            if let Some(wtr) = wtr_opt.as_mut() {
                // while being within the specified range, complete values
                while local_current_value.is_some()
                    && match (args.flag_reverse, local_current_value) {
                        (true, Some(cv)) if cv >= min.unwrap() => true,
                        (false, Some(cv)) if cv <= max.unwrap() => true,
                        _ => false,
                    }
                {
                    mutate_record_to_emit(
                        &mut output_record,
                        headers.len(),
                        (
                            column_to_complete_index,
                            &local_current_value.unwrap().to_bytes(),
                        ),
                        groupby_mask_opt.as_ref(),
                        Some(group_key),
                    );

                    wtr.write_byte_record(&output_record)?;

                    local_current_value = local_current_value.map(|v| v.advance(args.flag_reverse));
                }
            // checking for completeness
            } else if match (args.flag_reverse, local_current_value) {
                // if after processing all input records in the group and
                // 'advancing' to the next value, we are still within the
                // specified range, then the input is not complete
                (true, Some(cv)) if cv >= min.unwrap() => true,
                (false, Some(cv)) if cv <= max.unwrap() => true,
                _ => false,
            } {
                Err(format!(
                    "file is not complete: missing value {:?}",
                    local_current_value.unwrap()
                ))?;
            }
        }

        Ok(())
    };

    // process all records
    for (group_key, records) in records_per_group.iter() {
        if args.flag_sorted && args.flag_groupby.is_none() {
            // NOTE: group_key is empty here, and will be ignored in the processing
            // QUESTION: Am I not allowing memory for every record here (in case input from stdin)?
            process_records_in_group(&mut rdr.byte_records(), group_key)?;
        } else {
            // sorting records in the group
            let mut values_and_records = records
                .iter()
                .map(|record| -> CliResult<(Value, ByteRecord)> {
                    let value_and_record = (
                        args.get_value_from_bytes(&record[column_to_complete_index])?,
                        record.clone(),
                    );
                    Ok(value_and_record)
                })
                .collect::<Result<Vec<_>, _>>()?;
            values_and_records.sort_by(|a, b| {
                if args.flag_reverse {
                    b.0.cmp(&a.0)
                } else {
                    a.0.cmp(&b.0)
                }
            });
            let records = values_and_records.iter().map(|(_, r)| r);
            let mut records: &mut dyn Iterator<Item = Result<ByteRecord, simd_csv::Error>> =
                &mut records.map(|r| Ok(r.clone()));

            process_records_in_group(&mut records, group_key)?;
        }
    }

    if let Some(wtr) = wtr_opt.as_mut() {
        Ok(wtr.flush()?)
    } else {
        writeln!(&mut stdout(), "file is complete!")?;
        Ok(())
    }
}