oxirs_samm/parser/
streaming.rs1use crate::error::{Result, SammError};
42use crate::metamodel::Aspect;
43use crate::parser::SammTurtleParser;
44use futures::stream::{Stream, StreamExt};
45use std::path::Path;
46use std::pin::Pin;
47use tokio::fs::File;
48use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
49
50const DEFAULT_CHUNK_SIZE: usize = 64 * 1024;
52
53const MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
55
56pub struct StreamingParser {
58 chunk_size: usize,
60 max_buffer_size: usize,
62 base_uri: Option<String>,
64}
65
66impl StreamingParser {
67 pub fn new() -> Self {
69 Self {
70 chunk_size: DEFAULT_CHUNK_SIZE,
71 max_buffer_size: MAX_BUFFER_SIZE,
72 base_uri: None,
73 }
74 }
75
76 pub fn with_chunk_size(mut self, size: usize) -> Self {
83 self.chunk_size = size.max(1024); self
85 }
86
87 pub fn with_max_buffer_size(mut self, size: usize) -> Self {
94 self.max_buffer_size = size;
95 self
96 }
97
98 pub fn with_base_uri(mut self, base_uri: impl Into<String>) -> Self {
100 self.base_uri = Some(base_uri.into());
101 self
102 }
103
104 pub async fn parse_file_streaming<P: AsRef<Path>>(
118 &mut self,
119 path: P,
120 ) -> Result<impl Stream<Item = Result<Aspect>>> {
121 let file = File::open(path.as_ref())
122 .await
123 .map_err(|e| SammError::ParseError(format!("Failed to open file: {}", e)))?;
124
125 let base_uri = self
126 .base_uri
127 .clone()
128 .unwrap_or_else(|| format!("file://{}", path.as_ref().to_string_lossy()));
129
130 Ok(self.create_stream(file, base_uri))
131 }
132
133 pub fn parse_reader_streaming<R>(
137 &mut self,
138 reader: R,
139 base_uri: impl Into<String>,
140 ) -> impl Stream<Item = Result<Aspect>>
141 where
142 R: AsyncReadExt + Unpin + Send + 'static,
143 {
144 self.create_stream(reader, base_uri.into())
145 }
146
147 fn create_stream<R>(&self, reader: R, base_uri: String) -> impl Stream<Item = Result<Aspect>>
149 where
150 R: AsyncReadExt + Unpin + Send + 'static,
151 {
152 let chunk_size = self.chunk_size;
153 let max_buffer_size = self.max_buffer_size;
154
155 async_stream::stream! {
156 let mut reader = BufReader::with_capacity(chunk_size, reader);
157 let mut buffer = String::new();
158
159 loop {
161 let mut line = String::new();
162 match reader.read_line(&mut line).await {
163 Ok(0) => {
164 if !buffer.is_empty() {
166 match try_parse_buffer(&buffer, &base_uri).await {
167 Ok(Some(aspect)) => yield Ok(aspect),
168 Ok(None) => {}, Err(e) => yield Err(e),
170 }
171 }
172 break;
173 }
174 Ok(_) => {
175 buffer.push_str(&line);
176
177 if line.trim().ends_with('.') || buffer.len() > max_buffer_size {
180 match try_parse_buffer(&buffer, &base_uri).await {
181 Ok(Some(aspect)) => {
182 yield Ok(aspect);
183 buffer.clear();
184 }
185 Ok(None) => {
186 if buffer.len() > max_buffer_size {
188 tracing::warn!(
190 "Buffer exceeded max size ({}MB), clearing incomplete document",
191 buffer.len() / 1024 / 1024
192 );
193 buffer.clear();
194 yield Err(SammError::ParseError(
195 "Document too large for streaming parser".to_string()
196 ));
197 }
198 }
199 Err(e) => {
200 tracing::debug!("Parse error in streaming: {}", e);
202 buffer.clear();
203 }
204 }
205 }
206 }
207 Err(e) => {
208 yield Err(SammError::ParseError(format!("Read error: {}", e)));
209 break;
210 }
211 }
212 }
213 }
214 }
215
216 pub fn parse_string_streaming(
221 &self,
222 content: String,
223 base_uri: impl Into<String>,
224 ) -> impl Stream<Item = Result<Aspect>> {
225 let base_uri = base_uri.into();
226 let lines: Vec<String> = content.lines().map(String::from).collect();
227
228 async_stream::stream! {
229 let mut buffer = String::new();
230 let mut blank_line_count = 0;
231
232 for line in lines {
233 let trimmed = line.trim();
234
235 if trimmed.is_empty() {
237 blank_line_count += 1;
238 } else {
239 blank_line_count = 0;
240 }
241
242 buffer.push_str(&line);
243 buffer.push('\n');
244
245 if (blank_line_count >= 2 || buffer.len() > 10000) && !buffer.trim().is_empty() {
249 match try_parse_buffer(&buffer, &base_uri).await {
250 Ok(Some(aspect)) => {
251 yield Ok(aspect);
252 buffer.clear();
253 blank_line_count = 0;
254 }
255 Ok(None) => {
256 }
258 Err(e) => {
259 if buffer.len() > 100000 {
261 tracing::debug!("Clearing large buffer after parse error: {}", e);
262 buffer.clear();
263 blank_line_count = 0;
264 }
265 }
267 }
268 }
269 }
270
271 if !buffer.is_empty() {
273 match try_parse_buffer(&buffer, &base_uri).await {
274 Ok(Some(aspect)) => yield Ok(aspect),
275 Ok(None) => {},
276 Err(e) => yield Err(e),
277 }
278 }
279 }
280 }
281
282 pub fn config_summary(&self) -> String {
284 format!(
285 "StreamingParser {{ chunk_size: {}KB, max_buffer: {}MB, base_uri: {} }}",
286 self.chunk_size / 1024,
287 self.max_buffer_size / 1024 / 1024,
288 self.base_uri.as_deref().unwrap_or("auto")
289 )
290 }
291}
292
293impl Default for StreamingParser {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299async fn try_parse_buffer(content: &str, base_uri: &str) -> Result<Option<Aspect>> {
301 let has_prefix = content.contains("@prefix");
303 let ends_properly = content.trim_end().ends_with('.');
304
305 if !has_prefix || !ends_properly {
306 return Ok(None); }
308
309 let mut parser = SammTurtleParser::new();
311 match parser.parse_string(content, base_uri).await {
312 Ok(aspect) => Ok(Some(aspect)),
313 Err(e) => {
314 if content.len() < 100 {
317 Ok(None) } else {
319 Err(e)
320 }
321 }
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328 use crate::metamodel::ModelElement;
329 use futures::StreamExt;
330
331 #[tokio::test]
332 async fn test_streaming_parser_string() {
333 use futures::pin_mut;
334
335 let ttl_content = r#"
336@prefix samm: <urn:samm:org.eclipse.esmf.samm:meta-model:2.3.0#> .
337@prefix : <urn:samm:org.example:1.0.0#> .
338
339:TestAspect a samm:Aspect ;
340 samm:preferredName "Test Aspect"@en ;
341 samm:description "A test aspect"@en ;
342 samm:properties ( :testProperty ) .
343
344:testProperty a samm:Property ;
345 samm:preferredName "Test Property"@en ;
346 samm:description "Test description"@en ;
347 samm:characteristic :TestCharacteristic .
348
349:TestCharacteristic a samm:Characteristic ;
350 samm:dataType <http://www.w3.org/2001/XMLSchema#string> .
351 "#
352 .to_string();
353
354 let parser = StreamingParser::new();
355 let stream = parser.parse_string_streaming(ttl_content, "urn:samm:org.example:1.0.0#");
356 pin_mut!(stream);
357
358 let mut count = 0;
359 while let Some(result) = stream.next().await {
360 match result {
361 Ok(aspect) => {
362 assert_eq!(aspect.name(), "TestAspect");
363 count += 1;
364 }
365 Err(e) => panic!("Unexpected error: {}", e),
366 }
367 }
368
369 assert!(count > 0, "Should have parsed at least one aspect");
370 }
371
372 #[tokio::test]
373 async fn test_streaming_parser_config() {
374 let parser = StreamingParser::new()
375 .with_chunk_size(128 * 1024)
376 .with_max_buffer_size(32 * 1024 * 1024)
377 .with_base_uri("urn:test#");
378
379 let summary = parser.config_summary();
380 assert!(summary.contains("128KB"));
381 assert!(summary.contains("32MB"));
382 assert!(summary.contains("urn:test#"));
383 }
384
385 #[tokio::test]
386 async fn test_streaming_parser_empty_input() {
387 use futures::pin_mut;
388
389 let parser = StreamingParser::new();
390 let stream = parser.parse_string_streaming(String::new(), "urn:test#");
391 pin_mut!(stream);
392
393 let result = stream.next().await;
394 assert!(result.is_none(), "Empty input should produce no results");
395 }
396
397 #[tokio::test]
398 async fn test_streaming_parser_memory_efficiency() {
399 use futures::pin_mut;
400
401 let parser = StreamingParser::new()
403 .with_chunk_size(1024) .with_max_buffer_size(10 * 1024); let ttl_content = r#"
407@prefix samm: <urn:samm:org.eclipse.esmf.samm:meta-model:2.3.0#> .
408@prefix : <urn:samm:org.example:1.0.0#> .
409
410:SmallAspect a samm:Aspect ;
411 samm:preferredName "Small"@en ;
412 samm:description "Small test"@en ;
413 samm:properties () .
414 "#
415 .to_string();
416
417 let stream = parser.parse_string_streaming(ttl_content, "urn:samm:org.example:1.0.0#");
418 pin_mut!(stream);
419
420 let result = stream.next().await;
422 assert!(result.is_some());
423 }
424}