#![allow(missing_docs)] use crate::containers::vbyte::{decode_vbyte_delta, encode_vbyte, read_vbyte};
use crate::containers::{Sequence, sequence};
use crate::triples::Id;
use bytesize::ByteSize;
use std::cmp::{Ordering, min};
use std::collections::BTreeSet;
use std::io::{BufRead, Write};
use std::sync::Arc;
#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
use std::thread::{JoinHandle, spawn};
use std::{fmt, str};
use thiserror::Error;
pub type Result<T> = core::result::Result<T, Error>;
type ReadInternalResult = (usize, usize, Sequence, Arc<[u8]>, [u8; 4]);
#[cfg_attr(test, derive(PartialEq))]
pub struct DictSectPFC {
pub num_strings: usize,
pub block_size: usize,
pub sequence: Sequence,
pub packed_data: Arc<[u8]>,
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("IO error")]
Io(#[from] std::io::Error),
#[error("Invalid CRC8-CCIT checksum {0}, expected {1}")]
InvalidCrc8Checksum(u8, u8),
#[error("Invalid CRC32-C checksum {0}, expected {1}")]
InvalidCrc32Checksum(u32, u32),
#[error("implementation only supports plain front coded dictionary section type 2, found type {0}")]
DictSectNotPfc(u8),
#[error("sequence read error")]
Sequence(#[from] sequence::Error),
}
impl fmt::Debug for DictSectPFC {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"total size {}, {} strings, sequence {:?}, packed data {:?}",
ByteSize(self.size_in_bytes() as u64),
self.num_strings,
self.sequence,
ByteSize(self.packed_data.len() as u64)
)
}
}
#[derive(Error, Debug)]
pub enum ExtractError {
#[error("index out of bounds: id {id} > dictionary section len {len}")]
IdOutOfBounds { id: Id, len: usize },
#[error("read invalid UTF-8 sequence in {data:?}, recovered: '{recovered}'")]
InvalidUtf8 { source: std::str::Utf8Error, data: Vec<u8>, recovered: String },
}
impl DictSectPFC {
pub fn size_in_bytes(&self) -> usize {
self.sequence.size_in_bytes() + self.packed_data.len()
}
fn index_str(&self, index: usize) -> &str {
let position: usize = self.sequence.get(index);
let length = self.strlen(position);
str::from_utf8(&self.packed_data[position..position + length]).unwrap()
}
pub fn string_to_id(&self, element: &str) -> Id {
if self.num_strings == 0 {
return 0;
}
let mut low: usize = 0;
let mut high = self.sequence.entries.saturating_sub(2); let max = high;
let mut mid = high;
while low <= high {
mid = usize::midpoint(low, high);
let cmp: Ordering = if mid > max {
mid = max;
break;
} else {
let text = self.index_str(mid);
element.cmp(text)
};
match cmp {
Ordering::Less => {
if mid == 0 {
return 0;
}
high = mid - 1;
}
Ordering::Greater => low = mid + 1,
Ordering::Equal => return ((mid * self.block_size) + 1) as Id,
}
}
if high < mid {
mid = high;
}
let idblock = self.locate_in_block(mid, element);
if idblock == 0 {
return 0;
}
((mid * self.block_size) + idblock + 1) as Id
}
fn longest_common_prefix(a: &[u8], b: &[u8]) -> usize {
let len = min(a.len(), b.len());
let mut delta = 0;
while delta < len && a[delta] == b[delta] {
delta += 1;
}
delta
}
fn locate_in_block(&self, block: usize, element: &str) -> usize {
if block >= self.sequence.entries {
return 0;
}
let element = element.as_bytes();
let mut pos = self.sequence.get(block);
let mut id_in_block = 0;
let mut cshared = 0;
let slen = self.strlen(pos);
let mut temp_string: Vec<u8> = self.packed_data[pos..pos + slen].to_vec();
pos += slen + 1;
id_in_block += 1;
while (id_in_block < self.block_size) && (pos < self.packed_data.len()) {
let (delta, vbyte_bytes) = decode_vbyte_delta(&self.packed_data, pos);
pos += vbyte_bytes;
let slen = self.strlen(pos);
temp_string.truncate(delta);
temp_string.extend_from_slice(&self.packed_data[pos..pos + slen]);
if delta >= cshared {
cshared += Self::longest_common_prefix(&temp_string[cshared..], &element[cshared..]);
if (cshared == element.len()) && (temp_string.len() == element.len()) {
break;
}
} else {
id_in_block = 0;
break;
}
pos += slen + 1;
id_in_block += 1;
}
if pos >= self.packed_data.len() || id_in_block == self.block_size {
id_in_block = 0;
}
id_in_block
}
pub fn extract(&self, id: Id) -> core::result::Result<String, ExtractError> {
if id as usize > self.num_strings {
return Err(ExtractError::IdOutOfBounds { id, len: self.num_strings });
}
let block_index = id.saturating_sub(1) as usize / self.block_size;
let string_index = id.saturating_sub(1) as usize % self.block_size;
let mut position = self.sequence.get(block_index);
let mut slen = self.strlen(position);
let mut string: Vec<u8> = self.packed_data[position..position + slen].to_vec();
for _ in 0..string_index {
position += slen + 1;
let (delta, vbyte_bytes) = decode_vbyte_delta(&self.packed_data, position);
position += vbyte_bytes;
slen = self.strlen(position);
string.truncate(delta);
string.extend_from_slice(&self.packed_data[position..position + slen]);
}
match str::from_utf8(&string) {
Ok(string) => Ok(String::from(string)),
Err(e) => Err(ExtractError::InvalidUtf8 {
source: e,
data: string.clone(),
recovered: String::from_utf8_lossy(&string).into_owned(),
}),
}
}
fn strlen(&self, offset: usize) -> usize {
let length = self.packed_data.len();
let mut position = offset;
while position < length && self.packed_data[position] != 0 {
position += 1;
}
position - offset
}
pub const fn num_strings(&self) -> usize {
self.num_strings
}
fn read_internal<R: BufRead>(reader: &mut R) -> Result<ReadInternalResult> {
let mut preamble = [0_u8];
reader.read_exact(&mut preamble)?;
if preamble[0] != 2 {
return Err(Error::DictSectNotPfc(preamble[0]));
}
let crc8 = crc::Crc::<u8>::new(&crc::CRC_8_SMBUS);
let mut digest8 = crc8.digest();
digest8.update(&[0x02]);
let (num_strings, bytes_read) = read_vbyte(reader)?;
digest8.update(&bytes_read);
let (packed_length, bytes_read) = read_vbyte(reader)?;
digest8.update(&bytes_read);
let (block_size, bytes_read) = read_vbyte(reader)?;
digest8.update(&bytes_read);
let mut crc_code8 = [0_u8];
reader.read_exact(&mut crc_code8)?;
let crc_code8 = crc_code8[0];
let crc_calculated8 = digest8.finalize();
if crc_calculated8 != crc_code8 {
return Err(Error::InvalidCrc8Checksum(crc_calculated8, crc_code8));
}
let sequence = Sequence::read(reader)?;
let mut packed_data = vec![0u8; packed_length];
reader.read_exact(&mut packed_data)?;
let packed_data = Arc::<[u8]>::from(packed_data);
let mut crc_code = [0u8; 4];
reader.read_exact(&mut crc_code)?;
Ok((num_strings, block_size, sequence, packed_data, crc_code))
}
fn verify_and_construct(
num_strings: usize, block_size: usize, sequence: Sequence, packed_data: Arc<[u8]>, crc_code: [u8; 4],
) -> Result<Self> {
let crc32 = crc::Crc::<u32>::new(&crc::CRC_32_ISCSI);
let mut digest32 = crc32.digest();
digest32.update(&packed_data[..]);
let crc_calculated32 = digest32.finalize();
let crc_code32 = u32::from_le_bytes(crc_code);
if crc_calculated32 != crc_code32 {
return Err(Error::InvalidCrc32Checksum(crc_calculated32, crc_code32));
}
Ok(DictSectPFC { num_strings, block_size, sequence, packed_data })
}
#[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))]
pub fn read<R: BufRead>(reader: &mut R) -> Result<JoinHandle<Result<Self>>> {
let (num_strings, block_size, sequence, packed_data, crc_code) = Self::read_internal(reader)?;
Ok(spawn(move || Self::verify_and_construct(num_strings, block_size, sequence, packed_data, crc_code)))
}
#[cfg(any(target_arch = "wasm32", target_arch = "wasm64"))]
pub fn read<R: BufRead>(reader: &mut R) -> Result<Self> {
let (num_strings, block_size, sequence, packed_data, crc_code) = Self::read_internal(reader)?;
Self::verify_and_construct(num_strings, block_size, sequence, packed_data, crc_code)
}
pub fn write(&self, dest_writer: &mut impl Write) -> Result<()> {
let crc8 = crc::Crc::<u8>::new(&crc::CRC_8_SMBUS);
let mut digest8 = crc8.digest();
let seq_type: [u8; 1] = [2];
dest_writer.write_all(&seq_type)?;
digest8.update(&seq_type);
let mut buf: Vec<u8> = vec![];
buf.extend_from_slice(&encode_vbyte(self.num_strings));
buf.extend_from_slice(&encode_vbyte(self.packed_data.len()));
buf.extend_from_slice(&encode_vbyte(self.block_size));
dest_writer.write_all(&buf)?;
digest8.update(&buf);
let checksum8: u8 = digest8.finalize();
dest_writer.write_all(&[checksum8])?;
self.sequence.write(dest_writer)?;
let crc32 = crc::Crc::<u32>::new(&crc::CRC_32_ISCSI);
let mut digest32 = crc32.digest();
dest_writer.write_all(&self.packed_data)?;
digest32.update(&self.packed_data);
let checksum32 = digest32.finalize();
let checksum_bytes: [u8; 4] = checksum32.to_le_bytes();
dest_writer.write_all(&checksum_bytes)?;
dest_writer.flush()?;
Ok(())
}
pub fn compress(terms: &BTreeSet<&str>, block_size: usize) -> Self {
let mut compressed_terms = Vec::new();
let mut offsets = Vec::new();
let mut last_term: &[u8] = &[];
let num_terms = terms.len();
for (i, term) in terms.iter().enumerate() {
let term = term.as_bytes();
if i % block_size == 0 {
offsets.push(compressed_terms.len());
compressed_terms.extend_from_slice(term);
} else {
let common_prefix_len = last_term.iter().zip(term).take_while(|(a, b)| a == b).count();
compressed_terms.extend_from_slice(&encode_vbyte(common_prefix_len));
compressed_terms.extend_from_slice(&term[common_prefix_len..]);
}
compressed_terms.push(0); last_term = term;
}
if num_terms > 0 {
offsets.push(compressed_terms.len());
}
DictSectPFC {
num_strings: num_terms,
block_size,
sequence: Sequence::new(&offsets),
packed_data: Arc::from(compressed_terms),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ControlInfo;
use crate::hdt::tests::snikmeta;
use crate::header::Header;
use crate::tests::init;
use fs_err::File;
use pretty_assertions::assert_eq;
use std::io::BufReader;
#[test]
fn read_section_read() -> color_eyre::Result<()> {
init();
let file = File::open("tests/resources/snikmeta.hdt")?;
let mut reader = BufReader::new(file);
ControlInfo::read(&mut reader)?;
Header::read(&mut reader)?;
let dict_ci = ControlInfo::read(&mut reader)?;
assert!(
dict_ci.format == "<http://purl.org/HDT/hdt#dictionaryFour>",
"invalid dictionary type: {:?}",
dict_ci.format
);
let shared = DictSectPFC::read(&mut reader)?.join().unwrap()?;
assert_eq!(shared.num_strings, 43);
assert_eq!(shared.packed_data.len(), 614);
assert_eq!(shared.block_size, 16);
for term in ["http://www.snik.eu/ontology/meta/Top", "http://www.snik.eu/ontology/meta/Function", "_:b1"] {
let id = shared.string_to_id(term);
let back = shared.extract(id)?;
assert_eq!(term, back, "term does not translate back to itself {} -> {} -> {}", term, id, back);
}
let sequence = shared.sequence;
let data_size = (sequence.bits_per_entry * sequence.entries).div_ceil(usize::BITS as usize);
assert_eq!(sequence.data.len(), data_size);
let subjects = DictSectPFC::read(&mut reader)?.join().unwrap()?;
assert_eq!(subjects.num_strings, 6);
for term in [
"http://www.snik.eu/ontology/meta", "http://www.snik.eu/ontology/meta/feature",
"http://www.snik.eu/ontology/meta/homonym", "http://www.snik.eu/ontology/meta/master",
"http://www.snik.eu/ontology/meta/typicalFeature",
] {
let id = subjects.string_to_id(term);
let back = subjects.extract(id)?;
assert_eq!(term, back, "term does not translate back to itself {} -> {} -> {}", term, id, back);
}
let sequence = subjects.sequence;
let data_size = (sequence.bits_per_entry * sequence.entries).div_ceil(usize::BITS as usize);
assert_eq!(sequence.data.len(), data_size);
Ok(())
}
#[test]
fn write() -> color_eyre::Result<()> {
init();
let file = File::open("tests/resources/snikmeta.hdt")?;
let mut reader = BufReader::new(file);
ControlInfo::read(&mut reader)?;
Header::read(&mut reader)?;
let _ = ControlInfo::read(&mut reader)?;
let shared = DictSectPFC::read(&mut reader)?.join().unwrap()?;
assert_eq!(shared.num_strings, 43);
assert_eq!(shared.packed_data.len(), 614);
assert_eq!(shared.block_size, 16);
let subjects = DictSectPFC::read(&mut reader)?.join().unwrap()?;
let predicates = DictSectPFC::read(&mut reader)?.join().unwrap()?;
let objects = DictSectPFC::read(&mut reader)?.join().unwrap()?;
for sect in [shared, subjects, predicates, objects] {
let mut buf = Vec::<u8>::new();
sect.write(&mut buf)?;
let mut cursor = std::io::Cursor::new(buf);
let sect2 = DictSectPFC::read(&mut cursor)?.join().unwrap()?;
assert_eq!(sect.num_strings, sect2.num_strings);
assert_eq!(sect.sequence, sect2.sequence);
assert_eq!(sect.packed_data.len(), sect2.packed_data.len());
assert_eq!(sect.block_size, sect2.block_size);
assert_eq!(sect.packed_data, sect2.packed_data);
}
Ok(())
}
#[test]
fn compress() -> color_eyre::Result<()> {
const BLOCK_SIZE: usize = 16;
init();
let strings = [
"http://www.snik.eu/ontology/meta", "http://www.snik.eu/ontology/meta/feature",
"http://www.snik.eu/ontology/meta/homonym", "http://www.snik.eu/ontology/meta/master",
"http://www.snik.eu/ontology/meta/typicalFeature", "http://www.snik.eu/ontology/meta/хобби-N-0",
];
let string_vec = Vec::from(strings);
let set: BTreeSet<&str> = BTreeSet::from(strings);
let dict = DictSectPFC::compress(&set, BLOCK_SIZE);
let sect_items =
|ds: &DictSectPFC| -> Vec<String> { (1..=ds.num_strings()).map(|i| ds.extract(i).unwrap()).collect() };
let items = sect_items(&dict);
assert_eq!(string_vec, items);
let hdt = snikmeta()?;
let dict = hdt.dict;
let names = ["shared", "subject", "predicate", "object"];
let sects = [dict.shared, dict.subjects, dict.predicates, dict.objects];
for (sect, name) in sects.iter().zip(names) {
let items1 = sect_items(sect);
let set1: BTreeSet<&str> = items1.iter().map(std::ops::Deref::deref).collect();
let sect2 = DictSectPFC::compress(&set1, BLOCK_SIZE);
let items2 = sect_items(§2);
assert_eq!(items1, items2, "error compressing {name} section");
}
assert_eq!(0, DictSectPFC::compress(&BTreeSet::new(), BLOCK_SIZE).num_strings);
Ok(())
}
}