use crate::MIN_PARALLEL_CHUNK_SIZE;
use crate::chunker::{get_ntriples_file_chunks, get_ntriples_slice_chunks};
use crate::line_formats::NQuadsRecognizer;
#[cfg(feature = "async-tokio")]
use crate::toolkit::TokioAsyncReaderIterator;
use crate::toolkit::{Parser, ReaderIterator, SliceIterator, TurtleParseError, TurtleSyntaxError};
use oxrdf::{Triple, TripleRef};
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom, Take, Write};
use std::path::Path;
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
#[derive(Default, Clone)]
#[must_use]
pub struct NTriplesParser {
lenient: bool,
}
impl NTriplesParser {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn lenient(mut self) -> Self {
self.lenient = true;
self
}
#[deprecated(note = "Use `lenient()` instead", since = "0.2.0")]
#[inline]
pub fn unchecked(self) -> Self {
self.lenient()
}
pub fn for_reader<R: Read>(self, reader: R) -> ReaderNTriplesParser<R> {
ReaderNTriplesParser {
inner: self.low_level().parser.for_reader(reader),
}
}
#[cfg(feature = "async-tokio")]
pub fn for_tokio_async_reader<R: AsyncRead + Unpin>(
self,
reader: R,
) -> TokioAsyncReaderNTriplesParser<R> {
TokioAsyncReaderNTriplesParser {
inner: self.low_level().parser.for_tokio_async_reader(reader),
}
}
pub fn for_slice(self, slice: &(impl AsRef<[u8]> + ?Sized)) -> SliceNTriplesParser<'_> {
SliceNTriplesParser {
inner: NQuadsRecognizer::new_parser(slice.as_ref(), true, false, self.lenient)
.into_iter(),
}
}
pub fn split_slice_for_parallel_parsing(
self,
slice: &(impl AsRef<[u8]> + ?Sized),
target_parallelism: usize,
) -> Vec<SliceNTriplesParser<'_>> {
let slice = slice.as_ref();
let n_chunks = (slice.len() / MIN_PARALLEL_CHUNK_SIZE).clamp(1, target_parallelism);
get_ntriples_slice_chunks(slice, n_chunks)
.into_iter()
.map(|(start, end)| self.clone().for_slice(&slice[start..end]))
.collect()
}
pub fn split_file_for_parallel_parsing(
self,
path: impl AsRef<Path>,
target_parallelism: usize,
) -> io::Result<Vec<ReaderNTriplesParser<Take<File>>>> {
let path = path.as_ref();
let mut file = File::open(path)?;
let file_size = file.metadata()?.len();
let n_chunks = usize::try_from(
file_size / u64::try_from(MIN_PARALLEL_CHUNK_SIZE).map_err(io::Error::other)?,
)
.map_err(io::Error::other)?
.clamp(1, target_parallelism);
get_ntriples_file_chunks(&mut file, file_size, n_chunks)?
.into_iter()
.map(|(start, end)| {
let mut file = File::open(path)?;
file.seek(SeekFrom::Start(start))?;
Ok(self.clone().for_reader(file.take(end - start)))
})
.collect()
}
pub fn low_level(self) -> LowLevelNTriplesParser {
LowLevelNTriplesParser {
parser: NQuadsRecognizer::new_parser(Vec::new(), false, false, self.lenient),
}
}
}
#[must_use]
pub struct ReaderNTriplesParser<R: Read> {
inner: ReaderIterator<R, NQuadsRecognizer>,
}
impl<R: Read> Iterator for ReaderNTriplesParser<R> {
type Item = Result<Triple, TurtleParseError>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.inner.next()?.map(Into::into))
}
}
#[cfg(feature = "async-tokio")]
#[must_use]
pub struct TokioAsyncReaderNTriplesParser<R: AsyncRead + Unpin> {
inner: TokioAsyncReaderIterator<R, NQuadsRecognizer>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> TokioAsyncReaderNTriplesParser<R> {
pub async fn next(&mut self) -> Option<Result<Triple, TurtleParseError>> {
Some(self.inner.next().await?.map(Into::into))
}
}
#[must_use]
pub struct SliceNTriplesParser<'a> {
inner: SliceIterator<'a, NQuadsRecognizer>,
}
impl Iterator for SliceNTriplesParser<'_> {
type Item = Result<Triple, TurtleSyntaxError>;
fn next(&mut self) -> Option<Self::Item> {
Some(self.inner.next()?.map(Into::into))
}
}
pub struct LowLevelNTriplesParser {
parser: Parser<Vec<u8>, NQuadsRecognizer>,
}
impl LowLevelNTriplesParser {
pub fn extend_from_slice(&mut self, other: &[u8]) {
self.parser.extend_from_slice(other)
}
pub fn end(&mut self) {
self.parser.end()
}
pub fn is_end(&self) -> bool {
self.parser.is_end()
}
pub fn parse_next(&mut self) -> Option<Result<Triple, TurtleSyntaxError>> {
Some(self.parser.parse_next()?.map(Into::into))
}
}
#[derive(Default, Clone)]
#[must_use]
#[expect(clippy::empty_structs_with_brackets)]
pub struct NTriplesSerializer {}
impl NTriplesSerializer {
#[inline]
pub fn new() -> Self {
Self {}
}
pub fn for_writer<W: Write>(self, writer: W) -> WriterNTriplesSerializer<W> {
WriterNTriplesSerializer {
writer,
low_level_writer: self.low_level(),
}
}
#[cfg(feature = "async-tokio")]
pub fn for_tokio_async_writer<W: AsyncWrite + Unpin>(
self,
writer: W,
) -> TokioAsyncWriterNTriplesSerializer<W> {
TokioAsyncWriterNTriplesSerializer {
writer,
low_level_writer: self.low_level(),
buffer: Vec::new(),
}
}
#[expect(clippy::unused_self)]
pub fn low_level(self) -> LowLevelNTriplesSerializer {
LowLevelNTriplesSerializer {}
}
}
#[must_use]
pub struct WriterNTriplesSerializer<W: Write> {
writer: W,
low_level_writer: LowLevelNTriplesSerializer,
}
impl<W: Write> WriterNTriplesSerializer<W> {
pub fn serialize_triple<'a>(&mut self, t: impl Into<TripleRef<'a>>) -> io::Result<()> {
self.low_level_writer.serialize_triple(t, &mut self.writer)
}
pub fn finish(self) -> W {
self.writer
}
}
#[cfg(feature = "async-tokio")]
#[must_use]
pub struct TokioAsyncWriterNTriplesSerializer<W: AsyncWrite + Unpin> {
writer: W,
low_level_writer: LowLevelNTriplesSerializer,
buffer: Vec<u8>,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> TokioAsyncWriterNTriplesSerializer<W> {
pub async fn serialize_triple<'a>(&mut self, t: impl Into<TripleRef<'a>>) -> io::Result<()> {
self.low_level_writer
.serialize_triple(t, &mut self.buffer)?;
self.writer.write_all(&self.buffer).await?;
self.buffer.clear();
Ok(())
}
pub fn finish(self) -> W {
self.writer
}
}
#[expect(clippy::empty_structs_with_brackets)]
pub struct LowLevelNTriplesSerializer {}
impl LowLevelNTriplesSerializer {
#[expect(clippy::unused_self)]
pub fn serialize_triple<'a>(
&mut self,
t: impl Into<TripleRef<'a>>,
mut writer: impl Write,
) -> io::Result<()> {
writeln!(writer, "{} .", t.into())
}
}
#[cfg(test)]
mod tests {
use super::*;
use oxrdf::{Literal, NamedNode};
#[test]
fn lenient_parsing() {
let triples = NTriplesParser::new()
.lenient()
.for_reader(r#"<foo> <bar> "baz"@toolonglangtag ."#.as_bytes())
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
triples,
[Triple::new(
NamedNode::new_unchecked("foo"),
NamedNode::new_unchecked("bar"),
Literal::new_language_tagged_literal_unchecked("baz", "toolonglangtag"),
)]
)
}
}