#![doc(html_root_url = "https://docs.rs/ciff/0.3.1")]
#![warn(
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_import_braces,
unused_qualifications
)]
#![warn(clippy::all, clippy::pedantic)]
#![allow(
clippy::module_name_repetitions,
clippy::default_trait_access,
clippy::cast_possible_wrap,
clippy::cast_possible_truncation,
clippy::copy_iterator
)]
use anyhow::{anyhow, Context};
use indicatif::ProgressIterator;
use indicatif::{ProgressBar, ProgressStyle};
use memmap::Mmap;
use num_traits::ToPrimitive;
use protobuf::{CodedInputStream, CodedOutputStream};
use std::borrow::Borrow;
use std::convert::TryFrom;
use std::ffi::{OsStr, OsString};
use std::fmt;
use std::fs::File;
use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use tempfile::TempDir;
mod proto;
pub use proto::{DocRecord, Posting, PostingsList};
mod binary_collection;
pub use binary_collection::{
BinaryCollection, BinarySequence, InvalidFormat, RandomAccessBinaryCollection,
};
mod payload_vector;
pub use payload_vector::{build_lexicon, PayloadIter, PayloadSlice, PayloadVector};
type Result<T> = anyhow::Result<T>;
const DEFAULT_PROGRESS_TEMPLATE: &str =
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {count}/{total} ({eta})";
#[derive(PartialEq, Clone, Default)]
struct Header {
num_postings_lists: u32,
num_documents: u32,
protobuf_header: proto::Header,
}
impl Header {
fn from_stream(input: &mut CodedInputStream<'_>) -> Result<Self> {
let header = input.read_message::<proto::Header>()?;
let num_documents = u32::try_from(header.get_num_docs())
.context("Number of documents must be non-negative.")?;
let num_postings_lists = u32::try_from(header.get_num_postings_lists())
.context("Number of documents must be non-negative.")?;
Ok(Self {
protobuf_header: header,
num_documents,
num_postings_lists,
})
}
}
impl fmt::Display for Header {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.protobuf_header)
}
}
fn pb_style() -> ProgressStyle {
ProgressStyle::default_bar()
.template(DEFAULT_PROGRESS_TEMPLATE)
.progress_chars("=> ")
}
pub fn encode_u32_sequence<N, S, W>(writer: &mut W, len: u32, sequence: S) -> io::Result<()>
where
N: Borrow<u32>,
S: IntoIterator<Item = N>,
W: Write,
{
let size: [u8; 4] = len.to_le_bytes();
writer.write_all(&size)?;
for element in sequence {
writer.write_all(&element.borrow().to_le_bytes())?;
}
Ok(())
}
fn write_posting_list<DW, FW, TW>(
posting_list: &PostingsList,
documents: &mut DW,
frequencies: &mut FW,
terms: &mut TW,
) -> Result<()>
where
DW: Write,
FW: Write,
TW: Write,
{
let length = posting_list
.get_df()
.to_u32()
.ok_or_else(|| anyhow!("Cannot cast to u32: {}", posting_list.get_df()))?;
let postings = posting_list.get_postings();
encode_u32_sequence(
documents,
length,
postings.iter().scan(0_u32, |prev, p| {
*prev += u32::try_from(p.get_docid()).expect("Negative ID");
Some(*prev)
}),
)?;
encode_u32_sequence(
frequencies,
length,
postings
.iter()
.map(|p| u32::try_from(p.get_tf()).expect("Negative frequency")),
)?;
writeln!(terms, "{}", posting_list.get_term())?;
Ok(())
}
fn check_lines_sorted<R: BufRead>(reader: R) -> io::Result<bool> {
let mut prev = String::from("");
for line in reader.lines() {
let line = line?;
if line < prev {
return Ok(false);
}
prev = line;
}
Ok(true)
}
pub fn concat<S1, S2>(path: S1, suffix: S2) -> OsString
where
S1: AsRef<OsStr>,
S2: AsRef<OsStr>,
{
let mut path = path.as_ref().to_owned();
path.push(suffix);
path
}
#[derive(Debug, Clone, Default)]
struct PisaIndexPaths {
documents: PathBuf,
frequencies: PathBuf,
sizes: PathBuf,
}
impl PisaIndexPaths {
#[must_use]
fn from_base_path<P: AsRef<OsStr>>(path: P) -> Self {
Self {
documents: PathBuf::from(concat(path.as_ref(), ".docs")),
frequencies: PathBuf::from(concat(path.as_ref(), ".freqs")),
sizes: PathBuf::from(concat(path.as_ref(), ".sizes")),
}
}
}
#[derive(Debug, Clone, Default)]
struct PisaPaths {
index: PisaIndexPaths,
terms: PathBuf,
titles: PathBuf,
termlex: Option<PathBuf>,
doclex: Option<PathBuf>,
}
impl PisaPaths {
#[must_use]
fn from_base_path<P: AsRef<OsStr>>(path: P) -> Self {
Self {
index: PisaIndexPaths::from_base_path(&path),
terms: PathBuf::from(concat(&path, ".terms")),
titles: PathBuf::from(concat(&path, ".documents")),
termlex: Some(PathBuf::from(concat(&path, ".termlex"))),
doclex: Some(PathBuf::from(concat(&path, ".doclex"))),
}
}
}
fn reorder_postings(path: &Path, order: &[usize], skip_first: bool) -> Result<()> {
let temp = TempDir::new()?;
let tmp_path = temp.path().join("coll");
std::fs::rename(path, &tmp_path)?;
let mmap = unsafe { Mmap::map(&File::open(tmp_path)?)? };
let coll = RandomAccessBinaryCollection::try_from(mmap.as_ref())?;
let mut writer = BufWriter::new(File::create(path)?);
if skip_first {
let order: Vec<_> = std::iter::once(0)
.chain(order.iter().map(|&i| i + 1))
.collect();
binary_collection::reorder(&coll, &order, &mut writer)?;
} else {
binary_collection::reorder(&coll, order, &mut writer)?;
}
writer.flush()?;
Ok(())
}
fn reorder_pisa_index(paths: &PisaPaths) -> Result<()> {
let terms = BufReader::new(File::open(&paths.terms)?)
.lines()
.collect::<io::Result<Vec<_>>>()?;
let mut order: Vec<_> = (0..terms.len()).collect();
order.sort_by_key(|&i| &terms[i]);
reorder_postings(&paths.index.documents, &order, true)?;
reorder_postings(&paths.index.frequencies, &order, false)?;
let mut term_writer = BufWriter::new(File::create(&paths.terms)?);
for index in order {
writeln!(&mut term_writer, "{}", terms[index])?;
}
Ok(())
}
#[derive(Debug, Default, Clone)]
pub struct CiffToPisa {
input: Option<PathBuf>,
documents_path: Option<PathBuf>,
frequencies_path: Option<PathBuf>,
sizes_path: Option<PathBuf>,
terms_path: Option<PathBuf>,
titles_path: Option<PathBuf>,
termlex_path: Option<PathBuf>,
doclex_path: Option<PathBuf>,
}
impl CiffToPisa {
pub fn input_path<P: Into<PathBuf>>(&mut self, path: P) -> &mut Self {
self.input = Some(path.into());
self
}
pub fn output_paths<P: AsRef<OsStr>>(&mut self, base_path: P) -> &mut Self {
let paths = PisaPaths::from_base_path(base_path);
self.documents_path = Some(paths.index.documents);
self.frequencies_path = Some(paths.index.frequencies);
self.sizes_path = Some(paths.index.sizes);
self.terms_path = Some(paths.terms);
self.titles_path = Some(paths.titles);
self.termlex_path = paths.termlex;
self.doclex_path = paths.doclex;
self
}
pub fn skip_lexicons(&mut self) -> &mut Self {
self.termlex_path = None;
self.doclex_path = None;
self
}
pub fn convert(&self) -> Result<()> {
let input = self
.input
.as_ref()
.ok_or_else(|| anyhow!("input path undefined"))?;
let index_output = PisaIndexPaths {
documents: self
.documents_path
.clone()
.ok_or_else(|| anyhow!("document postings path undefined"))?,
frequencies: self
.frequencies_path
.clone()
.ok_or_else(|| anyhow!("frequency postings path undefined"))?,
sizes: self
.sizes_path
.clone()
.ok_or_else(|| anyhow!("document sizes path undefined"))?,
};
let output = PisaPaths {
index: index_output,
terms: self
.terms_path
.clone()
.ok_or_else(|| anyhow!("terms path undefined"))?,
titles: self
.titles_path
.clone()
.ok_or_else(|| anyhow!("terms path undefined"))?,
termlex: self.termlex_path.clone(),
doclex: self.doclex_path.clone(),
};
convert_to_pisa(input, &output)
}
}
#[deprecated = "use CiffToPisa instead"]
pub fn ciff_to_pisa(input: &Path, output: &Path, generate_lexicons: bool) -> Result<()> {
let mut converter = CiffToPisa::default();
converter.input_path(input).output_paths(output);
if !generate_lexicons {
converter.skip_lexicons();
}
converter.convert()
}
fn convert_to_pisa(input: &Path, output: &PisaPaths) -> Result<()> {
println!("{:?}", output);
let mut ciff_reader =
File::open(input).with_context(|| format!("Unable to open {}", input.display()))?;
let mut input = CodedInputStream::new(&mut ciff_reader);
let mut documents = BufWriter::new(File::create(&output.index.documents)?);
let mut frequencies = BufWriter::new(File::create(&output.index.frequencies)?);
let mut terms = BufWriter::new(File::create(&output.terms)?);
let header = Header::from_stream(&mut input)?;
println!("{}", header);
eprintln!("Processing postings");
encode_u32_sequence(&mut documents, 1, [header.num_documents].iter())?;
let progress = ProgressBar::new(u64::try_from(header.num_postings_lists)?);
progress.set_style(pb_style());
progress.set_draw_delta(10);
for _ in 0..header.num_postings_lists {
write_posting_list(
&input.read_message::<PostingsList>()?,
&mut documents,
&mut frequencies,
&mut terms,
)?;
progress.inc(1);
}
progress.finish();
documents.flush()?;
frequencies.flush()?;
terms.flush()?;
eprintln!("Processing document lengths");
let mut sizes = BufWriter::new(File::create(&output.index.sizes)?);
let mut trecids = BufWriter::new(File::create(&output.titles)?);
let progress = ProgressBar::new(u64::from(header.num_documents));
progress.set_style(pb_style());
progress.set_draw_delta(u64::from(header.num_documents) / 100);
sizes.write_all(&header.num_documents.to_le_bytes())?;
sizes.flush()?;
for docs_seen in 0..header.num_documents {
let doc_record = input.read_message::<DocRecord>()?;
let docid: u32 = doc_record
.get_docid()
.to_u32()
.ok_or_else(|| anyhow!("Cannot cast docid to u32: {}", doc_record.get_docid()))?;
let trecid = doc_record.get_collection_docid();
let length: u32 = doc_record.get_doclength().to_u32().ok_or_else(|| {
anyhow!(
"Cannot cast doc length to u32: {}",
doc_record.get_doclength()
)
})?;
if docid != docs_seen {
anyhow::bail!("Document sizes must come in order");
}
sizes.write_all(&length.to_le_bytes())?;
writeln!(trecids, "{}", trecid)?;
progress.inc(1);
}
trecids.flush()?;
progress.finish();
if !check_lines_sorted(BufReader::new(File::open(&output.terms)?))? {
reorder_pisa_index(output)?;
}
eprintln!("Generating the document and term lexicons...");
drop(trecids);
if let Some(termlex) = output.termlex.as_ref() {
build_lexicon(&output.terms, termlex)?;
}
if let Some(doclex) = output.doclex.as_ref() {
build_lexicon(&output.titles, doclex)?;
}
Ok(())
}
fn read_document_count(
documents: &mut BinaryCollection,
) -> std::result::Result<u32, InvalidFormat> {
let invalid = || InvalidFormat::new("Unable to read document count");
documents
.next()
.ok_or_else(invalid)??
.get(0)
.ok_or_else(invalid)
}
fn header(documents_bytes: &[u8], sizes_bytes: &[u8], description: &str) -> Result<proto::Header> {
let mut num_postings_lists = 0;
eprintln!("Collecting posting lists statistics");
let progress = ProgressBar::new(documents_bytes.len() as u64);
progress.set_style(pb_style());
progress.set_draw_delta(100_000);
let mut collection = BinaryCollection::try_from(documents_bytes)?;
let num_documents = read_document_count(&mut collection)?;
for sequence in collection {
num_postings_lists += 1;
let sequence = sequence?;
progress.inc((sequence.bytes().len() + 4) as u64);
}
progress.finish();
eprintln!("Computing average document length");
let progress = ProgressBar::new(u64::from(num_documents));
progress.set_style(pb_style());
let doclen_sum: i64 = sizes(sizes_bytes)?
.iter()
.map(i64::from)
.progress_with(progress)
.sum();
let mut header = proto::Header::default();
header.set_version(1);
header.set_description(description.into());
header.set_num_postings_lists(num_postings_lists);
header.set_total_postings_lists(num_postings_lists);
header.set_total_terms_in_collection(doclen_sum);
header.set_num_docs(num_documents as i32);
header.set_total_docs(num_documents as i32);
#[allow(clippy::cast_precision_loss)]
header.set_average_doclength(doclen_sum as f64 / f64::from(num_documents));
Ok(header)
}
fn sizes(memory: &[u8]) -> std::result::Result<BinarySequence<'_>, InvalidFormat> {
BinaryCollection::try_from(memory)?
.next()
.ok_or_else(|| InvalidFormat::new("sizes collection is empty"))?
}
fn write_sizes(sizes_mmap: &Mmap, titles_file: &File, out: &mut CodedOutputStream) -> Result<()> {
let titles = BufReader::new(titles_file);
for ((docid, size), title) in sizes(sizes_mmap)?.iter().enumerate().zip(titles.lines()) {
let mut document = DocRecord::default();
document.set_docid(docid as i32);
document.set_collection_docid(title?);
document.set_doclength(size as i32);
out.write_message_no_tag(&document)?;
}
Ok(())
}
fn write_postings(
documents_mmap: &Mmap,
frequencies_mmap: &Mmap,
terms_file: &File,
out: &mut CodedOutputStream,
) -> Result<()> {
let mut documents = BinaryCollection::try_from(&documents_mmap[..])?;
let num_documents = u64::from(read_document_count(&mut documents)?);
let frequencies = BinaryCollection::try_from(&frequencies_mmap[..])?;
let terms = BufReader::new(terms_file);
eprintln!("Writing postings");
let progress = ProgressBar::new(num_documents);
progress.set_style(pb_style());
progress.set_draw_delta(num_documents / 100);
for ((term_documents, term_frequencies), term) in documents
.zip(frequencies)
.zip(terms.lines())
.progress_with(progress)
{
let mut posting_list = PostingsList::default();
posting_list.set_term(term?);
let mut count = 0;
let mut sum = 0;
let mut last_doc = 0;
for (docid, frequency) in term_documents?.iter().zip(term_frequencies?.iter()) {
let mut posting = Posting::default();
posting.set_docid(docid as i32 - last_doc);
posting.set_tf(frequency as i32);
posting_list.postings.push(posting);
count += 1;
sum += i64::from(frequency);
last_doc = docid as i32;
}
posting_list.set_df(count);
posting_list.set_cf(sum);
out.write_message_no_tag(&posting_list)?;
}
Ok(())
}
#[derive(Debug, Default, Clone)]
pub struct PisaToCiff {
documents_path: Option<PathBuf>,
frequencies_path: Option<PathBuf>,
sizes_path: Option<PathBuf>,
terms_path: Option<PathBuf>,
titles_path: Option<PathBuf>,
output_path: Option<PathBuf>,
description: String,
}
impl PisaToCiff {
pub fn description<S: Into<String>>(&mut self, description: S) -> &mut Self {
self.description = description.into();
self
}
pub fn pisa_paths<P: AsRef<OsStr>>(&mut self, base_path: P) -> &mut Self {
let paths = PisaPaths::from_base_path(base_path);
self.documents_path = Some(paths.index.documents);
self.frequencies_path = Some(paths.index.frequencies);
self.sizes_path = Some(paths.index.sizes);
self.terms_path = Some(paths.terms);
self.titles_path = Some(paths.titles);
self
}
pub fn index_paths<P: AsRef<OsStr>>(&mut self, base_path: P) -> &mut Self {
let PisaIndexPaths {
documents,
frequencies,
sizes,
} = PisaIndexPaths::from_base_path(base_path);
self.documents_path = Some(documents);
self.frequencies_path = Some(frequencies);
self.sizes_path = Some(sizes);
self
}
pub fn terms_path<P: Into<PathBuf>>(&mut self, path: P) -> &mut Self {
self.terms_path = Some(path.into());
self
}
pub fn titles_path<P: Into<PathBuf>>(&mut self, path: P) -> &mut Self {
self.titles_path = Some(path.into());
self
}
pub fn output_path<P: Into<PathBuf>>(&mut self, path: P) -> &mut Self {
self.output_path = Some(path.into());
self
}
pub fn convert(&self) -> Result<()> {
pisa_to_ciff_from_paths(
self.documents_path
.as_ref()
.ok_or_else(|| anyhow!("undefined document postings path"))?,
self.frequencies_path
.as_ref()
.ok_or_else(|| anyhow!("undefined frequency postings path"))?,
self.sizes_path
.as_ref()
.ok_or_else(|| anyhow!("undefined document sizes path"))?,
self.terms_path
.as_ref()
.ok_or_else(|| anyhow!("undefined terms path"))?,
self.titles_path
.as_ref()
.ok_or_else(|| anyhow!("undefined titles path"))?,
self.output_path
.as_ref()
.ok_or_else(|| anyhow!("undefined output path"))?,
&self.description,
)
}
}
#[deprecated = "use PisaToCiff instead"]
pub fn pisa_to_ciff(
collection_input: &Path,
terms_input: &Path,
titles_input: &Path,
output: &Path,
description: &str,
) -> Result<()> {
PisaToCiff::default()
.description(description)
.index_paths(collection_input)
.terms_path(terms_input)
.titles_path(titles_input)
.output_path(output)
.convert()
}
fn pisa_to_ciff_from_paths(
documents_path: &Path,
frequencies_path: &Path,
sizes_path: &Path,
terms_path: &Path,
titles_path: &Path,
output: &Path,
description: &str,
) -> Result<()> {
let documents_file = File::open(documents_path)?;
let frequencies_file = File::open(frequencies_path)?;
let sizes_file = File::open(sizes_path)?;
let terms_file = File::open(terms_path)?;
let titles_file = File::open(titles_path)?;
let documents_mmap = unsafe { Mmap::map(&documents_file)? };
let frequencies_mmap = unsafe { Mmap::map(&frequencies_file)? };
let sizes_mmap = unsafe { Mmap::map(&sizes_file)? };
let mut writer = BufWriter::new(File::create(output)?);
let mut out = CodedOutputStream::new(&mut writer);
let header = header(&documents_mmap[..], &sizes_mmap[..], description)?;
out.write_message_no_tag(&header)?;
write_postings(&documents_mmap, &frequencies_mmap, &terms_file, &mut out)?;
write_sizes(&sizes_mmap, &titles_file, &mut out)?;
out.flush()?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_size_sequence() {
let empty_memory = Vec::<u8>::new();
let sizes = sizes(&empty_memory);
assert!(sizes.is_err());
assert_eq!(
"Invalid binary collection format: sizes collection is empty",
&format!("{}", sizes.err().unwrap())
);
let valid_memory: Vec<u8> = [
5_u32.to_le_bytes(),
1_u32.to_le_bytes(),
2_u32.to_le_bytes(),
3_u32.to_le_bytes(),
4_u32.to_le_bytes(),
5_u32.to_le_bytes(),
]
.iter()
.flatten()
.copied()
.collect();
let sizes = super::sizes(&valid_memory);
assert!(sizes.is_ok());
assert_eq!(
sizes.unwrap().iter().collect::<Vec<u32>>(),
vec![1_u32, 2, 3, 4, 5]
);
}
fn header_to_buf(header: &proto::Header) -> Result<Vec<u8>> {
let mut buffer = Vec::<u8>::new();
let mut out = CodedOutputStream::vec(&mut buffer);
out.write_message_no_tag(header)?;
out.flush()?;
Ok(buffer)
}
#[test]
fn test_read_default_header() -> Result<()> {
let mut proto_header = proto::Header::default();
proto_header.set_num_docs(17);
proto_header.set_num_postings_lists(1234);
let buffer = header_to_buf(&proto_header)?;
let mut input = CodedInputStream::from_bytes(&buffer);
let header = Header::from_stream(&mut input)?;
assert_eq!(header.protobuf_header, proto_header);
assert_eq!(header.num_documents, 17);
assert_eq!(header.num_postings_lists, 1234);
Ok(())
}
#[test]
fn test_read_negative_num_documents() -> Result<()> {
let mut proto_header = proto::Header::default();
proto_header.set_num_docs(-17);
let buffer = header_to_buf(&proto_header)?;
let mut input = CodedInputStream::from_bytes(&buffer);
assert!(Header::from_stream(&mut input).is_err());
Ok(())
}
#[test]
fn test_read_negative_num_posting_lists() -> Result<()> {
let mut proto_header = proto::Header::default();
proto_header.set_num_postings_lists(-1234);
let buffer = header_to_buf(&proto_header)?;
let mut input = CodedInputStream::from_bytes(&buffer);
assert!(Header::from_stream(&mut input).is_err());
Ok(())
}
}