Skip to main content

tensorlogic_oxirs_bridge/schema/
streaming.rs

1//! Streaming RDF processing for large graphs.
2//!
3//! This module provides memory-efficient streaming support for processing
4//! large RDF datasets without loading everything into memory at once.
5//!
6//! # Features
7//!
8//! - **Chunked Processing**: Process RDF in configurable batch sizes
9//! - **Callback-based**: Register handlers for different triple patterns
10//! - **Statistics Tracking**: Monitor progress during streaming
11//! - **Memory Efficient**: Avoid loading entire graph into memory
12//!
13//! # Example
14//!
15//! ```no_run
16//! use tensorlogic_oxirs_bridge::schema::streaming::{StreamingRdfLoader, StreamStats};
17//! use anyhow::Result;
18//!
19//! fn main() -> Result<()> {
20//!     let turtle = r#"
21//!         @prefix ex: <http://example.org/> .
22//!         ex:Alice ex:knows ex:Bob .
23//!         ex:Bob ex:knows ex:Charlie .
24//!     "#;
25//!
26//!     let mut loader = StreamingRdfLoader::new();
27//!
28//!     // Register a handler for all triples
29//!     loader = loader.on_triple(|subject, predicate, object| {
30//!         println!("{} {} {}", subject, predicate, object);
31//!     });
32//!
33//!     // Process the data
34//!     let (stats, _graph) = loader.process_turtle(turtle)?;
35//!     println!("Processed {} triples", stats.triples_processed);
36//!     Ok(())
37//! }
38//! ```
39
40use anyhow::Result;
41use oxrdf::{Graph, Triple};
42use oxttl::TurtleParser;
43use std::io::BufRead;
44use std::time::{Duration, Instant};
45
46/// Statistics from streaming RDF processing.
47#[derive(Debug, Clone, Default)]
48pub struct StreamStats {
49    /// Total number of triples processed
50    pub triples_processed: usize,
51    /// Number of batches processed
52    pub batches_processed: usize,
53    /// Total processing time
54    pub processing_time: Duration,
55    /// Number of errors encountered
56    pub errors_encountered: usize,
57    /// Peak memory usage (estimated)
58    pub peak_memory_bytes: usize,
59}
60
61impl StreamStats {
62    /// Get processing rate in triples per second.
63    pub fn triples_per_second(&self) -> f64 {
64        if self.processing_time.as_secs_f64() > 0.0 {
65            self.triples_processed as f64 / self.processing_time.as_secs_f64()
66        } else {
67            0.0
68        }
69    }
70}
71
72/// Handler function type for processing individual triples.
73pub type TripleHandler = Box<dyn FnMut(&str, &str, &str) + Send>;
74
75/// Handler function type for batch processing.
76pub type BatchHandler = Box<dyn FnMut(&[Triple]) + Send>;
77
78/// Handler function type for progress updates.
79pub type ProgressHandler = Box<dyn FnMut(&StreamStats) + Send>;
80
81/// Streaming RDF loader for memory-efficient processing.
82///
83/// This loader processes RDF data in chunks, allowing you to handle
84/// large datasets without loading everything into memory.
85pub struct StreamingRdfLoader {
86    /// Batch size for chunked processing
87    batch_size: usize,
88    /// Handler for individual triples
89    triple_handler: Option<TripleHandler>,
90    /// Handler for batches
91    batch_handler: Option<BatchHandler>,
92    /// Handler for progress updates
93    progress_handler: Option<ProgressHandler>,
94    /// Progress update interval (in triples)
95    progress_interval: usize,
96    /// Whether to collect into a graph
97    collect_graph: bool,
98    /// Filter predicates (if set, only process these)
99    predicate_filter: Option<Vec<String>>,
100    /// Filter subjects by prefix
101    subject_prefix_filter: Option<String>,
102}
103
104impl StreamingRdfLoader {
105    /// Create a new streaming loader with default settings.
106    pub fn new() -> Self {
107        StreamingRdfLoader {
108            batch_size: 1000,
109            triple_handler: None,
110            batch_handler: None,
111            progress_handler: None,
112            progress_interval: 10000,
113            collect_graph: false,
114            predicate_filter: None,
115            subject_prefix_filter: None,
116        }
117    }
118
119    /// Set the batch size for chunked processing.
120    ///
121    /// Larger batches are more efficient but use more memory.
122    /// Default is 1000 triples per batch.
123    pub fn with_batch_size(mut self, size: usize) -> Self {
124        self.batch_size = size.max(1);
125        self
126    }
127
128    /// Set a handler for individual triples.
129    ///
130    /// The handler receives (subject, predicate, object) as strings.
131    pub fn on_triple<F>(mut self, handler: F) -> Self
132    where
133        F: FnMut(&str, &str, &str) + Send + 'static,
134    {
135        self.triple_handler = Some(Box::new(handler));
136        self
137    }
138
139    /// Set a handler for triple batches.
140    ///
141    /// The handler receives a slice of Triple objects.
142    pub fn on_batch<F>(mut self, handler: F) -> Self
143    where
144        F: FnMut(&[Triple]) + Send + 'static,
145    {
146        self.batch_handler = Some(Box::new(handler));
147        self
148    }
149
150    /// Set a handler for progress updates.
151    ///
152    /// The handler is called every `progress_interval` triples.
153    pub fn on_progress<F>(mut self, handler: F) -> Self
154    where
155        F: FnMut(&StreamStats) + Send + 'static,
156    {
157        self.progress_handler = Some(Box::new(handler));
158        self
159    }
160
161    /// Set the interval for progress updates.
162    ///
163    /// Default is every 10000 triples.
164    pub fn with_progress_interval(mut self, interval: usize) -> Self {
165        self.progress_interval = interval.max(1);
166        self
167    }
168
169    /// Enable collecting triples into a graph.
170    ///
171    /// This is useful when you need the complete graph after streaming.
172    /// Note: This increases memory usage.
173    pub fn collect_into_graph(mut self) -> Self {
174        self.collect_graph = true;
175        self
176    }
177
178    /// Filter to only process triples with specific predicates.
179    pub fn filter_predicates(mut self, predicates: Vec<String>) -> Self {
180        self.predicate_filter = Some(predicates);
181        self
182    }
183
184    /// Filter to only process triples whose subject starts with a prefix.
185    pub fn filter_subject_prefix(mut self, prefix: String) -> Self {
186        self.subject_prefix_filter = Some(prefix);
187        self
188    }
189
190    /// Process Turtle data from a string.
191    pub fn process_turtle(&mut self, data: &str) -> Result<(StreamStats, Option<Graph>)> {
192        let reader = std::io::Cursor::new(data);
193        self.process_turtle_reader(reader)
194    }
195
196    /// Process Turtle data from a reader.
197    pub fn process_turtle_reader<R: BufRead>(
198        &mut self,
199        reader: R,
200    ) -> Result<(StreamStats, Option<Graph>)> {
201        let start = Instant::now();
202        let mut stats = StreamStats::default();
203        let mut graph = if self.collect_graph {
204            Some(Graph::new())
205        } else {
206            None
207        };
208        let mut batch: Vec<Triple> = Vec::with_capacity(self.batch_size);
209
210        let parser = TurtleParser::new().for_reader(reader);
211
212        for result in parser {
213            match result {
214                Ok(triple) => {
215                    // Apply filters
216                    if !self.should_process_triple(&triple) {
217                        continue;
218                    }
219
220                    stats.triples_processed += 1;
221
222                    // Call triple handler
223                    if self.triple_handler.is_some() {
224                        let subject = self.subject_to_string(&triple.subject);
225                        let predicate = triple.predicate.as_str().to_string();
226                        let object = self.term_to_string(triple.object.as_ref());
227                        if let Some(ref mut handler) = self.triple_handler {
228                            handler(&subject, &predicate, &object);
229                        }
230                    }
231
232                    // Add to batch
233                    batch.push(triple);
234
235                    // Process batch if full
236                    if batch.len() >= self.batch_size {
237                        self.process_batch(&batch, &mut graph, &mut stats);
238                        batch.clear();
239                        stats.batches_processed += 1;
240                    }
241
242                    // Progress update
243                    if stats.triples_processed % self.progress_interval == 0 {
244                        stats.processing_time = start.elapsed();
245                        if let Some(ref mut handler) = self.progress_handler {
246                            handler(&stats);
247                        }
248                    }
249                }
250                Err(e) => {
251                    stats.errors_encountered += 1;
252                    // Continue processing on error
253                    eprintln!("Parse error: {}", e);
254                }
255            }
256        }
257
258        // Process remaining batch
259        if !batch.is_empty() {
260            self.process_batch(&batch, &mut graph, &mut stats);
261            stats.batches_processed += 1;
262        }
263
264        stats.processing_time = start.elapsed();
265        Ok((stats, graph))
266    }
267
268    /// Check if a triple should be processed based on filters.
269    fn should_process_triple(&self, triple: &Triple) -> bool {
270        // Check predicate filter
271        if let Some(ref predicates) = self.predicate_filter {
272            let pred_str = triple.predicate.as_str();
273            if !predicates.iter().any(|p| pred_str.contains(p)) {
274                return false;
275            }
276        }
277
278        // Check subject prefix filter
279        if let Some(ref prefix) = self.subject_prefix_filter {
280            let subject_str = self.subject_to_string(&triple.subject);
281            if !subject_str.starts_with(prefix) {
282                return false;
283            }
284        }
285
286        true
287    }
288
289    /// Convert subject to string.
290    fn subject_to_string(&self, subject: &oxrdf::NamedOrBlankNode) -> String {
291        match subject {
292            oxrdf::NamedOrBlankNode::NamedNode(n) => n.as_str().to_string(),
293            oxrdf::NamedOrBlankNode::BlankNode(b) => format!("_:{}", b.as_str()),
294        }
295    }
296
297    /// Process a batch of triples.
298    fn process_batch(
299        &mut self,
300        batch: &[Triple],
301        graph: &mut Option<Graph>,
302        _stats: &mut StreamStats,
303    ) {
304        // Call batch handler
305        if let Some(ref mut handler) = self.batch_handler {
306            handler(batch);
307        }
308
309        // Add to graph if collecting
310        if let Some(ref mut g) = graph {
311            for triple in batch {
312                g.insert(triple);
313            }
314        }
315    }
316
317    /// Convert an RDF term to string.
318    fn term_to_string(&self, term: oxrdf::TermRef) -> String {
319        match term {
320            oxrdf::TermRef::NamedNode(n) => n.as_str().to_string(),
321            oxrdf::TermRef::BlankNode(b) => format!("_:{}", b.as_str()),
322            oxrdf::TermRef::Literal(l) => {
323                if let Some(lang) = l.language() {
324                    format!("\"{}\"@{}", l.value(), lang)
325                } else if l.datatype() != oxrdf::vocab::xsd::STRING {
326                    format!("\"{}\"^^{}", l.value(), l.datatype().as_str())
327                } else {
328                    format!("\"{}\"", l.value())
329                }
330            }
331            #[allow(unreachable_patterns)]
332            _ => "[triple]".to_string(),
333        }
334    }
335}
336
337impl Default for StreamingRdfLoader {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343/// Stream processor for analyzing large RDF datasets.
344///
345/// This provides higher-level analysis operations on streaming data.
346pub struct StreamAnalyzer {
347    /// Count predicates
348    predicate_counts: std::collections::HashMap<String, usize>,
349    /// Count subjects
350    subject_count: usize,
351    /// Count unique subjects
352    unique_subjects: std::collections::HashSet<String>,
353    /// Track namespaces
354    namespaces: std::collections::HashSet<String>,
355}
356
357impl StreamAnalyzer {
358    /// Create a new stream analyzer.
359    pub fn new() -> Self {
360        StreamAnalyzer {
361            predicate_counts: std::collections::HashMap::new(),
362            subject_count: 0,
363            unique_subjects: std::collections::HashSet::new(),
364            namespaces: std::collections::HashSet::new(),
365        }
366    }
367
368    /// Process a triple for analysis.
369    pub fn process_triple(&mut self, subject: &str, predicate: &str, _object: &str) {
370        self.subject_count += 1;
371        self.unique_subjects.insert(subject.to_string());
372
373        *self
374            .predicate_counts
375            .entry(predicate.to_string())
376            .or_insert(0) += 1;
377
378        // Extract namespace
379        if let Some(ns) = Self::extract_namespace(predicate) {
380            self.namespaces.insert(ns.to_string());
381        }
382    }
383
384    /// Extract namespace from IRI.
385    fn extract_namespace(iri: &str) -> Option<&str> {
386        if let Some(hash_pos) = iri.rfind('#') {
387            Some(&iri[..=hash_pos])
388        } else if let Some(slash_pos) = iri.rfind('/') {
389            Some(&iri[..=slash_pos])
390        } else {
391            None
392        }
393    }
394
395    /// Get predicate statistics.
396    pub fn predicate_stats(&self) -> &std::collections::HashMap<String, usize> {
397        &self.predicate_counts
398    }
399
400    /// Get unique subject count.
401    pub fn unique_subject_count(&self) -> usize {
402        self.unique_subjects.len()
403    }
404
405    /// Get total triple count.
406    pub fn total_triples(&self) -> usize {
407        self.subject_count
408    }
409
410    /// Get discovered namespaces.
411    pub fn namespaces(&self) -> &std::collections::HashSet<String> {
412        &self.namespaces
413    }
414
415    /// Get top N predicates by frequency.
416    pub fn top_predicates(&self, n: usize) -> Vec<(&str, usize)> {
417        let mut predicates: Vec<_> = self.predicate_counts.iter().collect();
418        predicates.sort_by(|a, b| b.1.cmp(a.1));
419        predicates
420            .into_iter()
421            .take(n)
422            .map(|(k, v)| (k.as_str(), *v))
423            .collect()
424    }
425}
426
427impl Default for StreamAnalyzer {
428    fn default() -> Self {
429        Self::new()
430    }
431}
432
433/// Process an N-Triples file line by line.
434///
435/// This is more efficient than the Turtle parser for simple N-Triples format.
436pub fn process_ntriples_lines<F>(data: &str, mut handler: F) -> Result<usize>
437where
438    F: FnMut(&str, &str, &str),
439{
440    let mut count = 0;
441
442    for line in data.lines() {
443        let line = line.trim();
444        if line.is_empty() || line.starts_with('#') {
445            continue;
446        }
447
448        // Parse N-Triples line: <subject> <predicate> <object> .
449        if let Some((subject, rest)) = parse_ntriples_term(line) {
450            let rest = rest.trim_start();
451            if let Some((predicate, rest)) = parse_ntriples_term(rest) {
452                let rest = rest.trim_start();
453                if let Some((object, _)) = parse_ntriples_term(rest) {
454                    handler(subject, predicate, object);
455                    count += 1;
456                }
457            }
458        }
459    }
460
461    Ok(count)
462}
463
464/// Parse a single N-Triples term.
465fn parse_ntriples_term(s: &str) -> Option<(&str, &str)> {
466    let s = s.trim_start();
467
468    if s.starts_with('<') {
469        // IRI
470        if let Some(end) = s.find('>') {
471            return Some((&s[1..end], &s[end + 1..]));
472        }
473    } else if s.starts_with('"') {
474        // Literal
475        let mut i = 1;
476        let chars: Vec<char> = s.chars().collect();
477        while i < chars.len() {
478            if chars[i] == '"' && (i == 0 || chars[i - 1] != '\\') {
479                // Find end of literal (including optional language tag or datatype)
480                let mut end = i + 1;
481                if end < chars.len() && chars[end] == '@' {
482                    // Language tag
483                    while end < chars.len() && !chars[end].is_whitespace() {
484                        end += 1;
485                    }
486                } else if end + 1 < chars.len() && chars[end] == '^' && chars[end + 1] == '^' {
487                    // Datatype
488                    end += 2;
489                    if end < chars.len() && chars[end] == '<' {
490                        while end < chars.len() && chars[end] != '>' {
491                            end += 1;
492                        }
493                        if end < chars.len() {
494                            end += 1;
495                        }
496                    }
497                }
498                return Some((&s[..end], &s[end..]));
499            }
500            i += 1;
501        }
502    } else if let Some(stripped) = s.strip_prefix("_:") {
503        // Blank node
504        let end = stripped
505            .find(|c: char| c.is_whitespace() || c == '.')
506            .map(|i| i + 2)
507            .unwrap_or(s.len());
508        return Some((&s[..end], &s[end..]));
509    }
510
511    None
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517
518    #[test]
519    fn test_streaming_basic() {
520        let turtle = r#"
521            @prefix ex: <http://example.org/> .
522            ex:Alice ex:knows ex:Bob .
523            ex:Bob ex:knows ex:Charlie .
524            ex:Charlie ex:knows ex:Alice .
525        "#;
526
527        let mut loader = StreamingRdfLoader::new();
528
529        loader = loader.on_triple(|_s, _p, _o| {
530            // Handler receives each triple
531        });
532
533        let (stats, _) = loader.process_turtle(turtle).unwrap();
534        assert_eq!(stats.triples_processed, 3);
535    }
536
537    #[test]
538    fn test_streaming_with_batch() {
539        let turtle = r#"
540            @prefix ex: <http://example.org/> .
541            ex:A ex:p ex:B .
542            ex:B ex:p ex:C .
543            ex:C ex:p ex:D .
544            ex:D ex:p ex:E .
545            ex:E ex:p ex:F .
546        "#;
547
548        let mut loader = StreamingRdfLoader::new().with_batch_size(2);
549
550        let (stats, _) = loader.process_turtle(turtle).unwrap();
551        assert_eq!(stats.triples_processed, 5);
552        assert_eq!(stats.batches_processed, 3); // 2 + 2 + 1
553    }
554
555    #[test]
556    fn test_streaming_collect_graph() {
557        let turtle = r#"
558            @prefix ex: <http://example.org/> .
559            ex:Alice ex:knows ex:Bob .
560            ex:Bob ex:knows ex:Charlie .
561        "#;
562
563        let mut loader = StreamingRdfLoader::new().collect_into_graph();
564
565        let (stats, graph) = loader.process_turtle(turtle).unwrap();
566        assert_eq!(stats.triples_processed, 2);
567        assert!(graph.is_some());
568        assert_eq!(graph.unwrap().len(), 2);
569    }
570
571    #[test]
572    fn test_streaming_filter_predicate() {
573        let turtle = r#"
574            @prefix ex: <http://example.org/> .
575            @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
576            ex:Alice ex:knows ex:Bob .
577            ex:Alice rdfs:label "Alice" .
578            ex:Bob ex:knows ex:Charlie .
579        "#;
580
581        let mut loader = StreamingRdfLoader::new().filter_predicates(vec!["knows".to_string()]);
582
583        let (stats, _) = loader.process_turtle(turtle).unwrap();
584        assert_eq!(stats.triples_processed, 2);
585    }
586
587    #[test]
588    fn test_stream_analyzer() {
589        let mut analyzer = StreamAnalyzer::new();
590
591        analyzer.process_triple(
592            "http://example.org/Alice",
593            "http://example.org/knows",
594            "http://example.org/Bob",
595        );
596        analyzer.process_triple(
597            "http://example.org/Bob",
598            "http://example.org/knows",
599            "http://example.org/Charlie",
600        );
601        analyzer.process_triple("http://example.org/Alice", "http://example.org/age", "30");
602
603        assert_eq!(analyzer.unique_subject_count(), 2);
604        assert_eq!(analyzer.total_triples(), 3);
605        assert_eq!(analyzer.predicate_stats().len(), 2);
606        assert_eq!(analyzer.predicate_stats()["http://example.org/knows"], 2);
607    }
608
609    #[test]
610    fn test_ntriples_processing() {
611        let ntriples = r#"
612            <http://example.org/Alice> <http://example.org/knows> <http://example.org/Bob> .
613            <http://example.org/Bob> <http://example.org/knows> <http://example.org/Charlie> .
614        "#;
615
616        let mut count = 0;
617        process_ntriples_lines(ntriples, |_s, _p, _o| {
618            count += 1;
619        })
620        .unwrap();
621
622        assert_eq!(count, 2);
623    }
624
625    #[test]
626    fn test_stats_rate() {
627        let stats = StreamStats {
628            triples_processed: 10000,
629            batches_processed: 10,
630            processing_time: Duration::from_secs(2),
631            errors_encountered: 0,
632            peak_memory_bytes: 0,
633        };
634
635        assert_eq!(stats.triples_per_second(), 5000.0);
636    }
637}