Skip to main content

rsomics_tsv_crosstab/
lib.rs

1use std::collections::BTreeMap;
2use std::fs::File;
3use std::io::{BufWriter, Read, Write};
4use std::path::Path;
5
6use rsomics_common::{Result, RsomicsError};
7
8mod fmtg;
9mod ops;
10
11pub use ops::Op;
12
13pub struct Config {
14    pub delim: u8,
15    /// 1-based field indices selecting the x (row) and y (column) keys.
16    pub x: usize,
17    pub y: usize,
18    /// Aggregate to apply, with the 1-based field it reads. `None` = count co-occurrences.
19    pub agg: Option<(Op, usize)>,
20    /// Sort input by the key fields first. Without it, like datamash, only
21    /// consecutive runs of a key are grouped (input is assumed pre-sorted).
22    pub sort: bool,
23    /// First input line is column headers and is dropped (datamash --header-in).
24    pub header_in: bool,
25    pub filler: Vec<u8>,
26    pub collapse_delim: u8,
27}
28
29pub fn run(input: &Path, cfg: &Config, out: &mut dyn Write) -> Result<()> {
30    let mut raw = Vec::new();
31    open(input)?.read_to_end(&mut raw)?;
32    let mut bw = BufWriter::with_capacity(1 << 20, out);
33    crosstab(&raw, cfg, &mut bw)?;
34    bw.flush().map_err(RsomicsError::Io)
35}
36
37fn open(input: &Path) -> Result<Box<dyn Read>> {
38    if input.as_os_str() == "-" {
39        Ok(Box::new(std::io::stdin().lock()))
40    } else {
41        Ok(Box::new(File::open(input).map_err(|e| {
42            RsomicsError::InvalidInput(format!("{}: {e}", input.display()))
43        })?))
44    }
45}
46
47// datamash groups only *consecutive* runs of a key (the input is assumed
48// pre-sorted); when a cell's key reappears in a later run the first run's value
49// is kept and the rest discarded. `--sort` reorders the rows by key first, which
50// makes every key one contiguous run and so aggregates the whole input. Cells
51// are keyed by interned (x,y) indices so the run loop touches only integers.
52fn crosstab(raw: &[u8], cfg: &Config, out: &mut dyn Write) -> Result<()> {
53    // datamash's getline drops a single trailing newline; empty input still
54    // emits the (empty) header line.
55    let body = raw.strip_suffix(b"\n").unwrap_or(raw);
56    if body.is_empty() {
57        return out.write_all(b"\n").map_err(RsomicsError::Io);
58    }
59
60    let need = cfg.x.max(cfg.y).max(cfg.agg.map_or(0, |(_, f)| f));
61
62    let mut state = RunState {
63        cfg,
64        need,
65        xs: BTreeMap::new(),
66        ys: BTreeMap::new(),
67        cells: BTreeMap::new(),
68        field_buf: Vec::with_capacity(need + 1),
69        run: None,
70    };
71
72    let skip = usize::from(cfg.header_in);
73    if cfg.sort {
74        let mut lines: Vec<&[u8]> = body.split(|&b| b == b'\n').skip(skip).collect();
75        lines.sort_by_key(|line| key(line, cfg));
76        for (lineno, line) in lines.into_iter().enumerate() {
77            state.feed(lineno + skip, line)?;
78        }
79    } else {
80        for (lineno, line) in body.split(|&b| b == b'\n').enumerate().skip(skip) {
81            state.feed(lineno, line)?;
82        }
83    }
84    state.finish();
85
86    emit(cfg, &state.xs, &state.ys, &state.cells, out)
87}
88
89struct RunState<'a> {
90    cfg: &'a Config,
91    need: usize,
92    xs: BTreeMap<Vec<u8>, usize>,
93    ys: BTreeMap<Vec<u8>, usize>,
94    cells: BTreeMap<(usize, usize), ops::Acc>,
95    field_buf: Vec<(usize, usize)>,
96    run: Option<Run>,
97}
98
99// The open run holds the current cell's interned key, the label bytes (so a
100// row continuing the run is recognised by a byte compare, with no map lookup or
101// allocation), and the accumulator.
102struct Run {
103    xi: usize,
104    yi: usize,
105    xlabel: Vec<u8>,
106    ylabel: Vec<u8>,
107    acc: ops::Acc,
108}
109
110impl RunState<'_> {
111    fn feed(&mut self, lineno: usize, line: &[u8]) -> Result<()> {
112        split_fields(line, self.cfg.delim, &mut self.field_buf);
113        if self.field_buf.len() < self.need {
114            return Err(RsomicsError::InvalidInput(format!(
115                "invalid input: field {} requested, line {} has only {} fields",
116                self.need,
117                lineno + 1,
118                self.field_buf.len(),
119            )));
120        }
121        let xv = field(line, &self.field_buf, self.cfg.x);
122        let yv = field(line, &self.field_buf, self.cfg.y);
123
124        let continues = matches!(&self.run, Some(r) if r.xlabel == xv && r.ylabel == yv);
125        if !continues {
126            if let Some(r) = self.run.take() {
127                self.cells.entry((r.xi, r.yi)).or_insert(r.acc);
128            }
129            let xi = intern(&mut self.xs, xv);
130            let yi = intern(&mut self.ys, yv);
131            self.run = Some(Run {
132                xi,
133                yi,
134                xlabel: xv.to_vec(),
135                ylabel: yv.to_vec(),
136                acc: ops::Acc::new(self.cfg.agg.map(|(op, _)| op)),
137            });
138        }
139        let acc = &mut self.run.as_mut().unwrap().acc;
140        match self.cfg.agg {
141            None => acc.bump(),
142            Some((_, f)) => acc.push(field(line, &self.field_buf, f), lineno + 1)?,
143        }
144        Ok(())
145    }
146
147    fn finish(&mut self) {
148        if let Some(r) = self.run.take() {
149            self.cells.entry((r.xi, r.yi)).or_insert(r.acc);
150        }
151    }
152}
153
154fn key(line: &[u8], cfg: &Config) -> (Vec<u8>, Vec<u8>) {
155    let mut buf = Vec::new();
156    split_fields(line, cfg.delim, &mut buf);
157    let g = |idx1: usize| -> Vec<u8> {
158        buf.get(idx1 - 1)
159            .map(|&(s, e)| line[s..e].to_vec())
160            .unwrap_or_default()
161    };
162    (g(cfg.x), g(cfg.y))
163}
164
165// An empty line is zero fields, matching datamash's getline-based parser; a
166// non-empty line has (delimiter count + 1) fields.
167fn split_fields(line: &[u8], delim: u8, out: &mut Vec<(usize, usize)>) {
168    out.clear();
169    if line.is_empty() {
170        return;
171    }
172    let mut start = 0;
173    for (i, &b) in line.iter().enumerate() {
174        if b == delim {
175            out.push((start, i));
176            start = i + 1;
177        }
178    }
179    out.push((start, line.len()));
180}
181
182fn field<'a>(line: &'a [u8], spans: &[(usize, usize)], idx1: usize) -> &'a [u8] {
183    let (s, e) = spans[idx1 - 1];
184    &line[s..e]
185}
186
187fn intern(table: &mut BTreeMap<Vec<u8>, usize>, key: &[u8]) -> usize {
188    let next = table.len();
189    *table.entry(key.to_vec()).or_insert(next)
190}
191
192fn emit(
193    cfg: &Config,
194    xs: &BTreeMap<Vec<u8>, usize>,
195    ys: &BTreeMap<Vec<u8>, usize>,
196    cells: &BTreeMap<(usize, usize), ops::Acc>,
197    out: &mut dyn Write,
198) -> Result<()> {
199    // BTreeMap iterates keys in sorted byte order; map each label to its row /
200    // column slot index in that order.
201    let x_slot: Vec<(usize, &[u8])> = xs.iter().map(|(k, &i)| (i, k.as_slice())).collect();
202    let y_slot: Vec<(usize, &[u8])> = ys.iter().map(|(k, &i)| (i, k.as_slice())).collect();
203
204    let mut line = Vec::with_capacity(256);
205    for (_, label) in &y_slot {
206        line.push(cfg.delim);
207        line.extend_from_slice(label);
208    }
209    line.push(b'\n');
210    out.write_all(&line).map_err(RsomicsError::Io)?;
211
212    for (xi, xlabel) in &x_slot {
213        line.clear();
214        line.extend_from_slice(xlabel);
215        for (yi, _) in &y_slot {
216            line.push(cfg.delim);
217            match cells.get(&(*xi, *yi)) {
218                Some(acc) => acc.render(cfg.collapse_delim, &mut line),
219                None => line.extend_from_slice(&cfg.filler),
220            }
221        }
222        line.push(b'\n');
223        out.write_all(&line).map_err(RsomicsError::Io)?;
224    }
225    Ok(())
226}