tensorlogic_oxirs_bridge/schema/
streaming.rs1use anyhow::Result;
41use oxrdf::{Graph, Triple};
42use oxttl::TurtleParser;
43use std::io::BufRead;
44use std::time::{Duration, Instant};
45
46#[derive(Debug, Clone, Default)]
48pub struct StreamStats {
49 pub triples_processed: usize,
51 pub batches_processed: usize,
53 pub processing_time: Duration,
55 pub errors_encountered: usize,
57 pub peak_memory_bytes: usize,
59}
60
61impl StreamStats {
62 pub fn triples_per_second(&self) -> f64 {
64 if self.processing_time.as_secs_f64() > 0.0 {
65 self.triples_processed as f64 / self.processing_time.as_secs_f64()
66 } else {
67 0.0
68 }
69 }
70}
71
72pub type TripleHandler = Box<dyn FnMut(&str, &str, &str) + Send>;
74
75pub type BatchHandler = Box<dyn FnMut(&[Triple]) + Send>;
77
78pub type ProgressHandler = Box<dyn FnMut(&StreamStats) + Send>;
80
81pub struct StreamingRdfLoader {
86 batch_size: usize,
88 triple_handler: Option<TripleHandler>,
90 batch_handler: Option<BatchHandler>,
92 progress_handler: Option<ProgressHandler>,
94 progress_interval: usize,
96 collect_graph: bool,
98 predicate_filter: Option<Vec<String>>,
100 subject_prefix_filter: Option<String>,
102}
103
104impl StreamingRdfLoader {
105 pub fn new() -> Self {
107 StreamingRdfLoader {
108 batch_size: 1000,
109 triple_handler: None,
110 batch_handler: None,
111 progress_handler: None,
112 progress_interval: 10000,
113 collect_graph: false,
114 predicate_filter: None,
115 subject_prefix_filter: None,
116 }
117 }
118
119 pub fn with_batch_size(mut self, size: usize) -> Self {
124 self.batch_size = size.max(1);
125 self
126 }
127
128 pub fn on_triple<F>(mut self, handler: F) -> Self
132 where
133 F: FnMut(&str, &str, &str) + Send + 'static,
134 {
135 self.triple_handler = Some(Box::new(handler));
136 self
137 }
138
139 pub fn on_batch<F>(mut self, handler: F) -> Self
143 where
144 F: FnMut(&[Triple]) + Send + 'static,
145 {
146 self.batch_handler = Some(Box::new(handler));
147 self
148 }
149
150 pub fn on_progress<F>(mut self, handler: F) -> Self
154 where
155 F: FnMut(&StreamStats) + Send + 'static,
156 {
157 self.progress_handler = Some(Box::new(handler));
158 self
159 }
160
161 pub fn with_progress_interval(mut self, interval: usize) -> Self {
165 self.progress_interval = interval.max(1);
166 self
167 }
168
169 pub fn collect_into_graph(mut self) -> Self {
174 self.collect_graph = true;
175 self
176 }
177
178 pub fn filter_predicates(mut self, predicates: Vec<String>) -> Self {
180 self.predicate_filter = Some(predicates);
181 self
182 }
183
184 pub fn filter_subject_prefix(mut self, prefix: String) -> Self {
186 self.subject_prefix_filter = Some(prefix);
187 self
188 }
189
190 pub fn process_turtle(&mut self, data: &str) -> Result<(StreamStats, Option<Graph>)> {
192 let reader = std::io::Cursor::new(data);
193 self.process_turtle_reader(reader)
194 }
195
196 pub fn process_turtle_reader<R: BufRead>(
198 &mut self,
199 reader: R,
200 ) -> Result<(StreamStats, Option<Graph>)> {
201 let start = Instant::now();
202 let mut stats = StreamStats::default();
203 let mut graph = if self.collect_graph {
204 Some(Graph::new())
205 } else {
206 None
207 };
208 let mut batch: Vec<Triple> = Vec::with_capacity(self.batch_size);
209
210 let parser = TurtleParser::new().for_reader(reader);
211
212 for result in parser {
213 match result {
214 Ok(triple) => {
215 if !self.should_process_triple(&triple) {
217 continue;
218 }
219
220 stats.triples_processed += 1;
221
222 if self.triple_handler.is_some() {
224 let subject = self.subject_to_string(&triple.subject);
225 let predicate = triple.predicate.as_str().to_string();
226 let object = self.term_to_string(triple.object.as_ref());
227 if let Some(ref mut handler) = self.triple_handler {
228 handler(&subject, &predicate, &object);
229 }
230 }
231
232 batch.push(triple);
234
235 if batch.len() >= self.batch_size {
237 self.process_batch(&batch, &mut graph, &mut stats);
238 batch.clear();
239 stats.batches_processed += 1;
240 }
241
242 if stats.triples_processed % self.progress_interval == 0 {
244 stats.processing_time = start.elapsed();
245 if let Some(ref mut handler) = self.progress_handler {
246 handler(&stats);
247 }
248 }
249 }
250 Err(e) => {
251 stats.errors_encountered += 1;
252 eprintln!("Parse error: {}", e);
254 }
255 }
256 }
257
258 if !batch.is_empty() {
260 self.process_batch(&batch, &mut graph, &mut stats);
261 stats.batches_processed += 1;
262 }
263
264 stats.processing_time = start.elapsed();
265 Ok((stats, graph))
266 }
267
268 fn should_process_triple(&self, triple: &Triple) -> bool {
270 if let Some(ref predicates) = self.predicate_filter {
272 let pred_str = triple.predicate.as_str();
273 if !predicates.iter().any(|p| pred_str.contains(p)) {
274 return false;
275 }
276 }
277
278 if let Some(ref prefix) = self.subject_prefix_filter {
280 let subject_str = self.subject_to_string(&triple.subject);
281 if !subject_str.starts_with(prefix) {
282 return false;
283 }
284 }
285
286 true
287 }
288
289 fn subject_to_string(&self, subject: &oxrdf::NamedOrBlankNode) -> String {
291 match subject {
292 oxrdf::NamedOrBlankNode::NamedNode(n) => n.as_str().to_string(),
293 oxrdf::NamedOrBlankNode::BlankNode(b) => format!("_:{}", b.as_str()),
294 }
295 }
296
297 fn process_batch(
299 &mut self,
300 batch: &[Triple],
301 graph: &mut Option<Graph>,
302 _stats: &mut StreamStats,
303 ) {
304 if let Some(ref mut handler) = self.batch_handler {
306 handler(batch);
307 }
308
309 if let Some(ref mut g) = graph {
311 for triple in batch {
312 g.insert(triple);
313 }
314 }
315 }
316
317 fn term_to_string(&self, term: oxrdf::TermRef) -> String {
319 match term {
320 oxrdf::TermRef::NamedNode(n) => n.as_str().to_string(),
321 oxrdf::TermRef::BlankNode(b) => format!("_:{}", b.as_str()),
322 oxrdf::TermRef::Literal(l) => {
323 if let Some(lang) = l.language() {
324 format!("\"{}\"@{}", l.value(), lang)
325 } else if l.datatype() != oxrdf::vocab::xsd::STRING {
326 format!("\"{}\"^^{}", l.value(), l.datatype().as_str())
327 } else {
328 format!("\"{}\"", l.value())
329 }
330 }
331 #[allow(unreachable_patterns)]
332 _ => "[triple]".to_string(),
333 }
334 }
335}
336
337impl Default for StreamingRdfLoader {
338 fn default() -> Self {
339 Self::new()
340 }
341}
342
343pub struct StreamAnalyzer {
347 predicate_counts: std::collections::HashMap<String, usize>,
349 subject_count: usize,
351 unique_subjects: std::collections::HashSet<String>,
353 namespaces: std::collections::HashSet<String>,
355}
356
357impl StreamAnalyzer {
358 pub fn new() -> Self {
360 StreamAnalyzer {
361 predicate_counts: std::collections::HashMap::new(),
362 subject_count: 0,
363 unique_subjects: std::collections::HashSet::new(),
364 namespaces: std::collections::HashSet::new(),
365 }
366 }
367
368 pub fn process_triple(&mut self, subject: &str, predicate: &str, _object: &str) {
370 self.subject_count += 1;
371 self.unique_subjects.insert(subject.to_string());
372
373 *self
374 .predicate_counts
375 .entry(predicate.to_string())
376 .or_insert(0) += 1;
377
378 if let Some(ns) = Self::extract_namespace(predicate) {
380 self.namespaces.insert(ns.to_string());
381 }
382 }
383
384 fn extract_namespace(iri: &str) -> Option<&str> {
386 if let Some(hash_pos) = iri.rfind('#') {
387 Some(&iri[..=hash_pos])
388 } else if let Some(slash_pos) = iri.rfind('/') {
389 Some(&iri[..=slash_pos])
390 } else {
391 None
392 }
393 }
394
395 pub fn predicate_stats(&self) -> &std::collections::HashMap<String, usize> {
397 &self.predicate_counts
398 }
399
400 pub fn unique_subject_count(&self) -> usize {
402 self.unique_subjects.len()
403 }
404
405 pub fn total_triples(&self) -> usize {
407 self.subject_count
408 }
409
410 pub fn namespaces(&self) -> &std::collections::HashSet<String> {
412 &self.namespaces
413 }
414
415 pub fn top_predicates(&self, n: usize) -> Vec<(&str, usize)> {
417 let mut predicates: Vec<_> = self.predicate_counts.iter().collect();
418 predicates.sort_by(|a, b| b.1.cmp(a.1));
419 predicates
420 .into_iter()
421 .take(n)
422 .map(|(k, v)| (k.as_str(), *v))
423 .collect()
424 }
425}
426
427impl Default for StreamAnalyzer {
428 fn default() -> Self {
429 Self::new()
430 }
431}
432
433pub fn process_ntriples_lines<F>(data: &str, mut handler: F) -> Result<usize>
437where
438 F: FnMut(&str, &str, &str),
439{
440 let mut count = 0;
441
442 for line in data.lines() {
443 let line = line.trim();
444 if line.is_empty() || line.starts_with('#') {
445 continue;
446 }
447
448 if let Some((subject, rest)) = parse_ntriples_term(line) {
450 let rest = rest.trim_start();
451 if let Some((predicate, rest)) = parse_ntriples_term(rest) {
452 let rest = rest.trim_start();
453 if let Some((object, _)) = parse_ntriples_term(rest) {
454 handler(subject, predicate, object);
455 count += 1;
456 }
457 }
458 }
459 }
460
461 Ok(count)
462}
463
464fn parse_ntriples_term(s: &str) -> Option<(&str, &str)> {
466 let s = s.trim_start();
467
468 if s.starts_with('<') {
469 if let Some(end) = s.find('>') {
471 return Some((&s[1..end], &s[end + 1..]));
472 }
473 } else if s.starts_with('"') {
474 let mut i = 1;
476 let chars: Vec<char> = s.chars().collect();
477 while i < chars.len() {
478 if chars[i] == '"' && (i == 0 || chars[i - 1] != '\\') {
479 let mut end = i + 1;
481 if end < chars.len() && chars[end] == '@' {
482 while end < chars.len() && !chars[end].is_whitespace() {
484 end += 1;
485 }
486 } else if end + 1 < chars.len() && chars[end] == '^' && chars[end + 1] == '^' {
487 end += 2;
489 if end < chars.len() && chars[end] == '<' {
490 while end < chars.len() && chars[end] != '>' {
491 end += 1;
492 }
493 if end < chars.len() {
494 end += 1;
495 }
496 }
497 }
498 return Some((&s[..end], &s[end..]));
499 }
500 i += 1;
501 }
502 } else if let Some(stripped) = s.strip_prefix("_:") {
503 let end = stripped
505 .find(|c: char| c.is_whitespace() || c == '.')
506 .map(|i| i + 2)
507 .unwrap_or(s.len());
508 return Some((&s[..end], &s[end..]));
509 }
510
511 None
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517
518 #[test]
519 fn test_streaming_basic() {
520 let turtle = r#"
521 @prefix ex: <http://example.org/> .
522 ex:Alice ex:knows ex:Bob .
523 ex:Bob ex:knows ex:Charlie .
524 ex:Charlie ex:knows ex:Alice .
525 "#;
526
527 let mut loader = StreamingRdfLoader::new();
528
529 loader = loader.on_triple(|_s, _p, _o| {
530 });
532
533 let (stats, _) = loader.process_turtle(turtle).unwrap();
534 assert_eq!(stats.triples_processed, 3);
535 }
536
537 #[test]
538 fn test_streaming_with_batch() {
539 let turtle = r#"
540 @prefix ex: <http://example.org/> .
541 ex:A ex:p ex:B .
542 ex:B ex:p ex:C .
543 ex:C ex:p ex:D .
544 ex:D ex:p ex:E .
545 ex:E ex:p ex:F .
546 "#;
547
548 let mut loader = StreamingRdfLoader::new().with_batch_size(2);
549
550 let (stats, _) = loader.process_turtle(turtle).unwrap();
551 assert_eq!(stats.triples_processed, 5);
552 assert_eq!(stats.batches_processed, 3); }
554
555 #[test]
556 fn test_streaming_collect_graph() {
557 let turtle = r#"
558 @prefix ex: <http://example.org/> .
559 ex:Alice ex:knows ex:Bob .
560 ex:Bob ex:knows ex:Charlie .
561 "#;
562
563 let mut loader = StreamingRdfLoader::new().collect_into_graph();
564
565 let (stats, graph) = loader.process_turtle(turtle).unwrap();
566 assert_eq!(stats.triples_processed, 2);
567 assert!(graph.is_some());
568 assert_eq!(graph.unwrap().len(), 2);
569 }
570
571 #[test]
572 fn test_streaming_filter_predicate() {
573 let turtle = r#"
574 @prefix ex: <http://example.org/> .
575 @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
576 ex:Alice ex:knows ex:Bob .
577 ex:Alice rdfs:label "Alice" .
578 ex:Bob ex:knows ex:Charlie .
579 "#;
580
581 let mut loader = StreamingRdfLoader::new().filter_predicates(vec!["knows".to_string()]);
582
583 let (stats, _) = loader.process_turtle(turtle).unwrap();
584 assert_eq!(stats.triples_processed, 2);
585 }
586
587 #[test]
588 fn test_stream_analyzer() {
589 let mut analyzer = StreamAnalyzer::new();
590
591 analyzer.process_triple(
592 "http://example.org/Alice",
593 "http://example.org/knows",
594 "http://example.org/Bob",
595 );
596 analyzer.process_triple(
597 "http://example.org/Bob",
598 "http://example.org/knows",
599 "http://example.org/Charlie",
600 );
601 analyzer.process_triple("http://example.org/Alice", "http://example.org/age", "30");
602
603 assert_eq!(analyzer.unique_subject_count(), 2);
604 assert_eq!(analyzer.total_triples(), 3);
605 assert_eq!(analyzer.predicate_stats().len(), 2);
606 assert_eq!(analyzer.predicate_stats()["http://example.org/knows"], 2);
607 }
608
609 #[test]
610 fn test_ntriples_processing() {
611 let ntriples = r#"
612 <http://example.org/Alice> <http://example.org/knows> <http://example.org/Bob> .
613 <http://example.org/Bob> <http://example.org/knows> <http://example.org/Charlie> .
614 "#;
615
616 let mut count = 0;
617 process_ntriples_lines(ntriples, |_s, _p, _o| {
618 count += 1;
619 })
620 .unwrap();
621
622 assert_eq!(count, 2);
623 }
624
625 #[test]
626 fn test_stats_rate() {
627 let stats = StreamStats {
628 triples_processed: 10000,
629 batches_processed: 10,
630 processing_time: Duration::from_secs(2),
631 errors_encountered: 0,
632 peak_memory_bytes: 0,
633 };
634
635 assert_eq!(stats.triples_per_second(), 5000.0);
636 }
637}