use super::{Parser, ParserConfig, RdfFormat};
use crate::model::Quad;
use crate::{OxirsError, Result};
use std::future::Future;
use std::pin::Pin;
#[cfg(feature = "async")]
pub struct AsyncStreamingParser {
format: RdfFormat,
config: ParserConfig,
progress_callback: Option<Box<dyn Fn(usize) + Send + Sync>>,
chunk_size: usize,
}
#[cfg(feature = "async")]
impl AsyncStreamingParser {
pub fn new(format: RdfFormat) -> Self {
AsyncStreamingParser {
format,
config: ParserConfig::default(),
progress_callback: None,
chunk_size: 8192, }
}
pub fn with_progress_callback<F>(mut self, callback: F) -> Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.progress_callback = Some(Box::new(callback));
self
}
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}
pub fn with_error_tolerance(mut self, ignore_errors: bool) -> Self {
self.config.ignore_errors = ignore_errors;
self
}
pub async fn parse_stream<R, F, Fut>(&self, mut reader: R, mut handler: F) -> Result<()>
where
R: tokio::io::AsyncRead + Unpin,
F: FnMut(Quad) -> Fut,
Fut: Future<Output = Result<()>>,
{
use tokio::io::AsyncReadExt;
let mut buffer = Vec::with_capacity(self.chunk_size);
let mut accumulated_data = String::new();
let mut bytes_processed = 0usize;
let mut line_buffer = String::new();
loop {
buffer.clear();
buffer.resize(self.chunk_size, 0);
let bytes_read = reader.read(&mut buffer).await?;
if bytes_read == 0 {
break; }
buffer.truncate(bytes_read);
bytes_processed += bytes_read;
let chunk_str = String::from_utf8_lossy(&buffer);
accumulated_data.push_str(&chunk_str);
if matches!(self.format, RdfFormat::NTriples | RdfFormat::NQuads) {
self.process_lines_async(&mut accumulated_data, &mut line_buffer, &mut handler)
.await?;
}
if let Some(ref callback) = self.progress_callback {
callback(bytes_processed);
}
}
if !accumulated_data.is_empty() {
match self.format {
RdfFormat::NTriples | RdfFormat::NQuads => {
accumulated_data.push_str(&line_buffer);
self.process_lines_async(
&mut accumulated_data,
&mut String::new(),
&mut handler,
)
.await?;
}
_ => {
let parser = Parser::with_config(self.format, self.config.clone());
parser.parse_str_with_handler(&accumulated_data, |quad| {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(handler(quad))
})
})?;
}
}
}
Ok(())
}
async fn process_lines_async<F, Fut>(
&self,
accumulated_data: &mut String,
line_buffer: &mut String,
handler: &mut F,
) -> Result<()>
where
F: FnMut(Quad) -> Fut,
Fut: Future<Output = Result<()>>,
{
let mut full_data = line_buffer.clone();
full_data.push_str(accumulated_data);
let mut last_newline_pos = 0;
for (pos, _) in full_data.match_indices('\n') {
let line = &full_data[last_newline_pos..pos];
last_newline_pos = pos + 1;
if let Some(quad) = self.parse_line(line)? {
handler(quad).await?;
}
}
line_buffer.clear();
if last_newline_pos < full_data.len() {
line_buffer.push_str(&full_data[last_newline_pos..]);
}
accumulated_data.clear();
Ok(())
}
fn parse_line(&self, line: &str) -> Result<Option<Quad>> {
let parser = Parser::with_config(self.format, self.config.clone());
match self.format {
RdfFormat::NTriples => parser.parse_ntriples_line(line),
RdfFormat::NQuads => {
parser.parse_ntriples_line(line)
}
_ => Err(OxirsError::Parse(
"Unsupported format for line parsing".to_string(),
)),
}
}
pub async fn parse_bytes<F, Fut>(&self, data: &[u8], handler: F) -> Result<()>
where
F: FnMut(Quad) -> Fut,
Fut: Future<Output = Result<()>>,
{
use std::io::Cursor;
let cursor = Cursor::new(data);
self.parse_stream(cursor, handler).await
}
pub async fn parse_str_async<F, Fut>(&self, data: &str, handler: F) -> Result<()>
where
F: FnMut(Quad) -> Fut,
Fut: Future<Output = Result<()>>,
{
let bytes = data.as_bytes();
self.parse_bytes(bytes, handler).await
}
pub async fn parse_str_to_quads_async(&self, data: &str) -> Result<Vec<Quad>> {
use std::sync::Arc;
use tokio::sync::Mutex;
let quads = Arc::new(Mutex::new(Vec::new()));
let quads_clone = Arc::clone(&quads);
self.parse_str_async(data, move |quad| {
let quads = Arc::clone(&quads_clone);
async move {
quads.lock().await.push(quad);
Ok(())
}
})
.await?;
let result = quads.lock().await;
Ok(result.clone())
}
}
#[cfg(feature = "async")]
#[derive(Debug, Clone)]
pub struct ParseProgress {
pub bytes_processed: usize,
pub quads_parsed: usize,
pub errors_encountered: usize,
pub estimated_total_bytes: Option<usize>,
}
#[cfg(feature = "async")]
impl ParseProgress {
pub fn completion_percentage(&self) -> Option<f64> {
self.estimated_total_bytes.map(|total| {
if total == 0 {
100.0
} else {
(self.bytes_processed as f64 / total as f64) * 100.0
}
})
}
}
#[cfg(feature = "async")]
pub trait AsyncRdfSink: Send + Sync {
fn process_quad(&mut self, quad: Quad)
-> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
fn finalize(&mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
}
#[cfg(feature = "async")]
pub struct MemoryAsyncSink {
quads: Vec<Quad>,
}
#[cfg(feature = "async")]
impl MemoryAsyncSink {
pub fn new() -> Self {
MemoryAsyncSink { quads: Vec::new() }
}
pub fn into_quads(self) -> Vec<Quad> {
self.quads
}
}
#[cfg(feature = "async")]
impl Default for MemoryAsyncSink {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "async")]
impl AsyncRdfSink for MemoryAsyncSink {
fn process_quad(
&mut self,
quad: Quad,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
Box::pin(async move {
self.quads.push(quad);
Ok(())
})
}
fn finalize(&mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
Box::pin(async move { Ok(()) })
}
}