1use crate::{
67 model::{Quad, Triple},
68 parser::{Parser, ParserConfig, RdfFormat},
69 serializer::Serializer,
70 OxirsError, Result,
71};
72use futures::future::BoxFuture;
73use std::pin::Pin;
74use std::sync::{
75 atomic::{AtomicBool, Ordering},
76 Arc,
77};
78use std::task::{Context, Poll};
79use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
80
81#[derive(Debug, Clone)]
83pub struct StreamingProgress {
84 pub bytes_processed: usize,
86 pub items_processed: usize,
88 pub errors_encountered: usize,
90 pub total_bytes: Option<usize>,
92 pub items_per_second: Option<f64>,
94}
95
96impl StreamingProgress {
97 pub fn new() -> Self {
99 StreamingProgress {
100 bytes_processed: 0,
101 items_processed: 0,
102 errors_encountered: 0,
103 total_bytes: None,
104 items_per_second: None,
105 }
106 }
107
108 pub fn completion_percentage(&self) -> Option<f64> {
110 self.total_bytes.map(|total| {
111 if total == 0 {
112 100.0
113 } else {
114 (self.bytes_processed as f64 / total as f64) * 100.0
115 }
116 })
117 }
118}
119
120impl Default for StreamingProgress {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126pub type ProgressCallback = Box<dyn Fn(&StreamingProgress) + Send + Sync>;
128
129#[derive(Clone)]
131pub struct AsyncStreamingConfig {
132 pub chunk_size: usize,
134 pub buffer_size: usize,
136 pub ignore_errors: bool,
138 pub max_errors: Option<usize>,
140 pub base_iri: Option<String>,
142}
143
144impl Default for AsyncStreamingConfig {
145 fn default() -> Self {
146 AsyncStreamingConfig {
147 chunk_size: 8192, buffer_size: 65536, ignore_errors: false,
150 max_errors: None,
151 base_iri: None,
152 }
153 }
154}
155
156pub trait AsyncRdfParser: Send + Sync {
158 fn parse_async<'a, R>(
160 &'a self,
161 reader: R,
162 config: AsyncStreamingConfig,
163 progress: Option<ProgressCallback>,
164 cancel_token: Option<Arc<AtomicBool>>,
165 ) -> BoxFuture<'a, Result<Vec<Quad>>>
166 where
167 R: AsyncRead + Unpin + Send + 'a;
168
169 fn parse_with_handler_async<'a, R, F, Fut>(
171 &'a self,
172 reader: R,
173 handler: F,
174 config: AsyncStreamingConfig,
175 progress: Option<ProgressCallback>,
176 cancel_token: Option<Arc<AtomicBool>>,
177 ) -> BoxFuture<'a, Result<()>>
178 where
179 R: AsyncRead + Unpin + Send + 'a,
180 F: FnMut(Quad) -> Fut + Send + 'a,
181 Fut: std::future::Future<Output = Result<()>> + Send + 'a;
182}
183
184pub trait AsyncRdfSerializer: Send + Sync {
186 fn serialize_quads_async<'a, W, I>(
188 &'a self,
189 writer: W,
190 quads: I,
191 config: AsyncStreamingConfig,
192 progress: Option<ProgressCallback>,
193 cancel_token: Option<Arc<AtomicBool>>,
194 ) -> BoxFuture<'a, Result<()>>
195 where
196 W: AsyncWrite + Unpin + Send + 'a,
197 I: Iterator<Item = Quad> + Send + 'a;
198
199 fn serialize_triples_async<'a, W, I>(
201 &'a self,
202 writer: W,
203 triples: I,
204 config: AsyncStreamingConfig,
205 progress: Option<ProgressCallback>,
206 cancel_token: Option<Arc<AtomicBool>>,
207 ) -> BoxFuture<'a, Result<()>>
208 where
209 W: AsyncWrite + Unpin + Send + 'a,
210 I: Iterator<Item = Triple> + Send + 'a;
211}
212
213pub struct AsyncStreamingParser {
215 format: RdfFormat,
216}
217
218impl AsyncStreamingParser {
219 pub fn new(format: RdfFormat) -> Self {
221 AsyncStreamingParser { format }
222 }
223
224 fn check_cancelled(cancel_token: &Option<Arc<AtomicBool>>) -> Result<()> {
226 if let Some(token) = cancel_token {
227 if token.load(Ordering::Relaxed) {
228 return Err(std::io::Error::new(
229 std::io::ErrorKind::Interrupted,
230 "Operation cancelled",
231 )
232 .into());
233 }
234 }
235 Ok(())
236 }
237
238 fn report_progress(
240 progress_callback: &Option<ProgressCallback>,
241 progress_info: &StreamingProgress,
242 ) {
243 if let Some(callback) = progress_callback {
244 callback(progress_info);
245 }
246 }
247
248 async fn parse_line_based<R, F, Fut>(
250 &self,
251 mut reader: R,
252 mut handler: F,
253 config: AsyncStreamingConfig,
254 progress_callback: Option<ProgressCallback>,
255 cancel_token: Option<Arc<AtomicBool>>,
256 ) -> Result<()>
257 where
258 R: AsyncRead + Unpin + Send,
259 F: FnMut(Quad) -> Fut + Send,
260 Fut: std::future::Future<Output = Result<()>> + Send,
261 {
262 let parser_config = ParserConfig {
263 base_iri: config.base_iri.clone(),
264 ignore_errors: config.ignore_errors,
265 max_errors: config.max_errors,
266 };
267 let parser = Parser::with_config(self.format, parser_config);
268
269 let mut buffer = vec![0u8; config.chunk_size];
270 let mut accumulated = Vec::new();
271 let mut line_buffer = String::new();
272 let mut progress = StreamingProgress::new();
273 let start_time = std::time::Instant::now();
274
275 loop {
276 Self::check_cancelled(&cancel_token)?;
277
278 let bytes_read = reader.read(&mut buffer).await?;
279 if bytes_read == 0 {
280 break; }
282
283 progress.bytes_processed += bytes_read;
284 accumulated.extend_from_slice(&buffer[..bytes_read]);
285
286 while let Some(newline_pos) = accumulated.iter().position(|&b| b == b'\n') {
288 let line_bytes = accumulated.drain(..=newline_pos).collect::<Vec<_>>();
289
290 match String::from_utf8(line_bytes) {
292 Ok(mut line) => {
293 if line.ends_with('\n') {
295 line.pop();
296 if line.ends_with('\r') {
297 line.pop();
298 }
299 }
300
301 line_buffer.push_str(&line);
302
303 match self.parse_single_line(&parser, &line_buffer) {
305 Ok(Some(quad)) => {
306 handler(quad).await?;
307 progress.items_processed += 1;
308 line_buffer.clear();
309 }
310 Ok(None) => {
311 line_buffer.clear();
313 }
314 Err(e) => {
315 if config.ignore_errors {
316 progress.errors_encountered += 1;
317 if let Some(max_errors) = config.max_errors {
318 if progress.errors_encountered >= max_errors {
319 return Err(e);
320 }
321 }
322 tracing::warn!("Parse error: {}", e);
323 line_buffer.clear();
324 } else {
325 return Err(e);
326 }
327 }
328 }
329 }
330 Err(e) => {
331 if config.ignore_errors {
332 progress.errors_encountered += 1;
333 tracing::warn!("UTF-8 error: {}", e);
334 } else {
335 return Err(OxirsError::Parse(format!("UTF-8 error: {e}")));
336 }
337 }
338 }
339 }
340
341 let elapsed = start_time.elapsed().as_secs_f64();
343 if elapsed > 0.0 {
344 progress.items_per_second = Some(progress.items_processed as f64 / elapsed);
345 }
346
347 Self::report_progress(&progress_callback, &progress);
348 }
349
350 if !accumulated.is_empty() {
352 match String::from_utf8(accumulated) {
353 Ok(remaining) => {
354 line_buffer.push_str(&remaining);
355 if !line_buffer.trim().is_empty() {
356 if let Ok(Some(quad)) = self.parse_single_line(&parser, &line_buffer) {
357 handler(quad).await?;
358 progress.items_processed += 1;
359 }
360 }
361 }
362 Err(e) => {
363 if !config.ignore_errors {
364 return Err(OxirsError::Parse(format!("UTF-8 error: {e}")));
365 }
366 }
367 }
368 }
369
370 Self::report_progress(&progress_callback, &progress);
371 Ok(())
372 }
373
374 fn parse_single_line(&self, parser: &Parser, line: &str) -> Result<Option<Quad>> {
376 let line = line.trim();
377 if line.is_empty() || line.starts_with('#') {
378 return Ok(None);
379 }
380
381 match self.format {
382 RdfFormat::NTriples => parser.parse_ntriples_line(line),
383 RdfFormat::NQuads => parser.parse_nquads_line(line),
384 _ => Err(OxirsError::Parse(
385 "Format not supported for line-based parsing".to_string(),
386 )),
387 }
388 }
389
390 async fn parse_document_based<R, F, Fut>(
392 &self,
393 mut reader: R,
394 mut handler: F,
395 config: AsyncStreamingConfig,
396 progress_callback: Option<ProgressCallback>,
397 cancel_token: Option<Arc<AtomicBool>>,
398 ) -> Result<()>
399 where
400 R: AsyncRead + Unpin + Send,
401 F: FnMut(Quad) -> Fut + Send,
402 Fut: std::future::Future<Output = Result<()>> + Send,
403 {
404 let mut buffer = Vec::new();
406 let mut chunk = vec![0u8; config.chunk_size];
407 let mut progress = StreamingProgress::new();
408
409 loop {
410 Self::check_cancelled(&cancel_token)?;
411
412 let bytes_read = reader.read(&mut chunk).await?;
413 if bytes_read == 0 {
414 break;
415 }
416
417 buffer.extend_from_slice(&chunk[..bytes_read]);
418 progress.bytes_processed += bytes_read;
419
420 Self::report_progress(&progress_callback, &progress);
421 }
422
423 let document = String::from_utf8(buffer)
425 .map_err(|e| OxirsError::Parse(format!("UTF-8 error: {e}")))?;
426
427 let parser_config = ParserConfig {
428 base_iri: config.base_iri,
429 ignore_errors: config.ignore_errors,
430 max_errors: config.max_errors,
431 };
432 let parser = Parser::with_config(self.format, parser_config);
433
434 let quads = parser.parse_str_to_quads(&document)?;
436
437 for quad in quads {
439 Self::check_cancelled(&cancel_token)?;
440 handler(quad).await?;
441 progress.items_processed += 1;
442 }
443
444 Self::report_progress(&progress_callback, &progress);
445 Ok(())
446 }
447}
448
449impl AsyncRdfParser for AsyncStreamingParser {
450 fn parse_async<'a, R>(
451 &'a self,
452 reader: R,
453 config: AsyncStreamingConfig,
454 progress: Option<ProgressCallback>,
455 cancel_token: Option<Arc<AtomicBool>>,
456 ) -> BoxFuture<'a, Result<Vec<Quad>>>
457 where
458 R: AsyncRead + Unpin + Send + 'a,
459 {
460 Box::pin(async move {
461 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
462
463 let collector = tokio::spawn(async move {
465 let mut quads = Vec::new();
466 while let Some(quad) = rx.recv().await {
467 quads.push(quad);
468 }
469 quads
470 });
471
472 let parse_result = self
474 .parse_with_handler_async(
475 reader,
476 |quad| {
477 let tx = tx.clone();
478 async move {
479 tx.send(quad)
480 .map_err(|_| OxirsError::Parse("Channel send error".to_string()))?;
481 Ok(())
482 }
483 },
484 config,
485 progress,
486 cancel_token,
487 )
488 .await;
489
490 drop(tx);
492
493 parse_result?;
495
496 let quads = collector
498 .await
499 .map_err(|_| OxirsError::Parse("Failed to collect quads".to_string()))?;
500 Ok(quads)
501 })
502 }
503
504 fn parse_with_handler_async<'a, R, F, Fut>(
505 &'a self,
506 reader: R,
507 handler: F,
508 config: AsyncStreamingConfig,
509 progress: Option<ProgressCallback>,
510 cancel_token: Option<Arc<AtomicBool>>,
511 ) -> BoxFuture<'a, Result<()>>
512 where
513 R: AsyncRead + Unpin + Send + 'a,
514 F: FnMut(Quad) -> Fut + Send + 'a,
515 Fut: std::future::Future<Output = Result<()>> + Send + 'a,
516 {
517 Box::pin(async move {
518 match self.format {
519 RdfFormat::NTriples | RdfFormat::NQuads => {
520 self.parse_line_based(reader, handler, config, progress, cancel_token)
521 .await
522 }
523 _ => {
524 self.parse_document_based(reader, handler, config, progress, cancel_token)
525 .await
526 }
527 }
528 })
529 }
530}
531
532pub struct AsyncStreamingSerializer {
534 format: RdfFormat,
535}
536
537impl AsyncStreamingSerializer {
538 pub fn new(format: RdfFormat) -> Self {
540 AsyncStreamingSerializer { format }
541 }
542
543 fn check_cancelled(cancel_token: &Option<Arc<AtomicBool>>) -> Result<()> {
545 if let Some(token) = cancel_token {
546 if token.load(Ordering::Relaxed) {
547 return Err(std::io::Error::new(
548 std::io::ErrorKind::Interrupted,
549 "Operation cancelled",
550 )
551 .into());
552 }
553 }
554 Ok(())
555 }
556
557 fn serialize_quad(&self, quad: &Quad) -> Result<String> {
559 let serializer = Serializer::new(self.format);
560 match self.format {
561 RdfFormat::NTriples => {
562 if quad.is_default_graph() {
564 let mut graph = crate::model::Graph::new();
565 graph.insert(quad.to_triple());
566 serializer.serialize_graph(&graph)
567 } else {
568 Ok(String::new())
569 }
570 }
571 RdfFormat::NQuads => serializer.serialize_quad_to_nquads(quad),
572 _ => Err(OxirsError::Serialize(
573 "Format not supported for streaming serialization".to_string(),
574 )),
575 }
576 }
577
578 #[allow(dead_code)]
580 fn serialize_triple(&self, triple: &Triple) -> Result<String> {
581 let quad = Quad::from_triple(triple.clone());
582 self.serialize_quad(&quad)
583 }
584}
585
586impl AsyncRdfSerializer for AsyncStreamingSerializer {
587 fn serialize_quads_async<'a, W, I>(
588 &'a self,
589 mut writer: W,
590 quads: I,
591 config: AsyncStreamingConfig,
592 progress: Option<ProgressCallback>,
593 cancel_token: Option<Arc<AtomicBool>>,
594 ) -> BoxFuture<'a, Result<()>>
595 where
596 W: AsyncWrite + Unpin + Send + 'a,
597 I: Iterator<Item = Quad> + Send + 'a,
598 {
599 Box::pin(async move {
600 let mut buffer = String::with_capacity(config.buffer_size);
601 let mut progress_info = StreamingProgress::new();
602 let start_time = std::time::Instant::now();
603
604 for quad in quads {
605 Self::check_cancelled(&cancel_token)?;
606
607 let serialized = self.serialize_quad(&quad)?;
609 buffer.push_str(&serialized);
610 progress_info.items_processed += 1;
611
612 if buffer.len() >= config.chunk_size {
614 writer.write_all(buffer.as_bytes()).await?;
615 progress_info.bytes_processed += buffer.len();
616 buffer.clear();
617 }
618
619 let elapsed = start_time.elapsed().as_secs_f64();
621 if elapsed > 0.0 {
622 progress_info.items_per_second =
623 Some(progress_info.items_processed as f64 / elapsed);
624 }
625
626 if let Some(ref callback) = progress {
627 callback(&progress_info);
628 }
629 }
630
631 if !buffer.is_empty() {
633 writer.write_all(buffer.as_bytes()).await?;
634 progress_info.bytes_processed += buffer.len();
635
636 if let Some(ref callback) = progress {
638 callback(&progress_info);
639 }
640 }
641
642 writer.flush().await?;
644
645 Ok(())
646 })
647 }
648
649 fn serialize_triples_async<'a, W, I>(
650 &'a self,
651 writer: W,
652 triples: I,
653 config: AsyncStreamingConfig,
654 progress: Option<ProgressCallback>,
655 cancel_token: Option<Arc<AtomicBool>>,
656 ) -> BoxFuture<'a, Result<()>>
657 where
658 W: AsyncWrite + Unpin + Send + 'a,
659 I: Iterator<Item = Triple> + Send + 'a,
660 {
661 let quads = triples.map(Quad::from_triple);
662 self.serialize_quads_async(writer, quads, config, progress, cancel_token)
663 }
664}
665
666pub struct BackpressureReader<R> {
668 inner: R,
669 buffer: Vec<u8>,
670 capacity: usize,
671 read_pos: usize,
672 write_pos: usize,
673}
674
675impl<R: AsyncRead + Unpin> BackpressureReader<R> {
676 pub fn new(inner: R, capacity: usize) -> Self {
678 BackpressureReader {
679 inner,
680 buffer: vec![0; capacity],
681 capacity,
682 read_pos: 0,
683 write_pos: 0,
684 }
685 }
686
687 pub fn available(&self) -> usize {
689 self.write_pos - self.read_pos
690 }
691
692 pub fn is_full(&self) -> bool {
694 self.available() == self.capacity
695 }
696}
697
698impl<R: AsyncRead + Unpin> AsyncRead for BackpressureReader<R> {
699 fn poll_read(
700 self: Pin<&mut Self>,
701 cx: &mut Context<'_>,
702 buf: &mut tokio::io::ReadBuf<'_>,
703 ) -> Poll<std::io::Result<()>> {
704 let me = self.get_mut();
705
706 if me.available() > 0 {
708 let to_read = std::cmp::min(buf.remaining(), me.available());
709 buf.put_slice(&me.buffer[me.read_pos..me.read_pos + to_read]);
710 me.read_pos += to_read;
711
712 if me.read_pos == me.write_pos {
714 me.read_pos = 0;
715 me.write_pos = 0;
716 }
717
718 return Poll::Ready(Ok(()));
719 }
720
721 let write_pos = me.write_pos;
723 let mut read_buf = tokio::io::ReadBuf::new(&mut me.buffer[write_pos..]);
724 match Pin::new(&mut me.inner).poll_read(cx, &mut read_buf) {
725 Poll::Ready(Ok(())) => {
726 let bytes_read = read_buf.filled().len();
727 if bytes_read == 0 {
728 return Poll::Ready(Ok(()));
730 }
731
732 me.write_pos += bytes_read;
733
734 let to_read = std::cmp::min(buf.remaining(), me.available());
736 buf.put_slice(&me.buffer[me.read_pos..me.read_pos + to_read]);
737 me.read_pos += to_read;
738
739 Poll::Ready(Ok(()))
740 }
741 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
742 Poll::Pending => Poll::Pending,
743 }
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use super::*;
750 use crate::model::*;
751 use std::sync::atomic::AtomicUsize;
752
753 #[tokio::test]
754 async fn test_async_ntriples_parsing() {
755 let ntriples_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
756<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
757"#;
758
759 let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
760 let reader = std::io::Cursor::new(ntriples_data.as_bytes());
761
762 let quads = parser
763 .parse_async(reader, AsyncStreamingConfig::default(), None, None)
764 .await
765 .expect("operation should succeed");
766
767 assert_eq!(quads.len(), 2);
768 assert!(quads[0].is_default_graph());
769 assert!(quads[1].is_default_graph());
770 }
771
772 #[tokio::test]
773 async fn test_async_parsing_with_progress() {
774 let ntriples_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
775<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
776<http://example.org/charlie> <http://xmlns.com/foaf/0.1/name> "Charlie" .
777"#;
778
779 let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
780 let reader = std::io::Cursor::new(ntriples_data.as_bytes());
781
782 let progress_count = Arc::new(AtomicUsize::new(0));
783 let progress_count_clone = progress_count.clone();
784
785 let progress_callback = Box::new(move |progress: &StreamingProgress| {
786 progress_count_clone.fetch_add(1, Ordering::Relaxed);
787 assert!(progress.bytes_processed > 0);
788 assert!(progress.items_processed <= 3);
789 });
790
791 let quads = parser
792 .parse_async(
793 reader,
794 AsyncStreamingConfig::default(),
795 Some(progress_callback),
796 None,
797 )
798 .await
799 .expect("operation should succeed");
800
801 assert_eq!(quads.len(), 3);
802 assert!(progress_count.load(Ordering::Relaxed) > 0);
803 }
804
805 #[tokio::test]
806 async fn test_async_parsing_with_cancellation() {
807 let ntriples_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
808<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
809<http://example.org/charlie> <http://xmlns.com/foaf/0.1/name> "Charlie" .
810"#;
811
812 let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
813 let reader = std::io::Cursor::new(ntriples_data.as_bytes());
814
815 let cancel_token = Arc::new(AtomicBool::new(false));
816 let cancel_token_clone = cancel_token.clone();
817
818 let count = Arc::new(AtomicUsize::new(0));
820 let count_clone = count.clone();
821 let result = parser
822 .parse_with_handler_async(
823 reader,
824 |_quad| {
825 let token = cancel_token_clone.clone();
826 let count = count_clone.clone();
827 async move {
828 let current = count.fetch_add(1, Ordering::Relaxed);
829 if current == 0 {
830 token.store(true, Ordering::Relaxed);
831 }
832 Ok(())
833 }
834 },
835 AsyncStreamingConfig::default(),
836 None,
837 Some(cancel_token),
838 )
839 .await;
840
841 assert!(result.is_err());
842 let err = result.unwrap_err();
843 assert!(err.to_string().contains("cancelled"));
844 }
845
846 #[tokio::test]
847 async fn test_async_ntriples_serialization() {
848 let mut quads = Vec::new();
849
850 let alice = NamedNode::new("http://example.org/alice").expect("valid IRI");
851 let name_pred = NamedNode::new("http://xmlns.com/foaf/0.1/name").expect("valid IRI");
852 let alice_name = Literal::new("Alice");
853 let triple1 = Triple::new(alice.clone(), name_pred.clone(), alice_name);
854 quads.push(Quad::from_triple(triple1));
855
856 let bob = NamedNode::new("http://example.org/bob").expect("valid IRI");
857 let bob_name = Literal::new("Bob");
858 let triple2 = Triple::new(bob, name_pred, bob_name);
859 quads.push(Quad::from_triple(triple2));
860
861 let serializer = AsyncStreamingSerializer::new(RdfFormat::NTriples);
862 let mut output = Vec::new();
863
864 serializer
865 .serialize_quads_async(
866 &mut output,
867 quads.into_iter(),
868 AsyncStreamingConfig::default(),
869 None,
870 None,
871 )
872 .await
873 .expect("operation should succeed");
874
875 let result = String::from_utf8(output).expect("bytes should be valid UTF-8");
876 assert!(result.contains("http://example.org/alice"));
877 assert!(result.contains("http://example.org/bob"));
878 assert!(result.contains("\"Alice\""));
879 assert!(result.contains("\"Bob\""));
880 }
881
882 #[tokio::test]
883 async fn test_async_serialization_with_progress() {
884 let mut triples = Vec::new();
885
886 for i in 0..10 {
887 let subject = NamedNode::new(format!("http://example.org/item{i}"))
888 .expect("valid IRI from format");
889 let pred = NamedNode::new("http://example.org/value").expect("valid IRI");
890 let obj = Literal::new(i.to_string());
891 triples.push(Triple::new(subject, pred, obj));
892 }
893
894 let serializer = AsyncStreamingSerializer::new(RdfFormat::NTriples);
895 let mut output = Vec::new();
896
897 let progress_count = Arc::new(AtomicUsize::new(0));
898 let progress_count_clone = progress_count.clone();
899 let last_bytes = Arc::new(AtomicUsize::new(0));
900 let last_bytes_clone = last_bytes.clone();
901
902 let progress_callback = Box::new(move |progress: &StreamingProgress| {
903 progress_count_clone.fetch_add(1, Ordering::Relaxed);
904 assert!(progress.items_processed <= 10);
905 let prev_bytes = last_bytes_clone.load(Ordering::Relaxed);
907 assert!(progress.bytes_processed >= prev_bytes);
908 last_bytes_clone.store(progress.bytes_processed, Ordering::Relaxed);
909 });
910
911 serializer
912 .serialize_triples_async(
913 &mut output,
914 triples.into_iter(),
915 AsyncStreamingConfig::default(),
916 Some(progress_callback),
917 None,
918 )
919 .await
920 .expect("operation should succeed");
921
922 assert!(progress_count.load(Ordering::Relaxed) > 0);
923 }
924
925 #[tokio::test]
926 async fn test_async_error_tolerance() {
927 let invalid_ntriples = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" .
928INVALID LINE HERE
929<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" .
930"#;
931
932 let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
933 let reader = std::io::Cursor::new(invalid_ntriples.as_bytes());
934
935 let config = AsyncStreamingConfig {
936 ignore_errors: true,
937 ..Default::default()
938 };
939
940 let quads = parser
941 .parse_async(reader, config, None, None)
942 .await
943 .expect("operation should succeed");
944
945 assert_eq!(quads.len(), 2);
947 }
948
949 #[tokio::test]
950 async fn test_backpressure_reader() {
951 let data = b"Hello, World!";
952 let cursor = std::io::Cursor::new(data);
953 let mut reader = BackpressureReader::new(cursor, 16); let mut output = Vec::new();
956 reader
957 .read_to_end(&mut output)
958 .await
959 .expect("async operation should succeed");
960
961 assert_eq!(output, data);
962 }
963
964 #[tokio::test]
965 async fn test_async_nquads_parsing() {
966 let nquads_data = r#"<http://example.org/alice> <http://xmlns.com/foaf/0.1/name> "Alice" <http://example.org/graph1> .
967<http://example.org/bob> <http://xmlns.com/foaf/0.1/name> "Bob" <http://example.org/graph2> .
968"#;
969
970 let parser = AsyncStreamingParser::new(RdfFormat::NQuads);
971 let reader = std::io::Cursor::new(nquads_data.as_bytes());
972
973 let quads = parser
974 .parse_async(reader, AsyncStreamingConfig::default(), None, None)
975 .await
976 .expect("operation should succeed");
977
978 assert_eq!(quads.len(), 2);
979 assert!(!quads[0].is_default_graph());
980 assert!(!quads[1].is_default_graph());
981 }
982
983 #[tokio::test]
984 async fn test_large_chunk_parsing() {
985 let mut ntriples_data = String::new();
987 for i in 0..1000 {
988 ntriples_data.push_str(&format!(
989 "<http://example.org/item{}> <http://example.org/value> \"{}\" .\n",
990 i, i
991 ));
992 }
993
994 let parser = AsyncStreamingParser::new(RdfFormat::NTriples);
995 let reader = std::io::Cursor::new(ntriples_data.as_bytes());
996
997 let config = AsyncStreamingConfig {
998 chunk_size: 1024, ..Default::default()
1000 };
1001
1002 let quads = parser
1003 .parse_async(reader, config, None, None)
1004 .await
1005 .expect("operation should succeed");
1006
1007 assert_eq!(quads.len(), 1000);
1008 }
1009
1010 #[tokio::test]
1011 async fn test_custom_base_iri() {
1012 let turtle_data = r#"@prefix ex: <http://example.org/> .
1013ex:alice ex:knows ex:bob ."#;
1014
1015 let parser = AsyncStreamingParser::new(RdfFormat::Turtle);
1016 let reader = std::io::Cursor::new(turtle_data.as_bytes());
1017
1018 let config = AsyncStreamingConfig {
1019 base_iri: Some("http://example.org/".to_string()),
1020 ..Default::default()
1021 };
1022
1023 let quads = parser
1024 .parse_async(reader, config, None, None)
1025 .await
1026 .expect("operation should succeed");
1027
1028 assert_eq!(quads.len(), 1);
1029 let triple = quads[0].to_triple();
1030
1031 if let Subject::NamedNode(subj) = triple.subject() {
1033 assert!(subj.as_str().contains("example.org"));
1034 }
1035 }
1036}