use crossbeam_channel::unbounded;
use rayon::prelude::*;
use std::{
cmp::min,
fs::File,
io::{prelude::*, BufReader, BufWriter},
path::PathBuf,
};
use crate::errors::*;
use crate::qgram::*;
use crate::verification::*;
#[cfg(feature = "cli")]
use crate::cli::ProgressBarBuilder;
#[cfg(feature = "cli")]
use indicatif::{ParallelProgressIterator, ProgressBar};
pub fn min_edit_errors(qgram_array: &[PosQGram], q: usize) -> usize {
let mut cnt = 0;
let mut loc = 0;
let mut array_clone: Vec<PosQGram> = vec![PosQGram::default(); qgram_array.len()];
array_clone[..].clone_from_slice(qgram_array);
array_clone.par_sort_unstable_by_key(|qgram| qgram.loc);
qgram_array.iter().for_each(|qgram| {
if qgram.loc > loc {
cnt += 1;
loc = qgram.loc + q - 1;
}
});
cnt
}
pub fn calc_prefix_len(qgram_array: &mut PosQGramArray, q: usize, tau: usize) -> usize {
let mut left: usize = tau + 1;
let mut right: usize = q * tau + 1;
let mut mid: usize;
let mut err: usize;
let qgram_len: usize = qgram_array.len();
while left < right {
mid = (left + right) / 2; err = min_edit_errors(&qgram_array[0..min(mid, qgram_len)], q);
if err <= tau {
left = mid + 1;
} else {
right = mid;
}
}
left = std::cmp::min(left, qgram_array.len());
#[cfg(feature = "cli")]
trace!(
"CalcPrefix for `{}`: prefix length = {}",
&qgram_array,
&left
);
left
}
pub fn ed_join(doc_x: &PathBuf, doc_y: &PathBuf, q: usize, tau: usize) -> Result<()> {
let file_x: File = File::open(doc_x)?;
let reader_x: BufReader<File> = BufReader::new(file_x);
let file_y: File = File::open(doc_x)?;
let mut reader_y: BufReader<File> = BufReader::new(file_y);
let mut y_buffer: String = String::new();
reader_y.read_to_string(&mut y_buffer)?;
let y_vec: Vec<Vec<u8>> = y_buffer.par_lines().map(Vec::from).collect();
let out_name: PathBuf = PathBuf::from(
format!(
"{}_out_q{}_tau{}.{}",
doc_x.file_stem().unwrap().to_str().unwrap(),
q,
tau,
doc_x
.extension()
.unwrap_or_else(|| std::ffi::OsStr::new("txt"))
.to_str()
.unwrap()
)
.to_string(),
);
let doc_out: File = File::create(&out_name).expect("Failed to Create File");
let mut writer: BufWriter<File> = BufWriter::new(doc_out);
let mut output_vec: Vec<(ID, Vec<(ID, usize)>)> = Vec::new();
let (output_s, output_r) = unbounded::<Vec<(ID, Vec<(ID, usize)>)>>();
let inverted_index: InvertedIndex = generate_inverted_index(doc_x, doc_y, q)?;
#[cfg(feature = "cli")]
debug!("InvertedList: {:?}", &inverted_index);
#[cfg(not(feature = "cli"))]
let file_x_iter = reader_x.lines().enumerate().par_bridge();
#[cfg(feature = "cli")]
let file_x_iter;
#[cfg(feature = "cli")]
{
let file_x_len: usize = BufReader::new(File::open(doc_x)?).lines().count();
let pbar: ProgressBar = ProgressBarBuilder::new(file_x_len, "Processing").build();
file_x_iter = reader_x
.lines()
.enumerate()
.par_bridge()
.progress_with(pbar);
}
file_x_iter.for_each(|(x_id, line_x)| {
let x_content = String::from(line_x.unwrap());
#[cfg(feature = "cli")]
trace!(
"=====================\nCurrent line {}: {}",
x_id,
x_content
);
let mut qgram_array_x: PosQGramArray = PosQGramArray::from(&x_content, q);
qgram_array_x.sort_by_frequency(&inverted_index);
let prefix_len: usize = calc_prefix_len(&mut qgram_array_x, q, tau);
let mut candidates: Vec<ID> = qgram_array_x
.par_iter()
.take(prefix_len)
.flat_map(|qgram| {
let token_x: Token = qgram.token.clone();
let loc_x: Loc = qgram.loc;
let inverted_list: &Vec<(ID, Loc)> = &inverted_index[&token_x].0;
#[cfg(feature = "cli")]
trace!(
"**************\nI-list of `{}`: {:?}",
token_x,
inverted_list,
);
let mut filtered: Vec<ID> = inverted_list
.par_iter()
.filter(|(y_id, _loc_y)| {
(doc_x == doc_y) && (*y_id > x_id) || (doc_x != doc_y)
})
.filter(|(y_id, loc_y)| {
(y_vec[*y_id].len() as isize - x_content.len() as isize).abs() <= tau as isize
&& (loc_x as isize - *loc_y as isize).abs() <= tau as isize
})
.map(|pair| pair.0)
.collect();
filtered.par_sort_unstable();
filtered.dedup();
filtered
})
.collect();
candidates.par_sort_unstable();
candidates.dedup();
#[cfg(feature = "cli")]
debug!("Candidate of `{}: {}`: {:?}", x_id, x_content, candidates);
let mut verified: Vec<(ID, Vec<(ID, usize)>)> = candidates
.par_iter()
.map(|y_id| {
let y_content = std::str::from_utf8(&y_vec[*y_id]).unwrap();
let qgram_array_y = PosQGramArray::from(&y_content, q);
(y_id, y_content, qgram_array_y)
})
.filter_map(|(y_id, y_content, mut qgram_array_y)|{
verify(
qgram_array_x.to_vec(),
x_id,
&x_content,
&mut qgram_array_y,
*y_id,
&y_content,
&inverted_index,
q,
tau,
)
})
.collect();
verified.par_iter_mut().for_each(|(_x_id, yvec)| yvec.par_sort_unstable_by(|(a_id, _a_ed), (b_id, _b_ed)| a_id.cmp(&b_id)));
output_s.send(verified).unwrap();
});
drop(output_s);
while let Ok(mut v) = output_r.recv() {
output_vec.append(&mut v);
}
drop(output_r);
output_vec.par_sort_by_key(|x| x.0);
#[cfg(feature = "cli")]
debug!("Mathes: {:?}", output_vec);
output_vec.iter().for_each(|(id_x, pairs)| {
pairs.iter().for_each(|(id_y, ed)| {
writer
.write_all(format!("{},{},{}\n", id_x, id_y, ed).as_bytes())
.expect("Failed to write to output file.");
})
});
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::qgram::{PosQGram, PosQGramArray};
#[test]
fn test_min_edit_error() {
let qgram_array: PosQGramArray = PosQGramArray::from("hello", 2);
assert_eq!(min_edit_errors(&qgram_array, 2), 2);
}
#[test]
fn test_calc_prefix_len() {
let mut qgram_array: PosQGramArray = PosQGramArray::from_vec(vec![
PosQGram {
token: "lo".to_string(),
loc: 3,
},
PosQGram {
token: "he".to_string(),
loc: 0,
},
PosQGram {
token: "el".to_string(),
loc: 1,
},
PosQGram {
token: "ll".to_string(),
loc: 2,
},
]);
let result = calc_prefix_len(&mut qgram_array, 2, 2);
assert_eq!(result, 4);
}
}