use anyhow::Result;
use oxrdf::{Graph, Triple};
use oxttl::TurtleParser;
use std::io::BufRead;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Default)]
pub struct StreamStats {
pub triples_processed: usize,
pub batches_processed: usize,
pub processing_time: Duration,
pub errors_encountered: usize,
pub peak_memory_bytes: usize,
}
impl StreamStats {
pub fn triples_per_second(&self) -> f64 {
if self.processing_time.as_secs_f64() > 0.0 {
self.triples_processed as f64 / self.processing_time.as_secs_f64()
} else {
0.0
}
}
}
pub type TripleHandler = Box<dyn FnMut(&str, &str, &str) + Send>;
pub type BatchHandler = Box<dyn FnMut(&[Triple]) + Send>;
pub type ProgressHandler = Box<dyn FnMut(&StreamStats) + Send>;
pub struct StreamingRdfLoader {
batch_size: usize,
triple_handler: Option<TripleHandler>,
batch_handler: Option<BatchHandler>,
progress_handler: Option<ProgressHandler>,
progress_interval: usize,
collect_graph: bool,
predicate_filter: Option<Vec<String>>,
subject_prefix_filter: Option<String>,
}
impl StreamingRdfLoader {
pub fn new() -> Self {
StreamingRdfLoader {
batch_size: 1000,
triple_handler: None,
batch_handler: None,
progress_handler: None,
progress_interval: 10000,
collect_graph: false,
predicate_filter: None,
subject_prefix_filter: None,
}
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size.max(1);
self
}
pub fn on_triple<F>(mut self, handler: F) -> Self
where
F: FnMut(&str, &str, &str) + Send + 'static,
{
self.triple_handler = Some(Box::new(handler));
self
}
pub fn on_batch<F>(mut self, handler: F) -> Self
where
F: FnMut(&[Triple]) + Send + 'static,
{
self.batch_handler = Some(Box::new(handler));
self
}
pub fn on_progress<F>(mut self, handler: F) -> Self
where
F: FnMut(&StreamStats) + Send + 'static,
{
self.progress_handler = Some(Box::new(handler));
self
}
pub fn with_progress_interval(mut self, interval: usize) -> Self {
self.progress_interval = interval.max(1);
self
}
pub fn collect_into_graph(mut self) -> Self {
self.collect_graph = true;
self
}
pub fn filter_predicates(mut self, predicates: Vec<String>) -> Self {
self.predicate_filter = Some(predicates);
self
}
pub fn filter_subject_prefix(mut self, prefix: String) -> Self {
self.subject_prefix_filter = Some(prefix);
self
}
pub fn process_turtle(&mut self, data: &str) -> Result<(StreamStats, Option<Graph>)> {
let reader = std::io::Cursor::new(data);
self.process_turtle_reader(reader)
}
pub fn process_turtle_reader<R: BufRead>(
&mut self,
reader: R,
) -> Result<(StreamStats, Option<Graph>)> {
let start = Instant::now();
let mut stats = StreamStats::default();
let mut graph = if self.collect_graph {
Some(Graph::new())
} else {
None
};
let mut batch: Vec<Triple> = Vec::with_capacity(self.batch_size);
let parser = TurtleParser::new().for_reader(reader);
for result in parser {
match result {
Ok(triple) => {
if !self.should_process_triple(&triple) {
continue;
}
stats.triples_processed += 1;
if self.triple_handler.is_some() {
let subject = self.subject_to_string(&triple.subject);
let predicate = triple.predicate.as_str().to_string();
let object = self.term_to_string(triple.object.as_ref());
if let Some(ref mut handler) = self.triple_handler {
handler(&subject, &predicate, &object);
}
}
batch.push(triple);
if batch.len() >= self.batch_size {
self.process_batch(&batch, &mut graph, &mut stats);
batch.clear();
stats.batches_processed += 1;
}
if stats.triples_processed % self.progress_interval == 0 {
stats.processing_time = start.elapsed();
if let Some(ref mut handler) = self.progress_handler {
handler(&stats);
}
}
}
Err(e) => {
stats.errors_encountered += 1;
eprintln!("Parse error: {}", e);
}
}
}
if !batch.is_empty() {
self.process_batch(&batch, &mut graph, &mut stats);
stats.batches_processed += 1;
}
stats.processing_time = start.elapsed();
Ok((stats, graph))
}
fn should_process_triple(&self, triple: &Triple) -> bool {
if let Some(ref predicates) = self.predicate_filter {
let pred_str = triple.predicate.as_str();
if !predicates.iter().any(|p| pred_str.contains(p)) {
return false;
}
}
if let Some(ref prefix) = self.subject_prefix_filter {
let subject_str = self.subject_to_string(&triple.subject);
if !subject_str.starts_with(prefix) {
return false;
}
}
true
}
fn subject_to_string(&self, subject: &oxrdf::NamedOrBlankNode) -> String {
match subject {
oxrdf::NamedOrBlankNode::NamedNode(n) => n.as_str().to_string(),
oxrdf::NamedOrBlankNode::BlankNode(b) => format!("_:{}", b.as_str()),
}
}
fn process_batch(
&mut self,
batch: &[Triple],
graph: &mut Option<Graph>,
_stats: &mut StreamStats,
) {
if let Some(ref mut handler) = self.batch_handler {
handler(batch);
}
if let Some(ref mut g) = graph {
for triple in batch {
g.insert(triple);
}
}
}
fn term_to_string(&self, term: oxrdf::TermRef) -> String {
match term {
oxrdf::TermRef::NamedNode(n) => n.as_str().to_string(),
oxrdf::TermRef::BlankNode(b) => format!("_:{}", b.as_str()),
oxrdf::TermRef::Literal(l) => {
if let Some(lang) = l.language() {
format!("\"{}\"@{}", l.value(), lang)
} else if l.datatype() != oxrdf::vocab::xsd::STRING {
format!("\"{}\"^^{}", l.value(), l.datatype().as_str())
} else {
format!("\"{}\"", l.value())
}
}
#[allow(unreachable_patterns)]
_ => "[triple]".to_string(),
}
}
}
impl Default for StreamingRdfLoader {
fn default() -> Self {
Self::new()
}
}
pub struct StreamAnalyzer {
predicate_counts: std::collections::HashMap<String, usize>,
subject_count: usize,
unique_subjects: std::collections::HashSet<String>,
namespaces: std::collections::HashSet<String>,
}
impl StreamAnalyzer {
pub fn new() -> Self {
StreamAnalyzer {
predicate_counts: std::collections::HashMap::new(),
subject_count: 0,
unique_subjects: std::collections::HashSet::new(),
namespaces: std::collections::HashSet::new(),
}
}
pub fn process_triple(&mut self, subject: &str, predicate: &str, _object: &str) {
self.subject_count += 1;
self.unique_subjects.insert(subject.to_string());
*self
.predicate_counts
.entry(predicate.to_string())
.or_insert(0) += 1;
if let Some(ns) = Self::extract_namespace(predicate) {
self.namespaces.insert(ns.to_string());
}
}
fn extract_namespace(iri: &str) -> Option<&str> {
if let Some(hash_pos) = iri.rfind('#') {
Some(&iri[..=hash_pos])
} else if let Some(slash_pos) = iri.rfind('/') {
Some(&iri[..=slash_pos])
} else {
None
}
}
pub fn predicate_stats(&self) -> &std::collections::HashMap<String, usize> {
&self.predicate_counts
}
pub fn unique_subject_count(&self) -> usize {
self.unique_subjects.len()
}
pub fn total_triples(&self) -> usize {
self.subject_count
}
pub fn namespaces(&self) -> &std::collections::HashSet<String> {
&self.namespaces
}
pub fn top_predicates(&self, n: usize) -> Vec<(&str, usize)> {
let mut predicates: Vec<_> = self.predicate_counts.iter().collect();
predicates.sort_by(|a, b| b.1.cmp(a.1));
predicates
.into_iter()
.take(n)
.map(|(k, v)| (k.as_str(), *v))
.collect()
}
}
impl Default for StreamAnalyzer {
fn default() -> Self {
Self::new()
}
}
pub fn process_ntriples_lines<F>(data: &str, mut handler: F) -> Result<usize>
where
F: FnMut(&str, &str, &str),
{
let mut count = 0;
for line in data.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if let Some((subject, rest)) = parse_ntriples_term(line) {
let rest = rest.trim_start();
if let Some((predicate, rest)) = parse_ntriples_term(rest) {
let rest = rest.trim_start();
if let Some((object, _)) = parse_ntriples_term(rest) {
handler(subject, predicate, object);
count += 1;
}
}
}
}
Ok(count)
}
fn parse_ntriples_term(s: &str) -> Option<(&str, &str)> {
let s = s.trim_start();
if s.starts_with('<') {
if let Some(end) = s.find('>') {
return Some((&s[1..end], &s[end + 1..]));
}
} else if s.starts_with('"') {
let mut i = 1;
let chars: Vec<char> = s.chars().collect();
while i < chars.len() {
if chars[i] == '"' && (i == 0 || chars[i - 1] != '\\') {
let mut end = i + 1;
if end < chars.len() && chars[end] == '@' {
while end < chars.len() && !chars[end].is_whitespace() {
end += 1;
}
} else if end + 1 < chars.len() && chars[end] == '^' && chars[end + 1] == '^' {
end += 2;
if end < chars.len() && chars[end] == '<' {
while end < chars.len() && chars[end] != '>' {
end += 1;
}
if end < chars.len() {
end += 1;
}
}
}
return Some((&s[..end], &s[end..]));
}
i += 1;
}
} else if let Some(stripped) = s.strip_prefix("_:") {
let end = stripped
.find(|c: char| c.is_whitespace() || c == '.')
.map(|i| i + 2)
.unwrap_or(s.len());
return Some((&s[..end], &s[end..]));
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_basic() {
let turtle = r#"
@prefix ex: <http://example.org/> .
ex:Alice ex:knows ex:Bob .
ex:Bob ex:knows ex:Charlie .
ex:Charlie ex:knows ex:Alice .
"#;
let mut loader = StreamingRdfLoader::new();
loader = loader.on_triple(|_s, _p, _o| {
});
let (stats, _) = loader.process_turtle(turtle).expect("unwrap");
assert_eq!(stats.triples_processed, 3);
}
#[test]
fn test_streaming_with_batch() {
let turtle = r#"
@prefix ex: <http://example.org/> .
ex:A ex:p ex:B .
ex:B ex:p ex:C .
ex:C ex:p ex:D .
ex:D ex:p ex:E .
ex:E ex:p ex:F .
"#;
let mut loader = StreamingRdfLoader::new().with_batch_size(2);
let (stats, _) = loader.process_turtle(turtle).expect("unwrap");
assert_eq!(stats.triples_processed, 5);
assert_eq!(stats.batches_processed, 3); }
#[test]
fn test_streaming_collect_graph() {
let turtle = r#"
@prefix ex: <http://example.org/> .
ex:Alice ex:knows ex:Bob .
ex:Bob ex:knows ex:Charlie .
"#;
let mut loader = StreamingRdfLoader::new().collect_into_graph();
let (stats, graph) = loader.process_turtle(turtle).expect("unwrap");
assert_eq!(stats.triples_processed, 2);
assert!(graph.is_some());
assert_eq!(graph.expect("unwrap").len(), 2);
}
#[test]
fn test_streaming_filter_predicate() {
let turtle = r#"
@prefix ex: <http://example.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
ex:Alice ex:knows ex:Bob .
ex:Alice rdfs:label "Alice" .
ex:Bob ex:knows ex:Charlie .
"#;
let mut loader = StreamingRdfLoader::new().filter_predicates(vec!["knows".to_string()]);
let (stats, _) = loader.process_turtle(turtle).expect("unwrap");
assert_eq!(stats.triples_processed, 2);
}
#[test]
fn test_stream_analyzer() {
let mut analyzer = StreamAnalyzer::new();
analyzer.process_triple(
"http://example.org/Alice",
"http://example.org/knows",
"http://example.org/Bob",
);
analyzer.process_triple(
"http://example.org/Bob",
"http://example.org/knows",
"http://example.org/Charlie",
);
analyzer.process_triple("http://example.org/Alice", "http://example.org/age", "30");
assert_eq!(analyzer.unique_subject_count(), 2);
assert_eq!(analyzer.total_triples(), 3);
assert_eq!(analyzer.predicate_stats().len(), 2);
assert_eq!(analyzer.predicate_stats()["http://example.org/knows"], 2);
}
#[test]
fn test_ntriples_processing() {
let ntriples = r#"
<http://example.org/Alice> <http://example.org/knows> <http://example.org/Bob> .
<http://example.org/Bob> <http://example.org/knows> <http://example.org/Charlie> .
"#;
let mut count = 0;
process_ntriples_lines(ntriples, |_s, _p, _o| {
count += 1;
})
.expect("unwrap");
assert_eq!(count, 2);
}
#[test]
fn test_stats_rate() {
let stats = StreamStats {
triples_processed: 10000,
batches_processed: 10,
processing_time: Duration::from_secs(2),
errors_encountered: 0,
peak_memory_bytes: 0,
};
assert_eq!(stats.triples_per_second(), 5000.0);
}
}