use crate::rdf_integration::{convert_rule_atom, NamespaceManager, RdfRuleAtom};
use crate::rdf_integration::RdfTerm; use crate::{RuleAtom, Term as RuleTerm};
use anyhow::{anyhow, Result};
use oxirs_core::model::{Dataset, Graph, Quad, Triple, NamedNode};
use oxirs_core::{OxirsError, Store};
use regex::Regex;
use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufReader, Read, Write};
use std::path::Path;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RdfFormat {
RdfXml,
Turtle,
NTriples,
NQuads,
JsonLd,
TriG,
}
impl RdfFormat {
pub fn to_graph_format(&self) -> Option<oxirs_core::GraphFormat> {
match self {
RdfFormat::RdfXml => Some(oxirs_core::GraphFormat::RdfXml),
RdfFormat::Turtle => Some(oxirs_core::GraphFormat::Turtle),
RdfFormat::NTriples => Some(oxirs_core::GraphFormat::NTriples),
_ => None,
}
}
pub fn to_dataset_format(&self) -> Option<oxirs_core::DatasetFormat> {
match self {
RdfFormat::NQuads => Some(oxirs_core::DatasetFormat::NQuads),
RdfFormat::TriG => Some(oxirs_core::DatasetFormat::TriG),
_ => None,
}
}
pub fn from_extension(path: &Path) -> Option<Self> {
let ext = path.extension()?.to_str()?.to_lowercase();
match ext.as_str() {
"rdf" | "xml" => Some(RdfFormat::RdfXml),
"ttl" | "turtle" => Some(RdfFormat::Turtle),
"nt" => Some(RdfFormat::NTriples),
"nq" => Some(RdfFormat::NQuads),
"jsonld" | "json" => Some(RdfFormat::JsonLd),
"trig" => Some(RdfFormat::TriG),
_ => None,
}
}
pub fn mime_type(&self) -> &'static str {
match self {
RdfFormat::RdfXml => "application/rdf+xml",
RdfFormat::Turtle => "text/turtle",
RdfFormat::NTriples => "application/n-triples",
RdfFormat::NQuads => "application/n-quads",
RdfFormat::JsonLd => "application/ld+json",
RdfFormat::TriG => "application/trig",
}
}
}
pub struct RdfProcessor {
store: Arc<dyn Store>,
namespaces: NamespaceManager,
config: ProcessingConfig,
}
#[derive(Debug, Clone)]
pub struct ProcessingConfig {
pub validate_iris: bool,
pub resolve_relative: bool,
pub base_iri: Option<String>,
pub max_file_size: usize,
pub use_streaming: bool,
pub streaming_threshold: usize,
pub collect_stats: bool,
}
impl Default for ProcessingConfig {
fn default() -> Self {
Self {
validate_iris: true,
resolve_relative: true,
base_iri: None,
max_file_size: 0, use_streaming: true,
streaming_threshold: 10 * 1024 * 1024, collect_stats: false,
}
}
}
#[derive(Debug, Default)]
pub struct ProcessingStats {
pub triples_processed: usize,
pub quads_processed: usize,
pub subjects: HashSet<String>,
pub predicates: HashSet<String>,
pub objects: HashSet<String>,
pub graphs: HashSet<String>,
pub parse_errors: Vec<String>,
pub processing_time: std::time::Duration,
}
impl RdfProcessor {
pub fn new(store: Arc<dyn Store>) -> Self {
Self {
store,
namespaces: NamespaceManager::new(),
config: ProcessingConfig::default(),
}
}
pub fn with_config(store: Arc<dyn Store>, config: ProcessingConfig) -> Self {
let mut namespaces = NamespaceManager::new();
if let Some(base) = &config.base_iri {
namespaces.set_base(base.clone());
}
Self {
store,
namespaces,
config,
}
}
pub async fn process_file(&mut self, path: &Path) -> Result<ProcessingStats> {
let format = RdfFormat::from_extension(path)
.ok_or_else(|| anyhow!("Unknown RDF format for file: {:?}", path))?;
let metadata = tokio::fs::metadata(path).await?;
if self.config.max_file_size > 0 && metadata.len() > self.config.max_file_size as u64 {
return Err(anyhow!("File size exceeds maximum: {} bytes", metadata.len()));
}
let use_streaming = self.config.use_streaming &&
metadata.len() > self.config.streaming_threshold as u64;
let file = tokio::fs::File::open(path).await?;
if use_streaming {
self.process_stream(file, format).await
} else {
let mut contents = Vec::new();
let mut reader = tokio::io::BufReader::new(file);
reader.read_to_end(&mut contents).await?;
self.process_data(&contents, format).await
}
}
pub async fn process_data(&mut self, data: &[u8], format: RdfFormat) -> Result<ProcessingStats> {
let start_time = std::time::Instant::now();
let mut stats = ProcessingStats::default();
match format {
RdfFormat::RdfXml | RdfFormat::Turtle | RdfFormat::NTriples => {
self.process_graph_format(data, format, &mut stats)?;
}
RdfFormat::NQuads | RdfFormat::TriG => {
self.process_dataset_format(data, format, &mut stats)?;
}
RdfFormat::JsonLd => {
self.process_jsonld(data, &mut stats).await?;
}
}
stats.processing_time = start_time.elapsed();
Ok(stats)
}
pub async fn process_stream(
&mut self,
mut reader: impl AsyncRead + Unpin,
format: RdfFormat,
) -> Result<ProcessingStats> {
let start_time = std::time::Instant::now();
let mut stats = ProcessingStats::default();
match format {
RdfFormat::NTriples | RdfFormat::NQuads => {
let reader = tokio::io::BufReader::new(reader);
self.process_line_based_stream(reader, format, &mut stats).await?;
}
_ => {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
self.process_data(&buffer, format).await?;
}
}
stats.processing_time = start_time.elapsed();
Ok(stats)
}
fn process_graph_format(
&mut self,
data: &[u8],
format: RdfFormat,
stats: &mut ProcessingStats,
) -> Result<()> {
let graph_format = format.to_graph_format()
.ok_or_else(|| anyhow!("Not a graph format: {:?}", format))?;
let graph = Graph::parse(
std::io::Cursor::new(data),
graph_format,
self.config.base_iri.as_deref(),
)?;
for triple in graph.iter() {
self.store.insert(&Quad::from(triple.clone()))?;
if self.config.collect_stats {
stats.triples_processed += 1;
stats.subjects.insert(triple.subject.to_string());
stats.predicates.insert(triple.predicate.to_string());
stats.objects.insert(triple.object.to_string());
}
}
if format == RdfFormat::Turtle {
self.extract_turtle_prefixes(std::str::from_utf8(data)?)?;
}
Ok(())
}
fn process_dataset_format(
&mut self,
data: &[u8],
format: RdfFormat,
stats: &mut ProcessingStats,
) -> Result<()> {
let dataset_format = format.to_dataset_format()
.ok_or_else(|| anyhow!("Not a dataset format: {:?}", format))?;
let dataset = Dataset::parse(
std::io::Cursor::new(data),
dataset_format,
self.config.base_iri.as_deref(),
)?;
for quad in dataset.iter() {
self.store.insert(quad)?;
if self.config.collect_stats {
stats.quads_processed += 1;
stats.subjects.insert(quad.subject.to_string());
stats.predicates.insert(quad.predicate.to_string());
stats.objects.insert(quad.object.to_string());
if let Some(graph) = &quad.graph_name {
stats.graphs.insert(graph.to_string());
}
}
}
Ok(())
}
async fn process_jsonld(&mut self, data: &[u8], stats: &mut ProcessingStats) -> Result<()> {
use serde_json::Value;
let json: Value = serde_json::from_slice(data)?;
let context = if let Some(ctx) = json.get("@context") {
self.extract_context(ctx)?
} else {
std::collections::HashMap::new()
};
match &json {
Value::Object(obj) => {
self.process_jsonld_object(obj, &context, stats)?;
}
Value::Array(arr) => {
for item in arr {
if let Value::Object(obj) = item {
self.process_jsonld_object(obj, &context, stats)?;
}
}
}
_ => return Err(anyhow!("Invalid JSON-LD: expected object or array")),
}
Ok(())
}
fn extract_context(&self, ctx: &serde_json::Value) -> Result<std::collections::HashMap<String, String>> {
use serde_json::Value;
let mut context = std::collections::HashMap::new();
match ctx {
Value::String(url) => {
tracing::debug!("Skipping remote context: {}", url);
}
Value::Object(obj) => {
for (key, value) in obj {
if let Value::String(iri) = value {
context.insert(key.clone(), iri.clone());
}
}
}
Value::Array(arr) => {
for item in arr {
if let Value::Object(obj) = item {
for (key, value) in obj {
if let Value::String(iri) = value {
context.insert(key.clone(), iri.clone());
}
}
}
}
}
_ => {}
}
Ok(context)
}
fn process_jsonld_object(
&mut self,
obj: &serde_json::Map<String, serde_json::Value>,
context: &std::collections::HashMap<String, String>,
stats: &mut ProcessingStats,
) -> Result<()> {
use serde_json::Value;
if obj.len() == 1 && (obj.contains_key("@context") || obj.contains_key("@graph")) {
return Ok(());
}
let subject = if let Some(Value::String(id)) = obj.get("@id") {
self.expand_term(id, context)
} else {
format!("_:b{}", stats.processed_triples)
};
if let Some(type_value) = obj.get("@type") {
match type_value {
Value::String(type_str) => {
let type_iri = self.expand_term(type_str, context);
self.add_triple_from_jsonld(&subject, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", &type_iri, stats)?;
}
Value::Array(types) => {
for t in types {
if let Value::String(type_str) = t {
let type_iri = self.expand_term(type_str, context);
self.add_triple_from_jsonld(&subject, "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", &type_iri, stats)?;
}
}
}
_ => {}
}
}
for (key, value) in obj {
if key.starts_with('@') {
continue;
}
let predicate = self.expand_term(key, context);
match value {
Value::String(s) => {
let object = if s.starts_with("http://") || s.starts_with("https://") {
s.clone()
} else {
format!("\"{}\"", s)
};
self.add_triple_from_jsonld(&subject, &predicate, &object, stats)?;
}
Value::Number(n) => {
self.add_triple_from_jsonld(&subject, &predicate, &format!("\"{}\"", n), stats)?;
}
Value::Bool(b) => {
self.add_triple_from_jsonld(&subject, &predicate, &format!("\"{}\"", b), stats)?;
}
Value::Object(nested) => {
if let Some(Value::String(nested_id)) = nested.get("@id") {
let nested_iri = self.expand_term(nested_id, context);
self.add_triple_from_jsonld(&subject, &predicate, &nested_iri, stats)?;
}
}
Value::Array(arr) => {
for item in arr {
if let Value::String(s) = item {
let object = if s.starts_with("http://") || s.starts_with("https://") {
s.clone()
} else {
format!("\"{}\"", s)
};
self.add_triple_from_jsonld(&subject, &predicate, &object, stats)?;
}
}
}
_ => {}
}
}
Ok(())
}
fn expand_term(&self, term: &str, context: &std::collections::HashMap<String, String>) -> String {
if term.starts_with("http://") || term.starts_with("https://") {
return term.to_string();
}
if let Some((prefix, local)) = term.split_once(':') {
if let Some(base) = context.get(prefix) {
return format!("{}{}", base, local);
}
}
if let Some(iri) = context.get(term) {
return iri.clone();
}
term.to_string()
}
fn add_triple_from_jsonld(
&mut self,
subject: &str,
predicate: &str,
object: &str,
stats: &mut ProcessingStats,
) -> Result<()> {
use oxirs_core::model::{NamedNode, Subject, Predicate, Object, Literal, Triple, Quad};
let subj = if subject.starts_with("_:") {
Subject::BlankNode(oxirs_core::BlankNode::new(&subject[2..])?)
} else {
Subject::NamedNode(NamedNode::new(subject)?)
};
let pred = Predicate::NamedNode(NamedNode::new(predicate)?);
let obj = if object.starts_with('"') {
Object::Literal(Literal::new(object.trim_matches('"')))
} else if object.starts_with("_:") {
Object::BlankNode(oxirs_core::BlankNode::new(&object[2..])?)
} else {
Object::NamedNode(NamedNode::new(object)?)
};
let triple = Triple::new(subj, pred, obj);
let quad = Quad::from(triple);
self.store.insert(&quad)?;
if self.config.collect_stats {
stats.triples_processed += 1;
}
Ok(())
}
async fn process_line_based_stream(
&mut self,
reader: tokio::io::BufReader<impl AsyncRead + Unpin>,
format: RdfFormat,
stats: &mut ProcessingStats,
) -> Result<()> {
let mut lines = reader.lines();
let mut line_number = 0;
while let Some(line) = lines.next_line().await? {
line_number += 1;
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
match format {
RdfFormat::NTriples => {
match self.parse_ntriples_line(&line) {
Ok(triple) => {
self.store.insert(&Quad::from(triple))?;
stats.triples_processed += 1;
}
Err(e) => {
if self.config.collect_stats {
stats.parse_errors.push(format!("Line {}: {}", line_number, e));
}
}
}
}
RdfFormat::NQuads => {
match self.parse_nquads_line(&line) {
Ok(quad) => {
self.store.insert(&quad)?;
stats.quads_processed += 1;
}
Err(e) => {
if self.config.collect_stats {
stats.parse_errors.push(format!("Line {}: {}", line_number, e));
}
}
}
}
_ => unreachable!(),
}
}
Ok(())
}
fn parse_ntriples_line(&self, line: &str) -> Result<Triple> {
let mut cursor = std::io::Cursor::new(line.as_bytes());
let graph = Graph::parse(&mut cursor, oxirs_core::GraphFormat::NTriples, None)?;
let triple = graph.iter().next()
.ok_or_else(|| anyhow!("No triple parsed from line"))?;
Ok(triple.clone())
}
fn parse_nquads_line(&self, line: &str) -> Result<Quad> {
let mut cursor = std::io::Cursor::new(line.as_bytes());
let dataset = Dataset::parse(&mut cursor, oxirs_core::DatasetFormat::NQuads, None)?;
let quad = dataset.iter().next()
.ok_or_else(|| anyhow!("No quad parsed from line"))?;
Ok(quad.clone())
}
fn extract_turtle_prefixes(&mut self, content: &str) -> Result<()> {
let prefix_regex = Regex::new(r"@prefix\s+(\w+):\s*<([^>]+)>\s*\.")?;
for cap in prefix_regex.captures_iter(content) {
if let (Some(prefix), Some(namespace)) = (cap.get(1), cap.get(2)) {
self.namespaces.add_prefix(
prefix.as_str().to_string(),
namespace.as_str().to_string(),
);
}
}
let base_regex = Regex::new(r"@base\s*<([^>]+)>\s*\.")?;
if let Some(cap) = base_regex.captures(content) {
if let Some(base) = cap.get(1) {
self.namespaces.set_base(base.as_str().to_string());
}
}
Ok(())
}
pub fn to_rule_atoms(&self) -> Result<Vec<RuleAtom>> {
let mut atoms = Vec::new();
for quad in self.store.query_quads(None, None, None, None)? {
if quad.graph_name() != &oxirs_core::model::GraphName::DefaultGraph {
continue;
}
let atom = RuleAtom::Triple {
subject: self.term_to_rule_term(&quad.subject().clone().into())?,
predicate: self.term_to_rule_term(&quad.predicate().clone().into())?,
object: self.term_to_rule_term(&quad.object().clone().into())?,
};
atoms.push(atom);
}
Ok(atoms)
}
fn term_to_rule_term(&self, term: &oxirs_core::model::Term) -> Result<RuleTerm> {
use oxirs_core::model::Term;
match term {
Term::NamedNode(n) => Ok(RuleTerm::Constant(self.namespaces.compact(n.as_str()))),
Term::BlankNode(b) => Ok(RuleTerm::Constant(format!("_:{}", b.as_str()))),
Term::Literal(l) => {
if let Some(lang) = l.language() {
Ok(RuleTerm::Literal(format!("{}@{}", l.value(), lang)))
} else {
let dt = l.datatype();
if dt.as_str() != "http://www.w3.org/2001/XMLSchema#string" {
Ok(RuleTerm::Literal(format!("{}^^{}", l.value(), self.namespaces.compact(dt.as_str()))))
} else {
Ok(RuleTerm::Literal(l.value().to_string()))
}
}
}
Term::Variable(v) => Ok(RuleTerm::Variable(v.name().to_string())),
Term::QuotedTriple(_) => Err(anyhow!("Quoted triples not yet supported in rules")),
}
}
pub fn namespaces(&self) -> &NamespaceManager {
&self.namespaces
}
pub fn store(&self) -> &Arc<dyn Store> {
&self.store
}
}
pub struct FactManager {
memory_facts: Vec<RdfRuleAtom>,
storage_path: Option<std::path::PathBuf>,
max_memory_facts: usize,
total_facts: usize,
}
impl FactManager {
pub fn new(max_memory_facts: usize) -> Self {
Self {
memory_facts: Vec::new(),
storage_path: None,
max_memory_facts,
total_facts: 0,
}
}
pub fn with_storage(mut self, path: std::path::PathBuf) -> Self {
self.storage_path = Some(path);
self
}
pub fn add_fact(&mut self, fact: RdfRuleAtom) -> Result<()> {
self.total_facts += 1;
if self.memory_facts.len() < self.max_memory_facts {
self.memory_facts.push(fact);
} else if let Some(path) = &self.storage_path {
self.spill_to_disk(&fact)?;
} else {
return Err(anyhow!("Memory limit reached and no storage path configured"));
}
Ok(())
}
fn spill_to_disk(&self, fact: &RdfRuleAtom) -> Result<()> {
Ok(())
}
pub fn iter(&self) -> impl Iterator<Item = &RdfRuleAtom> {
self.memory_facts.iter()
}
pub fn total_count(&self) -> usize {
self.total_facts
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[tokio::test]
async fn test_format_detection() {
assert_eq!(
RdfFormat::from_extension(Path::new("test.ttl")),
Some(RdfFormat::Turtle)
);
assert_eq!(
RdfFormat::from_extension(Path::new("test.rdf")),
Some(RdfFormat::RdfXml)
);
assert_eq!(
RdfFormat::from_extension(Path::new("test.jsonld")),
Some(RdfFormat::JsonLd)
);
}
#[tokio::test]
async fn test_process_turtle() {
let store = Arc::new(Store::new()?);
let mut processor = RdfProcessor::new(store.clone());
let turtle_data = r#"
@prefix ex: <http://example.org/> .
ex:subject ex:predicate ex:object .
"#;
let stats = processor.process_data(turtle_data.as_bytes(), RdfFormat::Turtle).await?;
assert_eq!(stats.triples_processed, 1);
assert_eq!(store.len()?, 1);
}
#[tokio::test]
async fn test_process_ntriples() {
let store = Arc::new(Store::new()?);
let mut processor = RdfProcessor::new(store.clone());
let ntriples_data = r#"<http://example.org/subject> <http://example.org/predicate> <http://example.org/object> ."#;
let stats = processor.process_data(ntriples_data.as_bytes(), RdfFormat::NTriples).await?;
assert_eq!(stats.triples_processed, 1);
assert_eq!(store.len()?, 1);
}
#[tokio::test]
async fn test_streaming_processing() {
let store = Arc::new(Store::new()?);
let config = ProcessingConfig {
use_streaming: true,
streaming_threshold: 0, ..Default::default()
};
let mut processor = RdfProcessor::with_config(store.clone(), config);
let mut temp_file = NamedTempFile::new()?;
writeln!(temp_file, "<http://example.org/s1> <http://example.org/p> <http://example.org/o1> .")?;
writeln!(temp_file, "<http://example.org/s2> <http://example.org/p> <http://example.org/o2> .")?;
let stats = processor.process_file(temp_file.path()).await?;
assert_eq!(stats.triples_processed, 2);
assert_eq!(store.len()?, 2);
}
#[test]
fn test_fact_manager() -> anyhow::Result<()> {
let mut manager = FactManager::new(2);
let fact1 = RdfRuleAtom::Triple {
subject: RdfTerm::NamedNode(NamedNode::new("http://example.org/s1")?),
predicate: RdfTerm::NamedNode(NamedNode::new("http://example.org/p")?),
object: RdfTerm::NamedNode(NamedNode::new("http://example.org/o1")?),
};
manager.add_fact(fact1.clone())?;
assert_eq!(manager.total_count(), 1);
manager.add_fact(fact1.clone())?;
assert_eq!(manager.total_count(), 2);
assert!(manager.add_fact(fact1).is_err());
Ok(())
}
}