1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#![feature(bench_black_box)]
use memmap::Mmap;
use seq_io::BaseRecord;
pub fn parallel_fastx(filename: &str, nb_threads: usize, task: &(dyn Fn(&[u8],&str) + 'static + Sync))
{
let f = std::fs::File::open(filename).expect("Error: file not found");
let mmap = unsafe { Mmap::map(&f).expect(&format!("Error mapping file {}", filename)) };
let len_file = mmap.len();
let mut start_pos : Vec<usize> = Vec::new();
let mut end_pos : Vec<usize> = Vec::new();
let mut mmap_chunk = Vec::new();
start_pos.push(0);
for i in 1..nb_threads
{
let mut start = i*len_file/nb_threads;
while ! ((mmap[start] == '@' as u8 || mmap[start] == '>' as u8) && (mmap[start-1] == '\n' as u8 || mmap[start-1] == '\r' as u8)) { start += 1};
end_pos.push(start-1);
start_pos.push(start);
}
end_pos.push(len_file-1);
for i in 0..nb_threads {
mmap_chunk.push(mmap[start_pos[i]..end_pos[i]].as_ptr() as usize);
}
std::thread::scope(|scope| {let mut threads = vec![];
for i in 0..nb_threads {
let start_pos_i = start_pos[i];
let end_pos_i = end_pos[i];
let mmap_chunk_i = mmap_chunk[i];
threads.push(scope.spawn(move || {
unsafe{
let mut reader = seq_io::fastx::Reader::new(std::slice::from_raw_parts(mmap_chunk_i as *const u8,end_pos_i-start_pos_i));
while let Some(result) = reader.next() {
let rec = result.unwrap();
let seq_str = rec.seq();
let seq_id = rec.id().unwrap().to_string();
task(&seq_str,&seq_id);
}
}
}));
}
for thread in threads {
let _ = thread.join();
}
});
}