use crate::header::Header;
use crate::triples::{TripleId, TriplesBitmap};
use crate::{DictSectPFC, FourSectDict, Hdt, IdKind};
use bitset_core::BitSet;
use bytesize::ByteSize;
use lasso::{Key, Spur, ThreadedRodeo};
use log::{debug, error};
use oxttl::NTriplesParser;
use rayon::prelude::*;
use std::collections::BTreeSet;
use std::path::Path;
use std::sync::Arc;
use std::thread;
pub type Result<T> = std::io::Result<T>;
type Simd = [u64; 4];
type Indices = Vec<Simd>;
impl Hdt {
pub fn read_nt(f: &Path) -> Result<Self> {
const BLOCK_SIZE: usize = 16;
let (dict, mut encoded_triples) = read_dict_triples(f, BLOCK_SIZE)?;
let num_triples = encoded_triples.len();
encoded_triples.sort_unstable();
let triples = TriplesBitmap::from_triples(&encoded_triples);
let header = Header { format: "ntriples".to_owned(), length: 0, body: BTreeSet::new() };
let mut hdt = Hdt { header, dict, triples };
hdt.fill_header(f, BLOCK_SIZE, num_triples)?;
debug!("HDT size in memory {}, details:", ByteSize(hdt.size_in_bytes() as u64));
debug!("{hdt:#?}");
Ok(hdt)
}
fn fill_header(&mut self, path: &Path, block_size: usize, num_triples: usize) -> Result<()> {
use crate::containers::rdf::Term::Literal as Lit;
use crate::containers::rdf::{Id, Literal, Term, Triple};
use crate::vocab::*;
use std::io::Write;
const ORDER: &str = "SPO";
macro_rules! literal {
($s:expr, $p:expr, $o:expr) => {
self.header.body.insert(Triple::new($s.clone(), $p.to_owned(), Lit(Literal::new($o.to_string()))));
};
}
macro_rules! insert_id {
($s:expr, $p:expr, $o:expr) => {
self.header.body.insert(Triple::new($s.clone(), $p.to_owned(), Term::Id($o.clone())));
};
}
let file_iri = format!("file://{}", path.canonicalize()?.display());
let base = Id::Named(file_iri);
literal!(base, RDF_TYPE, HDT_CONTAINER);
literal!(base, RDF_TYPE, VOID_DATASET);
literal!(base, VOID_TRIPLES, num_triples);
literal!(base, VOID_PROPERTIES, self.dict.predicates.num_strings);
let [d_s, d_o] =
[&self.dict.subjects, &self.dict.objects].map(|s| s.num_strings + self.dict.shared.num_strings);
literal!(base, VOID_DISTINCT_SUBJECTS, d_s);
literal!(base, VOID_DISTINCT_OBJECTS, d_o);
let stats_id = Id::Blank("statistics".to_owned());
let pub_id = Id::Blank("publicationInformation".to_owned());
let format_id = Id::Blank("format".to_owned());
let dict_id = Id::Blank("dictionary".to_owned());
let triples_id = Id::Blank("triples".to_owned());
insert_id!(base, HDT_STATISTICAL_INFORMATION, stats_id);
insert_id!(base, HDT_STATISTICAL_INFORMATION, pub_id);
insert_id!(base, HDT_FORMAT_INFORMATION, format_id);
insert_id!(format_id, HDT_DICTIONARY, dict_id);
insert_id!(format_id, HDT_TRIPLES, triples_id);
literal!(dict_id, HDT_DICT_SHARED_SO, self.dict.shared.num_strings);
literal!(dict_id, HDT_DICT_MAPPING, "1");
literal!(dict_id, HDT_DICT_SIZE_STRINGS, ByteSize(self.dict.size_in_bytes() as u64));
literal!(dict_id, HDT_DICT_BLOCK_SIZE, block_size);
literal!(triples_id, DC_TERMS_FORMAT, HDT_TYPE_BITMAP);
literal!(triples_id, HDT_NUM_TRIPLES, num_triples);
literal!(triples_id, HDT_TRIPLES_ORDER, ORDER);
let meta = std::fs::File::open(path)?.metadata()?;
literal!(stats_id, HDT_ORIGINAL_SIZE, meta.len());
literal!(stats_id, HDT_SIZE, ByteSize(self.size_in_bytes() as u64));
let mut buf = Vec::<u8>::new();
for triple in &self.header.body {
writeln!(buf, "{triple}")?;
}
self.header.length = buf.len();
Ok(())
}
}
struct IndexPool {
triples: Vec<[usize; 3]>,
subjects: Indices,
objects: Indices,
predicates: Indices,
strings: Vec<String>,
}
fn read_dict_triples(path: &Path, block_size: usize) -> Result<(FourSectDict, Vec<TripleId>)> {
let mut pool = parse_nt_terms(path)?;
let mut triples = std::mem::take(&mut pool.triples); let sorter = thread::Builder::new().name("sorter".to_owned()).spawn(move || {
triples.sort_unstable();
triples.dedup();
triples
})?;
let dict = build_dict_from_terms(&pool, block_size);
let sorted_triple_indices = sorter.join().unwrap();
let refs: &[[usize; 3]] = &sorted_triple_indices;
let encoded_triples: Vec<TripleId> = refs
.par_iter()
.map(|[s_idx, p_idx, o_idx]| {
let s = &pool.strings[*s_idx as usize];
let p = &pool.strings[*p_idx as usize];
let o = &pool.strings[*o_idx as usize];
let triple = [
dict.string_to_id(s, IdKind::Subject),
dict.string_to_id(p, IdKind::Predicate),
dict.string_to_id(o, IdKind::Object),
];
if triple[0] == 0 || triple[1] == 0 || triple[2] == 0 {
error!("{triple:?} contains 0, part of ({s}, {p}, {o}) not found in the dictionary");
}
triple
})
.collect();
Ok((dict, encoded_triples))
}
fn parse_nt_terms(path: &Path) -> Result<IndexPool> {
let lasso: Arc<ThreadedRodeo<Spur>> = Arc::new(ThreadedRodeo::new());
let num_parsers = std::cmp::min(16, thread::available_parallelism().map_or(2, std::num::NonZero::get));
let readers = NTriplesParser::new().split_file_for_parallel_parsing(path, num_parsers)?;
let triples: Vec<[usize; 3]> = readers
.into_par_iter()
.flat_map_iter(|reader| {
reader.map(|q| {
let clean = |s: &mut String| {
let mut chars = s.chars();
if chars.next() == Some('<') && chars.nth_back(0) == Some('>') {
s.remove(0);
s.pop();
}
};
let q = q.unwrap(); let mut subj_str = q.subject.to_string();
clean(&mut subj_str);
let mut pred_str = q.predicate.to_string();
clean(&mut pred_str);
let mut obj_str = q.object.to_string();
clean(&mut obj_str);
let s_idx = lasso.get_or_intern(subj_str).into_usize();
let p_idx = lasso.get_or_intern(pred_str).into_usize();
let o_idx = lasso.get_or_intern(obj_str).into_usize();
[s_idx, p_idx, o_idx]
})
})
.collect();
let lasso = Arc::try_unwrap(lasso).unwrap(); let block = [0u64; 4];
let blocks = lasso.len().div_ceil(256);
let mut subjects = vec![block; blocks];
let mut objects = vec![block; blocks];
let mut predicates = vec![block; blocks];
for [s, p, o] in &triples {
subjects.bit_set(*s);
predicates.bit_set(*p);
objects.bit_set(*o);
}
let strings: Vec<String> = lasso.into_resolver().strings().map(String::from).collect();
Ok(IndexPool { triples, subjects, objects, predicates, strings })
}
fn build_dict_from_terms(pool: &IndexPool, block_size: usize) -> FourSectDict {
use log::warn;
use std::collections::BTreeSet;
if pool.predicates.is_empty() {
warn!("no triples found in provided RDF");
}
let externalize = |idx: &Indices| {
let mut v = BTreeSet::<&str>::new();
#[allow(clippy::needless_range_loop)]
for i in 0..idx.bit_len() {
if idx.bit_test(i) {
v.insert(&pool.strings[i]);
}
}
v
};
macro_rules! nspawn {
($s:expr, $n:expr, $f:expr) => {
thread::Builder::new().name($n.to_owned()).spawn_scoped($s, $f).unwrap()
};
}
let [shared, subjects, predicates, objects]: [DictSectPFC; 4] = thread::scope(|s| {
[
nspawn!(s, "shared", || {
let mut shared_indices: Indices = pool.subjects.clone();
shared_indices.bit_and(&pool.objects); DictSectPFC::compress(&externalize(&shared_indices), block_size)
}),
nspawn!(s, "unique subjects", || {
let mut unique_subject_indices: Indices = pool.subjects.clone();
unique_subject_indices.bit_andnot(&pool.objects);
DictSectPFC::compress(&externalize(&unique_subject_indices), block_size)
}),
nspawn!(s, "predicates", || DictSectPFC::compress(&externalize(&pool.predicates), block_size)),
nspawn!(s, "unique objects", || {
let mut unique_object_indices = pool.objects.clone();
unique_object_indices.bit_andnot(&pool.subjects);
DictSectPFC::compress(&externalize(&unique_object_indices), block_size)
}),
]
.map(|t| t.join().unwrap())
});
FourSectDict { shared, subjects, predicates, objects }
}
#[cfg(test)]
pub mod tests {
use super::super::StringTriple;
use super::super::tests::snikmeta_check;
use super::Hdt;
use crate::hdt::tests::snikmeta;
use crate::tests::init;
use color_eyre::Result;
use fs_err::File;
use std::path::Path;
#[test]
fn read_nt() -> Result<()> {
init();
let path = Path::new("tests/resources/snikmeta.nt");
if !path.exists() {
log::info!("Creating test resource snikmeta.nt.");
let mut writer = std::io::BufWriter::new(File::create(path)?);
snikmeta()?.write_nt(&mut writer)?;
}
let snikmeta_nt = Hdt::read_nt(path)?;
let snikmeta = snikmeta()?;
let hdt_triples: Vec<StringTriple> = snikmeta.triples_all().collect();
let nt_triples: Vec<StringTriple> = snikmeta_nt.triples_all().collect();
assert_eq!(nt_triples, hdt_triples);
assert_eq!(snikmeta.triples.bitmap_y.dict, snikmeta_nt.triples.bitmap_y.dict);
snikmeta_check(&snikmeta_nt)?;
let path = Path::new("tests/resources/empty.nt");
let hdt_empty = Hdt::read_nt(path)?;
let mut buf = Vec::<u8>::new();
hdt_empty.write(&mut buf)?;
Hdt::read(std::io::Cursor::new(buf))?;
Ok(())
}
}