oxirs_core/parser/
async_parser.rs1use super::{Parser, ParserConfig, RdfFormat};
4use crate::model::Quad;
5use crate::{OxirsError, Result};
6use std::future::Future;
7use std::pin::Pin;
8
9#[cfg(feature = "async")]
11pub struct AsyncStreamingParser {
12 format: RdfFormat,
13 config: ParserConfig,
14 progress_callback: Option<Box<dyn Fn(usize) + Send + Sync>>,
15 chunk_size: usize,
16}
17
18#[cfg(feature = "async")]
19impl AsyncStreamingParser {
20 pub fn new(format: RdfFormat) -> Self {
22 AsyncStreamingParser {
23 format,
24 config: ParserConfig::default(),
25 progress_callback: None,
26 chunk_size: 8192, }
28 }
29
30 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
32 where
33 F: Fn(usize) + Send + Sync + 'static,
34 {
35 self.progress_callback = Some(Box::new(callback));
36 self
37 }
38
39 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
41 self.chunk_size = chunk_size;
42 self
43 }
44
45 pub fn with_error_tolerance(mut self, ignore_errors: bool) -> Self {
47 self.config.ignore_errors = ignore_errors;
48 self
49 }
50
51 pub async fn parse_stream<R, F, Fut>(&self, mut reader: R, mut handler: F) -> Result<()>
53 where
54 R: tokio::io::AsyncRead + Unpin,
55 F: FnMut(Quad) -> Fut,
56 Fut: Future<Output = Result<()>>,
57 {
58 use tokio::io::AsyncReadExt;
59
60 let mut buffer = Vec::with_capacity(self.chunk_size);
61 let mut accumulated_data = String::new();
62 let mut bytes_processed = 0usize;
63 let mut line_buffer = String::new();
64
65 loop {
66 buffer.clear();
67 buffer.resize(self.chunk_size, 0);
68
69 let bytes_read = reader.read(&mut buffer).await?;
70
71 if bytes_read == 0 {
72 break; }
74
75 buffer.truncate(bytes_read);
76 bytes_processed += bytes_read;
77
78 let chunk_str = String::from_utf8_lossy(&buffer);
80 accumulated_data.push_str(&chunk_str);
81
82 if matches!(self.format, RdfFormat::NTriples | RdfFormat::NQuads) {
84 self.process_lines_async(&mut accumulated_data, &mut line_buffer, &mut handler)
85 .await?;
86 }
87
88 if let Some(ref callback) = self.progress_callback {
90 callback(bytes_processed);
91 }
92 }
93
94 if !accumulated_data.is_empty() {
96 match self.format {
97 RdfFormat::NTriples | RdfFormat::NQuads => {
98 accumulated_data.push_str(&line_buffer);
100 self.process_lines_async(
101 &mut accumulated_data,
102 &mut String::new(),
103 &mut handler,
104 )
105 .await?;
106 }
107 _ => {
108 let parser = Parser::with_config(self.format, self.config.clone());
110 parser.parse_str_with_handler(&accumulated_data, |quad| {
111 tokio::task::block_in_place(|| {
114 tokio::runtime::Handle::current().block_on(handler(quad))
115 })
116 })?;
117 }
118 }
119 }
120
121 Ok(())
122 }
123
124 async fn process_lines_async<F, Fut>(
126 &self,
127 accumulated_data: &mut String,
128 line_buffer: &mut String,
129 handler: &mut F,
130 ) -> Result<()>
131 where
132 F: FnMut(Quad) -> Fut,
133 Fut: Future<Output = Result<()>>,
134 {
135 let mut full_data = line_buffer.clone();
137 full_data.push_str(accumulated_data);
138
139 let mut last_newline_pos = 0;
140
141 for (pos, _) in full_data.match_indices('\n') {
143 let line = &full_data[last_newline_pos..pos];
144 last_newline_pos = pos + 1;
145
146 if let Some(quad) = self.parse_line(line)? {
148 handler(quad).await?;
149 }
150 }
151
152 line_buffer.clear();
154 if last_newline_pos < full_data.len() {
155 line_buffer.push_str(&full_data[last_newline_pos..]);
156 }
157
158 accumulated_data.clear();
159 Ok(())
160 }
161
162 fn parse_line(&self, line: &str) -> Result<Option<Quad>> {
164 let parser = Parser::with_config(self.format, self.config.clone());
165
166 match self.format {
167 RdfFormat::NTriples => parser.parse_ntriples_line(line),
168 RdfFormat::NQuads => {
169 parser.parse_ntriples_line(line)
172 }
173 _ => Err(OxirsError::Parse(
174 "Unsupported format for line parsing".to_string(),
175 )),
176 }
177 }
178
179 pub async fn parse_bytes<F, Fut>(&self, data: &[u8], handler: F) -> Result<()>
181 where
182 F: FnMut(Quad) -> Fut,
183 Fut: Future<Output = Result<()>>,
184 {
185 use std::io::Cursor;
186 let cursor = Cursor::new(data);
187 self.parse_stream(cursor, handler).await
188 }
189
190 pub async fn parse_str_async<F, Fut>(&self, data: &str, handler: F) -> Result<()>
192 where
193 F: FnMut(Quad) -> Fut,
194 Fut: Future<Output = Result<()>>,
195 {
196 let bytes = data.as_bytes();
197 self.parse_bytes(bytes, handler).await
198 }
199
200 pub async fn parse_str_to_quads_async(&self, data: &str) -> Result<Vec<Quad>> {
202 use std::sync::Arc;
203 use tokio::sync::Mutex;
204
205 let quads = Arc::new(Mutex::new(Vec::new()));
206 let quads_clone = Arc::clone(&quads);
207
208 self.parse_str_async(data, move |quad| {
209 let quads = Arc::clone(&quads_clone);
210 async move {
211 quads.lock().await.push(quad);
212 Ok(())
213 }
214 })
215 .await?;
216
217 let result = quads.lock().await;
218 Ok(result.clone())
219 }
220}
221
222#[cfg(feature = "async")]
224#[derive(Debug, Clone)]
225pub struct ParseProgress {
226 pub bytes_processed: usize,
227 pub quads_parsed: usize,
228 pub errors_encountered: usize,
229 pub estimated_total_bytes: Option<usize>,
230}
231
232#[cfg(feature = "async")]
233impl ParseProgress {
234 pub fn completion_percentage(&self) -> Option<f64> {
236 self.estimated_total_bytes.map(|total| {
237 if total == 0 {
238 100.0
239 } else {
240 (self.bytes_processed as f64 / total as f64) * 100.0
241 }
242 })
243 }
244}
245
246#[cfg(feature = "async")]
248pub trait AsyncRdfSink: Send + Sync {
249 fn process_quad(&mut self, quad: Quad)
251 -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
252
253 fn finalize(&mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
255}
256
257#[cfg(feature = "async")]
259pub struct MemoryAsyncSink {
260 quads: Vec<Quad>,
261}
262
263#[cfg(feature = "async")]
264impl MemoryAsyncSink {
265 pub fn new() -> Self {
266 MemoryAsyncSink { quads: Vec::new() }
267 }
268
269 pub fn into_quads(self) -> Vec<Quad> {
270 self.quads
271 }
272}
273
274#[cfg(feature = "async")]
275impl Default for MemoryAsyncSink {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281#[cfg(feature = "async")]
282impl AsyncRdfSink for MemoryAsyncSink {
283 fn process_quad(
284 &mut self,
285 quad: Quad,
286 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
287 Box::pin(async move {
288 self.quads.push(quad);
289 Ok(())
290 })
291 }
292
293 fn finalize(&mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
294 Box::pin(async move { Ok(()) })
295 }
296}