bustools_cli/
sort.rs

1//! `bustools sort` code. Sorts busfiles by CB/UMI/EC
2//!
3//! # Merging records
4//! Note that this not only sorts records according to CB/UMI/EC,
5//! but also merges records with the same CB/UMI/EC/FLAG (adding up their counts)
6//!
7#![deny(missing_docs)]
8use bustools_core::{
9    io::{BusReader, BusRecord, BusWriter},
10    iterators::CbUmiGroupIterator,
11    merger::MultiIterator,
12};
13use itertools::Itertools;
14use std::collections::{BTreeMap, HashMap};
15use tempfile::tempdir;
16
17/// sorts/inserts an Iterator over records into a BTreeMap,
18/// (CB,UMI,EC, FLAG) -> records
19/// This effectively sorts the records in memory and aggregates records with the same CB/UMI/EC/FLAG
20fn sort_into_btree<I: Iterator<Item = BusRecord>>(
21    iterator: I,
22) -> BTreeMap<(u64, u64, u32, u32), BusRecord> {
23    let mut in_mem_sort: BTreeMap<(u64, u64, u32, u32), BusRecord> = BTreeMap::new();
24
25    for record in iterator {
26        if let Some(r) = in_mem_sort.get_mut(&(record.CB, record.UMI, record.EC, record.FLAG)) {
27            r.COUNT += record.COUNT
28        }
29        else {
30            in_mem_sort.insert((record.CB, record.UMI, record.EC, record.FLAG), record);
31        }
32    }
33    in_mem_sort
34}
35
36/// Sort a busfile (via CB/UMI/EC) in memory, using BTreeMap's internal sorting!
37/// This gets quite bad for larger files!
38///
39/// # Parameters
40/// * `busfile`: file to be sorted in memory
41/// * `outfile`: file to be sorted into
42#[allow(dead_code)]
43fn sort_in_memory(busfile: &str, outfile: &str) {
44    let reader = BusReader::new(busfile);
45    let params = reader.get_params().clone();
46
47    let in_mem_sort = sort_into_btree(reader);
48
49    // write out
50    let mut writer = BusWriter::new(outfile, params);
51
52    writer.write_iterator(
53        // in_mem_sort.into_iter().map(|(_, rec)| rec )
54        in_mem_sort.into_values()
55
56    );
57}
58
59/// Merges records (CB/UMI/EC) that got split over different chunks
60pub (crate) fn merge_chunks(record_dict: HashMap<String, Vec<BusRecord>>) -> Vec<BusRecord>{
61    let records_from_all_chunks = record_dict.into_values().flatten();
62    let btree_sorted: Vec<BusRecord> = sort_into_btree(records_from_all_chunks).into_values().collect();
63    btree_sorted
64}
65/// Sort a busfile on disk (i.e. without loading the entire thing into memory)
66/// Works via `mergesort`:
67/// 1. split the busfile into separate chunks on disk: Temporary directory is used
68/// 2. sort the chunks (in memory) individually
69/// 3. merge the chunks: iterate over all chunks in parallel via [bustools::merger]
70/// and aggregate records that might have been split across chunks
71///
72/// # Parameters:
73/// * `busfile`: file to be sorted
74/// * `outfile`: file to be sorted into
75/// * `chunksize`: number of busrecords per chunk (this is how much is loaded into mem at any point).
76///    `chunksize=10_000_000` is roughly a 300MB chunk on disk
77/// 
78pub fn sort_on_disk(busfile: &str, outfile: &str, chunksize: usize) {
79    let reader = BusReader::new(busfile);
80    let params = reader.get_params().clone();
81
82    let mut chunkfiles = Vec::new();
83
84    println!("Sorting chunks");
85    let tmpdir = tempdir().unwrap();
86
87    for (i, record_chunk) in (&reader.chunks(chunksize)).into_iter().enumerate() {
88        println!("Sorting {}th chunks", i);
89
90        // sort the chunk in memory
91        let in_mem_sort = sort_into_btree(record_chunk);
92
93        //write current sorted file to disk
94        let file_path = tmpdir.path().join(format!("tmp_{}.bus", i));
95        let tmpfilename = file_path.to_str().unwrap().to_string();
96
97        let mut tmpwriter = BusWriter::new(&tmpfilename, params.clone());
98        tmpwriter.write_iterator(
99            // in_mem_sort.into_iter().map(|(_, rec)| rec )
100            in_mem_sort.into_values()
101        );
102
103        chunkfiles.push(tmpfilename);
104    }
105
106    // merge all chunks
107    println!("Merging {} chunks", chunkfiles.len());
108    let mut writer = BusWriter::new(outfile, params);
109
110    // gather the individual iterators for each chunk
111    let mut iterator_map = HashMap::new();
112    for file in chunkfiles.iter() {
113        let iter = BusReader::new(file).groupby_cbumi();
114        iterator_map.insert(file.to_string(), iter);
115    }
116
117    // each file itself is sorted
118    // now we only have to merge them
119    // if a single cb/umi is split over multiple records, this will put them back together
120    // however, we need to aggregate their counts and sort them by EC
121    let mi = MultiIterator::new(iterator_map);
122    // for (_cbumi, record_dict) in mi {
123    //     let merged_records = merge_chunks(record_dict);  //takes care of aggregating across chunks and sorting
124    //     writer.write_records(&merged_records);
125    // }
126
127    let it = mi
128        .flat_map(|(_cbumi, rdict)|
129            merge_chunks(rdict)
130        );
131
132    writer.write_iterator(it);
133
134    //tmpfiles get clean up once tmpdir is dropped!
135}
136
137#[cfg(test)]
138mod test {
139    use std::collections::{HashMap, HashSet};
140
141    use super::{sort_in_memory, sort_on_disk};
142    use bustools_core::{
143        io::{setup_busfile, BusReader, BusRecord, BusWriter},
144        iterators::CbUmiGroupIterator,
145    };
146
147    #[test]
148    fn test_merge_sorted_aggregated(){
149        let input: HashMap<String, Vec<BusRecord>> = HashMap::from(
150            [
151                ("s1".to_string(), vec![
152                    BusRecord {CB:0 , UMI: 1, EC:0, COUNT:1 , FLAG:0},
153                    BusRecord {CB:0 , UMI: 0, EC:1, COUNT:1 , FLAG:0},
154                ]),
155                ("s2".to_string(), vec![
156                    BusRecord {CB:0 , UMI: 0, EC:0, COUNT:1 , FLAG:0},
157                    BusRecord {CB:0 , UMI: 1, EC:0, COUNT:1 , FLAG:0},
158                ]),                
159            ]);
160        let merged_records = super::merge_chunks(input);
161
162        assert_eq!(merged_records, vec![
163            BusRecord {CB:0 , UMI: 0, EC:0, COUNT:1 , FLAG:0},
164            BusRecord {CB:0 , UMI: 0, EC:1, COUNT:1 , FLAG:0},
165            BusRecord {CB:0 , UMI: 1, EC:0, COUNT:2 , FLAG:0}
166        ])
167    }
168
169    #[test]
170    fn test_sort_in_memory() {
171        // this is the correct order here:
172        let r1 = BusRecord { CB: 0, UMI: 1, EC: 0, COUNT: 12, FLAG: 0 };
173        let r2 = BusRecord { CB: 0, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
174        let r3 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
175        let r4 = BusRecord { CB: 1, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
176        let r5 = BusRecord { CB: 1, UMI: 2, EC: 1, COUNT: 2, FLAG: 0 };
177        let r6 = BusRecord { CB: 2, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
178
179        let unsorted_records = vec![
180            r6.clone(),
181            r4.clone(),
182            r1.clone(),
183            r2.clone(),
184            r5.clone(),
185            r3.clone(),
186        ];
187        let (busname, _dir) = setup_busfile(&unsorted_records);
188
189        let outpath = _dir.path().join("bustools_test_sorted.bus");
190        let outfile = outpath.to_str().unwrap();
191
192        sort_in_memory(&busname, outfile);
193
194        let b = BusReader::new(outfile);
195        let v: Vec<BusRecord> = b.collect();
196
197        assert_eq!(v, vec![r1, r2, r3, r4, r5, r6]);
198    }
199
200    #[test]
201    fn test_sort_on_disk() {
202        // lets use chunksize 2 and split records over chunks on purpose
203
204        let r1 = BusRecord { CB: 0, UMI: 1, EC: 0, COUNT: 12, FLAG: 0 };
205        let r2 = BusRecord { CB: 0, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
206        let r3 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
207        let r4 = BusRecord { CB: 1, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
208        let r5 = BusRecord { CB: 1, UMI: 2, EC: 1, COUNT: 2, FLAG: 0 };
209        let r6 = BusRecord { CB: 2, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
210        let r7 = BusRecord { CB: 2, UMI: 1, EC: 0, COUNT: 2, FLAG: 0 };
211
212        let unsorted_records = vec![
213            // chunk 1
214            r6.clone(),
215            r4.clone(),
216            // chunk 2
217            r1.clone(),
218            r7.clone(),
219            // chunk 3
220            r5.clone(),
221            r3.clone(),
222            // chunk 4
223            r2.clone(),
224        ];
225
226        let (busname, _dir) = setup_busfile(&unsorted_records);
227        let outpath = _dir.path().join("bustools_test_sorted.bus");
228        let outfile = outpath.to_str().unwrap();
229
230        sort_on_disk(&busname, outfile, 2);
231
232        let b = BusReader::new(outfile);
233
234        // the followug doesnt work: r1 and r2 are both (0,1) and hence their order is arbitray
235        // assert_eq!(b.collect(), vec![r1, r2, r3, r4, r5, r6, r7]);
236
237        // instead check the sorting of the file implicitely
238        let n: usize = b.groupby_cbumi().map(|(_, rlist)| rlist.len()).sum();
239        assert_eq!(n, 7)
240    }
241
242    use rand::distributions::{Distribution, Uniform};
243
244    #[test]
245    fn test_random_file_sort() {
246        let cb_len = 16;
247        let umi_len = 12;
248        // let n_records = 10_000_000;
249        // let chunksize = 1_000_000;
250
251        let n_records = 10_000;
252        let chunksize = 1_000;
253
254        let cb_distr = Uniform::from(0..10000);
255        let umi_distr = Uniform::from(0..10000);
256        let mut rng = rand::thread_rng();
257
258        use tempfile::tempdir;
259        let dir = tempdir().unwrap();
260        let file_path = dir.path().join("test_bus_sort_random.bus");
261        let outfile = file_path.to_str().unwrap();
262
263        let mut writer = BusWriter::new(outfile, bustools_core::io::BusParams {cb_len, umi_len});
264        let mut records = vec![];
265
266        // actually,  make sure we dont sample the same CB/UMI multiple times
267        let mut cbumi:HashSet<(u64, u64)> = HashSet::new();
268        for _ in 0..n_records {
269            let cb = cb_distr.sample(&mut rng);
270            let umi = umi_distr.sample(&mut rng);         
271            cbumi.insert((cb, umi));
272        }
273
274        for (cb,umi) in &cbumi {
275            let r = BusRecord { CB: *cb, UMI: *umi, EC: 0, COUNT: 1, FLAG: 0 };
276            records.push(r);
277        }
278        writer.write_iterator(records.into_iter());
279
280        drop(writer); //make sure everything is written
281
282        // sort it
283        let sortec_path = dir.path().join("test_bus_sort_random_sorted.bus");
284        let sorted_out = sortec_path.to_str().unwrap();
285        sort_on_disk(&outfile, sorted_out, chunksize);
286
287        // check if sorted
288        let b = BusReader::new(sorted_out);
289        let n: usize = b.groupby_cbumi().map(|(_, rlist)| rlist.len()).sum();
290        assert_eq!(n, cbumi.len())
291    }
292
293    mod sort_into_btree {
294        use bustools_core::io::BusRecord;
295
296        #[test]
297        fn test_simple(){
298            let v = vec![
299                BusRecord {CB: 1, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
300                BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
301                BusRecord {CB: 0, UMI: 1, EC: 0, COUNT:1, FLAG: 0},
302                ];
303            let sorted_set = crate::sort::sort_into_btree(v.into_iter(), );
304            assert_eq!(sorted_set.len(), 3);
305
306            let umis: Vec<_> = sorted_set.iter().map(|(_,r)| r.UMI).collect();
307            assert_eq!(umis, vec![0,1,0]);
308        }
309        #[test]
310        fn test_ec_sorted(){
311            let v = vec![
312                BusRecord {CB: 0, UMI: 0, EC: 100, COUNT:1, FLAG: 0},
313                BusRecord {CB: 0, UMI: 0, EC: 10, COUNT:1, FLAG: 0},
314                BusRecord {CB: 0, UMI: 0, EC: 1, COUNT:1, FLAG: 0},
315                ];
316            let sorted_set = crate::sort::sort_into_btree(v.into_iter(), );
317            assert_eq!(sorted_set.len(), 3);
318
319            let ecs: Vec<_> = sorted_set.iter().map(|(_,r)| r.EC).collect();
320            assert_eq!(ecs, vec![1,10,100]);
321        }
322
323        #[test]
324        fn test_merge(){
325            let v = vec![
326                BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
327                BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
328                BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
329                ];
330            let sorted_set = crate::sort::sort_into_btree(v.into_iter(), );
331            assert_eq!(sorted_set.len(), 1);
332
333            let counts: Vec<_> = sorted_set.iter().map(|(_,r)| r.COUNT).collect();
334            assert_eq!(counts, vec![3]);
335        }        
336    }
337}