use crate::io::{RdfParseError, RdfParser, RdfSerializer};
use crate::model::*;
#[expect(deprecated)]
use crate::sparql::{
Query, QueryEvaluationError, QueryExplanation, QueryResults, SparqlEvaluator, Update,
UpdateEvaluationError,
};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
use crate::storage::StorageOptions;
#[cfg(not(target_family = "wasm"))]
use crate::storage::map_thread_result;
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError};
use crate::storage::{
DEFAULT_BULK_LOAD_BATCH_SIZE, DecodingGraphIterator, DecodingQuadIterator, Storage,
StorageBulkLoader, StorageReadableTransaction, StorageReader,
};
#[cfg(not(target_family = "wasm"))]
use std::cmp::max;
use std::fmt;
#[cfg(not(target_family = "wasm"))]
use std::fs::File;
use std::io::{Read, Write};
use std::mem::swap;
#[cfg(not(target_family = "wasm"))]
use std::num::NonZero;
#[cfg(not(target_family = "wasm"))]
use std::path::Path;
use std::sync::Arc;
#[cfg(not(target_family = "wasm"))]
use std::sync::mpsc;
#[cfg(not(target_family = "wasm"))]
use std::thread;
#[cfg(not(target_family = "wasm"))]
use std::thread::available_parallelism;
#[derive(Clone)]
pub struct Store {
storage: Storage,
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[derive(Clone, Debug, Default)]
pub struct StoreOptions {
max_open_files: Option<StoreMaxOpenFiles>,
fd_reserve: Option<u32>,
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum StoreMaxOpenFiles {
Limited(u32),
Unlimited,
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
impl StoreOptions {
#[must_use]
pub fn with_max_open_files(mut self, max_open_files: u32) -> Self {
self.max_open_files = Some(StoreMaxOpenFiles::Limited(max_open_files));
self
}
#[must_use]
pub fn with_unlimited_max_open_files(mut self) -> Self {
self.max_open_files = Some(StoreMaxOpenFiles::Unlimited);
self
}
#[must_use]
pub fn with_fd_reserve(mut self, fd_reserve: u32) -> Self {
self.fd_reserve = Some(fd_reserve);
self
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
impl From<StoreOptions> for StorageOptions {
fn from(value: StoreOptions) -> Self {
Self::new(
value
.max_open_files
.map(|max_open_files| match max_open_files {
StoreMaxOpenFiles::Limited(value) => value.try_into().unwrap_or(i32::MAX),
StoreMaxOpenFiles::Unlimited => -1,
}),
value.fd_reserve,
)
}
}
impl Store {
pub fn new() -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::new()?,
})
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open(path.as_ref())?,
})
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_with_options(
path: impl AsRef<Path>,
options: StoreOptions,
) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open_with_options(path.as_ref(), options.into())?,
})
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open_read_only(path.as_ref())?,
})
}
#[deprecated(note = "Use `SparqlEvaluator` interface instead", since = "0.5.0")]
#[expect(deprecated)]
pub fn query(
&self,
query: impl TryInto<Query, Error = impl Into<QueryEvaluationError>>,
) -> Result<QueryResults<'static>, QueryEvaluationError> {
self.query_opt(query, SparqlEvaluator::new())
}
#[deprecated(note = "Use `SparqlEvaluator` interface instead", since = "0.5.0")]
#[expect(deprecated)]
pub fn query_opt(
&self,
query: impl TryInto<Query, Error = impl Into<QueryEvaluationError>>,
options: SparqlEvaluator,
) -> Result<QueryResults<'static>, QueryEvaluationError> {
self.query_opt_with_substituted_variables(query, options, [])
}
#[deprecated(note = "Use `SparqlEvaluator` interface instead", since = "0.5.0")]
#[expect(deprecated)]
pub fn query_opt_with_substituted_variables(
&self,
query: impl TryInto<Query, Error = impl Into<QueryEvaluationError>>,
options: SparqlEvaluator,
substitutions: impl IntoIterator<Item = (Variable, Term)>,
) -> Result<QueryResults<'static>, QueryEvaluationError> {
let mut evaluator = options.for_query(query.try_into().map_err(Into::into)?);
for (variable, term) in substitutions {
evaluator = evaluator.substitute_variable(variable, term);
}
evaluator.on_store(self).execute()
}
#[deprecated(note = "Use `SparqlEvaluator` interface instead", since = "0.5.0")]
#[expect(deprecated)]
pub fn explain_query_opt(
&self,
query: impl TryInto<Query, Error = impl Into<QueryEvaluationError>>,
options: SparqlEvaluator,
with_stats: bool,
) -> Result<
(
Result<QueryResults<'static>, QueryEvaluationError>,
QueryExplanation,
),
QueryEvaluationError,
> {
let mut prepared = options
.for_query(query.try_into().map_err(Into::into)?)
.on_store(self);
if with_stats {
prepared = prepared.compute_statistics();
}
Ok(prepared.explain())
}
#[deprecated(note = "Use `SparqlEvaluator` interface instead", since = "0.5.0")]
#[expect(deprecated)]
pub fn explain_query_opt_with_substituted_variables(
&self,
query: impl TryInto<Query, Error = impl Into<QueryEvaluationError>>,
options: SparqlEvaluator,
with_stats: bool,
substitutions: impl IntoIterator<Item = (Variable, Term)>,
) -> Result<
(
Result<QueryResults<'static>, QueryEvaluationError>,
QueryExplanation,
),
QueryEvaluationError,
> {
let mut prepared = options
.for_query(query.try_into().map_err(Into::into)?)
.on_store(self);
if with_stats {
prepared = prepared.compute_statistics();
}
for (variable, term) in substitutions {
prepared = prepared.substitute_variable(variable, term);
}
Ok(prepared.explain())
}
pub fn quads_for_pattern(
&self,
subject: Option<NamedOrBlankNodeRef<'_>>,
predicate: Option<NamedNodeRef<'_>>,
object: Option<TermRef<'_>>,
graph_name: Option<GraphNameRef<'_>>,
) -> QuadIter<'static> {
let reader = self.storage.snapshot();
QuadIter {
iter: reader.quads_for_pattern(
subject.map(EncodedTerm::from).as_ref(),
predicate.map(EncodedTerm::from).as_ref(),
object.map(EncodedTerm::from).as_ref(),
graph_name.map(EncodedTerm::from).as_ref(),
),
reader,
}
}
pub fn iter(&self) -> QuadIter<'static> {
self.quads_for_pattern(None, None, None, None)
}
pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = EncodedQuad::from(quad.into());
self.storage.snapshot().contains(&quad)
}
pub fn len(&self) -> Result<usize, StorageError> {
self.storage.snapshot().len()
}
pub fn is_empty(&self) -> Result<bool, StorageError> {
self.storage.snapshot().is_empty()
}
pub fn start_transaction(&self) -> Result<Transaction<'_>, StorageError> {
Ok(Transaction {
inner: self.storage.start_readable_transaction()?,
})
}
#[expect(deprecated)]
pub fn update(
&self,
update: impl TryInto<Update, Error = impl Into<UpdateEvaluationError>>,
) -> Result<(), UpdateEvaluationError> {
self.update_opt(update, SparqlEvaluator::new())
}
#[expect(deprecated)]
pub fn update_opt(
&self,
update: impl TryInto<Update, Error = impl Into<UpdateEvaluationError>>,
options: SparqlEvaluator,
) -> Result<(), UpdateEvaluationError> {
options
.for_update(update.try_into().map_err(Into::into)?)
.on_store(self)
.execute()
}
pub fn load_from_reader(
&self,
parser: impl Into<RdfParser>,
reader: impl Read,
) -> Result<(), LoaderError> {
let mut transaction = self.storage.start_transaction()?;
for quad in parser.into().rename_blank_nodes().for_reader(reader) {
transaction.insert(quad?.as_ref());
}
transaction.commit()?;
Ok(())
}
pub fn load_from_slice(
&self,
parser: impl Into<RdfParser>,
slice: &(impl AsRef<[u8]> + ?Sized),
) -> Result<(), LoaderError> {
let mut transaction = self.storage.start_transaction()?;
for quad in parser.into().rename_blank_nodes().for_slice(slice.as_ref()) {
transaction.insert(quad.map_err(RdfParseError::Syntax)?.as_ref());
}
transaction.commit()?;
Ok(())
}
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<(), StorageError> {
let mut transaction = self.storage.start_transaction()?;
transaction.insert(quad.into());
transaction.commit()?;
Ok(())
}
pub fn extend(
&self,
quads: impl IntoIterator<Item = impl Into<Quad>>,
) -> Result<(), StorageError> {
let mut transaction = self.storage.start_transaction()?;
for quad in quads {
transaction.insert(quad.into().as_ref());
}
transaction.commit()?;
Ok(())
}
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<(), StorageError> {
let mut transaction = self.storage.start_transaction()?;
transaction.remove(quad.into());
transaction.commit()?;
Ok(())
}
pub fn dump_to_writer<W: Write>(
&self,
serializer: impl Into<RdfSerializer>,
writer: W,
) -> Result<W, SerializerError> {
let serializer = serializer.into();
if !serializer.format().supports_datasets() {
return Err(SerializerError::DatasetFormatExpected(serializer.format()));
}
let mut serializer = serializer.for_writer(writer);
for quad in self {
serializer.serialize_quad(&quad?)?;
}
Ok(serializer.finish()?)
}
pub fn dump_graph_to_writer<'a, W: Write>(
&self,
from_graph_name: impl Into<GraphNameRef<'a>>,
serializer: impl Into<RdfSerializer>,
writer: W,
) -> Result<W, SerializerError> {
let mut serializer = serializer.into().for_writer(writer);
for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) {
serializer.serialize_triple(quad?.as_ref())?;
}
Ok(serializer.finish()?)
}
pub fn named_graphs(&self) -> GraphNameIter<'static> {
let reader = self.storage.snapshot();
GraphNameIter {
iter: reader.named_graphs(),
reader,
}
}
pub fn contains_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, StorageError> {
let graph_name = EncodedTerm::from(graph_name.into());
self.storage.snapshot().contains_named_graph(&graph_name)
}
pub fn insert_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<(), StorageError> {
let mut transaction = self.storage.start_transaction()?;
transaction.insert_named_graph(graph_name.into());
transaction.commit()?;
Ok(())
}
pub fn clear_graph<'a>(
&self,
graph_name: impl Into<GraphNameRef<'a>>,
) -> Result<(), StorageError> {
let graph_name = graph_name.into();
if graph_name.is_default_graph() {
let mut transaction = self.storage.start_transaction()?;
transaction.clear_default_graph();
transaction.commit()
} else {
let mut transaction = self.storage.start_readable_transaction()?;
transaction.clear_graph(graph_name)?;
transaction.commit()
}
}
pub fn remove_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<(), StorageError> {
let mut transaction = self.storage.start_readable_transaction()?;
transaction.remove_named_graph(graph_name.into())?;
transaction.commit()?;
Ok(())
}
pub fn clear(&self) -> Result<(), StorageError> {
let mut transaction = self.storage.start_transaction()?;
transaction.clear();
transaction.commit()
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn flush(&self) -> Result<(), StorageError> {
self.storage.flush()
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn optimize(&self) -> Result<(), StorageError> {
self.storage.compact()
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn backup(&self, target_directory: impl AsRef<Path>) -> Result<(), StorageError> {
self.storage.backup(target_directory.as_ref())
}
pub fn bulk_loader(&self) -> BulkLoader<'_> {
BulkLoader {
storage: self.storage.bulk_loader(),
num_threads: None,
max_memory_size: None,
on_parse_error: None,
}
}
#[doc(hidden)]
pub fn validate(&self) -> Result<(), StorageError> {
self.storage.snapshot().validate()
}
pub(super) fn storage(&self) -> &Storage {
&self.storage
}
}
impl fmt::Display for Store {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for t in self {
writeln!(f, "{} .", t.map_err(|_| fmt::Error)?)?;
}
Ok(())
}
}
impl IntoIterator for &Store {
type IntoIter = QuadIter<'static>;
type Item = Result<Quad, StorageError>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[must_use]
pub struct Transaction<'a> {
inner: StorageReadableTransaction<'a>,
}
impl<'a> Transaction<'a> {
#[deprecated(note = "Use `SparqlEvaluator` interface instead", since = "0.5.0")]
#[expect(deprecated)]
pub fn query(
&self,
query: impl TryInto<Query, Error = impl Into<QueryEvaluationError>>,
) -> Result<QueryResults<'_>, QueryEvaluationError> {
self.query_opt(query, SparqlEvaluator::new())
}
#[deprecated(note = "Use `SparqlEvaluator` interface instead", since = "0.5.0")]
#[expect(deprecated)]
pub fn query_opt(
&self,
query: impl TryInto<Query, Error = impl Into<QueryEvaluationError>>,
options: SparqlEvaluator,
) -> Result<QueryResults<'_>, QueryEvaluationError> {
options
.for_query(query.try_into().map_err(Into::into)?)
.on_transaction(self)
.execute()
}
pub fn quads_for_pattern(
&self,
subject: Option<NamedOrBlankNodeRef<'_>>,
predicate: Option<NamedNodeRef<'_>>,
object: Option<TermRef<'_>>,
graph_name: Option<GraphNameRef<'_>>,
) -> QuadIter<'_> {
let reader = self.inner.reader();
QuadIter {
iter: reader.quads_for_pattern(
subject.map(EncodedTerm::from).as_ref(),
predicate.map(EncodedTerm::from).as_ref(),
object.map(EncodedTerm::from).as_ref(),
graph_name.map(EncodedTerm::from).as_ref(),
),
reader,
}
}
pub fn iter(&self) -> QuadIter<'_> {
self.quads_for_pattern(None, None, None, None)
}
pub fn contains<'b>(&self, quad: impl Into<QuadRef<'b>>) -> Result<bool, StorageError> {
let quad = EncodedQuad::from(quad.into());
self.inner.reader().contains(&quad)
}
pub fn len(&self) -> Result<usize, StorageError> {
self.inner.reader().len()
}
pub fn is_empty(&self) -> Result<bool, StorageError> {
self.inner.reader().is_empty()
}
#[expect(deprecated)]
pub fn update(
&mut self,
update: impl TryInto<Update, Error = impl Into<UpdateEvaluationError>>,
) -> Result<(), UpdateEvaluationError> {
self.update_opt(update, SparqlEvaluator::new())
}
#[expect(deprecated)]
pub fn update_opt(
&mut self,
update: impl TryInto<Update, Error = impl Into<UpdateEvaluationError>>,
options: SparqlEvaluator,
) -> Result<(), UpdateEvaluationError> {
options
.for_update(update.try_into().map_err(Into::into)?)
.on_transaction(self)
.execute()
}
pub fn load_from_reader(
&mut self,
parser: impl Into<RdfParser>,
reader: impl Read,
) -> Result<(), LoaderError> {
for quad in parser.into().rename_blank_nodes().for_reader(reader) {
self.insert(quad?.as_ref());
}
Ok(())
}
pub fn load_from_slice(
&mut self,
parser: impl Into<RdfParser>,
slice: &(impl AsRef<[u8]> + ?Sized),
) -> Result<(), LoaderError> {
for quad in parser.into().rename_blank_nodes().for_slice(slice) {
self.insert(quad.map_err(RdfParseError::Syntax)?.as_ref());
}
Ok(())
}
pub fn insert<'b>(&mut self, quad: impl Into<QuadRef<'b>>) {
self.inner.insert(quad.into())
}
pub fn extend<'b>(&mut self, quads: impl IntoIterator<Item = impl Into<QuadRef<'b>>>) {
for quad in quads {
self.inner.insert(quad.into());
}
}
pub fn remove<'b>(&mut self, quad: impl Into<QuadRef<'b>>) {
self.inner.remove(quad.into())
}
pub fn named_graphs(&self) -> GraphNameIter<'_> {
let reader = self.inner.reader();
GraphNameIter {
iter: reader.named_graphs(),
reader,
}
}
pub fn contains_named_graph<'b>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'b>>,
) -> Result<bool, StorageError> {
self.inner
.reader()
.contains_named_graph(&EncodedTerm::from(graph_name.into()))
}
pub fn insert_named_graph<'b>(&mut self, graph_name: impl Into<NamedOrBlankNodeRef<'b>>) {
self.inner.insert_named_graph(graph_name.into())
}
pub fn clear_graph<'b>(
&mut self,
graph_name: impl Into<GraphNameRef<'b>>,
) -> Result<(), StorageError> {
self.inner.clear_graph(graph_name.into())
}
pub fn remove_named_graph<'b>(
&mut self,
graph_name: impl Into<NamedOrBlankNodeRef<'b>>,
) -> Result<(), StorageError> {
self.inner.remove_named_graph(graph_name.into())
}
pub fn clear(&mut self) -> Result<(), StorageError> {
self.inner.clear()
}
pub fn commit(self) -> Result<(), StorageError> {
self.inner.commit()
}
pub(super) fn inner(&self) -> &StorageReadableTransaction<'a> {
&self.inner
}
pub(super) fn inner_mut(&mut self) -> &mut StorageReadableTransaction<'a> {
&mut self.inner
}
}
impl<'a> IntoIterator for &'a Transaction<'_> {
type Item = Result<Quad, StorageError>;
type IntoIter = QuadIter<'a>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[must_use]
pub struct QuadIter<'a> {
iter: DecodingQuadIterator<'a>,
reader: StorageReader<'a>,
}
impl Iterator for QuadIter<'_> {
type Item = Result<Quad, StorageError>;
fn next(&mut self) -> Option<Self::Item> {
Some(match self.iter.next()? {
Ok(quad) => self.reader.decode_quad(&quad),
Err(error) => Err(error),
})
}
}
#[must_use]
pub struct GraphNameIter<'a> {
iter: DecodingGraphIterator<'a>,
reader: StorageReader<'a>,
}
impl Iterator for GraphNameIter<'_> {
type Item = Result<NamedOrBlankNode, StorageError>;
fn next(&mut self) -> Option<Self::Item> {
Some(
self.iter
.next()?
.and_then(|graph_name| self.reader.decode_named_or_blank_node(&graph_name)),
)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
#[must_use]
pub struct BulkLoader<'a> {
storage: StorageBulkLoader<'a>,
num_threads: Option<usize>,
max_memory_size: Option<usize>,
on_parse_error: Option<Arc<dyn Fn(RdfParseError) -> Result<(), RdfParseError> + Send + Sync>>,
}
impl BulkLoader<'_> {
pub fn with_num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = Some(num_threads);
self
}
pub fn with_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self {
self.max_memory_size = Some(max_memory_size);
self
}
#[cfg(not(target_family = "wasm"))]
fn target_num_threads(&self) -> usize {
max(
1,
self.num_threads
.unwrap_or_else(|| available_parallelism().map_or(1, NonZero::get)),
)
}
#[cfg(target_family = "wasm")]
#[expect(clippy::unused_self)]
fn target_num_threads(&self) -> usize {
1
}
fn target_batch_size(&self) -> usize {
if let Some(max_memory_size) = self.max_memory_size {
max_memory_size * 1000 / self.target_num_threads()
} else {
DEFAULT_BULK_LOAD_BATCH_SIZE
}
}
pub fn without_atomicity(mut self) -> Self {
self.storage = self.storage.without_atomicity();
self
}
pub fn on_progress(mut self, callback: impl Fn(u64) + Send + Sync + 'static) -> Self {
self.storage = self.storage.on_progress(callback);
self
}
pub fn on_parse_error(
mut self,
callback: impl Fn(RdfParseError) -> Result<(), RdfParseError> + Send + Sync + 'static,
) -> Self {
self.on_parse_error = Some(Arc::new(callback));
self
}
pub fn load_from_reader(
&mut self,
parser: impl Into<RdfParser>,
reader: impl Read,
) -> Result<(), LoaderError> {
let on_parse_error = self.on_parse_error.as_ref().map(Arc::clone);
self.load_ok_quads(
parser
.into()
.rename_blank_nodes()
.for_reader(reader)
.filter_map(|r| match r {
Ok(q) => Some(Ok(q)),
Err(e) => {
if let Some(callback) = &on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e))
}
}
}),
)
}
pub fn load_from_slice(
&mut self,
parser: impl Into<RdfParser>,
slice: &(impl AsRef<[u8]> + ?Sized),
) -> Result<(), LoaderError> {
let on_parse_error = self.on_parse_error.as_ref().map(Arc::clone);
self.load_ok_quads(
parser
.into()
.rename_blank_nodes()
.for_slice(slice)
.filter_map(|r| match r {
Ok(q) => Some(Ok(q)),
Err(e) => {
if let Some(callback) = &on_parse_error {
if let Err(e) = callback(e.into()) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e.into()))
}
}
}),
)
}
#[cfg(not(target_family = "wasm"))]
pub fn parallel_load_from_file(
&mut self,
parser: impl Into<RdfParser>,
path: impl AsRef<Path>,
) -> Result<(), LoaderError> {
let target_num_threads = self.target_num_threads() / 2;
if target_num_threads < 2 {
return self.load_from_reader(parser, File::open(path).map_err(RdfParseError::from)?);
}
let target_batch_size = self.target_batch_size();
let on_parse_error = self.on_parse_error.as_ref().map(Arc::clone);
let parsers = parser
.into()
.rename_blank_nodes()
.split_file_for_parallel_parsing(path, target_num_threads)
.map_err(RdfParseError::Io)?;
thread::scope(|scope| {
let (sender, receiver) = mpsc::sync_channel(1);
let threads = parsers
.into_iter()
.map(|parser| {
let sender = sender.clone();
let on_parse_error = on_parse_error.clone();
scope.spawn(move || {
let mut batch = Vec::with_capacity(target_batch_size);
for result in parser {
match result {
Ok(quad) => {
batch.push(quad);
if batch.len() >= target_batch_size {
let mut batch_to_save =
Vec::with_capacity(target_batch_size);
swap(&mut batch, &mut batch_to_save);
if sender.send(batch_to_save).is_err() {
return Ok(());
}
}
}
Err(e) => {
if let Some(callback) = &on_parse_error {
callback(e)?;
} else {
return Err(LoaderError::from(e));
}
}
}
}
if !batch.is_empty() {
let _we_are_returning = sender.send(batch);
}
Ok(())
})
})
.collect::<Vec<_>>();
drop(sender);
while let Ok(batch) = receiver.recv() {
self.storage.load_batch(batch, target_num_threads)?;
}
for thread in threads {
map_thread_result(thread.join()).map_err(StorageError::from)??;
}
Ok(())
})
}
#[cfg(not(target_family = "wasm"))]
pub fn parallel_load_from_slice(
&mut self,
parser: impl Into<RdfParser>,
slice: &(impl AsRef<[u8]> + ?Sized),
) -> Result<(), LoaderError> {
let target_num_threads = self.target_num_threads() / 2;
if target_num_threads < 2 {
return self.load_from_slice(parser, slice);
}
let target_batch_size = self.target_batch_size();
let on_parse_error = self.on_parse_error.as_ref().map(Arc::clone);
let parsers = parser
.into()
.rename_blank_nodes()
.split_slice_for_parallel_parsing(slice, target_num_threads);
thread::scope(|scope| {
let (sender, receiver) = mpsc::sync_channel(1);
let threads = parsers
.into_iter()
.map(|parser| {
let sender = sender.clone();
let on_parse_error = on_parse_error.clone();
scope.spawn(move || {
let mut batch = Vec::with_capacity(target_batch_size);
for result in parser {
match result {
Ok(quad) => {
batch.push(quad);
if batch.len() >= target_batch_size {
let mut batch_to_save =
Vec::with_capacity(target_batch_size);
swap(&mut batch, &mut batch_to_save);
if sender.send(batch_to_save).is_err() {
return Ok(());
}
}
}
Err(e) => {
if let Some(callback) = &on_parse_error {
callback(e.into())?;
} else {
return Err(LoaderError::from(RdfParseError::from(e)));
}
}
}
}
if !batch.is_empty() {
let _we_are_returning = sender.send(batch);
}
Ok(())
})
})
.collect::<Vec<_>>();
drop(sender);
while let Ok(batch) = receiver.recv() {
self.storage.load_batch(batch, target_num_threads)?;
}
for thread in threads {
map_thread_result(thread.join()).map_err(StorageError::from)??;
}
Ok(())
})
}
pub fn load_quads(
&mut self,
quads: impl IntoIterator<Item = impl Into<Quad>>,
) -> Result<(), StorageError> {
self.load_ok_quads(quads.into_iter().map(Ok::<_, StorageError>))
}
pub fn load_ok_quads<EI, EO: From<StorageError> + From<EI>>(
&mut self,
quads: impl IntoIterator<Item = Result<impl Into<Quad>, EI>>,
) -> Result<(), EO> {
let target_num_threads = self.target_num_threads();
let target_batch_size = self.target_batch_size();
let mut batch = Vec::with_capacity(target_batch_size);
for quad in quads {
batch.push(quad?.into());
if batch.len() >= target_batch_size {
let mut batch_to_save = Vec::with_capacity(target_batch_size);
swap(&mut batch, &mut batch_to_save);
self.storage.load_batch(batch_to_save, target_num_threads)?;
}
}
if !batch.is_empty() {
self.storage.load_batch(batch, target_num_threads)?;
}
Ok(())
}
pub fn commit(self) -> Result<(), StorageError> {
self.storage.commit()
}
}
#[cfg(test)]
#[expect(clippy::panic_in_result_fn)]
mod tests {
use super::*;
#[test]
fn test_send_sync() {
fn is_send_sync<T: Send + Sync>() {}
is_send_sync::<Store>();
}
#[test]
fn store() -> Result<(), StorageError> {
use crate::model::*;
let main_s = NamedOrBlankNode::from(BlankNode::default());
let main_p = NamedNode::new_unchecked("http://example.com");
let main_o = Term::from(Literal::from(1));
let main_g = GraphName::from(BlankNode::default());
let default_quad = Quad::new(
main_s.clone(),
main_p.clone(),
main_o.clone(),
GraphName::DefaultGraph,
);
let named_quad = Quad::new(
main_s.clone(),
main_p.clone(),
main_o.clone(),
main_g.clone(),
);
let mut default_quads = vec![
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(0),
GraphName::DefaultGraph,
),
default_quad.clone(),
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(200_000_000),
GraphName::DefaultGraph,
),
];
let all_quads = vec![
named_quad.clone(),
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(200_000_000),
GraphName::DefaultGraph,
),
default_quad.clone(),
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(0),
GraphName::DefaultGraph,
),
];
let store = Store::new()?;
for t in &default_quads {
store.insert(t)?;
assert!(store.contains(t)?);
}
store.insert(&default_quad)?;
store.remove(&default_quad)?;
assert!(!store.contains(&default_quad)?);
store.remove(&default_quad)?;
store.insert(&named_quad)?;
assert!(store.contains(&named_quad)?);
store.insert(&named_quad)?;
store.insert(&default_quad)?;
store.insert(&default_quad)?;
store.validate()?;
assert_eq!(store.len()?, 4);
assert_eq!(store.iter().collect::<Result<Vec<_>, _>>()?, all_quads);
assert_eq!(
store
.quads_for_pattern(Some(main_s.as_ref()), None, None, None)
.collect::<Result<Vec<_>, _>>()?,
all_quads
);
assert_eq!(
store
.quads_for_pattern(Some(main_s.as_ref()), Some(main_p.as_ref()), None, None)
.collect::<Result<Vec<_>, _>>()?,
all_quads
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
Some(main_o.as_ref()),
None
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone(), default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone()]
);
default_quads.reverse();
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
None,
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
default_quads
);
assert_eq!(
store
.quads_for_pattern(Some(main_s.as_ref()), None, Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone(), default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
None,
Some(main_o.as_ref()),
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
None,
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
None,
None,
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
default_quads
);
assert_eq!(
store
.quads_for_pattern(None, Some(main_p.as_ref()), None, None)
.collect::<Result<Vec<_>, _>>()?,
all_quads
);
assert_eq!(
store
.quads_for_pattern(None, Some(main_p.as_ref()), Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone(), default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(None, None, Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone(), default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(None, None, None, Some(GraphNameRef::DefaultGraph))
.collect::<Result<Vec<_>, _>>()?,
default_quads
);
assert_eq!(
store
.quads_for_pattern(
None,
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad]
);
assert_eq!(
store
.quads_for_pattern(
None,
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad]
);
Ok(())
}
}