1#![deny(missing_docs)]
8use bustools_core::{
9 io::{BusReader, BusRecord, BusWriter},
10 iterators::CbUmiGroupIterator,
11 merger::MultiIterator,
12};
13use itertools::Itertools;
14use std::collections::{BTreeMap, HashMap};
15use tempfile::tempdir;
16
17fn sort_into_btree<I: Iterator<Item = BusRecord>>(
21 iterator: I,
22) -> BTreeMap<(u64, u64, u32, u32), BusRecord> {
23 let mut in_mem_sort: BTreeMap<(u64, u64, u32, u32), BusRecord> = BTreeMap::new();
24
25 for record in iterator {
26 if let Some(r) = in_mem_sort.get_mut(&(record.CB, record.UMI, record.EC, record.FLAG)) {
27 r.COUNT += record.COUNT
28 }
29 else {
30 in_mem_sort.insert((record.CB, record.UMI, record.EC, record.FLAG), record);
31 }
32 }
33 in_mem_sort
34}
35
36#[allow(dead_code)]
43fn sort_in_memory(busfile: &str, outfile: &str) {
44 let reader = BusReader::new(busfile);
45 let params = reader.get_params().clone();
46
47 let in_mem_sort = sort_into_btree(reader);
48
49 let mut writer = BusWriter::new(outfile, params);
51
52 writer.write_iterator(
53 in_mem_sort.into_values()
55
56 );
57}
58
59pub (crate) fn merge_chunks(record_dict: HashMap<String, Vec<BusRecord>>) -> Vec<BusRecord>{
61 let records_from_all_chunks = record_dict.into_values().flatten();
62 let btree_sorted: Vec<BusRecord> = sort_into_btree(records_from_all_chunks).into_values().collect();
63 btree_sorted
64}
65pub fn sort_on_disk(busfile: &str, outfile: &str, chunksize: usize) {
79 let reader = BusReader::new(busfile);
80 let params = reader.get_params().clone();
81
82 let mut chunkfiles = Vec::new();
83
84 println!("Sorting chunks");
85 let tmpdir = tempdir().unwrap();
86
87 for (i, record_chunk) in (&reader.chunks(chunksize)).into_iter().enumerate() {
88 println!("Sorting {}th chunks", i);
89
90 let in_mem_sort = sort_into_btree(record_chunk);
92
93 let file_path = tmpdir.path().join(format!("tmp_{}.bus", i));
95 let tmpfilename = file_path.to_str().unwrap().to_string();
96
97 let mut tmpwriter = BusWriter::new(&tmpfilename, params.clone());
98 tmpwriter.write_iterator(
99 in_mem_sort.into_values()
101 );
102
103 chunkfiles.push(tmpfilename);
104 }
105
106 println!("Merging {} chunks", chunkfiles.len());
108 let mut writer = BusWriter::new(outfile, params);
109
110 let mut iterator_map = HashMap::new();
112 for file in chunkfiles.iter() {
113 let iter = BusReader::new(file).groupby_cbumi();
114 iterator_map.insert(file.to_string(), iter);
115 }
116
117 let mi = MultiIterator::new(iterator_map);
122 let it = mi
128 .flat_map(|(_cbumi, rdict)|
129 merge_chunks(rdict)
130 );
131
132 writer.write_iterator(it);
133
134 }
136
137#[cfg(test)]
138mod test {
139 use std::collections::{HashMap, HashSet};
140
141 use super::{sort_in_memory, sort_on_disk};
142 use bustools_core::{
143 io::{setup_busfile, BusReader, BusRecord, BusWriter},
144 iterators::CbUmiGroupIterator,
145 };
146
147 #[test]
148 fn test_merge_sorted_aggregated(){
149 let input: HashMap<String, Vec<BusRecord>> = HashMap::from(
150 [
151 ("s1".to_string(), vec![
152 BusRecord {CB:0 , UMI: 1, EC:0, COUNT:1 , FLAG:0},
153 BusRecord {CB:0 , UMI: 0, EC:1, COUNT:1 , FLAG:0},
154 ]),
155 ("s2".to_string(), vec![
156 BusRecord {CB:0 , UMI: 0, EC:0, COUNT:1 , FLAG:0},
157 BusRecord {CB:0 , UMI: 1, EC:0, COUNT:1 , FLAG:0},
158 ]),
159 ]);
160 let merged_records = super::merge_chunks(input);
161
162 assert_eq!(merged_records, vec![
163 BusRecord {CB:0 , UMI: 0, EC:0, COUNT:1 , FLAG:0},
164 BusRecord {CB:0 , UMI: 0, EC:1, COUNT:1 , FLAG:0},
165 BusRecord {CB:0 , UMI: 1, EC:0, COUNT:2 , FLAG:0}
166 ])
167 }
168
169 #[test]
170 fn test_sort_in_memory() {
171 let r1 = BusRecord { CB: 0, UMI: 1, EC: 0, COUNT: 12, FLAG: 0 };
173 let r2 = BusRecord { CB: 0, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
174 let r3 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
175 let r4 = BusRecord { CB: 1, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
176 let r5 = BusRecord { CB: 1, UMI: 2, EC: 1, COUNT: 2, FLAG: 0 };
177 let r6 = BusRecord { CB: 2, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
178
179 let unsorted_records = vec![
180 r6.clone(),
181 r4.clone(),
182 r1.clone(),
183 r2.clone(),
184 r5.clone(),
185 r3.clone(),
186 ];
187 let (busname, _dir) = setup_busfile(&unsorted_records);
188
189 let outpath = _dir.path().join("bustools_test_sorted.bus");
190 let outfile = outpath.to_str().unwrap();
191
192 sort_in_memory(&busname, outfile);
193
194 let b = BusReader::new(outfile);
195 let v: Vec<BusRecord> = b.collect();
196
197 assert_eq!(v, vec![r1, r2, r3, r4, r5, r6]);
198 }
199
200 #[test]
201 fn test_sort_on_disk() {
202 let r1 = BusRecord { CB: 0, UMI: 1, EC: 0, COUNT: 12, FLAG: 0 };
205 let r2 = BusRecord { CB: 0, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
206 let r3 = BusRecord { CB: 0, UMI: 2, EC: 0, COUNT: 12, FLAG: 0 };
207 let r4 = BusRecord { CB: 1, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
208 let r5 = BusRecord { CB: 1, UMI: 2, EC: 1, COUNT: 2, FLAG: 0 };
209 let r6 = BusRecord { CB: 2, UMI: 1, EC: 1, COUNT: 2, FLAG: 0 };
210 let r7 = BusRecord { CB: 2, UMI: 1, EC: 0, COUNT: 2, FLAG: 0 };
211
212 let unsorted_records = vec![
213 r6.clone(),
215 r4.clone(),
216 r1.clone(),
218 r7.clone(),
219 r5.clone(),
221 r3.clone(),
222 r2.clone(),
224 ];
225
226 let (busname, _dir) = setup_busfile(&unsorted_records);
227 let outpath = _dir.path().join("bustools_test_sorted.bus");
228 let outfile = outpath.to_str().unwrap();
229
230 sort_on_disk(&busname, outfile, 2);
231
232 let b = BusReader::new(outfile);
233
234 let n: usize = b.groupby_cbumi().map(|(_, rlist)| rlist.len()).sum();
239 assert_eq!(n, 7)
240 }
241
242 use rand::distributions::{Distribution, Uniform};
243
244 #[test]
245 fn test_random_file_sort() {
246 let cb_len = 16;
247 let umi_len = 12;
248 let n_records = 10_000;
252 let chunksize = 1_000;
253
254 let cb_distr = Uniform::from(0..10000);
255 let umi_distr = Uniform::from(0..10000);
256 let mut rng = rand::thread_rng();
257
258 use tempfile::tempdir;
259 let dir = tempdir().unwrap();
260 let file_path = dir.path().join("test_bus_sort_random.bus");
261 let outfile = file_path.to_str().unwrap();
262
263 let mut writer = BusWriter::new(outfile, bustools_core::io::BusParams {cb_len, umi_len});
264 let mut records = vec![];
265
266 let mut cbumi:HashSet<(u64, u64)> = HashSet::new();
268 for _ in 0..n_records {
269 let cb = cb_distr.sample(&mut rng);
270 let umi = umi_distr.sample(&mut rng);
271 cbumi.insert((cb, umi));
272 }
273
274 for (cb,umi) in &cbumi {
275 let r = BusRecord { CB: *cb, UMI: *umi, EC: 0, COUNT: 1, FLAG: 0 };
276 records.push(r);
277 }
278 writer.write_iterator(records.into_iter());
279
280 drop(writer); let sortec_path = dir.path().join("test_bus_sort_random_sorted.bus");
284 let sorted_out = sortec_path.to_str().unwrap();
285 sort_on_disk(&outfile, sorted_out, chunksize);
286
287 let b = BusReader::new(sorted_out);
289 let n: usize = b.groupby_cbumi().map(|(_, rlist)| rlist.len()).sum();
290 assert_eq!(n, cbumi.len())
291 }
292
293 mod sort_into_btree {
294 use bustools_core::io::BusRecord;
295
296 #[test]
297 fn test_simple(){
298 let v = vec![
299 BusRecord {CB: 1, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
300 BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
301 BusRecord {CB: 0, UMI: 1, EC: 0, COUNT:1, FLAG: 0},
302 ];
303 let sorted_set = crate::sort::sort_into_btree(v.into_iter(), );
304 assert_eq!(sorted_set.len(), 3);
305
306 let umis: Vec<_> = sorted_set.iter().map(|(_,r)| r.UMI).collect();
307 assert_eq!(umis, vec![0,1,0]);
308 }
309 #[test]
310 fn test_ec_sorted(){
311 let v = vec![
312 BusRecord {CB: 0, UMI: 0, EC: 100, COUNT:1, FLAG: 0},
313 BusRecord {CB: 0, UMI: 0, EC: 10, COUNT:1, FLAG: 0},
314 BusRecord {CB: 0, UMI: 0, EC: 1, COUNT:1, FLAG: 0},
315 ];
316 let sorted_set = crate::sort::sort_into_btree(v.into_iter(), );
317 assert_eq!(sorted_set.len(), 3);
318
319 let ecs: Vec<_> = sorted_set.iter().map(|(_,r)| r.EC).collect();
320 assert_eq!(ecs, vec![1,10,100]);
321 }
322
323 #[test]
324 fn test_merge(){
325 let v = vec![
326 BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
327 BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
328 BusRecord {CB: 0, UMI: 0, EC: 0, COUNT:1, FLAG: 0},
329 ];
330 let sorted_set = crate::sort::sort_into_btree(v.into_iter(), );
331 assert_eq!(sorted_set.len(), 1);
332
333 let counts: Vec<_> = sorted_set.iter().map(|(_,r)| r.COUNT).collect();
334 assert_eq!(counts, vec![3]);
335 }
336 }
337}