use crate::io::read::ParseError;
use crate::io::{
DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer,
};
use crate::model::*;
use crate::sparql::{
evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update,
UpdateOptions,
};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::StorageBulkLoader;
use crate::storage::{
ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter,
};
pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError};
use std::error::Error;
use std::io::{BufRead, Write};
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path;
use std::{fmt, str};
#[derive(Clone)]
pub struct Store {
storage: Storage,
}
impl Store {
pub fn new() -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::new()?,
})
}
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open(path.as_ref())?,
})
}
pub fn query(
&self,
query: impl TryInto<Query, Error = impl Into<EvaluationError>>,
) -> Result<QueryResults, EvaluationError> {
self.query_opt(query, QueryOptions::default())
}
pub fn query_opt(
&self,
query: impl TryInto<Query, Error = impl Into<EvaluationError>>,
options: QueryOptions,
) -> Result<QueryResults, EvaluationError> {
evaluate_query(self.storage.snapshot(), query, options)
}
pub fn quads_for_pattern(
&self,
subject: Option<SubjectRef<'_>>,
predicate: Option<NamedNodeRef<'_>>,
object: Option<TermRef<'_>>,
graph_name: Option<GraphNameRef<'_>>,
) -> QuadIter {
let reader = self.storage.snapshot();
QuadIter {
iter: reader.quads_for_pattern(
subject.map(EncodedTerm::from).as_ref(),
predicate.map(EncodedTerm::from).as_ref(),
object.map(EncodedTerm::from).as_ref(),
graph_name.map(EncodedTerm::from).as_ref(),
),
reader,
}
}
pub fn iter(&self) -> QuadIter {
self.quads_for_pattern(None, None, None, None)
}
pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = EncodedQuad::from(quad.into());
self.storage.snapshot().contains(&quad)
}
pub fn len(&self) -> Result<usize, StorageError> {
self.storage.snapshot().len()
}
pub fn is_empty(&self) -> Result<bool, StorageError> {
self.storage.snapshot().is_empty()
}
pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> {
self.storage.transaction(|writer| f(Transaction { writer }))
}
pub fn update(
&self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
) -> Result<(), EvaluationError> {
self.update_opt(update, UpdateOptions::default())
}
pub fn update_opt(
&self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
options: impl Into<UpdateOptions>,
) -> Result<(), EvaluationError> {
let update = update.try_into().map_err(std::convert::Into::into)?;
let options = options.into();
self.storage
.transaction(|mut t| evaluate_update(&mut t, &update, &options))
}
pub fn load_graph<'a>(
&self,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
let quads = parser
.read_triples(reader)?
.collect::<Result<Vec<_>, _>>()?;
let to_graph_name = to_graph_name.into();
self.storage.transaction(move |mut t| {
for quad in &quads {
t.insert(quad.as_ref().in_graph(to_graph_name))?;
}
Ok(())
})
}
pub fn load_dataset(
&self,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
let quads = parser.read_quads(reader)?.collect::<Result<Vec<_>, _>>()?;
self.storage.transaction(move |mut t| {
for quad in &quads {
t.insert(quad.into())?;
}
Ok(())
})
}
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = quad.into();
self.transaction(|mut t| t.insert(quad))
}
pub fn extend(
&self,
quads: impl IntoIterator<Item = impl Into<Quad>>,
) -> Result<(), StorageError> {
let quads = quads.into_iter().map(|q| q.into()).collect::<Vec<_>>();
self.transaction(move |mut t| t.extend(&quads))
}
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = quad.into();
self.transaction(move |mut t| t.remove(quad))
}
pub fn dump_graph<'a>(
&self,
writer: impl Write,
format: GraphFormat,
from_graph_name: impl Into<GraphNameRef<'a>>,
) -> Result<(), SerializerError> {
let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?;
for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) {
writer.write(quad?.as_ref())?;
}
writer.finish()?;
Ok(())
}
pub fn dump_dataset(
&self,
writer: impl Write,
format: DatasetFormat,
) -> Result<(), SerializerError> {
let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?;
for quad in self.iter() {
writer.write(&quad?)?;
}
writer.finish()?;
Ok(())
}
pub fn named_graphs(&self) -> GraphNameIter {
let reader = self.storage.snapshot();
GraphNameIter {
iter: reader.named_graphs(),
reader,
}
}
pub fn contains_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, StorageError> {
let graph_name = EncodedTerm::from(graph_name.into());
self.storage.snapshot().contains_named_graph(&graph_name)
}
pub fn insert_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, StorageError> {
let graph_name = graph_name.into();
self.transaction(|mut t| t.insert_named_graph(graph_name))
}
pub fn clear_graph<'a>(
&self,
graph_name: impl Into<GraphNameRef<'a>>,
) -> Result<(), StorageError> {
let graph_name = graph_name.into();
self.transaction(|mut t| t.clear_graph(graph_name))
}
pub fn remove_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, StorageError> {
let graph_name = graph_name.into();
self.transaction(|mut t| t.remove_named_graph(graph_name))
}
pub fn clear(&self) -> Result<(), StorageError> {
self.transaction(|mut t| t.clear())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> Result<(), StorageError> {
self.storage.flush()
}
#[cfg(not(target_arch = "wasm32"))]
pub fn optimize(&self) -> Result<(), StorageError> {
self.storage.compact()
}
#[cfg(not(target_arch = "wasm32"))]
pub fn backup(&self, target_directory: impl AsRef<Path>) -> Result<(), StorageError> {
self.storage.backup(target_directory.as_ref())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn bulk_loader(&self) -> BulkLoader {
BulkLoader {
storage: StorageBulkLoader::new(self.storage.clone()),
on_parse_error: None,
}
}
#[doc(hidden)]
#[cfg(not(target_arch = "wasm32"))]
pub fn validate(&self) -> Result<(), StorageError> {
self.storage.snapshot().validate()
}
}
impl fmt::Display for Store {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for t in self.iter() {
writeln!(f, "{} .", t.map_err(|_| fmt::Error)?)?;
}
Ok(())
}
}
pub struct Transaction<'a> {
writer: StorageWriter<'a>,
}
impl<'a> Transaction<'a> {
pub fn query(
&self,
query: impl TryInto<Query, Error = impl Into<EvaluationError>>,
) -> Result<QueryResults, EvaluationError> {
self.query_opt(query, QueryOptions::default())
}
pub fn query_opt(
&self,
query: impl TryInto<Query, Error = impl Into<EvaluationError>>,
options: QueryOptions,
) -> Result<QueryResults, EvaluationError> {
evaluate_query(self.writer.reader(), query, options)
}
pub fn quads_for_pattern(
&self,
subject: Option<SubjectRef<'_>>,
predicate: Option<NamedNodeRef<'_>>,
object: Option<TermRef<'_>>,
graph_name: Option<GraphNameRef<'_>>,
) -> QuadIter {
let reader = self.writer.reader();
QuadIter {
iter: reader.quads_for_pattern(
subject.map(EncodedTerm::from).as_ref(),
predicate.map(EncodedTerm::from).as_ref(),
object.map(EncodedTerm::from).as_ref(),
graph_name.map(EncodedTerm::from).as_ref(),
),
reader,
}
}
pub fn iter(&self) -> QuadIter {
self.quads_for_pattern(None, None, None, None)
}
pub fn contains<'b>(&self, quad: impl Into<QuadRef<'b>>) -> Result<bool, StorageError> {
let quad = EncodedQuad::from(quad.into());
self.writer.reader().contains(&quad)
}
pub fn len(&self) -> Result<usize, StorageError> {
self.writer.reader().len()
}
pub fn is_empty(&self) -> Result<bool, StorageError> {
self.writer.reader().is_empty()
}
pub fn update(
&mut self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
) -> Result<(), EvaluationError> {
self.update_opt(update, UpdateOptions::default())
}
pub fn update_opt(
&mut self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
options: impl Into<UpdateOptions>,
) -> Result<(), EvaluationError> {
evaluate_update(
&mut self.writer,
&update.try_into().map_err(std::convert::Into::into)?,
&options.into(),
)
}
pub fn load_graph<'b>(
&mut self,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'b>>,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
let to_graph_name = to_graph_name.into();
for triple in parser.read_triples(reader)? {
self.writer
.insert(triple?.as_ref().in_graph(to_graph_name))?;
}
Ok(())
}
pub fn load_dataset(
&mut self,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
for quad in parser.read_quads(reader)? {
self.writer.insert(quad?.as_ref())?;
}
Ok(())
}
pub fn insert<'b>(&mut self, quad: impl Into<QuadRef<'b>>) -> Result<bool, StorageError> {
self.writer.insert(quad.into())
}
pub fn extend<'b>(
&mut self,
quads: impl IntoIterator<Item = impl Into<QuadRef<'b>>>,
) -> Result<(), StorageError> {
for quad in quads {
self.writer.insert(quad.into())?;
}
Ok(())
}
pub fn remove<'b>(&mut self, quad: impl Into<QuadRef<'b>>) -> Result<bool, StorageError> {
self.writer.remove(quad.into())
}
pub fn named_graphs(&self) -> GraphNameIter {
let reader = self.writer.reader();
GraphNameIter {
iter: reader.named_graphs(),
reader,
}
}
pub fn contains_named_graph<'b>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'b>>,
) -> Result<bool, StorageError> {
self.writer
.reader()
.contains_named_graph(&EncodedTerm::from(graph_name.into()))
}
pub fn insert_named_graph<'b>(
&mut self,
graph_name: impl Into<NamedOrBlankNodeRef<'b>>,
) -> Result<bool, StorageError> {
self.writer.insert_named_graph(graph_name.into())
}
pub fn clear_graph<'b>(
&mut self,
graph_name: impl Into<GraphNameRef<'b>>,
) -> Result<(), StorageError> {
self.writer.clear_graph(graph_name.into())
}
pub fn remove_named_graph<'b>(
&mut self,
graph_name: impl Into<NamedOrBlankNodeRef<'b>>,
) -> Result<bool, StorageError> {
self.writer.remove_named_graph(graph_name.into())
}
pub fn clear(&mut self) -> Result<(), StorageError> {
self.writer.clear()
}
}
pub struct QuadIter {
iter: ChainedDecodingQuadIterator,
reader: StorageReader,
}
impl Iterator for QuadIter {
type Item = Result<Quad, StorageError>;
fn next(&mut self) -> Option<Result<Quad, StorageError>> {
Some(match self.iter.next()? {
Ok(quad) => self.reader.decode_quad(&quad),
Err(error) => Err(error),
})
}
}
pub struct GraphNameIter {
iter: DecodingGraphIterator,
reader: StorageReader,
}
impl Iterator for GraphNameIter {
type Item = Result<NamedOrBlankNode, StorageError>;
fn next(&mut self) -> Option<Result<NamedOrBlankNode, StorageError>> {
Some(
self.iter
.next()?
.and_then(|graph_name| self.reader.decode_named_or_blank_node(&graph_name)),
)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct BulkLoader {
storage: StorageBulkLoader,
on_parse_error: Option<Box<dyn Fn(ParseError) -> Result<(), ParseError>>>,
}
#[cfg(not(target_arch = "wasm32"))]
impl BulkLoader {
pub fn set_num_threads(mut self, num_threads: usize) -> Self {
self.storage = self.storage.set_num_threads(num_threads);
self
}
pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self {
self.storage = self
.storage
.set_max_memory_size_in_megabytes(max_memory_size);
self
}
pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self {
self.storage = self.storage.on_progress(callback);
self
}
pub fn on_parse_error(
mut self,
callback: impl Fn(ParseError) -> Result<(), ParseError> + 'static,
) -> Self {
self.on_parse_error = Some(Box::new(callback));
self
}
pub fn load_dataset(
&self,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
self.storage
.load(parser.read_quads(reader)?.filter_map(|r| match r {
Ok(q) => Some(Ok(q)),
Err(e) => {
if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e))
}
}
}))
}
pub fn load_graph<'a>(
&self,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
let to_graph_name = to_graph_name.into();
self.storage
.load(parser.read_triples(reader)?.filter_map(|r| match r {
Ok(q) => Some(Ok(q.in_graph(to_graph_name.into_owned()))),
Err(e) => {
if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e))
}
}
}))
}
pub fn load_quads(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
self.storage
.load::<StorageError, _, _>(quads.into_iter().map(Ok))
}
}
#[test]
fn store() -> Result<(), StorageError> {
use crate::model::*;
let main_s = Subject::from(BlankNode::default());
let main_p = NamedNode::new("http://example.com").unwrap();
let main_o = Term::from(Literal::from(1));
let main_g = GraphName::from(BlankNode::default());
let default_quad = Quad::new(
main_s.clone(),
main_p.clone(),
main_o.clone(),
GraphName::DefaultGraph,
);
let named_quad = Quad::new(
main_s.clone(),
main_p.clone(),
main_o.clone(),
main_g.clone(),
);
let default_quads = vec![
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(0),
GraphName::DefaultGraph,
),
default_quad.clone(),
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(200_000_000),
GraphName::DefaultGraph,
),
];
let all_quads = vec![
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(0),
GraphName::DefaultGraph,
),
default_quad.clone(),
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(200_000_000),
GraphName::DefaultGraph,
),
named_quad.clone(),
];
let store = Store::new()?;
for t in &default_quads {
assert!(store.insert(t)?);
}
assert!(!store.insert(&default_quad)?);
assert!(store.remove(&default_quad)?);
assert!(!store.remove(&default_quad)?);
assert!(store.insert(&named_quad)?);
assert!(!store.insert(&named_quad)?);
assert!(store.insert(&default_quad)?);
assert!(!store.insert(&default_quad)?);
assert_eq!(store.len()?, 4);
assert_eq!(store.iter().collect::<Result<Vec<_>, _>>()?, all_quads);
assert_eq!(
store
.quads_for_pattern(Some(main_s.as_ref()), None, None, None)
.collect::<Result<Vec<_>, _>>()?,
all_quads
);
assert_eq!(
store
.quads_for_pattern(Some(main_s.as_ref()), Some(main_p.as_ref()), None, None)
.collect::<Result<Vec<_>, _>>()?,
all_quads
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
Some(main_o.as_ref()),
None
)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone(), named_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
None,
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
default_quads
);
assert_eq!(
store
.quads_for_pattern(Some(main_s.as_ref()), None, Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone(), named_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
None,
Some(main_o.as_ref()),
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
None,
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
None,
None,
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
default_quads
);
assert_eq!(
store
.quads_for_pattern(None, Some(main_p.as_ref()), None, None)
.collect::<Result<Vec<_>, _>>()?,
all_quads
);
assert_eq!(
store
.quads_for_pattern(None, Some(main_p.as_ref()), Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone(), named_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(None, None, Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad.clone(), named_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(None, None, None, Some(GraphNameRef::DefaultGraph))
.collect::<Result<Vec<_>, _>>()?,
default_quads
);
assert_eq!(
store
.quads_for_pattern(
None,
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(GraphNameRef::DefaultGraph)
)
.collect::<Result<Vec<_>, _>>()?,
vec![default_quad]
);
assert_eq!(
store
.quads_for_pattern(
None,
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad]
);
Ok(())
}