1use crate::error::{TurtleParseError, TurtleResult};
79use oxirs_core::model::Triple;
80use std::io::{BufRead, BufReader, Read};
81
82#[derive(Debug, Clone)]
97pub struct StreamingConfig {
98 pub batch_size: usize,
100 pub lenient: bool,
102 pub max_buffer_size: usize,
104}
105
106impl Default for StreamingConfig {
107 fn default() -> Self {
108 Self {
109 batch_size: 10_000,
110 lenient: false,
111 max_buffer_size: 100 * 1024 * 1024, }
113 }
114}
115
116impl StreamingConfig {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_batch_size(mut self, size: usize) -> Self {
124 self.batch_size = size;
125 self
126 }
127
128 pub fn lenient(mut self, lenient: bool) -> Self {
130 self.lenient = lenient;
131 self
132 }
133
134 pub fn with_max_buffer_size(mut self, size: usize) -> Self {
136 self.max_buffer_size = size;
137 self
138 }
139}
140
141pub struct StreamingParser<R: BufRead> {
143 reader: R,
144 config: StreamingConfig,
145 buffer: String,
146 triples_parsed: usize,
147 bytes_read: usize,
148 prefix_declarations: String,
150}
151
152impl<R: Read> StreamingParser<BufReader<R>> {
153 pub fn new(reader: R) -> Self {
155 Self::with_config(reader, StreamingConfig::default())
156 }
157
158 pub fn with_config(reader: R, config: StreamingConfig) -> Self {
160 Self {
161 reader: BufReader::new(reader),
162 config,
163 buffer: String::new(),
164 triples_parsed: 0,
165 bytes_read: 0,
166 prefix_declarations: String::new(),
167 }
168 }
169}
170
171impl<R: BufRead> StreamingParser<R> {
172 pub fn from_buf_reader(reader: R, config: StreamingConfig) -> Self {
174 Self {
175 reader,
176 config,
177 buffer: String::new(),
178 triples_parsed: 0,
179 bytes_read: 0,
180 prefix_declarations: String::new(),
181 }
182 }
183
184 pub fn triples_parsed(&self) -> usize {
186 self.triples_parsed
187 }
188
189 pub fn bytes_read(&self) -> usize {
191 self.bytes_read
192 }
193
194 pub fn next_batch(&mut self) -> TurtleResult<Option<Vec<Triple>>> {
196 use crate::formats::trig::TriGParser;
197 use crate::toolkit::Parser;
198 use crate::turtle::TurtleParser;
199 use oxirs_core::model::Quad;
200
201 self.buffer.clear();
204 let mut lines_read = 0;
205 let target_lines = self.config.batch_size / 10; let mut in_multiline_string = false;
207 let mut last_line_ended_statement = false;
208
209 while lines_read < target_lines && self.buffer.len() < self.config.max_buffer_size {
210 let mut line = String::new();
211 match self.reader.read_line(&mut line) {
212 Ok(0) => break, Ok(n) => {
214 self.bytes_read += n;
215 self.buffer.push_str(&line);
216 lines_read += 1;
217
218 let triple_quotes =
220 line.matches("\"\"\"").count() + line.matches("'''").count();
221 if triple_quotes % 2 == 1 {
222 in_multiline_string = !in_multiline_string;
223 }
224
225 let trimmed = line.trim();
227 if !in_multiline_string && (trimmed.ends_with('.') || trimmed == "}") {
228 last_line_ended_statement = true;
229 if lines_read >= target_lines {
231 break;
232 }
233 } else {
234 last_line_ended_statement = false;
235 }
236 }
237 Err(e) => return Err(TurtleParseError::io(e)),
238 }
239 }
240
241 while !last_line_ended_statement
244 && self.buffer.len() < self.config.max_buffer_size
245 && !in_multiline_string
246 {
247 let mut line = String::new();
248 match self.reader.read_line(&mut line) {
249 Ok(0) => break, Ok(n) => {
251 self.bytes_read += n;
252 self.buffer.push_str(&line);
253
254 let triple_quotes =
256 line.matches("\"\"\"").count() + line.matches("'''").count();
257 if triple_quotes % 2 == 1 {
258 in_multiline_string = !in_multiline_string;
259 }
260
261 let trimmed = line.trim();
262 if !in_multiline_string && (trimmed.ends_with('.') || trimmed == "}") {
263 break;
264 }
265 }
266 Err(e) => return Err(TurtleParseError::io(e)),
267 }
268 }
269
270 if self.buffer.is_empty() {
271 return Ok(None); }
273
274 for line in self.buffer.lines() {
276 let trimmed = line.trim();
277 if trimmed.starts_with("@prefix") || trimmed.starts_with("@base") {
278 if !self.prefix_declarations.contains(trimmed) {
280 self.prefix_declarations.push_str(trimmed);
281 self.prefix_declarations.push('\n');
282 }
283 }
284 }
285
286 let document = format!("{}{}", self.prefix_declarations, self.buffer);
288
289 let is_trig = document.contains('{') || document.contains("GRAPH");
291
292 if is_trig {
293 let mut complete_document = document.clone();
296 loop {
297 let mut line = String::new();
298 match self.reader.read_line(&mut line) {
299 Ok(0) => break, Ok(n) => {
301 self.bytes_read += n;
302 complete_document.push_str(&line);
303 }
304 Err(e) => return Err(TurtleParseError::io(e)),
305 }
306 }
307
308 let mut parser = TriGParser::new();
310 if self.config.lenient {
311 parser.lenient = true;
312 }
313
314 match parser.parse(complete_document.as_bytes()) {
315 Ok(quads) => {
316 let triples: Vec<Triple> = quads
318 .into_iter()
319 .map(|q: Quad| {
320 Triple::new(
321 q.subject().clone(),
322 q.predicate().clone(),
323 q.object().clone(),
324 )
325 })
326 .collect();
327 self.triples_parsed += triples.len();
328 self.buffer.clear();
330 Ok(Some(triples))
331 }
332 Err(_e) if self.config.lenient => {
333 self.buffer.clear();
336 Ok(Some(Vec::new()))
337 }
338 Err(e) => Err(e),
339 }
340 } else {
341 let parser = if self.config.lenient {
343 TurtleParser::new_lenient()
344 } else {
345 TurtleParser::new()
346 };
347
348 match parser.parse_document(&document) {
349 Ok(triples) => {
350 self.triples_parsed += triples.len();
351 Ok(Some(triples))
352 }
353 Err(_e) if self.config.lenient => {
354 Ok(Some(Vec::new()))
356 }
357 Err(e) => Err(e),
358 }
359 }
360 }
361
362 pub fn batches(self) -> StreamingBatchIterator<R> {
364 StreamingBatchIterator { parser: self }
365 }
366
367 pub fn triples(self) -> StreamingTripleIterator<R> {
369 StreamingTripleIterator {
370 parser: self,
371 current_batch: Vec::new(),
372 batch_index: 0,
373 }
374 }
375}
376
377pub struct StreamingBatchIterator<R: BufRead> {
379 parser: StreamingParser<R>,
380}
381
382impl<R: BufRead> Iterator for StreamingBatchIterator<R> {
383 type Item = TurtleResult<Vec<Triple>>;
384
385 fn next(&mut self) -> Option<Self::Item> {
386 match self.parser.next_batch() {
387 Ok(Some(batch)) => Some(Ok(batch)),
388 Ok(None) => None,
389 Err(e) => Some(Err(e)),
390 }
391 }
392}
393
394pub struct StreamingTripleIterator<R: BufRead> {
396 parser: StreamingParser<R>,
397 current_batch: Vec<Triple>,
398 batch_index: usize,
399}
400
401impl<R: BufRead> Iterator for StreamingTripleIterator<R> {
402 type Item = TurtleResult<Triple>;
403
404 fn next(&mut self) -> Option<Self::Item> {
405 if self.batch_index < self.current_batch.len() {
407 let triple = self.current_batch[self.batch_index].clone();
408 self.batch_index += 1;
409 return Some(Ok(triple));
410 }
411
412 match self.parser.next_batch() {
414 Ok(Some(batch)) => {
415 self.current_batch = batch;
416 self.batch_index = 0;
417 self.next() }
419 Ok(None) => None, Err(e) => Some(Err(e)),
421 }
422 }
423}
424
425pub trait ProgressCallback: Send {
427 fn on_batch(&mut self, triples_count: usize, bytes_read: usize);
429
430 fn on_error(&mut self, error: &TurtleParseError);
432}
433
434pub struct PrintProgress {
436 last_report: usize,
437 report_interval: usize,
438}
439
440impl PrintProgress {
441 pub fn new(report_interval: usize) -> Self {
443 Self {
444 last_report: 0,
445 report_interval,
446 }
447 }
448}
449
450impl Default for PrintProgress {
451 fn default() -> Self {
452 Self::new(10_000)
453 }
454}
455
456impl ProgressCallback for PrintProgress {
457 fn on_batch(&mut self, triples_count: usize, bytes_read: usize) {
458 if triples_count - self.last_report >= self.report_interval {
459 eprintln!(
460 "Parsed {} triples ({:.2} MB)",
461 triples_count,
462 bytes_read as f64 / 1_024_000.0
463 );
464 self.last_report = triples_count;
465 }
466 }
467
468 fn on_error(&mut self, error: &TurtleParseError) {
469 eprintln!("Warning: {}", error);
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476 use std::io::Cursor;
477
478 #[test]
479 fn test_streaming_parser_basic() {
480 let turtle = r#"
481 @prefix ex: <http://example.org/> .
482 ex:alice ex:name "Alice" .
483 ex:bob ex:name "Bob" .
484 ex:charlie ex:name "Charlie" .
485 "#;
486
487 let reader = Cursor::new(turtle);
488 let mut parser = StreamingParser::new(reader);
489
490 let batch = parser.next_batch().unwrap();
491 assert!(batch.is_some());
492
493 let triples = batch.unwrap();
494 assert_eq!(triples.len(), 3);
495 }
496
497 #[test]
498 fn test_batch_iterator() {
499 let turtle = r#"
500 @prefix ex: <http://example.org/> .
501 ex:alice ex:name "Alice" .
502 ex:bob ex:name "Bob" .
503 "#;
504
505 let reader = Cursor::new(turtle);
506 let parser = StreamingParser::new(reader);
507
508 let batches: Vec<_> = parser.batches().collect();
509 assert_eq!(batches.len(), 1);
510 assert!(batches[0].is_ok());
511 }
512
513 #[test]
514 fn test_triple_iterator() {
515 let turtle = r#"
516 @prefix ex: <http://example.org/> .
517 ex:alice ex:name "Alice" .
518 ex:bob ex:name "Bob" .
519 "#;
520
521 let reader = Cursor::new(turtle);
522 let parser = StreamingParser::new(reader);
523
524 let triples: Result<Vec<_>, _> = parser.triples().collect();
525 assert!(triples.is_ok());
526 assert_eq!(triples.unwrap().len(), 2);
527 }
528
529 #[test]
530 fn test_large_document_streaming() {
531 let mut turtle = String::from("@prefix ex: <http://example.org/> .\n");
533 for i in 0..1000 {
534 turtle.push_str(&format!("ex:subject{} ex:predicate \"object{}\" .\n", i, i));
535 }
536
537 let reader = Cursor::new(turtle);
538 let config = StreamingConfig::default().with_batch_size(100);
539 let mut parser = StreamingParser::with_config(reader, config);
540
541 let mut total_triples = 0;
542 while let Some(batch) = parser.next_batch().unwrap() {
543 total_triples += batch.len();
544 }
545
546 assert_eq!(total_triples, 1000);
547 }
548
549 #[test]
550 fn test_lenient_mode() {
551 let turtle = r#"
552 @prefix ex: <http://example.org/> .
553 ex:alice ex:name "Alice" .
554 invalid syntax here
555 ex:bob ex:name "Bob" .
556 "#;
557
558 let reader = Cursor::new(turtle);
559 let config = StreamingConfig::default().lenient(true);
560 let parser = StreamingParser::with_config(reader, config);
561
562 let _triples: Vec<_> = parser.triples().collect();
564 }
565
566 #[test]
567 fn test_progress_tracking() {
568 let turtle = r#"
569 @prefix ex: <http://example.org/> .
570 ex:alice ex:name "Alice" .
571 ex:bob ex:name "Bob" .
572 "#;
573
574 let reader = Cursor::new(turtle);
575 let mut parser = StreamingParser::new(reader);
576
577 let _ = parser.next_batch();
578
579 assert!(parser.triples_parsed() > 0);
580 assert!(parser.bytes_read() > 0);
581 }
582}