rust_parallelfastx/
lib.rs1#![feature(bench_black_box)]
2use memmap::Mmap;
3use seq_io::BaseRecord;
4
5pub fn parallel_fastx<F>(filename: &str, nb_threads: usize, task: F)
6 where F: Send + Sync + Fn(&[u8],&str)
7{
8 let f = std::fs::File::open(filename).expect("Error: file not found");
9 let mmap = unsafe { Mmap::map(&f).expect(&format!("Error mapping file {}", filename)) };
10
11 let len_file = mmap.len();
13 let mut start_pos : Vec<usize> = Vec::new();
14 let mut end_pos : Vec<usize> = Vec::new();
15 let mut mmap_chunk = Vec::new();
16 start_pos.push(0);
17 for i in 1..nb_threads
18 {
19 let mut start = i*len_file/nb_threads;
20 while ! ((mmap[start] == '@' as u8 || mmap[start] == '>' as u8) && (mmap[start-1] == '\n' as u8 || mmap[start-1] == '\r' as u8)) { start += 1};
22 end_pos.push(start-1);
23 start_pos.push(start);
24 }
25 end_pos.push(len_file-1);
26 for i in 0..nb_threads {
27 mmap_chunk.push(mmap[start_pos[i]..end_pos[i]].as_ptr() as usize);
28 }
29
30 std::thread::scope(|scope| {let task = &task;
33 let mut threads = vec![];
34 for i in 0..nb_threads {
35 let start_pos_i = start_pos[i];
37 let end_pos_i = end_pos[i];
38 let mmap_chunk_i = mmap_chunk[i];
39 threads.push(scope.spawn(move || {
40 unsafe{
41 let mut reader = seq_io::fastx::Reader::new(std::slice::from_raw_parts(mmap_chunk_i as *const u8,end_pos_i-start_pos_i));
42 while let Some(result) = reader.next() {
43 let rec = result.unwrap();
44 let seq_str = rec.seq();
45 let seq_id = rec.id().unwrap().to_string();
46 task(&seq_str,&seq_id);
47 }
48 }
49 }));
50 }
51 for thread in threads {
52 let _ = thread.join();
53 }
54 });
55
56}
57
58