rust_parallelfastx/
lib.rs

1#![feature(bench_black_box)]
2use memmap::Mmap;
3use seq_io::BaseRecord;
4
5pub fn parallel_fastx<F>(filename: &str, nb_threads: usize, task: F)
6    where F: Send + Sync + Fn(&[u8],&str)
7{
8    let f = std::fs::File::open(filename).expect("Error: file not found");
9    let mmap = unsafe { Mmap::map(&f).expect(&format!("Error mapping file {}", filename)) };
10
11    // Determine chunks 
12    let len_file = mmap.len();
13    let mut start_pos : Vec<usize> = Vec::new();
14    let mut end_pos : Vec<usize> = Vec::new();
15    let mut mmap_chunk = Vec::new();
16    start_pos.push(0);
17    for i in 1..nb_threads
18    {
19	let mut start = i*len_file/nb_threads;
20	// adjust starting position of chunk to align with fastq or fasta 
21	while ! ((mmap[start] == '@' as u8 || mmap[start] == '>' as u8) && (mmap[start-1] == '\n' as u8 || mmap[start-1] == '\r' as u8)) { start += 1};
22	end_pos.push(start-1);
23	start_pos.push(start);
24    }
25    end_pos.push(len_file-1);
26    for i in 0..nb_threads {
27	mmap_chunk.push(mmap[start_pos[i]..end_pos[i]].as_ptr() as usize);
28    }
29
30    // Start FASTX parsing threads
31    std::thread::scope(|scope|  {// since rust 1.63
32        let task = &task;
33        let mut threads = vec![];
34        for i in 0..nb_threads {
35            // the things rust make us do..
36            let start_pos_i = start_pos[i];
37            let end_pos_i = end_pos[i];
38            let mmap_chunk_i = mmap_chunk[i];		
39            threads.push(scope.spawn(move || {
40                unsafe{
41                    let mut reader = seq_io::fastx::Reader::new(std::slice::from_raw_parts(mmap_chunk_i as *const u8,end_pos_i-start_pos_i));
42                    while let Some(result) = reader.next() {
43                        let rec = result.unwrap();
44                        let seq_str = rec.seq(); 
45                        let seq_id = rec.id().unwrap().to_string();
46                        task(&seq_str,&seq_id); 
47                    }
48                }
49            }));
50        }
51        for thread in threads {
52            let _ = thread.join();
53        }
54    });
55
56}
57
58