use crate::{StarError, StarResult, StarStore, StarTerm, StarTriple};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{Read, Write};
use tracing::{info, instrument, warn};
use scirs2_core::profiling::Profiler;
#[path = "hdt_star/simd_compression.rs"]
pub mod simd_compression;
pub use simd_compression::{SimdBitmapOps, SimdCompressionAnalyzer, SimdStringComparator};
pub const HDT_STAR_VERSION: u8 = 1;
pub const HDT_STAR_MAGIC: [u8; 8] = *b"HDT*RDF\0";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HdtStarConfig {
pub enable_compression: bool,
pub compression_level: u8,
pub enable_quoted_dict: bool,
pub max_nesting_depth: usize,
pub block_size: usize,
pub enable_mmap: bool,
pub index_strategy: IndexStrategy,
}
impl Default for HdtStarConfig {
fn default() -> Self {
Self {
enable_compression: true,
compression_level: 6,
enable_quoted_dict: true,
max_nesting_depth: 10,
block_size: 1024,
enable_mmap: true,
index_strategy: IndexStrategy::All,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IndexStrategy {
Spo,
Pos,
Osp,
All,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HdtStarHeader {
pub version: u8,
pub base_uri: Option<String>,
pub triple_count: u64,
pub subject_count: u64,
pub predicate_count: u64,
pub object_count: u64,
pub quoted_triple_count: u64,
pub max_nesting_depth: u8,
pub config: HdtStarConfig,
pub metadata: HashMap<String, String>,
pub created_at: u64,
}
impl HdtStarHeader {
pub fn new(config: HdtStarConfig) -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
Self {
version: HDT_STAR_VERSION,
base_uri: None,
triple_count: 0,
subject_count: 0,
predicate_count: 0,
object_count: 0,
quoted_triple_count: 0,
max_nesting_depth: 0,
config,
metadata: HashMap::new(),
created_at: timestamp,
}
}
pub fn to_bytes(&self) -> StarResult<Vec<u8>> {
let encoded = oxicode::serde::encode_to_vec(self, oxicode::config::standard())
.map_err(|e| StarError::serialization_error(format!("Header encoding failed: {e}")))?;
Ok(encoded)
}
pub fn from_bytes(bytes: &[u8]) -> StarResult<Self> {
let (decoded, _) = oxicode::serde::decode_from_slice(bytes, oxicode::config::standard())
.map_err(|e| StarError::parse_error(format!("Header decoding failed: {e}")))?;
Ok(decoded)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DictionaryEntry {
Iri(String),
Literal {
value: String,
datatype: Option<String>,
language: Option<String>,
},
BlankNode(String),
Variable(String),
QuotedTripleRef(u64),
}
impl DictionaryEntry {
pub fn from_star_term(term: &StarTerm) -> Self {
match term {
StarTerm::NamedNode(nn) => DictionaryEntry::Iri(nn.iri.clone()),
StarTerm::Literal(lit) => DictionaryEntry::Literal {
value: lit.value.clone(),
datatype: lit.datatype.as_ref().map(|d| d.iri.clone()),
language: lit.language.clone(),
},
StarTerm::BlankNode(bn) => DictionaryEntry::BlankNode(bn.id.clone()),
StarTerm::Variable(var) => DictionaryEntry::Variable(var.name.clone()),
StarTerm::QuotedTriple(_) => {
DictionaryEntry::QuotedTripleRef(0)
}
}
}
pub fn to_star_term(&self) -> StarResult<StarTerm> {
match self {
DictionaryEntry::Iri(iri) => StarTerm::iri(iri),
DictionaryEntry::Literal {
value,
datatype,
language,
} => {
let term = StarTerm::literal(value)?;
if let Some(lang) = language {
if let StarTerm::Literal(lit) = &term {
let mut new_lit = lit.clone();
new_lit.language = Some(lang.clone());
return Ok(StarTerm::Literal(new_lit));
}
}
if let Some(dt) = datatype {
if let StarTerm::Literal(lit) = &term {
let mut new_lit = lit.clone();
new_lit.datatype = Some(crate::model::NamedNode { iri: dt.clone() });
return Ok(StarTerm::Literal(new_lit));
}
}
Ok(term)
}
DictionaryEntry::BlankNode(bn) => Ok(StarTerm::BlankNode(crate::model::BlankNode {
id: bn.clone(),
})),
DictionaryEntry::Variable(var) => Ok(StarTerm::Variable(crate::model::Variable {
name: var.clone(),
})),
DictionaryEntry::QuotedTripleRef(_) => Err(StarError::invalid_term_type(
"Cannot convert QuotedTripleRef directly to StarTerm",
)),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct HdtStarDictionary {
subjects: HashMap<DictionaryEntry, u64>,
predicates: HashMap<DictionaryEntry, u64>,
objects: HashMap<DictionaryEntry, u64>,
shared: HashMap<DictionaryEntry, u64>,
quoted_triples: HashMap<u64, EncodedTriple>,
subject_reverse: Vec<DictionaryEntry>,
predicate_reverse: Vec<DictionaryEntry>,
object_reverse: Vec<DictionaryEntry>,
quoted_triple_reverse: Vec<EncodedTriple>,
next_subject_id: u64,
next_predicate_id: u64,
next_object_id: u64,
next_quoted_id: u64,
}
impl HdtStarDictionary {
pub fn new() -> Self {
Self::default()
}
pub fn add_subject(&mut self, entry: DictionaryEntry) -> u64 {
if let Some(&id) = self.subjects.get(&entry) {
return id;
}
let id = self.next_subject_id;
self.next_subject_id += 1;
self.subjects.insert(entry.clone(), id);
self.subject_reverse.push(entry);
id
}
pub fn add_predicate(&mut self, entry: DictionaryEntry) -> u64 {
if let Some(&id) = self.predicates.get(&entry) {
return id;
}
let id = self.next_predicate_id;
self.next_predicate_id += 1;
self.predicates.insert(entry.clone(), id);
self.predicate_reverse.push(entry);
id
}
pub fn add_object(&mut self, entry: DictionaryEntry) -> u64 {
if let Some(&id) = self.objects.get(&entry) {
return id;
}
let id = self.next_object_id;
self.next_object_id += 1;
self.objects.insert(entry.clone(), id);
self.object_reverse.push(entry);
id
}
pub fn add_quoted_triple(&mut self, triple: EncodedTriple) -> u64 {
for (id, existing) in &self.quoted_triples {
if existing == &triple {
return *id;
}
}
let id = self.next_quoted_id;
self.next_quoted_id += 1;
self.quoted_triples.insert(id, triple.clone());
self.quoted_triple_reverse.push(triple);
id
}
pub fn get_subject(&self, id: u64) -> Option<&DictionaryEntry> {
self.subject_reverse.get(id as usize)
}
pub fn get_predicate(&self, id: u64) -> Option<&DictionaryEntry> {
self.predicate_reverse.get(id as usize)
}
pub fn get_object(&self, id: u64) -> Option<&DictionaryEntry> {
self.object_reverse.get(id as usize)
}
pub fn get_quoted_triple(&self, id: u64) -> Option<&EncodedTriple> {
self.quoted_triples.get(&id)
}
pub fn statistics(&self) -> DictionaryStats {
DictionaryStats {
subject_count: self.subjects.len(),
predicate_count: self.predicates.len(),
object_count: self.objects.len(),
shared_count: self.shared.len(),
quoted_triple_count: self.quoted_triples.len(),
}
}
pub fn to_bytes(&self) -> StarResult<Vec<u8>> {
let encoded =
oxicode::serde::encode_to_vec(self, oxicode::config::standard()).map_err(|e| {
StarError::serialization_error(format!("Dictionary encoding failed: {e}"))
})?;
Ok(encoded)
}
pub fn from_bytes(bytes: &[u8]) -> StarResult<Self> {
let (decoded, _) = oxicode::serde::decode_from_slice(bytes, oxicode::config::standard())
.map_err(|e| StarError::parse_error(format!("Dictionary decoding failed: {e}")))?;
Ok(decoded)
}
}
#[derive(Debug, Clone, Default)]
pub struct DictionaryStats {
pub subject_count: usize,
pub predicate_count: usize,
pub object_count: usize,
pub shared_count: usize,
pub quoted_triple_count: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct EncodedTriple {
pub subject: EncodedTerm,
pub predicate: u64,
pub object: EncodedTerm,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub enum EncodedTerm {
Regular(u64),
QuotedTriple(u64),
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct HdtStarTriples {
spo_index: Vec<EncodedTriple>,
pos_index: Vec<EncodedTriple>,
osp_index: Vec<EncodedTriple>,
bitmap: Vec<u64>,
block_size: usize,
}
impl HdtStarTriples {
pub fn new(block_size: usize) -> Self {
Self {
spo_index: Vec::new(),
pos_index: Vec::new(),
osp_index: Vec::new(),
bitmap: Vec::new(),
block_size,
}
}
pub fn add(&mut self, triple: EncodedTriple) {
self.spo_index.push(triple.clone());
self.pos_index.push(triple.clone());
self.osp_index.push(triple);
}
pub fn build_indices(&mut self, strategy: IndexStrategy) {
if matches!(strategy, IndexStrategy::Spo | IndexStrategy::All) {
self.spo_index.sort_by(|a, b| {
(&a.subject, a.predicate, &a.object).cmp(&(&b.subject, b.predicate, &b.object))
});
}
if matches!(strategy, IndexStrategy::Pos | IndexStrategy::All) {
self.pos_index.sort_by(|a, b| {
(a.predicate, &a.object, &a.subject).cmp(&(b.predicate, &b.object, &b.subject))
});
}
if matches!(strategy, IndexStrategy::Osp | IndexStrategy::All) {
self.osp_index.sort_by(|a, b| {
(&a.object, &a.subject, a.predicate).cmp(&(&b.object, &b.subject, b.predicate))
});
}
self.build_bitmap();
}
fn build_bitmap(&mut self) {
let num_blocks = (self.spo_index.len() + 63) / 64;
self.bitmap = vec![0u64; num_blocks];
for i in 0..self.spo_index.len() {
let block = i / 64;
let bit = i % 64;
if block < self.bitmap.len() {
self.bitmap[block] |= 1u64 << bit;
}
}
}
pub fn len(&self) -> usize {
self.spo_index.len()
}
pub fn is_empty(&self) -> bool {
self.spo_index.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = &EncodedTriple> {
self.spo_index.iter()
}
pub fn to_bytes(&self) -> StarResult<Vec<u8>> {
let encoded = oxicode::serde::encode_to_vec(self, oxicode::config::standard())
.map_err(|e| StarError::serialization_error(format!("Triples encoding failed: {e}")))?;
Ok(encoded)
}
pub fn from_bytes(bytes: &[u8]) -> StarResult<Self> {
let (decoded, _) = oxicode::serde::decode_from_slice(bytes, oxicode::config::standard())
.map_err(|e| StarError::parse_error(format!("Triples decoding failed: {e}")))?;
Ok(decoded)
}
}
pub struct HdtStarBuilder {
config: HdtStarConfig,
header: HdtStarHeader,
dictionary: HdtStarDictionary,
triples: HdtStarTriples,
#[allow(dead_code)]
profiler: Profiler,
}
impl HdtStarBuilder {
pub fn new(config: HdtStarConfig) -> Self {
let header = HdtStarHeader::new(config.clone());
let triples = HdtStarTriples::new(config.block_size);
Self {
config,
header,
dictionary: HdtStarDictionary::new(),
triples,
profiler: Profiler::new(),
}
}
pub fn set_base_uri(&mut self, base_uri: impl Into<String>) {
self.header.base_uri = Some(base_uri.into());
}
pub fn add_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.header.metadata.insert(key.into(), value.into());
}
#[instrument(skip(self, store), fields(store_size = store.len()))]
pub fn add_store(&mut self, store: &StarStore) -> StarResult<()> {
info!(
"Adding store with {} triples to HDT-star builder",
store.len()
);
for triple in store.iter() {
self.add_triple(&triple)?;
}
Ok(())
}
pub fn add_triple(&mut self, triple: &StarTriple) -> StarResult<()> {
let encoded = self.encode_triple(triple)?;
self.triples.add(encoded);
self.header.triple_count += 1;
Ok(())
}
fn encode_triple(&mut self, triple: &StarTriple) -> StarResult<EncodedTriple> {
let subject = self.encode_term(&triple.subject, TermPosition::Subject)?;
let predicate = match &triple.predicate {
StarTerm::NamedNode(_) => {
let entry = DictionaryEntry::from_star_term(&triple.predicate);
self.dictionary.add_predicate(entry)
}
_ => {
return Err(StarError::invalid_term_type(
"Predicate must be a NamedNode",
))
}
};
let object = self.encode_term(&triple.object, TermPosition::Object)?;
Ok(EncodedTriple {
subject,
predicate,
object,
})
}
fn encode_term(&mut self, term: &StarTerm, position: TermPosition) -> StarResult<EncodedTerm> {
match term {
StarTerm::QuotedTriple(qt) => {
let encoded_qt = self.encode_triple(qt)?;
let qt_id = self.dictionary.add_quoted_triple(encoded_qt);
self.header.quoted_triple_count += 1;
Ok(EncodedTerm::QuotedTriple(qt_id))
}
_ => {
let entry = DictionaryEntry::from_star_term(term);
let id = match position {
TermPosition::Subject => self.dictionary.add_subject(entry),
TermPosition::Object => self.dictionary.add_object(entry),
};
Ok(EncodedTerm::Regular(id))
}
}
}
#[instrument(skip(self))]
pub fn build(&mut self) -> StarResult<()> {
info!("Building HDT-star indices...");
self.triples.build_indices(self.config.index_strategy);
let dict_stats = self.dictionary.statistics();
self.header.subject_count = dict_stats.subject_count as u64;
self.header.predicate_count = dict_stats.predicate_count as u64;
self.header.object_count = dict_stats.object_count as u64;
info!(
"HDT-star built: {} triples, {} subjects, {} predicates, {} objects, {} quoted",
self.header.triple_count,
self.header.subject_count,
self.header.predicate_count,
self.header.object_count,
self.header.quoted_triple_count
);
Ok(())
}
#[instrument(skip(self, writer))]
pub fn write<W: Write>(&mut self, writer: &mut W) -> StarResult<()> {
self.build()?;
info!("Writing HDT-star to output...");
writer.write_all(&HDT_STAR_MAGIC).map_err(|e| {
StarError::serialization_error(format!("Failed to write magic bytes: {e}"))
})?;
let header_bytes = self.header.to_bytes()?;
let header_len = (header_bytes.len() as u64).to_le_bytes();
writer.write_all(&header_len).map_err(|e| {
StarError::serialization_error(format!("Failed to write header length: {e}"))
})?;
writer
.write_all(&header_bytes)
.map_err(|e| StarError::serialization_error(format!("Failed to write header: {e}")))?;
let dict_bytes = self.dictionary.to_bytes()?;
let dict_len = (dict_bytes.len() as u64).to_le_bytes();
writer.write_all(&dict_len).map_err(|e| {
StarError::serialization_error(format!("Failed to write dictionary length: {e}"))
})?;
if self.config.enable_compression {
let compressed = compress_data(&dict_bytes, self.config.compression_level)?;
let compressed_len = (compressed.len() as u64).to_le_bytes();
writer.write_all(&compressed_len).map_err(|e| {
StarError::serialization_error(format!("Failed to write compressed length: {e}"))
})?;
writer.write_all(&compressed).map_err(|e| {
StarError::serialization_error(format!("Failed to write dictionary: {e}"))
})?;
} else {
writer.write_all(&dict_bytes).map_err(|e| {
StarError::serialization_error(format!("Failed to write dictionary: {e}"))
})?;
}
let triples_bytes = self.triples.to_bytes()?;
let triples_len = (triples_bytes.len() as u64).to_le_bytes();
writer.write_all(&triples_len).map_err(|e| {
StarError::serialization_error(format!("Failed to write triples length: {e}"))
})?;
if self.config.enable_compression {
let compressed = compress_data(&triples_bytes, self.config.compression_level)?;
let compressed_len = (compressed.len() as u64).to_le_bytes();
writer.write_all(&compressed_len).map_err(|e| {
StarError::serialization_error(format!("Failed to write compressed length: {e}"))
})?;
writer.write_all(&compressed).map_err(|e| {
StarError::serialization_error(format!("Failed to write triples: {e}"))
})?;
} else {
writer.write_all(&triples_bytes).map_err(|e| {
StarError::serialization_error(format!("Failed to write triples: {e}"))
})?;
}
info!("HDT-star write complete");
Ok(())
}
pub fn statistics(&self) -> HdtStarBuildStats {
let dict_stats = self.dictionary.statistics();
HdtStarBuildStats {
triple_count: self.header.triple_count,
quoted_triple_count: self.header.quoted_triple_count,
subject_count: dict_stats.subject_count as u64,
predicate_count: dict_stats.predicate_count as u64,
object_count: dict_stats.object_count as u64,
build_time_us: 0, }
}
}
#[derive(Debug, Clone, Copy)]
enum TermPosition {
Subject,
Object,
}
#[derive(Debug, Clone)]
pub struct HdtStarBuildStats {
pub triple_count: u64,
pub quoted_triple_count: u64,
pub subject_count: u64,
pub predicate_count: u64,
pub object_count: u64,
pub build_time_us: u64,
}
pub struct HdtStarReader {
header: HdtStarHeader,
dictionary: HdtStarDictionary,
triples: HdtStarTriples,
}
impl HdtStarReader {
#[instrument(skip(path))]
pub fn open<P: AsRef<std::path::Path>>(path: P) -> StarResult<Self> {
let mut file = std::fs::File::open(path.as_ref())
.map_err(|e| StarError::resource_error(format!("Failed to open file: {e}")))?;
Self::read(&mut file)
}
pub fn read<R: Read>(reader: &mut R) -> StarResult<Self> {
let mut magic = [0u8; 8];
reader
.read_exact(&mut magic)
.map_err(|e| StarError::parse_error(format!("Failed to read magic bytes: {e}")))?;
if magic != HDT_STAR_MAGIC {
return Err(StarError::parse_error(
"Invalid HDT-star file: magic bytes mismatch",
));
}
let mut header_len_bytes = [0u8; 8];
reader
.read_exact(&mut header_len_bytes)
.map_err(|e| StarError::parse_error(format!("Failed to read header length: {e}")))?;
let header_len = u64::from_le_bytes(header_len_bytes) as usize;
let mut header_bytes = vec![0u8; header_len];
reader
.read_exact(&mut header_bytes)
.map_err(|e| StarError::parse_error(format!("Failed to read header: {e}")))?;
let header = HdtStarHeader::from_bytes(&header_bytes)?;
let mut dict_len_bytes = [0u8; 8];
reader.read_exact(&mut dict_len_bytes).map_err(|e| {
StarError::parse_error(format!("Failed to read dictionary length: {e}"))
})?;
let _dict_len = u64::from_le_bytes(dict_len_bytes) as usize;
let dict_bytes = if header.config.enable_compression {
let mut compressed_len_bytes = [0u8; 8];
reader.read_exact(&mut compressed_len_bytes).map_err(|e| {
StarError::parse_error(format!("Failed to read compressed length: {e}"))
})?;
let compressed_len = u64::from_le_bytes(compressed_len_bytes) as usize;
let mut compressed = vec![0u8; compressed_len];
reader
.read_exact(&mut compressed)
.map_err(|e| StarError::parse_error(format!("Failed to read dictionary: {e}")))?;
decompress_data(&compressed)?
} else {
let mut dict_bytes = vec![0u8; _dict_len];
reader
.read_exact(&mut dict_bytes)
.map_err(|e| StarError::parse_error(format!("Failed to read dictionary: {e}")))?;
dict_bytes
};
let dictionary = HdtStarDictionary::from_bytes(&dict_bytes)?;
let mut triples_len_bytes = [0u8; 8];
reader
.read_exact(&mut triples_len_bytes)
.map_err(|e| StarError::parse_error(format!("Failed to read triples length: {e}")))?;
let _triples_len = u64::from_le_bytes(triples_len_bytes) as usize;
let triples_bytes = if header.config.enable_compression {
let mut compressed_len_bytes = [0u8; 8];
reader.read_exact(&mut compressed_len_bytes).map_err(|e| {
StarError::parse_error(format!("Failed to read compressed length: {e}"))
})?;
let compressed_len = u64::from_le_bytes(compressed_len_bytes) as usize;
let mut compressed = vec![0u8; compressed_len];
reader
.read_exact(&mut compressed)
.map_err(|e| StarError::parse_error(format!("Failed to read triples: {e}")))?;
decompress_data(&compressed)?
} else {
let mut triples_bytes = vec![0u8; _triples_len];
reader
.read_exact(&mut triples_bytes)
.map_err(|e| StarError::parse_error(format!("Failed to read triples: {e}")))?;
triples_bytes
};
let triples = HdtStarTriples::from_bytes(&triples_bytes)?;
info!(
"Loaded HDT-star file: {} triples, {} quoted triples",
header.triple_count, header.quoted_triple_count
);
Ok(Self {
header,
dictionary,
triples,
})
}
pub fn header(&self) -> &HdtStarHeader {
&self.header
}
pub fn dictionary(&self) -> &HdtStarDictionary {
&self.dictionary
}
pub fn len(&self) -> usize {
self.triples.len()
}
pub fn is_empty(&self) -> bool {
self.triples.is_empty()
}
pub fn iter_triples(&self) -> impl Iterator<Item = StarResult<StarTriple>> + '_ {
self.triples
.iter()
.map(|encoded| self.decode_triple(encoded))
}
fn decode_triple(&self, encoded: &EncodedTriple) -> StarResult<StarTriple> {
let subject = self.decode_term(&encoded.subject, TermPosition::Subject)?;
let predicate = self
.dictionary
.get_predicate(encoded.predicate)
.ok_or_else(|| StarError::parse_error("Invalid predicate ID"))?
.to_star_term()?;
let object = self.decode_term(&encoded.object, TermPosition::Object)?;
Ok(StarTriple::new(subject, predicate, object))
}
fn decode_term(&self, encoded: &EncodedTerm, position: TermPosition) -> StarResult<StarTerm> {
match encoded {
EncodedTerm::Regular(id) => {
let entry = match position {
TermPosition::Subject => self.dictionary.get_subject(*id),
TermPosition::Object => self.dictionary.get_object(*id),
}
.ok_or_else(|| StarError::parse_error("Invalid term ID"))?;
entry.to_star_term()
}
EncodedTerm::QuotedTriple(qt_id) => {
let encoded_qt = self
.dictionary
.get_quoted_triple(*qt_id)
.ok_or_else(|| StarError::parse_error("Invalid quoted triple ID"))?;
let decoded_qt = self.decode_triple(encoded_qt)?;
Ok(StarTerm::quoted_triple(decoded_qt))
}
}
}
pub fn to_store(&self) -> StarResult<StarStore> {
let store = StarStore::new();
for result in self.iter_triples() {
let triple = result?;
store.insert(&triple)?;
}
Ok(store)
}
pub fn query_by_subject(&self, subject: &StarTerm) -> Vec<StarResult<StarTriple>> {
let target_entry = DictionaryEntry::from_star_term(subject);
let subject_id = match self.dictionary.subjects.get(&target_entry) {
Some(&id) => id,
None => return Vec::new(),
};
self.triples
.spo_index
.iter()
.filter(|t| matches!(&t.subject, EncodedTerm::Regular(id) if *id == subject_id))
.map(|t| self.decode_triple(t))
.collect()
}
pub fn query_by_predicate(&self, predicate: &StarTerm) -> Vec<StarResult<StarTriple>> {
let target_entry = DictionaryEntry::from_star_term(predicate);
let predicate_id = match self.dictionary.predicates.get(&target_entry) {
Some(&id) => id,
None => return Vec::new(),
};
self.triples
.pos_index
.iter()
.filter(|t| t.predicate == predicate_id)
.map(|t| self.decode_triple(t))
.collect()
}
pub fn query_by_object(&self, object: &StarTerm) -> Vec<StarResult<StarTriple>> {
let target_entry = DictionaryEntry::from_star_term(object);
let object_id = match self.dictionary.objects.get(&target_entry) {
Some(&id) => id,
None => return Vec::new(),
};
self.triples
.osp_index
.iter()
.filter(|t| matches!(&t.object, EncodedTerm::Regular(id) if *id == object_id))
.map(|t| self.decode_triple(t))
.collect()
}
}
fn compress_data(data: &[u8], level: u8) -> StarResult<Vec<u8>> {
oxiarc_zstd::encode_all(data, level as i32)
.map_err(|e| StarError::serialization_error(format!("Compression failed: {e}")))
}
fn decompress_data(data: &[u8]) -> StarResult<Vec<u8>> {
oxiarc_zstd::decode_all(data)
.map_err(|e| StarError::parse_error(format!("Decompression failed: {e}")))
}
pub struct HdtStarConverter;
impl HdtStarConverter {
pub fn store_to_hdt(store: &StarStore, config: HdtStarConfig) -> StarResult<Vec<u8>> {
let mut builder = HdtStarBuilder::new(config);
builder.add_store(store)?;
let mut buffer = Vec::new();
builder.write(&mut buffer)?;
Ok(buffer)
}
pub fn hdt_to_store(data: &[u8]) -> StarResult<StarStore> {
let reader = HdtStarReader::read(&mut std::io::Cursor::new(data))?;
reader.to_store()
}
pub fn get_statistics<R: Read>(reader: &mut R) -> StarResult<HdtStarHeader> {
let mut magic = [0u8; 8];
reader
.read_exact(&mut magic)
.map_err(|e| StarError::parse_error(format!("Failed to read magic bytes: {e}")))?;
if magic != HDT_STAR_MAGIC {
return Err(StarError::parse_error(
"Invalid HDT-star file: magic bytes mismatch",
));
}
let mut header_len_bytes = [0u8; 8];
reader
.read_exact(&mut header_len_bytes)
.map_err(|e| StarError::parse_error(format!("Failed to read header length: {e}")))?;
let header_len = u64::from_le_bytes(header_len_bytes) as usize;
let mut header_bytes = vec![0u8; header_len];
reader
.read_exact(&mut header_bytes)
.map_err(|e| StarError::parse_error(format!("Failed to read header: {e}")))?;
HdtStarHeader::from_bytes(&header_bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hdt_star_config_default() {
let config = HdtStarConfig::default();
assert!(config.enable_compression);
assert_eq!(config.compression_level, 6);
assert!(config.enable_quoted_dict);
assert_eq!(config.max_nesting_depth, 10);
}
#[test]
fn test_hdt_star_header_serialization() {
let config = HdtStarConfig::default();
let header = HdtStarHeader::new(config);
let bytes = header.to_bytes().unwrap();
let decoded = HdtStarHeader::from_bytes(&bytes).unwrap();
assert_eq!(decoded.version, HDT_STAR_VERSION);
assert_eq!(decoded.triple_count, 0);
}
#[test]
fn test_dictionary_entry_conversion() {
let iri = StarTerm::iri("http://example.org/test").unwrap();
let entry = DictionaryEntry::from_star_term(&iri);
assert!(matches!(entry, DictionaryEntry::Iri(_)));
let back = entry.to_star_term().unwrap();
assert!(matches!(back, StarTerm::NamedNode(_)));
}
#[test]
fn test_dictionary_operations() {
let mut dict = HdtStarDictionary::new();
let entry1 = DictionaryEntry::Iri("http://example.org/s1".to_string());
let entry2 = DictionaryEntry::Iri("http://example.org/s2".to_string());
let id1 = dict.add_subject(entry1.clone());
let id2 = dict.add_subject(entry2);
let id1_again = dict.add_subject(entry1);
assert_eq!(id1, id1_again); assert_ne!(id1, id2);
}
#[test]
fn test_hdt_star_builder_simple() {
let config = HdtStarConfig::default();
let mut builder = HdtStarBuilder::new(config);
let triple = StarTriple::new(
StarTerm::iri("http://example.org/s").unwrap(),
StarTerm::iri("http://example.org/p").unwrap(),
StarTerm::iri("http://example.org/o").unwrap(),
);
builder.add_triple(&triple).unwrap();
let stats = builder.statistics();
assert_eq!(stats.triple_count, 1);
}
#[test]
fn test_hdt_star_builder_with_quoted_triple() {
let config = HdtStarConfig::default();
let mut builder = HdtStarBuilder::new(config);
let inner = StarTriple::new(
StarTerm::iri("http://example.org/alice").unwrap(),
StarTerm::iri("http://example.org/age").unwrap(),
StarTerm::literal("30").unwrap(),
);
let outer = StarTriple::new(
StarTerm::quoted_triple(inner),
StarTerm::iri("http://example.org/certainty").unwrap(),
StarTerm::literal("0.9").unwrap(),
);
builder.add_triple(&outer).unwrap();
let stats = builder.statistics();
assert_eq!(stats.triple_count, 1);
assert_eq!(stats.quoted_triple_count, 1);
}
#[test]
fn test_hdt_star_roundtrip() {
let config = HdtStarConfig::default();
let mut builder = HdtStarBuilder::new(config);
for i in 0..10 {
let s = format!("http://example.org/s{}", i);
let v = format!("value{}", i);
let triple = StarTriple::new(
StarTerm::iri(&s).unwrap(),
StarTerm::iri("http://example.org/p").unwrap(),
StarTerm::literal(&v).unwrap(),
);
builder.add_triple(&triple).unwrap();
}
let quoted = StarTriple::new(
StarTerm::iri("http://example.org/x").unwrap(),
StarTerm::iri("http://example.org/y").unwrap(),
StarTerm::iri("http://example.org/z").unwrap(),
);
let meta = StarTriple::new(
StarTerm::quoted_triple(quoted),
StarTerm::iri("http://example.org/meta").unwrap(),
StarTerm::literal("test").unwrap(),
);
builder.add_triple(&meta).unwrap();
let mut buffer = Vec::new();
builder.write(&mut buffer).unwrap();
let reader = HdtStarReader::read(&mut std::io::Cursor::new(&buffer)).unwrap();
assert_eq!(reader.len(), 11);
assert_eq!(reader.header().quoted_triple_count, 1);
let decoded: Vec<_> = reader.iter_triples().collect();
assert_eq!(decoded.len(), 11);
for result in decoded {
assert!(result.is_ok());
}
}
#[test]
fn test_hdt_star_query_by_subject() {
let config = HdtStarConfig::default();
let mut builder = HdtStarBuilder::new(config);
let subject = StarTerm::iri("http://example.org/alice").unwrap();
for i in 0..5 {
let p = format!("http://example.org/p{}", i);
let v = format!("value{}", i);
let triple = StarTriple::new(
subject.clone(),
StarTerm::iri(&p).unwrap(),
StarTerm::literal(&v).unwrap(),
);
builder.add_triple(&triple).unwrap();
}
for i in 0..3 {
let p = format!("http://example.org/p{}", i);
let triple = StarTriple::new(
StarTerm::iri("http://example.org/bob").unwrap(),
StarTerm::iri(&p).unwrap(),
StarTerm::literal("other").unwrap(),
);
builder.add_triple(&triple).unwrap();
}
let mut buffer = Vec::new();
builder.write(&mut buffer).unwrap();
let reader = HdtStarReader::read(&mut std::io::Cursor::new(&buffer)).unwrap();
let results = reader.query_by_subject(&subject);
assert_eq!(results.len(), 5);
}
#[test]
fn test_hdt_star_converter() {
let store = StarStore::new();
for i in 0..10 {
let s = format!("http://example.org/s{}", i);
let v = format!("{}", i);
let triple = StarTriple::new(
StarTerm::iri(&s).unwrap(),
StarTerm::iri("http://example.org/p").unwrap(),
StarTerm::literal(&v).unwrap(),
);
store.insert(&triple).unwrap();
}
let config = HdtStarConfig::default();
let hdt_bytes = HdtStarConverter::store_to_hdt(&store, config).unwrap();
let restored = HdtStarConverter::hdt_to_store(&hdt_bytes).unwrap();
assert_eq!(restored.len(), store.len());
}
#[test]
fn test_hdt_star_compression() {
let config = HdtStarConfig {
enable_compression: true,
compression_level: 9,
..HdtStarConfig::default()
};
let mut builder = HdtStarBuilder::new(config.clone());
for i in 0..1000 {
let s = format!("http://example.org/subject{}", i);
let v = format!("This is a test value number {}", i);
let triple = StarTriple::new(
StarTerm::iri(&s).unwrap(),
StarTerm::iri("http://example.org/predicate").unwrap(),
StarTerm::literal(&v).unwrap(),
);
builder.add_triple(&triple).unwrap();
}
let mut compressed_buffer = Vec::new();
builder.write(&mut compressed_buffer).unwrap();
let config_uncompressed = HdtStarConfig {
enable_compression: false,
..config
};
let mut builder_uncompressed = HdtStarBuilder::new(config_uncompressed);
for i in 0..1000 {
let s = format!("http://example.org/subject{}", i);
let v = format!("This is a test value number {}", i);
let triple = StarTriple::new(
StarTerm::iri(&s).unwrap(),
StarTerm::iri("http://example.org/predicate").unwrap(),
StarTerm::literal(&v).unwrap(),
);
builder_uncompressed.add_triple(&triple).unwrap();
}
let mut uncompressed_buffer = Vec::new();
builder_uncompressed
.write(&mut uncompressed_buffer)
.unwrap();
assert!(
compressed_buffer.len() < uncompressed_buffer.len(),
"Compressed: {}, Uncompressed: {}",
compressed_buffer.len(),
uncompressed_buffer.len()
);
}
#[test]
fn test_hdt_star_nested_quoted_triples() {
let config = HdtStarConfig::default();
let mut builder = HdtStarBuilder::new(config);
let level1 = StarTriple::new(
StarTerm::iri("http://example.org/a").unwrap(),
StarTerm::iri("http://example.org/b").unwrap(),
StarTerm::iri("http://example.org/c").unwrap(),
);
let level2 = StarTriple::new(
StarTerm::quoted_triple(level1),
StarTerm::iri("http://example.org/meta1").unwrap(),
StarTerm::literal("level2").unwrap(),
);
let level3 = StarTriple::new(
StarTerm::quoted_triple(level2),
StarTerm::iri("http://example.org/meta2").unwrap(),
StarTerm::literal("level3").unwrap(),
);
builder.add_triple(&level3).unwrap();
let stats = builder.statistics();
assert_eq!(stats.triple_count, 1);
assert_eq!(stats.quoted_triple_count, 2);
let mut buffer = Vec::new();
builder.write(&mut buffer).unwrap();
let reader = HdtStarReader::read(&mut std::io::Cursor::new(&buffer)).unwrap();
let triples: Vec<_> = reader.iter_triples().collect();
assert_eq!(triples.len(), 1);
assert!(triples[0].is_ok());
}
}