use crate::error::{TurtleParseError, TurtleResult};
use crate::formats::nquads::NQuadsParser;
use crate::formats::ntriples::NTriplesParser;
use crate::formats::trig::TriGParser;
use crate::formats::turtle::TurtleParser;
use crate::streaming::{StreamingConfig, StreamingParser};
use crate::toolkit::{FormatDetector, Parser, RdfFormat};
use oxirs_core::model::{Quad, Triple};
use std::fs::File;
use std::io::BufReader;
use std::path::Path;
pub fn parse_turtle_file<P: AsRef<Path>>(path: P) -> TurtleResult<Vec<Triple>> {
let file = File::open(path).map_err(TurtleParseError::io)?;
let reader = BufReader::new(file);
TurtleParser::new().parse(reader)
}
pub fn parse_ntriples_file<P: AsRef<Path>>(path: P) -> TurtleResult<Vec<Triple>> {
let file = File::open(path).map_err(TurtleParseError::io)?;
let reader = BufReader::new(file);
NTriplesParser::new().parse(reader)
}
pub fn parse_nquads_file<P: AsRef<Path>>(path: P) -> TurtleResult<Vec<Quad>> {
let file = File::open(path).map_err(TurtleParseError::io)?;
let reader = BufReader::new(file);
NQuadsParser::new().parse(reader)
}
pub fn parse_trig_file<P: AsRef<Path>>(path: P) -> TurtleResult<Vec<Quad>> {
let file = File::open(path).map_err(TurtleParseError::io)?;
let reader = BufReader::new(file);
TriGParser::new().parse(reader)
}
pub fn parse_rdf_file<P: AsRef<Path>>(path: P) -> TurtleResult<Vec<Triple>> {
let path_ref = path.as_ref();
let detector = FormatDetector::new();
let format = detector
.detect_from_path(path_ref)
.ok_or_else(|| {
TurtleParseError::io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Could not detect format for file: {:?}", path_ref),
))
})?
.format;
match format {
RdfFormat::Turtle => parse_turtle_file(path),
RdfFormat::NTriples => parse_ntriples_file(path),
_ => Err(TurtleParseError::io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"Format {:?} returns quads, not triples. Use parse_rdf_file_quads instead.",
format
),
))),
}
}
pub fn parse_rdf_file_quads<P: AsRef<Path>>(path: P) -> TurtleResult<Vec<Quad>> {
let path_ref = path.as_ref();
let detector = FormatDetector::new();
let format = detector
.detect_from_path(path_ref)
.ok_or_else(|| {
TurtleParseError::io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Could not detect format for file: {:?}", path_ref),
))
})?
.format;
match format {
RdfFormat::TriG => parse_trig_file(path),
RdfFormat::NQuads => parse_nquads_file(path),
RdfFormat::Turtle => {
let triples = parse_turtle_file(path)?;
Ok(triples
.into_iter()
.map(|t| {
Quad::new(
t.subject().clone(),
t.predicate().clone(),
t.object().clone(),
oxirs_core::model::GraphName::DefaultGraph,
)
})
.collect())
}
RdfFormat::NTriples => {
let triples = parse_ntriples_file(path)?;
Ok(triples
.into_iter()
.map(|t| {
Quad::new(
t.subject().clone(),
t.predicate().clone(),
t.object().clone(),
oxirs_core::model::GraphName::DefaultGraph,
)
})
.collect())
}
}
}
pub fn process_rdf_file_in_batches<P, F>(
path: P,
batch_size: usize,
mut callback: F,
) -> TurtleResult<()>
where
P: AsRef<Path>,
F: FnMut(&[Triple]) -> TurtleResult<()>,
{
let file = File::open(path).map_err(TurtleParseError::io)?;
let config = StreamingConfig::default().with_batch_size(batch_size);
let parser = StreamingParser::with_config(file, config);
for batch_result in parser.batches() {
let batch = batch_result?;
callback(&batch)?;
}
Ok(())
}
#[derive(Debug, Clone, Default)]
pub struct ParsingStatistics {
pub total_items: usize,
pub batches_processed: usize,
pub bytes_read: usize,
pub errors: usize,
}
impl ParsingStatistics {
pub fn new() -> Self {
Self::default()
}
pub fn avg_batch_size(&self) -> f64 {
if self.batches_processed == 0 {
0.0
} else {
self.total_items as f64 / self.batches_processed as f64
}
}
pub fn report(&self) -> String {
format!(
"Parsing Statistics:\n\
- Total items: {}\n\
- Batches: {}\n\
- Avg batch size: {:.1}\n\
- Bytes read: {}\n\
- Errors: {}",
self.total_items,
self.batches_processed,
self.avg_batch_size(),
self.bytes_read,
self.errors
)
}
}
pub fn process_rdf_file_with_stats<P, F>(
path: P,
batch_size: usize,
mut callback: F,
) -> TurtleResult<ParsingStatistics>
where
P: AsRef<Path>,
F: FnMut(&[Triple]) -> TurtleResult<()>,
{
let file = File::open(path).map_err(TurtleParseError::io)?;
let config = StreamingConfig::default().with_batch_size(batch_size);
let parser = StreamingParser::with_config(file, config);
let mut stats = ParsingStatistics::new();
for batch_result in parser.batches() {
let batch = batch_result?;
stats.total_items += batch.len();
stats.batches_processed += 1;
callback(&batch)?;
}
Ok(stats)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_parse_turtle_file() -> TurtleResult<()> {
let mut file = NamedTempFile::new().map_err(TurtleParseError::io)?;
writeln!(
file,
"@prefix ex: <http://example.org/> .\nex:s ex:p ex:o ."
)
.map_err(TurtleParseError::io)?;
let triples = parse_turtle_file(file.path())?;
assert_eq!(triples.len(), 1);
Ok(())
}
#[test]
fn test_parse_ntriples_file() -> TurtleResult<()> {
let mut file = NamedTempFile::new().map_err(TurtleParseError::io)?;
writeln!(file, "<http://s> <http://p> <http://o> .").map_err(TurtleParseError::io)?;
let triples = parse_ntriples_file(file.path())?;
assert_eq!(triples.len(), 1);
Ok(())
}
#[test]
fn test_process_in_batches() -> TurtleResult<()> {
let mut file = NamedTempFile::new().map_err(TurtleParseError::io)?;
for i in 0..100 {
writeln!(file, "<http://s{}> <http://p> <http://o{}> .", i, i)
.map_err(TurtleParseError::io)?;
}
let mut total = 0;
let mut batch_count = 0;
process_rdf_file_in_batches(file.path(), 10, |batch| {
total += batch.len();
batch_count += 1;
assert!(batch.len() <= 10);
Ok(())
})?;
assert_eq!(total, 100);
assert!(batch_count >= 10); Ok(())
}
#[test]
fn test_parsing_statistics() -> TurtleResult<()> {
let mut file = NamedTempFile::new().map_err(TurtleParseError::io)?;
for i in 0..50 {
writeln!(file, "<http://s{}> <http://p> <http://o{}> .", i, i)
.map_err(TurtleParseError::io)?;
}
let stats = process_rdf_file_with_stats(file.path(), 10, |_batch| Ok(()))?;
assert_eq!(stats.total_items, 50);
assert!(stats.batches_processed >= 5);
assert!(stats.avg_batch_size() > 0.0);
let report = stats.report();
assert!(report.contains("Total items: 50"));
Ok(())
}
}