oxirs_ttl/
streaming.rs

1//! Streaming support for memory-efficient parsing of large RDF files
2//!
3//! This module provides streaming parsers that can process very large RDF files
4//! (>1GB) with minimal memory usage by parsing in batches.
5//!
6//! # Features
7//!
8//! - **Batch Processing**: Process triples in configurable batch sizes
9//! - **Memory Efficient**: Stream files larger than available RAM
10//! - **Progress Tracking**: Monitor parsing progress
11//! - **Error Recovery**: Optional lenient mode to skip errors
12//! - **Prefix Preservation**: Maintains prefix declarations across batches
13//!
14//! # Example: Basic Streaming
15//!
16//! ```rust
17//! use oxirs_ttl::{StreamingParser, StreamingConfig};
18//! use std::io::Cursor;
19//!
20//! let data = Cursor::new(b"<http://s> <http://p> <http://o> .");
21//! let parser = StreamingParser::new(data);
22//!
23//! let mut total = 0;
24//! for batch in parser.batches() {
25//!     let triples = batch?;
26//!     total += triples.len();
27//! }
28//! assert_eq!(total, 1);
29//! # Ok::<(), Box<dyn std::error::Error>>(())
30//! ```
31//!
32//! # Example: Custom Batch Size
33//!
34//! ```rust
35//! use oxirs_ttl::{StreamingParser, StreamingConfig};
36//! use std::io::Cursor;
37//!
38//! let config = StreamingConfig::default()
39//!     .with_batch_size(5000)  // 5K triples per batch
40//!     .with_max_buffer_size(50 * 1024 * 1024);  // 50MB buffer
41//!
42//! let data = Cursor::new(b"<http://s> <http://p> <http://o> .");
43//! let parser = StreamingParser::with_config(data, config);
44//!
45//! for batch in parser.batches() {
46//!     let triples = batch?;
47//!     // Process batch
48//! }
49//! # Ok::<(), Box<dyn std::error::Error>>(())
50//! ```
51//!
52//! # Example: Processing Large Files
53//!
54//! ```rust,no_run
55//! use oxirs_ttl::{StreamingParser, StreamingConfig};
56//! use std::fs::File;
57//!
58//! let config = StreamingConfig::default()
59//!     .with_batch_size(10_000);
60//!
61//! let file = File::open("large_dataset.ttl")?;
62//! let parser = StreamingParser::with_config(file, config);
63//!
64//! let mut total = 0;
65//! for batch in parser.batches() {
66//!     let triples = batch?;
67//!     total += triples.len();
68//!
69//!     // Process each batch (e.g., insert into database)
70//!     if total % 100_000 == 0 {
71//!         println!("Processed {} triples", total);
72//!     }
73//! }
74//! println!("Total: {} triples", total);
75//! # Ok::<(), Box<dyn std::error::Error>>(())
76//! ```
77
78use crate::error::{TurtleParseError, TurtleResult};
79use oxirs_core::model::Triple;
80use std::io::{BufRead, BufReader, Read};
81
82/// Configuration for streaming parser
83///
84/// Controls batch size, memory limits, and error handling for streaming parsers.
85///
86/// # Example
87///
88/// ```rust
89/// use oxirs_ttl::StreamingConfig;
90///
91/// let config = StreamingConfig::default()
92///     .with_batch_size(5000)
93///     .lenient(true)
94///     .with_max_buffer_size(100 * 1024 * 1024);
95/// ```
96#[derive(Debug, Clone)]
97pub struct StreamingConfig {
98    /// Number of triples to buffer before yielding
99    pub batch_size: usize,
100    /// Whether to continue parsing after errors
101    pub lenient: bool,
102    /// Maximum memory to use for buffering (bytes)
103    pub max_buffer_size: usize,
104}
105
106impl Default for StreamingConfig {
107    fn default() -> Self {
108        Self {
109            batch_size: 10_000,
110            lenient: false,
111            max_buffer_size: 100 * 1024 * 1024, // 100 MB
112        }
113    }
114}
115
116impl StreamingConfig {
117    /// Create a new streaming configuration
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Set the batch size
123    pub fn with_batch_size(mut self, size: usize) -> Self {
124        self.batch_size = size;
125        self
126    }
127
128    /// Enable lenient mode (continue on errors)
129    pub fn lenient(mut self, lenient: bool) -> Self {
130        self.lenient = lenient;
131        self
132    }
133
134    /// Set maximum buffer size
135    pub fn with_max_buffer_size(mut self, size: usize) -> Self {
136        self.max_buffer_size = size;
137        self
138    }
139}
140
141/// Streaming parser that yields batches of triples
142pub struct StreamingParser<R: BufRead> {
143    reader: R,
144    config: StreamingConfig,
145    buffer: String,
146    triples_parsed: usize,
147    bytes_read: usize,
148    /// Accumulated prefix declarations to preserve across batches
149    prefix_declarations: String,
150}
151
152impl<R: Read> StreamingParser<BufReader<R>> {
153    /// Create a new streaming parser from a reader
154    pub fn new(reader: R) -> Self {
155        Self::with_config(reader, StreamingConfig::default())
156    }
157
158    /// Create a streaming parser with custom configuration
159    pub fn with_config(reader: R, config: StreamingConfig) -> Self {
160        Self {
161            reader: BufReader::new(reader),
162            config,
163            buffer: String::new(),
164            triples_parsed: 0,
165            bytes_read: 0,
166            prefix_declarations: String::new(),
167        }
168    }
169}
170
171impl<R: BufRead> StreamingParser<R> {
172    /// Create from an existing BufRead
173    pub fn from_buf_reader(reader: R, config: StreamingConfig) -> Self {
174        Self {
175            reader,
176            config,
177            buffer: String::new(),
178            triples_parsed: 0,
179            bytes_read: 0,
180            prefix_declarations: String::new(),
181        }
182    }
183
184    /// Get the number of triples parsed so far
185    pub fn triples_parsed(&self) -> usize {
186        self.triples_parsed
187    }
188
189    /// Get the number of bytes read so far
190    pub fn bytes_read(&self) -> usize {
191        self.bytes_read
192    }
193
194    /// Parse the next batch of triples
195    pub fn next_batch(&mut self) -> TurtleResult<Option<Vec<Triple>>> {
196        use crate::formats::trig::TriGParser;
197        use crate::toolkit::Parser;
198        use crate::turtle::TurtleParser;
199        use oxirs_core::model::Quad;
200
201        // Read up to batch_size lines or until buffer limit
202        // But always complete the current statement (read until we see a '.' or '}')
203        self.buffer.clear();
204        let mut lines_read = 0;
205        let target_lines = self.config.batch_size / 10; // Rough estimate: ~10 triples per line
206        let mut in_multiline_string = false;
207        let mut last_line_ended_statement = false;
208
209        while lines_read < target_lines && self.buffer.len() < self.config.max_buffer_size {
210            let mut line = String::new();
211            match self.reader.read_line(&mut line) {
212                Ok(0) => break, // EOF
213                Ok(n) => {
214                    self.bytes_read += n;
215                    self.buffer.push_str(&line);
216                    lines_read += 1;
217
218                    // Track multiline strings
219                    let triple_quotes =
220                        line.matches("\"\"\"").count() + line.matches("'''").count();
221                    if triple_quotes % 2 == 1 {
222                        in_multiline_string = !in_multiline_string;
223                    }
224
225                    // Check if this line ends a statement (only if not in multiline string)
226                    let trimmed = line.trim();
227                    if !in_multiline_string && (trimmed.ends_with('.') || trimmed == "}") {
228                        last_line_ended_statement = true;
229                        // If we've read enough lines and found a statement boundary, stop here
230                        if lines_read >= target_lines {
231                            break;
232                        }
233                    } else {
234                        last_line_ended_statement = false;
235                    }
236                }
237                Err(e) => return Err(TurtleParseError::io(e)),
238            }
239        }
240
241        // Continue reading until we complete the current statement
242        // (unless we're at EOF or hit the buffer limit)
243        while !last_line_ended_statement
244            && self.buffer.len() < self.config.max_buffer_size
245            && !in_multiline_string
246        {
247            let mut line = String::new();
248            match self.reader.read_line(&mut line) {
249                Ok(0) => break, // EOF
250                Ok(n) => {
251                    self.bytes_read += n;
252                    self.buffer.push_str(&line);
253
254                    // Track multiline strings
255                    let triple_quotes =
256                        line.matches("\"\"\"").count() + line.matches("'''").count();
257                    if triple_quotes % 2 == 1 {
258                        in_multiline_string = !in_multiline_string;
259                    }
260
261                    let trimmed = line.trim();
262                    if !in_multiline_string && (trimmed.ends_with('.') || trimmed == "}") {
263                        break;
264                    }
265                }
266                Err(e) => return Err(TurtleParseError::io(e)),
267            }
268        }
269
270        if self.buffer.is_empty() {
271            return Ok(None); // EOF
272        }
273
274        // Extract prefix and base declarations from this batch
275        for line in self.buffer.lines() {
276            let trimmed = line.trim();
277            if trimmed.starts_with("@prefix") || trimmed.starts_with("@base") {
278                // Save prefix/base declarations for future batches
279                if !self.prefix_declarations.contains(trimmed) {
280                    self.prefix_declarations.push_str(trimmed);
281                    self.prefix_declarations.push('\n');
282                }
283            }
284        }
285
286        // Prepend accumulated prefix declarations to this batch
287        let document = format!("{}{}", self.prefix_declarations, self.buffer);
288
289        // Detect if this is TriG (contains named graphs) or Turtle
290        let is_trig = document.contains('{') || document.contains("GRAPH");
291
292        if is_trig {
293            // For TriG format with named graphs, read the entire document
294            // (proper streaming would require graph-aware state management)
295            let mut complete_document = document.clone();
296            loop {
297                let mut line = String::new();
298                match self.reader.read_line(&mut line) {
299                    Ok(0) => break, // EOF
300                    Ok(n) => {
301                        self.bytes_read += n;
302                        complete_document.push_str(&line);
303                    }
304                    Err(e) => return Err(TurtleParseError::io(e)),
305                }
306            }
307
308            // Use TriG parser for documents with named graphs
309            let mut parser = TriGParser::new();
310            if self.config.lenient {
311                parser.lenient = true;
312            }
313
314            match parser.parse(complete_document.as_bytes()) {
315                Ok(quads) => {
316                    // Extract triples from quads
317                    let triples: Vec<Triple> = quads
318                        .into_iter()
319                        .map(|q: Quad| {
320                            Triple::new(
321                                q.subject().clone(),
322                                q.predicate().clone(),
323                                q.object().clone(),
324                            )
325                        })
326                        .collect();
327                    self.triples_parsed += triples.len();
328                    // Clear buffer to signal EOF on next call
329                    self.buffer.clear();
330                    Ok(Some(triples))
331                }
332                Err(_e) if self.config.lenient => {
333                    // In lenient mode, return empty batch on error
334                    // Clear buffer to signal EOF on next call
335                    self.buffer.clear();
336                    Ok(Some(Vec::new()))
337                }
338                Err(e) => Err(e),
339            }
340        } else {
341            // Use Turtle parser for plain triples
342            let parser = if self.config.lenient {
343                TurtleParser::new_lenient()
344            } else {
345                TurtleParser::new()
346            };
347
348            match parser.parse_document(&document) {
349                Ok(triples) => {
350                    self.triples_parsed += triples.len();
351                    Ok(Some(triples))
352                }
353                Err(_e) if self.config.lenient => {
354                    // In lenient mode, return empty batch on error
355                    Ok(Some(Vec::new()))
356                }
357                Err(e) => Err(e),
358            }
359        }
360    }
361
362    /// Get an iterator over batches
363    pub fn batches(self) -> StreamingBatchIterator<R> {
364        StreamingBatchIterator { parser: self }
365    }
366
367    /// Get an iterator over individual triples
368    pub fn triples(self) -> StreamingTripleIterator<R> {
369        StreamingTripleIterator {
370            parser: self,
371            current_batch: Vec::new(),
372            batch_index: 0,
373        }
374    }
375}
376
377/// Iterator over batches of triples
378pub struct StreamingBatchIterator<R: BufRead> {
379    parser: StreamingParser<R>,
380}
381
382impl<R: BufRead> Iterator for StreamingBatchIterator<R> {
383    type Item = TurtleResult<Vec<Triple>>;
384
385    fn next(&mut self) -> Option<Self::Item> {
386        match self.parser.next_batch() {
387            Ok(Some(batch)) => Some(Ok(batch)),
388            Ok(None) => None,
389            Err(e) => Some(Err(e)),
390        }
391    }
392}
393
394/// Iterator over individual triples
395pub struct StreamingTripleIterator<R: BufRead> {
396    parser: StreamingParser<R>,
397    current_batch: Vec<Triple>,
398    batch_index: usize,
399}
400
401impl<R: BufRead> Iterator for StreamingTripleIterator<R> {
402    type Item = TurtleResult<Triple>;
403
404    fn next(&mut self) -> Option<Self::Item> {
405        // If we have triples in the current batch, return the next one
406        if self.batch_index < self.current_batch.len() {
407            let triple = self.current_batch[self.batch_index].clone();
408            self.batch_index += 1;
409            return Some(Ok(triple));
410        }
411
412        // Need to load next batch
413        match self.parser.next_batch() {
414            Ok(Some(batch)) => {
415                self.current_batch = batch;
416                self.batch_index = 0;
417                self.next() // Recursively get first item from new batch
418            }
419            Ok(None) => None, // EOF
420            Err(e) => Some(Err(e)),
421        }
422    }
423}
424
425/// Progress callback for streaming operations
426pub trait ProgressCallback: Send {
427    /// Called when a batch is parsed
428    fn on_batch(&mut self, triples_count: usize, bytes_read: usize);
429
430    /// Called when an error occurs (in lenient mode)
431    fn on_error(&mut self, error: &TurtleParseError);
432}
433
434/// Simple progress printer
435pub struct PrintProgress {
436    last_report: usize,
437    report_interval: usize,
438}
439
440impl PrintProgress {
441    /// Create a new progress printer
442    pub fn new(report_interval: usize) -> Self {
443        Self {
444            last_report: 0,
445            report_interval,
446        }
447    }
448}
449
450impl Default for PrintProgress {
451    fn default() -> Self {
452        Self::new(10_000)
453    }
454}
455
456impl ProgressCallback for PrintProgress {
457    fn on_batch(&mut self, triples_count: usize, bytes_read: usize) {
458        if triples_count - self.last_report >= self.report_interval {
459            eprintln!(
460                "Parsed {} triples ({:.2} MB)",
461                triples_count,
462                bytes_read as f64 / 1_024_000.0
463            );
464            self.last_report = triples_count;
465        }
466    }
467
468    fn on_error(&mut self, error: &TurtleParseError) {
469        eprintln!("Warning: {}", error);
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476    use std::io::Cursor;
477
478    #[test]
479    fn test_streaming_parser_basic() {
480        let turtle = r#"
481            @prefix ex: <http://example.org/> .
482            ex:alice ex:name "Alice" .
483            ex:bob ex:name "Bob" .
484            ex:charlie ex:name "Charlie" .
485        "#;
486
487        let reader = Cursor::new(turtle);
488        let mut parser = StreamingParser::new(reader);
489
490        let batch = parser.next_batch().unwrap();
491        assert!(batch.is_some());
492
493        let triples = batch.unwrap();
494        assert_eq!(triples.len(), 3);
495    }
496
497    #[test]
498    fn test_batch_iterator() {
499        let turtle = r#"
500            @prefix ex: <http://example.org/> .
501            ex:alice ex:name "Alice" .
502            ex:bob ex:name "Bob" .
503        "#;
504
505        let reader = Cursor::new(turtle);
506        let parser = StreamingParser::new(reader);
507
508        let batches: Vec<_> = parser.batches().collect();
509        assert_eq!(batches.len(), 1);
510        assert!(batches[0].is_ok());
511    }
512
513    #[test]
514    fn test_triple_iterator() {
515        let turtle = r#"
516            @prefix ex: <http://example.org/> .
517            ex:alice ex:name "Alice" .
518            ex:bob ex:name "Bob" .
519        "#;
520
521        let reader = Cursor::new(turtle);
522        let parser = StreamingParser::new(reader);
523
524        let triples: Result<Vec<_>, _> = parser.triples().collect();
525        assert!(triples.is_ok());
526        assert_eq!(triples.unwrap().len(), 2);
527    }
528
529    #[test]
530    fn test_large_document_streaming() {
531        // Generate a large document
532        let mut turtle = String::from("@prefix ex: <http://example.org/> .\n");
533        for i in 0..1000 {
534            turtle.push_str(&format!("ex:subject{} ex:predicate \"object{}\" .\n", i, i));
535        }
536
537        let reader = Cursor::new(turtle);
538        let config = StreamingConfig::default().with_batch_size(100);
539        let mut parser = StreamingParser::with_config(reader, config);
540
541        let mut total_triples = 0;
542        while let Some(batch) = parser.next_batch().unwrap() {
543            total_triples += batch.len();
544        }
545
546        assert_eq!(total_triples, 1000);
547    }
548
549    #[test]
550    fn test_lenient_mode() {
551        let turtle = r#"
552            @prefix ex: <http://example.org/> .
553            ex:alice ex:name "Alice" .
554            invalid syntax here
555            ex:bob ex:name "Bob" .
556        "#;
557
558        let reader = Cursor::new(turtle);
559        let config = StreamingConfig::default().lenient(true);
560        let parser = StreamingParser::with_config(reader, config);
561
562        // Should not panic in lenient mode
563        let _triples: Vec<_> = parser.triples().collect();
564    }
565
566    #[test]
567    fn test_progress_tracking() {
568        let turtle = r#"
569            @prefix ex: <http://example.org/> .
570            ex:alice ex:name "Alice" .
571            ex:bob ex:name "Bob" .
572        "#;
573
574        let reader = Cursor::new(turtle);
575        let mut parser = StreamingParser::new(reader);
576
577        let _ = parser.next_batch();
578
579        assert!(parser.triples_parsed() > 0);
580        assert!(parser.bytes_read() > 0);
581    }
582}