1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
use std::num::NonZeroUsize;

use simd_csv::ByteRecord;

use crate::cmd::parallel::Args as ParallelArgs;
use crate::config::{Config, Delimiter};
use crate::moonblade::{
    AggregationProgram, GroupAggregationProgram, GroupAlongColumnsAggregationProgram,
};
use crate::select::SelectedColumns;
use crate::util;
use crate::CliResult;

static USAGE: &str = "
Group a CSV file by values contained in a column selection then aggregate data per
group using a custom aggregation expression.

For ungrouped aggregation, check the `xan agg` command instead.

The result of running the command will be a CSV file containing the grouped
columns and additional columns for each computed aggregation.

You can, for instance, compute the sum of a column per group:

    $ xan groupby user_name 'sum(retweet_count)' file.csv

You can use dynamic expressions to mangle the data before aggregating it:

    $ xan groupby user_name 'sum(retweet_count + replies_count)' file.csv

You can perform multiple aggregations at once:

    $ xan groupby user_name 'sum(retweet_count), mean(retweet_count), max(replies_count)' file.csv

You can rename the output columns using the 'as' syntax:

    $ xan groupby user_name 'sum(n) as sum, max(replies_count) as \"Max Replies\"' file.csv

You can group on multiple columns (read `xan select -h` for more information about column selection):

    $ xan groupby name,surname 'sum(count)' file.csv

# Computing a total aggregate in the same pass

This command can compute a total aggregate over the whole file in the same pass
as computing aggregates per group so you can easily pipe into other commands to
compute ratios and such.

For instance, given the following file:

user,count
marcy,5
john,2
marcy,6
john,4

Using the following command:

    $ xan groupby user 'sum(count) as count' -T 'sum(count) as total' file.csv

Will produce the following result:

user,count,total
john,7,17
marcy,10,17

You can then pipe this into e.g. `xan select -e` and get a ratio:

    $ <command-above> | xan select -e 'user, count, (count / total).to_fixed(2) as ratio'

To produce:

user,count,ratio
marcy,11,0.65
john,6,0.35

# Aggregating along columns

This command is also able to aggregate along columns that you can select using
the --along-cols <cols> flag. In which case, the aggregation functions will accept
the anonymous `_` placeholder representing currently processed column's value.

For instance, given the following file:

user,count1,count2
marcy,4,5
john,0,1
marcy,6,8
john,4,6

Using the following command:

    $ xan groupby user --along-cols count1,count2 'sum(_)' file.csv

Will produce the following result:

user,count1,count2
marcy,10,13
john,4,7

# Aggregating along matrix

This command can also aggregate over all values of a selection of columns, thus
representing a 2-dimensional matrix, using the -M/--along-matrix flag. In which
case aggregation functions will accept the anonymous `_` placeholder value representing
the currently processed column's value.

For instance, given the following file:

user,count1,count2
marcy,4,5
john,0,1
marcy,6,8
john,4,6

Using the following command:

    $ xan groupby user --along-matrix count1,count2 'sum(_) as total' file.csv

Will produce the following result:

user,total
marcy,23
john,11

---

For a quick review of the capabilities of the expression language,
check out the `xan help cheatsheet` command.

For a list of available aggregation functions, use `xan help aggs`.

For a list of available functions, use `xan help functions`.

Aggregations can be computed in parallel using the -p/--parallel or -t/--threads flags.
But this cannot work on streams or gzipped files, unless a `.gzi` index (as created
by `bgzip -i`) can be found beside it. Parallelization is not compatible
with the -S/--sorted nor -C/--along-cols flags.

Usage:
    xan groupby [options] <columns> <expression> [<input>]
    xan groupby --help

groupby options:
    --keep <cols>              Keep this selection of columns, in addition to
                               the ones representing groups, in the output. Only
                               values from the first seen row per group will be kept.
    -C, --along-cols <cols>    Perform a single aggregation over all of selected columns
                               and create a column per group with the result in the output.
    -M, --along-matrix <cols>  Aggregate all values found in the given selection
                               of columns.
    -T, --total <expr>         Run an aggregation over the whole file in the same pass over
                               the data and add the resulting columns at the end of each group's
                               result. Can be useful to compute ratios over total etc in a single
                               pass when piping into `map`, `transform`, `select -e` etc.
    -S, --sorted               Use this flag to indicate that the file is already sorted on the
                               group columns, in which case the command will be able to considerably
                               optimize memory usage.
    -p, --parallel             Whether to use parallelization to speed up computation.
                               Will automatically select a suitable number of threads to use
                               based on your number of cores. Use -t, --threads if you want to
                               indicate the number of threads yourself.
    -t, --threads <threads>    Parellize computations using this many threads. Use -p, --parallel
                               if you want the number of threads to be automatically chosen instead.

Common options:
    -h, --help               Display this message
    -o, --output <file>      Write output to <file> instead of stdout.
    -n, --no-headers         When set, the first row will not be evaled
                             as headers.
    -d, --delimiter <arg>    The field delimiter for reading CSV data.
                             Must be a single character.
";

#[derive(Deserialize)]
struct Args {
    arg_columns: SelectedColumns,
    arg_expression: String,
    arg_input: Option<String>,
    flag_no_headers: bool,
    flag_output: Option<String>,
    flag_delimiter: Option<Delimiter>,
    flag_keep: Option<SelectedColumns>,
    flag_along_cols: Option<SelectedColumns>,
    flag_along_matrix: Option<SelectedColumns>,
    flag_total: Option<String>,
    flag_sorted: bool,
    flag_parallel: bool,
    flag_threads: Option<NonZeroUsize>,
}

pub fn run(argv: &[&str]) -> CliResult<()> {
    let mut args: Args = util::get_args(USAGE, argv)?;

    if args.flag_parallel || args.flag_threads.is_some() {
        if args.flag_along_cols.is_some() {
            Err("-p/--parallel or -t/--threads cannot be used with -C/--along-cols!")?;
        }

        if args.flag_along_matrix.is_some() {
            Err("-p/--parallel or -t/--threads cannot be used with -M/--along-matrix!")?;
        }

        if args.flag_sorted {
            Err("-p/--parallel or -t/--threads cannot be used with -S/--sorted!")?;
        }

        if args.flag_total.is_some() {
            Err("-p/--parallel or -t/--threads cannot be used with -T/--total!")?;
        }

        let mut parallel_args = ParallelArgs::single_file(&args.arg_input, args.flag_threads)?;

        parallel_args.cmd_groupby = true;
        parallel_args.arg_group = Some(args.arg_columns);
        parallel_args.arg_expr = Some(args.arg_expression);

        parallel_args.flag_no_headers = args.flag_no_headers;
        parallel_args.flag_output = args.flag_output;
        parallel_args.flag_delimiter = args.flag_delimiter;

        return parallel_args.run();
    }

    let rconf = Config::new(&args.arg_input)
        .delimiter(args.flag_delimiter)
        .no_headers(args.flag_no_headers)
        .select(args.arg_columns);

    let mut rdr = rconf.simd_reader()?;
    let mut wtr = Config::new(&args.flag_output).simd_writer()?;
    let headers = rdr.byte_headers()?;

    let sel = rconf.selection(headers)?;

    let mut total_program_opt = args
        .flag_total
        .as_ref()
        .map(|total| AggregationProgram::parse(total, headers))
        .transpose()?;

    // --along-cols
    if let Some(selection) = args.flag_along_cols.take() {
        if args.flag_sorted || args.flag_keep.is_some() {
            Err("-C/--along-cols does not work with -S/--sorted nor --keep!")?;
        }

        if args.flag_total.is_some() {
            Err("-T/--total does work yet with -C/--along-cols!")?;
        }

        let mut pivot_sel = selection.selection(headers, !rconf.no_headers)?;
        pivot_sel.sort_and_dedup();

        let mut program = GroupAlongColumnsAggregationProgram::parse(
            &args.arg_expression,
            headers,
            pivot_sel.len(),
        )?;

        if !rconf.no_headers {
            let mut output_headers = sel.select(headers).collect::<ByteRecord>();

            for name in pivot_sel.select(headers) {
                output_headers.push_field(name);
            }

            wtr.write_byte_record(&output_headers)?;
        }

        let mut record = ByteRecord::new();
        let mut index: usize = 0;

        while rdr.read_byte_record(&mut record)? {
            let group = sel.collect(&record);

            program.run_with_cells(group, index, &record, pivot_sel.select(&record))?;

            index += 1;
        }

        for result in program.into_byte_records(false) {
            let (group, group_record) = result?;

            wtr.write_record(
                group
                    .iter()
                    .map(|cell| cell.as_slice())
                    .chain(group_record.iter()),
            )?;
        }

        return Ok(wtr.flush()?);
    }

    // --along-matrix
    if let Some(selection) = args.flag_along_matrix.take() {
        if args.flag_sorted || args.flag_keep.is_some() {
            Err("--along-matrix does not work with -S/--sorted nor --keep!")?;
        }

        if args.flag_total.is_some() {
            Err("-T/--total does work yet with -M/--along-matrix!")?;
        }

        let mut matrix_sel = selection.selection(headers, !rconf.no_headers)?;
        matrix_sel.sort_and_dedup();

        let mut program =
            GroupAggregationProgram::<ByteRecord>::parse(&args.arg_expression, headers)?;

        if !rconf.no_headers {
            wtr.write_record(sel.select(headers).chain(program.headers()))?;
        }

        let mut record = ByteRecord::new();
        let mut index: usize = 0;

        while rdr.read_byte_record(&mut record)? {
            let group = sel.select(&record).collect();

            program.run_with_cells(group, index, &record, matrix_sel.select(&record))?;

            index += 1;
        }

        for result in program.into_byte_records(false) {
            let (group, group_record) = result?;

            wtr.write_record(group.iter().chain(group_record.iter()))?;
        }

        return Ok(wtr.flush()?);
    }

    // --keep, lol...
    if let Some(selection) = args.flag_keep.take() {
        let mut keep_sel = selection.selection(headers, !rconf.no_headers)?;
        keep_sel.dedup();

        let addendum = keep_sel
            .iter()
            .filter(|i| !sel.contains(**i))
            .copied()
            .map(|i| {
                format!(
                    "first(col({})) as \"{}\"",
                    i,
                    std::str::from_utf8(&headers[i]).unwrap()
                )
            })
            .collect::<Vec<_>>()
            .join(", ");

        if !addendum.is_empty() {
            args.arg_expression = addendum + ", " + &args.arg_expression;
        }
    }

    let mut record = ByteRecord::new();

    if args.flag_sorted {
        if args.flag_total.is_some() {
            Err("-T/--total cannot work with -S/--sorted!")?;
        }

        let mut program = AggregationProgram::parse(&args.arg_expression, headers)?;
        let mut current: Option<ByteRecord> = None;

        if !rconf.no_headers {
            wtr.write_record(sel.select(headers).chain(program.headers()))?;
        }

        let mut index: usize = 0;

        while rdr.read_byte_record(&mut record)? {
            let group = sel.select(&record).collect();

            match current.as_ref() {
                None => {
                    current = Some(group);
                }
                Some(current_group) => {
                    if current_group != &group {
                        wtr.write_record(current_group.iter().chain(&program.finalize(false)?))?;

                        program.clear();
                        current = Some(group);
                    }
                }
            };

            program.run_with_record(index, &record)?;

            index += 1;
        }

        // Flushing final group
        if let Some(current_group) = current {
            wtr.write_record(current_group.iter().chain(&program.finalize(false)?))?;
        }
    } else {
        let mut program = GroupAggregationProgram::parse(&args.arg_expression, headers)?;

        if !rconf.no_headers {
            if let Some(total_program) = &total_program_opt {
                wtr.write_record(
                    sel.select(headers)
                        .chain(program.headers())
                        .chain(total_program.headers()),
                )?;
            } else {
                wtr.write_record(sel.select(headers).chain(program.headers()))?;
            }
        }

        let mut index: usize = 0;

        while rdr.read_byte_record(&mut record)? {
            let group = sel.collect(&record);

            program.run_with_record(group, index, &record)?;

            if let Some(total_program) = total_program_opt.as_mut() {
                total_program.run_with_record(index, &record)?;
            }

            index += 1;
        }

        let total_record_opt = total_program_opt
            .map(|mut total_program| total_program.finalize(false))
            .transpose()?;

        for result in program.into_byte_records(false) {
            let (group, group_record) = result?;

            if let Some(total_record) = &total_record_opt {
                wtr.write_record(
                    group
                        .iter()
                        .map(|cell| cell.as_slice())
                        .chain(group_record.iter())
                        .chain(total_record.iter()),
                )?;
            } else {
                wtr.write_record(
                    group
                        .iter()
                        .map(|cell| cell.as_slice())
                        .chain(group_record.iter()),
                )?;
            }
        }
    }

    Ok(wtr.flush()?)
}