Skip to main content

oxirs_core/io/
async_streaming.rs

1//! Async streaming support for RDF parsing and serialization
2//!
3//! This module provides async/await compatible streaming interfaces for RDF data processing,
4//! with support for backpressure, progress reporting, and cancellation.
5//!
6//! # Examples
7//!
8//! ## Async Parsing with Progress Reporting
9//!
10//! ```no_run
11//! use oxirs_core::io::{AsyncRdfParser, AsyncStreamingParser, AsyncStreamingConfig};
12//! use oxirs_core::parser::RdfFormat;
13//! use tokio::fs::File;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
17//!     let file = File::open("data.nt").await?;
18//!     let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
19//!     
20//!     let config = AsyncStreamingConfig {
21//!         chunk_size: 65536,  // 64KB chunks
22//!         ..Default::default()
23//!     };
24//!     
25//!     let progress = Box::new(|p: &oxirs_core::io::StreamingProgress| {
26//!         println!("Parsed {} quads ({} bytes)", p.items_processed, p.bytes_processed);
27//!     });
28//!     
29//!     let quads = parser.parse_async(file, config, Some(progress), None).await?;
30//!     println!("Total: {} quads", quads.len());
31//!     Ok(())
32//! }
33//! ```
34//!
35//! ## Async Serialization with Cancellation
36//!
37//! ```no_run
38//! use oxirs_core::io::{AsyncRdfSerializer, AsyncStreamingSerializer, AsyncStreamingConfig};
39//! use oxirs_core::parser::RdfFormat;
40//! use oxirs_core::model::*;
41//! use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
42//!
43//! #[tokio::main]
44//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
45//!     let serializer = AsyncStreamingSerializer::new(RdfFormat::NTriples);
46//!     let cancel_token = Arc::new(AtomicBool::new(false));
47//!     
48//!     // Create some quads
49//!     let quads = vec![
50//!         // ... your quads here
51//!     ];
52//!     
53//!     let mut output = Vec::new();
54//!     serializer.serialize_quads_async(
55//!         &mut output,
56//!         quads.into_iter(),
57//!         AsyncStreamingConfig::default(),
58//!         None,
59//!         Some(cancel_token.clone()),
60//!     ).await?;
61//!     
62//!     Ok(())
63//! }
64//! ```
65
66use crate::{
67    model::{Quad, Triple},
68    parser::{Parser, ParserConfig, RdfFormat},
69    serializer::Serializer,
70    OxirsError, Result,
71};
72use futures::future::BoxFuture;
73use std::pin::Pin;
74use std::sync::{
75    atomic::{AtomicBool, Ordering},
76    Arc,
77};
78use std::task::{Context, Poll};
79use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
80
81/// Progress information for async operations
82#[derive(Debug, Clone)]
83pub struct StreamingProgress {
84    /// Total bytes processed
85    pub bytes_processed: usize,
86    /// Total items (quads/triples) processed
87    pub items_processed: usize,
88    /// Errors encountered (if error tolerance is enabled)
89    pub errors_encountered: usize,
90    /// Estimated total bytes (if known)
91    pub total_bytes: Option<usize>,
92    /// Processing rate (items per second)
93    pub items_per_second: Option<f64>,
94}
95
96impl StreamingProgress {
97    /// Create new progress info
98    pub fn new() -> Self {
99        StreamingProgress {
100            bytes_processed: 0,
101            items_processed: 0,
102            errors_encountered: 0,
103            total_bytes: None,
104            items_per_second: None,
105        }
106    }
107
108    /// Calculate completion percentage if total size is known
109    pub fn completion_percentage(&self) -> Option<f64> {
110        self.total_bytes.map(|total| {
111            if total == 0 {
112                100.0
113            } else {
114                (self.bytes_processed as f64 / total as f64) * 100.0
115            }
116        })
117    }
118}
119
120impl Default for StreamingProgress {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126/// Callback for progress reporting
127pub type ProgressCallback = Box<dyn Fn(&StreamingProgress) + Send + Sync>;
128
129/// Configuration for async streaming operations
130#[derive(Clone)]
131pub struct AsyncStreamingConfig {
132    /// Size of chunks to read/write at a time
133    pub chunk_size: usize,
134    /// Size of the internal buffer
135    pub buffer_size: usize,
136    /// Whether to continue on parse errors
137    pub ignore_errors: bool,
138    /// Maximum number of errors before stopping
139    pub max_errors: Option<usize>,
140    /// Base IRI for resolving relative IRIs
141    pub base_iri: Option<String>,
142}
143
144impl Default for AsyncStreamingConfig {
145    fn default() -> Self {
146        AsyncStreamingConfig {
147            chunk_size: 8192,   // 8KB chunks
148            buffer_size: 65536, // 64KB buffer
149            ignore_errors: false,
150            max_errors: None,
151            base_iri: None,
152        }
153    }
154}
155
156/// Trait for async RDF parsing with streaming input
157pub trait AsyncRdfParser: Send + Sync {
158    /// Parse RDF data from an async reader
159    fn parse_async<'a, R>(
160        &'a self,
161        reader: R,
162        config: AsyncStreamingConfig,
163        progress: Option<ProgressCallback>,
164        cancel_token: Option<Arc<AtomicBool>>,
165    ) -> BoxFuture<'a, Result<Vec<Quad>>>
166    where
167        R: AsyncRead + Unpin + Send + 'a;
168
169    /// Parse RDF data from an async reader with a custom handler
170    fn parse_with_handler_async<'a, R, F, Fut>(
171        &'a self,
172        reader: R,
173        handler: F,
174        config: AsyncStreamingConfig,
175        progress: Option<ProgressCallback>,
176        cancel_token: Option<Arc<AtomicBool>>,
177    ) -> BoxFuture<'a, Result<()>>
178    where
179        R: AsyncRead + Unpin + Send + 'a,
180        F: FnMut(Quad) -> Fut + Send + 'a,
181        Fut: std::future::Future<Output = Result<()>> + Send + 'a;
182}
183
184/// Trait for async RDF serialization with streaming output
185pub trait AsyncRdfSerializer: Send + Sync {
186    /// Serialize quads to an async writer
187    fn serialize_quads_async<'a, W, I>(
188        &'a self,
189        writer: W,
190        quads: I,
191        config: AsyncStreamingConfig,
192        progress: Option<ProgressCallback>,
193        cancel_token: Option<Arc<AtomicBool>>,
194    ) -> BoxFuture<'a, Result<()>>
195    where
196        W: AsyncWrite + Unpin + Send + 'a,
197        I: Iterator<Item = Quad> + Send + 'a;
198
199    /// Serialize triples to an async writer
200    fn serialize_triples_async<'a, W, I>(
201        &'a self,
202        writer: W,
203        triples: I,
204        config: AsyncStreamingConfig,
205        progress: Option<ProgressCallback>,
206        cancel_token: Option<Arc<AtomicBool>>,
207    ) -> BoxFuture<'a, Result<()>>
208    where
209        W: AsyncWrite + Unpin + Send + 'a,
210        I: Iterator<Item = Triple> + Send + 'a;
211}
212
213/// Async streaming parser implementation
214pub struct AsyncStreamingParser {
215    format: RdfFormat,
216}
217
218impl AsyncStreamingParser {
219    /// Create a new async streaming parser
220    pub fn new(format: RdfFormat) -> Self {
221        AsyncStreamingParser { format }
222    }
223
224    /// Check if cancellation was requested
225    fn check_cancelled(cancel_token: &Option<Arc<AtomicBool>>) -> Result<()> {
226        if let Some(token) = cancel_token {
227            if token.load(Ordering::Relaxed) {
228                return Err(std::io::Error::new(
229                    std::io::ErrorKind::Interrupted,
230                    "Operation cancelled",
231                )
232                .into());
233            }
234        }
235        Ok(())
236    }
237
238    /// Report progress
239    fn report_progress(
240        progress_callback: &Option<ProgressCallback>,
241        progress_info: &StreamingProgress,
242    ) {
243        if let Some(callback) = progress_callback {
244            callback(progress_info);
245        }
246    }
247
248    /// Parse line-based formats (N-Triples, N-Quads)
249    async fn parse_line_based<R, F, Fut>(
250        &self,
251        mut reader: R,
252        mut handler: F,
253        config: AsyncStreamingConfig,
254        progress_callback: Option<ProgressCallback>,
255        cancel_token: Option<Arc<AtomicBool>>,
256    ) -> Result<()>
257    where
258        R: AsyncRead + Unpin + Send,
259        F: FnMut(Quad) -> Fut + Send,
260        Fut: std::future::Future<Output = Result<()>> + Send,
261    {
262        let parser_config = ParserConfig {
263            base_iri: config.base_iri.clone(),
264            ignore_errors: config.ignore_errors,
265            max_errors: config.max_errors,
266        };
267        let parser = Parser::with_config(self.format, parser_config);
268
269        let mut buffer = vec![0u8; config.chunk_size];
270        let mut accumulated = Vec::new();
271        let mut line_buffer = String::new();
272        let mut progress = StreamingProgress::new();
273        let start_time = std::time::Instant::now();
274
275        loop {
276            Self::check_cancelled(&cancel_token)?;
277
278            let bytes_read = reader.read(&mut buffer).await?;
279            if bytes_read == 0 {
280                break; // EOF
281            }
282
283            progress.bytes_processed += bytes_read;
284            accumulated.extend_from_slice(&buffer[..bytes_read]);
285
286            // Process complete lines
287            while let Some(newline_pos) = accumulated.iter().position(|&b| b == b'\n') {
288                let line_bytes = accumulated.drain(..=newline_pos).collect::<Vec<_>>();
289
290                // Convert to string, handling UTF-8 errors gracefully
291                match String::from_utf8(line_bytes) {
292                    Ok(mut line) => {
293                        // Remove newline
294                        if line.ends_with('\n') {
295                            line.pop();
296                            if line.ends_with('\r') {
297                                line.pop();
298                            }
299                        }
300
301                        line_buffer.push_str(&line);
302
303                        // Try to parse the line
304                        match self.parse_single_line(&parser, &line_buffer) {
305                            Ok(Some(quad)) => {
306                                handler(quad).await?;
307                                progress.items_processed += 1;
308                                line_buffer.clear();
309                            }
310                            Ok(None) => {
311                                // Empty line or comment
312                                line_buffer.clear();
313                            }
314                            Err(e) => {
315                                if config.ignore_errors {
316                                    progress.errors_encountered += 1;
317                                    if let Some(max_errors) = config.max_errors {
318                                        if progress.errors_encountered >= max_errors {
319                                            return Err(e);
320                                        }
321                                    }
322                                    tracing::warn!("Parse error: {}", e);
323                                    line_buffer.clear();
324                                } else {
325                                    return Err(e);
326                                }
327                            }
328                        }
329                    }
330                    Err(e) => {
331                        if config.ignore_errors {
332                            progress.errors_encountered += 1;
333                            tracing::warn!("UTF-8 error: {}", e);
334                        } else {
335                            return Err(OxirsError::Parse(format!("UTF-8 error: {e}")));
336                        }
337                    }
338                }
339            }
340
341            // Calculate processing rate
342            let elapsed = start_time.elapsed().as_secs_f64();
343            if elapsed > 0.0 {
344                progress.items_per_second = Some(progress.items_processed as f64 / elapsed);
345            }
346
347            Self::report_progress(&progress_callback, &progress);
348        }
349
350        // Process any remaining data
351        if !accumulated.is_empty() {
352            match String::from_utf8(accumulated) {
353                Ok(remaining) => {
354                    line_buffer.push_str(&remaining);
355                    if !line_buffer.trim().is_empty() {
356                        if let Ok(Some(quad)) = self.parse_single_line(&parser, &line_buffer) {
357                            handler(quad).await?;
358                            progress.items_processed += 1;
359                        }
360                    }
361                }
362                Err(e) => {
363                    if !config.ignore_errors {
364                        return Err(OxirsError::Parse(format!("UTF-8 error: {e}")));
365                    }
366                }
367            }
368        }
369
370        Self::report_progress(&progress_callback, &progress);
371        Ok(())
372    }
373
374    /// Parse a single line for line-based formats
375    fn parse_single_line(&self, parser: &Parser, line: &str) -> Result<Option<Quad>> {
376        let line = line.trim();
377        if line.is_empty() || line.starts_with('#') {
378            return Ok(None);
379        }
380
381        match self.format {
382            RdfFormat::NTriples => parser.parse_ntriples_line(line),
383            RdfFormat::NQuads => parser.parse_nquads_line(line),
384            _ => Err(OxirsError::Parse(
385                "Format not supported for line-based parsing".to_string(),
386            )),
387        }
388    }
389
390    /// Parse document-based formats (Turtle, TriG, RDF/XML, JSON-LD)
391    async fn parse_document_based<R, F, Fut>(
392        &self,
393        mut reader: R,
394        mut handler: F,
395        config: AsyncStreamingConfig,
396        progress_callback: Option<ProgressCallback>,
397        cancel_token: Option<Arc<AtomicBool>>,
398    ) -> Result<()>
399    where
400        R: AsyncRead + Unpin + Send,
401        F: FnMut(Quad) -> Fut + Send,
402        Fut: std::future::Future<Output = Result<()>> + Send,
403    {
404        // For document-based formats, we need to read the entire document
405        let mut buffer = Vec::new();
406        let mut chunk = vec![0u8; config.chunk_size];
407        let mut progress = StreamingProgress::new();
408
409        loop {
410            Self::check_cancelled(&cancel_token)?;
411
412            let bytes_read = reader.read(&mut chunk).await?;
413            if bytes_read == 0 {
414                break;
415            }
416
417            buffer.extend_from_slice(&chunk[..bytes_read]);
418            progress.bytes_processed += bytes_read;
419
420            Self::report_progress(&progress_callback, &progress);
421        }
422
423        // Parse the complete document
424        let document = String::from_utf8(buffer)
425            .map_err(|e| OxirsError::Parse(format!("UTF-8 error: {e}")))?;
426
427        let parser_config = ParserConfig {
428            base_iri: config.base_iri,
429            ignore_errors: config.ignore_errors,
430            max_errors: config.max_errors,
431        };
432        let parser = Parser::with_config(self.format, parser_config);
433
434        // Parse the document and collect quads
435        let quads = parser.parse_str_to_quads(&document)?;
436
437        // Process each quad with the async handler
438        for quad in quads {
439            Self::check_cancelled(&cancel_token)?;
440            handler(quad).await?;
441            progress.items_processed += 1;
442        }
443
444        Self::report_progress(&progress_callback, &progress);
445        Ok(())
446    }
447}
448
449impl AsyncRdfParser for AsyncStreamingParser {
450    fn parse_async<'a, R>(
451        &'a self,
452        reader: R,
453        config: AsyncStreamingConfig,
454        progress: Option<ProgressCallback>,
455        cancel_token: Option<Arc<AtomicBool>>,
456    ) -> BoxFuture<'a, Result<Vec<Quad>>>
457    where
458        R: AsyncRead + Unpin + Send + 'a,
459    {
460        Box::pin(async move {
461            let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
462
463            // Spawn a task to collect quads
464            let collector = tokio::spawn(async move {
465                let mut quads = Vec::new();
466                while let Some(quad) = rx.recv().await {
467                    quads.push(quad);
468                }
469                quads
470            });
471
472            // Parse with handler that sends quads to the channel
473            let parse_result = self
474                .parse_with_handler_async(
475                    reader,
476                    |quad| {
477                        let tx = tx.clone();
478                        async move {
479                            tx.send(quad)
480                                .map_err(|_| OxirsError::Parse("Channel send error".to_string()))?;
481                            Ok(())
482                        }
483                    },
484                    config,
485                    progress,
486                    cancel_token,
487                )
488                .await;
489
490            // Close the channel
491            drop(tx);
492
493            // Check for parse errors
494            parse_result?;
495
496            // Collect the quads
497            let quads = collector
498                .await
499                .map_err(|_| OxirsError::Parse("Failed to collect quads".to_string()))?;
500            Ok(quads)
501        })
502    }
503
504    fn parse_with_handler_async<'a, R, F, Fut>(
505        &'a self,
506        reader: R,
507        handler: F,
508        config: AsyncStreamingConfig,
509        progress: Option<ProgressCallback>,
510        cancel_token: Option<Arc<AtomicBool>>,
511    ) -> BoxFuture<'a, Result<()>>
512    where
513        R: AsyncRead + Unpin + Send + 'a,
514        F: FnMut(Quad) -> Fut + Send + 'a,
515        Fut: std::future::Future<Output = Result<()>> + Send + 'a,
516    {
517        Box::pin(async move {
518            match self.format {
519                RdfFormat::NTriples | RdfFormat::NQuads => {
520                    self.parse_line_based(reader, handler, config, progress, cancel_token)
521                        .await
522                }
523                _ => {
524                    self.parse_document_based(reader, handler, config, progress, cancel_token)
525                        .await
526                }
527            }
528        })
529    }
530}
531
532/// Async streaming serializer implementation
533pub struct AsyncStreamingSerializer {
534    format: RdfFormat,
535}
536
537impl AsyncStreamingSerializer {
538    /// Create a new async streaming serializer
539    pub fn new(format: RdfFormat) -> Self {
540        AsyncStreamingSerializer { format }
541    }
542
543    /// Check if cancellation was requested
544    fn check_cancelled(cancel_token: &Option<Arc<AtomicBool>>) -> Result<()> {
545        if let Some(token) = cancel_token {
546            if token.load(Ordering::Relaxed) {
547                return Err(std::io::Error::new(
548                    std::io::ErrorKind::Interrupted,
549                    "Operation cancelled",
550                )
551                .into());
552            }
553        }
554        Ok(())
555    }
556
557    /// Serialize a single quad to a string
558    fn serialize_quad(&self, quad: &Quad) -> Result<String> {
559        let serializer = Serializer::new(self.format);
560        match self.format {
561            RdfFormat::NTriples => {
562                // For N-Triples, we only serialize if it's in the default graph
563                if quad.is_default_graph() {
564                    let mut graph = crate::model::Graph::new();
565                    graph.insert(quad.to_triple());
566                    serializer.serialize_graph(&graph)
567                } else {
568                    Ok(String::new())
569                }
570            }
571            RdfFormat::NQuads => serializer.serialize_quad_to_nquads(quad),
572            _ => Err(OxirsError::Serialize(
573                "Format not supported for streaming serialization".to_string(),
574            )),
575        }
576    }
577
578    /// Serialize a single triple to a string
579    #[allow(dead_code)]
580    fn serialize_triple(&self, triple: &Triple) -> Result<String> {
581        let quad = Quad::from_triple(triple.clone());
582        self.serialize_quad(&quad)
583    }
584}
585
586impl AsyncRdfSerializer for AsyncStreamingSerializer {
587    fn serialize_quads_async<'a, W, I>(
588        &'a self,
589        mut writer: W,
590        quads: I,
591        config: AsyncStreamingConfig,
592        progress: Option<ProgressCallback>,
593        cancel_token: Option<Arc<AtomicBool>>,
594    ) -> BoxFuture<'a, Result<()>>
595    where
596        W: AsyncWrite + Unpin + Send + 'a,
597        I: Iterator<Item = Quad> + Send + 'a,
598    {
599        Box::pin(async move {
600            let mut buffer = String::with_capacity(config.buffer_size);
601            let mut progress_info = StreamingProgress::new();
602            let start_time = std::time::Instant::now();
603
604            for quad in quads {
605                Self::check_cancelled(&cancel_token)?;
606
607                // Serialize the quad
608                let serialized = self.serialize_quad(&quad)?;
609                buffer.push_str(&serialized);
610                progress_info.items_processed += 1;
611
612                // Write buffer if it's getting full
613                if buffer.len() >= config.chunk_size {
614                    writer.write_all(buffer.as_bytes()).await?;
615                    progress_info.bytes_processed += buffer.len();
616                    buffer.clear();
617                }
618
619                // Update progress
620                let elapsed = start_time.elapsed().as_secs_f64();
621                if elapsed > 0.0 {
622                    progress_info.items_per_second =
623                        Some(progress_info.items_processed as f64 / elapsed);
624                }
625
626                if let Some(ref callback) = progress {
627                    callback(&progress_info);
628                }
629            }
630
631            // Write any remaining data
632            if !buffer.is_empty() {
633                writer.write_all(buffer.as_bytes()).await?;
634                progress_info.bytes_processed += buffer.len();
635
636                // Report final progress
637                if let Some(ref callback) = progress {
638                    callback(&progress_info);
639                }
640            }
641
642            // Flush the writer
643            writer.flush().await?;
644
645            Ok(())
646        })
647    }
648
649    fn serialize_triples_async<'a, W, I>(
650        &'a self,
651        writer: W,
652        triples: I,
653        config: AsyncStreamingConfig,
654        progress: Option<ProgressCallback>,
655        cancel_token: Option<Arc<AtomicBool>>,
656    ) -> BoxFuture<'a, Result<()>>
657    where
658        W: AsyncWrite + Unpin + Send + 'a,
659        I: Iterator<Item = Triple> + Send + 'a,
660    {
661        let quads = triples.map(Quad::from_triple);
662        self.serialize_quads_async(writer, quads, config, progress, cancel_token)
663    }
664}
665
666/// Buffered async reader with backpressure support
667pub struct BackpressureReader<R> {
668    inner: R,
669    buffer: Vec<u8>,
670    capacity: usize,
671    read_pos: usize,
672    write_pos: usize,
673}
674
675impl<R: AsyncRead + Unpin> BackpressureReader<R> {
676    /// Create a new backpressure reader with specified buffer capacity
677    pub fn new(inner: R, capacity: usize) -> Self {
678        BackpressureReader {
679            inner,
680            buffer: vec![0; capacity],
681            capacity,
682            read_pos: 0,
683            write_pos: 0,
684        }
685    }
686
687    /// Get the number of bytes available in the buffer
688    pub fn available(&self) -> usize {
689        self.write_pos - self.read_pos
690    }
691
692    /// Check if the buffer is full
693    pub fn is_full(&self) -> bool {
694        self.available() == self.capacity
695    }
696}
697
698impl<R: AsyncRead + Unpin> AsyncRead for BackpressureReader<R> {
699    fn poll_read(
700        self: Pin<&mut Self>,
701        cx: &mut Context<'_>,
702        buf: &mut tokio::io::ReadBuf<'_>,
703    ) -> Poll<std::io::Result<()>> {
704        let me = self.get_mut();
705
706        // If we have data in the buffer, return it
707        if me.available() > 0 {
708            let to_read = std::cmp::min(buf.remaining(), me.available());
709            buf.put_slice(&me.buffer[me.read_pos..me.read_pos + to_read]);
710            me.read_pos += to_read;
711
712            // Reset positions if buffer is empty
713            if me.read_pos == me.write_pos {
714                me.read_pos = 0;
715                me.write_pos = 0;
716            }
717
718            return Poll::Ready(Ok(()));
719        }
720
721        // Otherwise, try to fill the buffer
722        let write_pos = me.write_pos;
723        let mut read_buf = tokio::io::ReadBuf::new(&mut me.buffer[write_pos..]);
724        match Pin::new(&mut me.inner).poll_read(cx, &mut read_buf) {
725            Poll::Ready(Ok(())) => {
726                let bytes_read = read_buf.filled().len();
727                if bytes_read == 0 {
728                    // EOF
729                    return Poll::Ready(Ok(()));
730                }
731
732                me.write_pos += bytes_read;
733
734                // Now serve from buffer
735                let to_read = std::cmp::min(buf.remaining(), me.available());
736                buf.put_slice(&me.buffer[me.read_pos..me.read_pos + to_read]);
737                me.read_pos += to_read;
738
739                Poll::Ready(Ok(()))
740            }
741            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
742            Poll::Pending => Poll::Pending,
743        }
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use super::*;
750    use crate::model::*;
751    use std::sync::atomic::AtomicUsize;
752
753    #[tokio::test]
754    async fn test_async_ntriples_parsing() {
755        let ntriples_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
756<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
757"#;
758
759        let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
760        let reader = std::io::Cursor::new(ntriples_data.as_bytes());
761
762        let quads = parser
763            .parse_async(reader, AsyncStreamingConfig::default(), None, None)
764            .await
765            .expect("operation should succeed");
766
767        assert_eq!(quads.len(), 2);
768        assert!(quads[0].is_default_graph());
769        assert!(quads[1].is_default_graph());
770    }
771
772    #[tokio::test]
773    async fn test_async_parsing_with_progress() {
774        let ntriples_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
775<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
776<http://example.org/charlie> <http://xmlns.com/foaf/0.1/name> "Charlie" .
777"#;
778
779        let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
780        let reader = std::io::Cursor::new(ntriples_data.as_bytes());
781
782        let progress_count = Arc::new(AtomicUsize::new(0));
783        let progress_count_clone = progress_count.clone();
784
785        let progress_callback = Box::new(move |progress: &StreamingProgress| {
786            progress_count_clone.fetch_add(1, Ordering::Relaxed);
787            assert!(progress.bytes_processed > 0);
788            assert!(progress.items_processed <= 3);
789        });
790
791        let quads = parser
792            .parse_async(
793                reader,
794                AsyncStreamingConfig::default(),
795                Some(progress_callback),
796                None,
797            )
798            .await
799            .expect("operation should succeed");
800
801        assert_eq!(quads.len(), 3);
802        assert!(progress_count.load(Ordering::Relaxed) > 0);
803    }
804
805    #[tokio::test]
806    async fn test_async_parsing_with_cancellation() {
807        let ntriples_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
808<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
809<http://example.org/charlie> <http://xmlns.com/foaf/0.1/name> "Charlie" .
810"#;
811
812        let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
813        let reader = std::io::Cursor::new(ntriples_data.as_bytes());
814
815        let cancel_token = Arc::new(AtomicBool::new(false));
816        let cancel_token_clone = cancel_token.clone();
817
818        // Set up handler that cancels after first quad
819        let count = Arc::new(AtomicUsize::new(0));
820        let count_clone = count.clone();
821        let result = parser
822            .parse_with_handler_async(
823                reader,
824                |_quad| {
825                    let token = cancel_token_clone.clone();
826                    let count = count_clone.clone();
827                    async move {
828                        let current = count.fetch_add(1, Ordering::Relaxed);
829                        if current == 0 {
830                            token.store(true, Ordering::Relaxed);
831                        }
832                        Ok(())
833                    }
834                },
835                AsyncStreamingConfig::default(),
836                None,
837                Some(cancel_token),
838            )
839            .await;
840
841        assert!(result.is_err());
842        let err = result.unwrap_err();
843        assert!(err.to_string().contains("cancelled"));
844    }
845
846    #[tokio::test]
847    async fn test_async_ntriples_serialization() {
848        let mut quads = Vec::new();
849
850        let alice = NamedNode::new("http://example.org/alice").expect("valid IRI");
851        let name_pred = NamedNode::new("http://xmlns.com/foaf/0.1/name").expect("valid IRI");
852        let alice_name = Literal::new("Alice");
853        let triple1 = Triple::new(alice.clone(), name_pred.clone(), alice_name);
854        quads.push(Quad::from_triple(triple1));
855
856        let bob = NamedNode::new("http://example.org/bob").expect("valid IRI");
857        let bob_name = Literal::new("Bob");
858        let triple2 = Triple::new(bob, name_pred, bob_name);
859        quads.push(Quad::from_triple(triple2));
860
861        let serializer = AsyncStreamingSerializer::new(RdfFormat::NTriples);
862        let mut output = Vec::new();
863
864        serializer
865            .serialize_quads_async(
866                &mut output,
867                quads.into_iter(),
868                AsyncStreamingConfig::default(),
869                None,
870                None,
871            )
872            .await
873            .expect("operation should succeed");
874
875        let result = String::from_utf8(output).expect("bytes should be valid UTF-8");
876        assert!(result.contains("http://example.org/alice"));
877        assert!(result.contains("http://example.org/bob"));
878        assert!(result.contains("\"Alice\""));
879        assert!(result.contains("\"Bob\""));
880    }
881
882    #[tokio::test]
883    async fn test_async_serialization_with_progress() {
884        let mut triples = Vec::new();
885
886        for i in 0..10 {
887            let subject = NamedNode::new(format!("http://example.org/item{i}"))
888                .expect("valid IRI from format");
889            let pred = NamedNode::new("http://example.org/value").expect("valid IRI");
890            let obj = Literal::new(i.to_string());
891            triples.push(Triple::new(subject, pred, obj));
892        }
893
894        let serializer = AsyncStreamingSerializer::new(RdfFormat::NTriples);
895        let mut output = Vec::new();
896
897        let progress_count = Arc::new(AtomicUsize::new(0));
898        let progress_count_clone = progress_count.clone();
899        let last_bytes = Arc::new(AtomicUsize::new(0));
900        let last_bytes_clone = last_bytes.clone();
901
902        let progress_callback = Box::new(move |progress: &StreamingProgress| {
903            progress_count_clone.fetch_add(1, Ordering::Relaxed);
904            assert!(progress.items_processed <= 10);
905            // bytes should be monotonically increasing or stay the same
906            let prev_bytes = last_bytes_clone.load(Ordering::Relaxed);
907            assert!(progress.bytes_processed >= prev_bytes);
908            last_bytes_clone.store(progress.bytes_processed, Ordering::Relaxed);
909        });
910
911        serializer
912            .serialize_triples_async(
913                &mut output,
914                triples.into_iter(),
915                AsyncStreamingConfig::default(),
916                Some(progress_callback),
917                None,
918            )
919            .await
920            .expect("operation should succeed");
921
922        assert!(progress_count.load(Ordering::Relaxed) > 0);
923    }
924
925    #[tokio::test]
926    async fn test_async_error_tolerance() {
927        let invalid_ntriples = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
928INVALID LINE HERE
929<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
930"#;
931
932        let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
933        let reader = std::io::Cursor::new(invalid_ntriples.as_bytes());
934
935        let config = AsyncStreamingConfig {
936            ignore_errors: true,
937            ..Default::default()
938        };
939
940        let quads = parser
941            .parse_async(reader, config, None, None)
942            .await
943            .expect("operation should succeed");
944
945        // Should parse the two valid lines
946        assert_eq!(quads.len(), 2);
947    }
948
949    #[tokio::test]
950    async fn test_backpressure_reader() {
951        let data = b"Hello, World!";
952        let cursor = std::io::Cursor::new(data);
953        let mut reader = BackpressureReader::new(cursor, 16); // Buffer size larger than data
954
955        let mut output = Vec::new();
956        reader
957            .read_to_end(&mut output)
958            .await
959            .expect("async operation should succeed");
960
961        assert_eq!(output, data);
962    }
963
964    #[tokio::test]
965    async fn test_async_nquads_parsing() {
966        let nquads_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" <http://example.org/graph1> .
967<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" <http://example.org/graph2> .
968"#;
969
970        let parser = AsyncStreamingParser::new(RdfFormat::NQuads);
971        let reader = std::io::Cursor::new(nquads_data.as_bytes());
972
973        let quads = parser
974            .parse_async(reader, AsyncStreamingConfig::default(), None, None)
975            .await
976            .expect("operation should succeed");
977
978        assert_eq!(quads.len(), 2);
979        assert!(!quads[0].is_default_graph());
980        assert!(!quads[1].is_default_graph());
981    }
982
983    #[tokio::test]
984    async fn test_large_chunk_parsing() {
985        // Create a large dataset
986        let mut ntriples_data = String::new();
987        for i in 0..1000 {
988            ntriples_data.push_str(&format!(
989                "<http://example.org/item{}> <http://example.org/value> \"{}\" .\n",
990                i, i
991            ));
992        }
993
994        let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
995        let reader = std::io::Cursor::new(ntriples_data.as_bytes());
996
997        let config = AsyncStreamingConfig {
998            chunk_size: 1024, // Small chunks to test buffering
999            ..Default::default()
1000        };
1001
1002        let quads = parser
1003            .parse_async(reader, config, None, None)
1004            .await
1005            .expect("operation should succeed");
1006
1007        assert_eq!(quads.len(), 1000);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_custom_base_iri() {
1012        let turtle_data = r#"@prefix ex: <http://example.org/> .
1013ex:alice ex:knows ex:bob ."#;
1014
1015        let parser = AsyncStreamingParser::new(RdfFormat::Turtle);
1016        let reader = std::io::Cursor::new(turtle_data.as_bytes());
1017
1018        let config = AsyncStreamingConfig {
1019            base_iri: Some("http://example.org/".to_string()),
1020            ..Default::default()
1021        };
1022
1023        let quads = parser
1024            .parse_async(reader, config, None, None)
1025            .await
1026            .expect("operation should succeed");
1027
1028        assert_eq!(quads.len(), 1);
1029        let triple = quads[0].to_triple();
1030
1031        // Should contain the example.org namespace
1032        if let Subject::NamedNode(subj) = triple.subject() {
1033            assert!(subj.as_str().contains("example.org"));
1034        }
1035    }
1036}