Skip to main content

oxirs_core/parser/
async_parser.rs

1//! Async RDF streaming parser for high-performance large file processing
2
3use super::{Parser, ParserConfig, RdfFormat};
4use crate::model::Quad;
5use crate::{OxirsError, Result};
6use std::future::Future;
7use std::pin::Pin;
8
9/// Async RDF streaming parser for high-performance large file processing
10#[cfg(feature = "async")]
11pub struct AsyncStreamingParser {
12    format: RdfFormat,
13    config: ParserConfig,
14    progress_callback: Option<Box<dyn Fn(usize) + Send + Sync>>,
15    chunk_size: usize,
16}
17
18#[cfg(feature = "async")]
19impl AsyncStreamingParser {
20    /// Create a new async streaming parser
21    pub fn new(format: RdfFormat) -> Self {
22        AsyncStreamingParser {
23            format,
24            config: ParserConfig::default(),
25            progress_callback: None,
26            chunk_size: 8192, // 8KB default chunk size
27        }
28    }
29
30    /// Set a progress callback that reports the number of bytes processed
31    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
32    where
33        F: Fn(usize) + Send + Sync + 'static,
34    {
35        self.progress_callback = Some(Box::new(callback));
36        self
37    }
38
39    /// Set the chunk size for streaming processing
40    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
41        self.chunk_size = chunk_size;
42        self
43    }
44
45    /// Configure error tolerance
46    pub fn with_error_tolerance(mut self, ignore_errors: bool) -> Self {
47        self.config.ignore_errors = ignore_errors;
48        self
49    }
50
51    /// Parse from an async readable stream
52    pub async fn parse_stream<R, F, Fut>(&self, mut reader: R, mut handler: F) -> Result<()>
53    where
54        R: tokio::io::AsyncRead + Unpin,
55        F: FnMut(Quad) -> Fut,
56        Fut: Future<Output = Result<()>>,
57    {
58        use tokio::io::AsyncReadExt;
59
60        let mut buffer = Vec::with_capacity(self.chunk_size);
61        let mut accumulated_data = String::new();
62        let mut bytes_processed = 0usize;
63        let mut line_buffer = String::new();
64
65        loop {
66            buffer.clear();
67            buffer.resize(self.chunk_size, 0);
68
69            let bytes_read = reader.read(&mut buffer).await?;
70
71            if bytes_read == 0 {
72                break; // End of stream
73            }
74
75            buffer.truncate(bytes_read);
76            bytes_processed += bytes_read;
77
78            // Convert bytes to string and append to accumulated data
79            let chunk_str = String::from_utf8_lossy(&buffer);
80            accumulated_data.push_str(&chunk_str);
81
82            // Process complete lines for line-based formats (N-Triples, N-Quads)
83            if matches!(self.format, RdfFormat::NTriples | RdfFormat::NQuads) {
84                self.process_lines_async(&mut accumulated_data, &mut line_buffer, &mut handler)
85                    .await?;
86            }
87
88            // Report progress if callback is set
89            if let Some(ref callback) = self.progress_callback {
90                callback(bytes_processed);
91            }
92        }
93
94        // Process any remaining data
95        if !accumulated_data.is_empty() {
96            match self.format {
97                RdfFormat::NTriples | RdfFormat::NQuads => {
98                    // Process final lines
99                    accumulated_data.push_str(&line_buffer);
100                    self.process_lines_async(
101                        &mut accumulated_data,
102                        &mut String::new(),
103                        &mut handler,
104                    )
105                    .await?;
106                }
107                _ => {
108                    // For other formats, parse the complete document
109                    let parser = Parser::with_config(self.format, self.config.clone());
110                    parser.parse_str_with_handler(&accumulated_data, |quad| {
111                        // Convert sync closure to async - this is a simplified approach
112                        // In a real implementation, you'd want to use proper async handling
113                        tokio::task::block_in_place(|| {
114                            tokio::runtime::Handle::current().block_on(handler(quad))
115                        })
116                    })?;
117                }
118            }
119        }
120
121        Ok(())
122    }
123
124    /// Process lines asynchronously for line-based formats
125    async fn process_lines_async<F, Fut>(
126        &self,
127        accumulated_data: &mut String,
128        line_buffer: &mut String,
129        handler: &mut F,
130    ) -> Result<()>
131    where
132        F: FnMut(Quad) -> Fut,
133        Fut: Future<Output = Result<()>>,
134    {
135        // Combine line buffer with new data
136        let mut full_data = line_buffer.clone();
137        full_data.push_str(accumulated_data);
138
139        let mut last_newline_pos = 0;
140
141        // Find complete lines
142        for (pos, _) in full_data.match_indices('\n') {
143            let line = &full_data[last_newline_pos..pos];
144            last_newline_pos = pos + 1;
145
146            // Parse the line
147            if let Some(quad) = self.parse_line(line)? {
148                handler(quad).await?;
149            }
150        }
151
152        // Keep incomplete line for next iteration
153        line_buffer.clear();
154        if last_newline_pos < full_data.len() {
155            line_buffer.push_str(&full_data[last_newline_pos..]);
156        }
157
158        accumulated_data.clear();
159        Ok(())
160    }
161
162    /// Parse a single line (for N-Triples/N-Quads)
163    fn parse_line(&self, line: &str) -> Result<Option<Quad>> {
164        let parser = Parser::with_config(self.format, self.config.clone());
165
166        match self.format {
167            RdfFormat::NTriples => parser.parse_ntriples_line(line),
168            RdfFormat::NQuads => {
169                // For N-Quads, we need a more sophisticated parser
170                // This is a simplified implementation
171                parser.parse_ntriples_line(line)
172            }
173            _ => Err(OxirsError::Parse(
174                "Unsupported format for line parsing".to_string(),
175            )),
176        }
177    }
178
179    /// Parse from bytes asynchronously
180    pub async fn parse_bytes<F, Fut>(&self, data: &[u8], handler: F) -> Result<()>
181    where
182        F: FnMut(Quad) -> Fut,
183        Fut: Future<Output = Result<()>>,
184    {
185        use std::io::Cursor;
186        let cursor = Cursor::new(data);
187        self.parse_stream(cursor, handler).await
188    }
189
190    /// Parse from string asynchronously
191    pub async fn parse_str_async<F, Fut>(&self, data: &str, handler: F) -> Result<()>
192    where
193        F: FnMut(Quad) -> Fut,
194        Fut: Future<Output = Result<()>>,
195    {
196        let bytes = data.as_bytes();
197        self.parse_bytes(bytes, handler).await
198    }
199
200    /// Convenience method to parse to a vector asynchronously
201    pub async fn parse_str_to_quads_async(&self, data: &str) -> Result<Vec<Quad>> {
202        use std::sync::Arc;
203        use tokio::sync::Mutex;
204
205        let quads = Arc::new(Mutex::new(Vec::new()));
206        let quads_clone = Arc::clone(&quads);
207
208        self.parse_str_async(data, move |quad| {
209            let quads = Arc::clone(&quads_clone);
210            async move {
211                quads.lock().await.push(quad);
212                Ok(())
213            }
214        })
215        .await?;
216
217        let result = quads.lock().await;
218        Ok(result.clone())
219    }
220}
221
222/// Progress information for async parsing
223#[cfg(feature = "async")]
224#[derive(Debug, Clone)]
225pub struct ParseProgress {
226    pub bytes_processed: usize,
227    pub quads_parsed: usize,
228    pub errors_encountered: usize,
229    pub estimated_total_bytes: Option<usize>,
230}
231
232#[cfg(feature = "async")]
233impl ParseProgress {
234    /// Calculate completion percentage if total size is known
235    pub fn completion_percentage(&self) -> Option<f64> {
236        self.estimated_total_bytes.map(|total| {
237            if total == 0 {
238                100.0
239            } else {
240                (self.bytes_processed as f64 / total as f64) * 100.0
241            }
242        })
243    }
244}
245
246/// Async streaming sink for writing parsed RDF data
247#[cfg(feature = "async")]
248pub trait AsyncRdfSink: Send + Sync {
249    /// Process a parsed quad asynchronously
250    fn process_quad(&mut self, quad: Quad)
251        -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
252
253    /// Finalize processing (called when parsing is complete)
254    fn finalize(&mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
255}
256
257/// Memory-based async sink that collects quads
258#[cfg(feature = "async")]
259pub struct MemoryAsyncSink {
260    quads: Vec<Quad>,
261}
262
263#[cfg(feature = "async")]
264impl MemoryAsyncSink {
265    pub fn new() -> Self {
266        MemoryAsyncSink { quads: Vec::new() }
267    }
268
269    pub fn into_quads(self) -> Vec<Quad> {
270        self.quads
271    }
272}
273
274#[cfg(feature = "async")]
275impl Default for MemoryAsyncSink {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281#[cfg(feature = "async")]
282impl AsyncRdfSink for MemoryAsyncSink {
283    fn process_quad(
284        &mut self,
285        quad: Quad,
286    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
287        Box::pin(async move {
288            self.quads.push(quad);
289            Ok(())
290        })
291    }
292
293    fn finalize(&mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
294        Box::pin(async move { Ok(()) })
295    }
296}