Skip to main content

oxirs_samm/parser/
streaming.rs

1//! Streaming SAMM Parser for Memory-Efficient Processing
2//!
3//! This module provides a streaming parser for SAMM models that processes
4//! large Turtle/RDF files incrementally without loading the entire file into memory.
5//!
6//! ## Features
7//!
8//! - **Memory Efficient**: Processes files in configurable chunks (default: 64KB)
9//! - **Large File Support**: Can handle files larger than available RAM
10//! - **Incremental Parsing**: Emits model elements as they're parsed
11//! - **Async Streaming**: Uses Rust async streams for efficient I/O
12//! - **Configurable Buffer**: Adjust chunk size based on available memory
13//!
14//! ## Example
15//!
16//! ```rust,no_run
17//! use oxirs_samm::parser::StreamingParser;
18//! use oxirs_samm::metamodel::ModelElement;
19//! use futures::{StreamExt, pin_mut};
20//!
21//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
22//! // Create streaming parser with 128KB chunks
23//! let mut parser = StreamingParser::new()
24//!     .with_chunk_size(128 * 1024);
25//!
26//! // Parse large file incrementally
27//! let stream = parser.parse_file_streaming("large_model.ttl").await?;
28//! pin_mut!(stream);
29//!
30//! // Process elements as they arrive
31//! while let Some(element) = stream.next().await {
32//!     match element {
33//!         Ok(aspect) => println!("Parsed aspect: {}", aspect.name()),
34//!         Err(e) => eprintln!("Parse error: {}", e),
35//!     }
36//! }
37//! # Ok(())
38//! # }
39//! ```
40
41use crate::error::{Result, SammError};
42use crate::metamodel::Aspect;
43use crate::parser::SammTurtleParser;
44use futures::stream::{Stream, StreamExt};
45use std::path::Path;
46use std::pin::Pin;
47use tokio::fs::File;
48use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
49
50/// Default chunk size for streaming (64KB)
51const DEFAULT_CHUNK_SIZE: usize = 64 * 1024;
52
53/// Maximum buffer size to prevent unbounded growth (16MB)
54const MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
55
56/// Streaming parser for memory-efficient SAMM model processing
57pub struct StreamingParser {
58    /// Chunk size for reading file
59    chunk_size: usize,
60    /// Maximum buffer size before forcing a parse
61    max_buffer_size: usize,
62    /// Base URI for resolving relative references
63    base_uri: Option<String>,
64}
65
66impl StreamingParser {
67    /// Create a new streaming parser with default settings
68    pub fn new() -> Self {
69        Self {
70            chunk_size: DEFAULT_CHUNK_SIZE,
71            max_buffer_size: MAX_BUFFER_SIZE,
72            base_uri: None,
73        }
74    }
75
76    /// Set the chunk size for reading (in bytes)
77    ///
78    /// Smaller chunks use less memory but may be slower.
79    /// Larger chunks are faster but use more memory.
80    ///
81    /// Default: 64KB
82    pub fn with_chunk_size(mut self, size: usize) -> Self {
83        self.chunk_size = size.max(1024); // Minimum 1KB
84        self
85    }
86
87    /// Set the maximum buffer size (in bytes)
88    ///
89    /// When the buffer grows beyond this size, the parser will
90    /// attempt to flush and parse accumulated data.
91    ///
92    /// Default: 16MB
93    pub fn with_max_buffer_size(mut self, size: usize) -> Self {
94        self.max_buffer_size = size;
95        self
96    }
97
98    /// Set the base URI for resolving relative references
99    pub fn with_base_uri(mut self, base_uri: impl Into<String>) -> Self {
100        self.base_uri = Some(base_uri.into());
101        self
102    }
103
104    /// Parse a file using streaming for memory efficiency
105    ///
106    /// This method reads the file in chunks and emits parsed Aspect models
107    /// as they become available. This is much more memory-efficient than
108    /// loading the entire file at once.
109    ///
110    /// # Arguments
111    ///
112    /// * `path` - Path to the Turtle file
113    ///
114    /// # Returns
115    ///
116    /// A stream of `Result<Aspect, SammError>` that emits aspects as they're parsed
117    pub async fn parse_file_streaming<P: AsRef<Path>>(
118        &mut self,
119        path: P,
120    ) -> Result<impl Stream<Item = Result<Aspect>>> {
121        let file = File::open(path.as_ref())
122            .await
123            .map_err(|e| SammError::ParseError(format!("Failed to open file: {}", e)))?;
124
125        let base_uri = self
126            .base_uri
127            .clone()
128            .unwrap_or_else(|| format!("file://{}", path.as_ref().to_string_lossy()));
129
130        Ok(self.create_stream(file, base_uri))
131    }
132
133    /// Parse from an async reader using streaming
134    ///
135    /// This allows streaming from any async source (file, network, etc.)
136    pub fn parse_reader_streaming<R>(
137        &mut self,
138        reader: R,
139        base_uri: impl Into<String>,
140    ) -> impl Stream<Item = Result<Aspect>>
141    where
142        R: AsyncReadExt + Unpin + Send + 'static,
143    {
144        self.create_stream(reader, base_uri.into())
145    }
146
147    /// Internal method to create the streaming parser
148    fn create_stream<R>(&self, reader: R, base_uri: String) -> impl Stream<Item = Result<Aspect>>
149    where
150        R: AsyncReadExt + Unpin + Send + 'static,
151    {
152        let chunk_size = self.chunk_size;
153        let max_buffer_size = self.max_buffer_size;
154
155        async_stream::stream! {
156            let mut reader = BufReader::with_capacity(chunk_size, reader);
157            let mut buffer = String::new();
158
159            // Read file line by line to find complete Turtle documents
160            loop {
161                let mut line = String::new();
162                match reader.read_line(&mut line).await {
163                    Ok(0) => {
164                        // EOF - process any remaining content
165                        if !buffer.is_empty() {
166                            match try_parse_buffer(&buffer, &base_uri).await {
167                                Ok(Some(aspect)) => yield Ok(aspect),
168                                Ok(None) => {}, // Incomplete document
169                                Err(e) => yield Err(e),
170                            }
171                        }
172                        break;
173                    }
174                    Ok(_) => {
175                        buffer.push_str(&line);
176
177                        // Check if we have a complete document (ends with .)
178                        // This is a simple heuristic - could be improved
179                        if line.trim().ends_with('.') || buffer.len() > max_buffer_size {
180                            match try_parse_buffer(&buffer, &base_uri).await {
181                                Ok(Some(aspect)) => {
182                                    yield Ok(aspect);
183                                    buffer.clear();
184                                }
185                                Ok(None) => {
186                                    // Keep accumulating
187                                    if buffer.len() > max_buffer_size {
188                                        // Force clear to prevent OOM
189                                        tracing::warn!(
190                                            "Buffer exceeded max size ({}MB), clearing incomplete document",
191                                            buffer.len() / 1024 / 1024
192                                        );
193                                        buffer.clear();
194                                        yield Err(SammError::ParseError(
195                                            "Document too large for streaming parser".to_string()
196                                        ));
197                                    }
198                                }
199                                Err(e) => {
200                                    // Parse error - clear buffer and continue
201                                    tracing::debug!("Parse error in streaming: {}", e);
202                                    buffer.clear();
203                                }
204                            }
205                        }
206                    }
207                    Err(e) => {
208                        yield Err(SammError::ParseError(format!("Read error: {}", e)));
209                        break;
210                    }
211                }
212            }
213        }
214    }
215
216    /// Parse a string using line-by-line streaming
217    ///
218    /// This is useful for processing large in-memory strings without
219    /// creating multiple copies.
220    pub fn parse_string_streaming(
221        &self,
222        content: String,
223        base_uri: impl Into<String>,
224    ) -> impl Stream<Item = Result<Aspect>> {
225        let base_uri = base_uri.into();
226        let lines: Vec<String> = content.lines().map(String::from).collect();
227
228        async_stream::stream! {
229            let mut buffer = String::new();
230            let mut blank_line_count = 0;
231
232            for line in lines {
233                let trimmed = line.trim();
234
235                // Track consecutive blank lines as potential document separators
236                if trimmed.is_empty() {
237                    blank_line_count += 1;
238                } else {
239                    blank_line_count = 0;
240                }
241
242                buffer.push_str(&line);
243                buffer.push('\n');
244
245                // Try to parse when we have a potentially complete document:
246                // - Multiple blank lines suggest document boundary
247                // - Or buffer is getting large
248                if (blank_line_count >= 2 || buffer.len() > 10000) && !buffer.trim().is_empty() {
249                    match try_parse_buffer(&buffer, &base_uri).await {
250                        Ok(Some(aspect)) => {
251                            yield Ok(aspect);
252                            buffer.clear();
253                            blank_line_count = 0;
254                        }
255                        Ok(None) => {
256                            // Keep accumulating - document not complete yet
257                        }
258                        Err(e) => {
259                            // Only clear if buffer is excessively large
260                            if buffer.len() > 100000 {
261                                tracing::debug!("Clearing large buffer after parse error: {}", e);
262                                buffer.clear();
263                                blank_line_count = 0;
264                            }
265                            // Otherwise keep accumulating
266                        }
267                    }
268                }
269            }
270
271            // Parse any remaining content
272            if !buffer.is_empty() {
273                match try_parse_buffer(&buffer, &base_uri).await {
274                    Ok(Some(aspect)) => yield Ok(aspect),
275                    Ok(None) => {},
276                    Err(e) => yield Err(e),
277                }
278            }
279        }
280    }
281
282    /// Get current configuration as a summary string
283    pub fn config_summary(&self) -> String {
284        format!(
285            "StreamingParser {{ chunk_size: {}KB, max_buffer: {}MB, base_uri: {} }}",
286            self.chunk_size / 1024,
287            self.max_buffer_size / 1024 / 1024,
288            self.base_uri.as_deref().unwrap_or("auto")
289        )
290    }
291}
292
293impl Default for StreamingParser {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299/// Try to parse accumulated buffer content
300async fn try_parse_buffer(content: &str, base_uri: &str) -> Result<Option<Aspect>> {
301    // Check if content looks complete (has @prefix and ends with .)
302    let has_prefix = content.contains("@prefix");
303    let ends_properly = content.trim_end().ends_with('.');
304
305    if !has_prefix || !ends_properly {
306        return Ok(None); // Incomplete document
307    }
308
309    // Try parsing
310    let mut parser = SammTurtleParser::new();
311    match parser.parse_string(content, base_uri).await {
312        Ok(aspect) => Ok(Some(aspect)),
313        Err(e) => {
314            // If it's a genuine parse error, return it
315            // If it's just incomplete, return None
316            if content.len() < 100 {
317                Ok(None) // Probably incomplete
318            } else {
319                Err(e)
320            }
321        }
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328    use crate::metamodel::ModelElement;
329    use futures::StreamExt;
330
331    #[tokio::test]
332    async fn test_streaming_parser_string() {
333        use futures::pin_mut;
334
335        let ttl_content = r#"
336@prefix samm: <urn:samm:org.eclipse.esmf.samm:meta-model:2.3.0#> .
337@prefix : <urn:samm:org.example:1.0.0#> .
338
339:TestAspect a samm:Aspect ;
340    samm:preferredName "Test Aspect"@en ;
341    samm:description "A test aspect"@en ;
342    samm:properties ( :testProperty ) .
343
344:testProperty a samm:Property ;
345    samm:preferredName "Test Property"@en ;
346    samm:description "Test description"@en ;
347    samm:characteristic :TestCharacteristic .
348
349:TestCharacteristic a samm:Characteristic ;
350    samm:dataType <http://www.w3.org/2001/XMLSchema#string> .
351        "#
352        .to_string();
353
354        let parser = StreamingParser::new();
355        let stream = parser.parse_string_streaming(ttl_content, "urn:samm:org.example:1.0.0#");
356        pin_mut!(stream);
357
358        let mut count = 0;
359        while let Some(result) = stream.next().await {
360            match result {
361                Ok(aspect) => {
362                    assert_eq!(aspect.name(), "TestAspect");
363                    count += 1;
364                }
365                Err(e) => panic!("Unexpected error: {}", e),
366            }
367        }
368
369        assert!(count > 0, "Should have parsed at least one aspect");
370    }
371
372    #[tokio::test]
373    async fn test_streaming_parser_config() {
374        let parser = StreamingParser::new()
375            .with_chunk_size(128 * 1024)
376            .with_max_buffer_size(32 * 1024 * 1024)
377            .with_base_uri("urn:test#");
378
379        let summary = parser.config_summary();
380        assert!(summary.contains("128KB"));
381        assert!(summary.contains("32MB"));
382        assert!(summary.contains("urn:test#"));
383    }
384
385    #[tokio::test]
386    async fn test_streaming_parser_empty_input() {
387        use futures::pin_mut;
388
389        let parser = StreamingParser::new();
390        let stream = parser.parse_string_streaming(String::new(), "urn:test#");
391        pin_mut!(stream);
392
393        let result = stream.next().await;
394        assert!(result.is_none(), "Empty input should produce no results");
395    }
396
397    #[tokio::test]
398    async fn test_streaming_parser_memory_efficiency() {
399        use futures::pin_mut;
400
401        // Test that streaming parser works with chunk size limits
402        let parser = StreamingParser::new()
403            .with_chunk_size(1024) // Small chunks
404            .with_max_buffer_size(10 * 1024); // Small buffer
405
406        let ttl_content = r#"
407@prefix samm: <urn:samm:org.eclipse.esmf.samm:meta-model:2.3.0#> .
408@prefix : <urn:samm:org.example:1.0.0#> .
409
410:SmallAspect a samm:Aspect ;
411    samm:preferredName "Small"@en ;
412    samm:description "Small test"@en ;
413    samm:properties () .
414        "#
415        .to_string();
416
417        let stream = parser.parse_string_streaming(ttl_content, "urn:samm:org.example:1.0.0#");
418        pin_mut!(stream);
419
420        // Should still work with small buffer
421        let result = stream.next().await;
422        assert!(result.is_some());
423    }
424}