use crate::{
model::*,
parser::RdfFormat,
rdf_store::{OxirsQueryResults, RdfStore},
transaction::{IsolationLevel, TransactionManager},
OxirsError, Result, Store as OxirsStoreTrait,
};
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
pub struct Store {
inner: Arc<RwLock<RdfStore>>,
tx_manager: Arc<RwLock<Option<TransactionManager>>>,
wal_dir: Option<PathBuf>,
}
impl Store {
pub fn new() -> Result<Self> {
Ok(Store {
inner: Arc::new(RwLock::new(RdfStore::new()?)),
tx_manager: Arc::new(RwLock::new(None)),
wal_dir: None,
})
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path_buf = path.as_ref().to_path_buf();
let wal_dir = path_buf.join("wal");
Ok(Store {
inner: Arc::new(RwLock::new(RdfStore::open(&path_buf)?)),
tx_manager: Arc::new(RwLock::new(None)),
wal_dir: Some(wal_dir),
})
}
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool> {
let quad_ref = quad.into();
let quad = Quad::new(
quad_ref.subject().to_owned(),
quad_ref.predicate().to_owned(),
quad_ref.object().to_owned(),
quad_ref.graph_name().to_owned(),
);
let store = self
.inner
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire write lock: {e}")))?;
store.insert_quad(quad)
}
pub fn extend<'a>(
&self,
quads: impl IntoIterator<Item = impl Into<QuadRef<'a>>>,
) -> Result<()> {
let store = self
.inner
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire write lock: {e}")))?;
for quad in quads {
let quad_ref = quad.into();
let quad = Quad::new(
quad_ref.subject().to_owned(),
quad_ref.predicate().to_owned(),
quad_ref.object().to_owned(),
quad_ref.graph_name().to_owned(),
);
store.insert_quad(quad)?;
}
Ok(())
}
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool> {
let quad_ref = quad.into();
let quad = Quad::new(
quad_ref.subject().to_owned(),
quad_ref.predicate().to_owned(),
quad_ref.object().to_owned(),
quad_ref.graph_name().to_owned(),
);
let store = self
.inner
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire write lock: {e}")))?;
store.remove_quad(&quad)
}
pub fn load_from_reader<R: BufRead>(
&self,
reader: R,
format: RdfFormat,
base_iri: Option<&str>,
graph: Option<impl Into<GraphName>>,
) -> Result<()> {
use crate::parser::Parser;
let mut data = String::new();
let mut reader = reader;
reader
.read_to_string(&mut data)
.map_err(|e| OxirsError::Parse(format!("Failed to read input: {e}")))?;
let mut parser = Parser::new(format);
if let Some(base) = base_iri {
parser = parser.with_base_iri(base);
}
let quads = parser.parse_str_to_quads(&data)?;
let store = self
.inner
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire write lock: {e}")))?;
let target_graph = graph.map(|g| g.into());
for quad in quads {
let final_quad = if let Some(ref g) = target_graph {
Quad::new(
quad.subject().clone(),
quad.predicate().clone(),
quad.object().clone(),
g.clone(),
)
} else {
quad
};
store.insert_quad(final_quad)?;
}
Ok(())
}
pub fn dump_to_writer<'a, W: Write>(
&self,
mut writer: W,
format: RdfFormat,
graph: Option<impl Into<GraphNameRef<'a>>>,
) -> Result<()> {
use crate::model::{dataset::Dataset, graph::Graph};
use crate::serializer::Serializer;
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
let serializer = Serializer::new(format);
let quads = if let Some(g) = graph {
let graph_ref = g.into();
let graph_name = graph_ref.to_owned();
store.query_quads(None, None, None, Some(&graph_name))?
} else {
store.iter_quads()?
};
let output = match format {
RdfFormat::Turtle | RdfFormat::NTriples | RdfFormat::RdfXml => {
let triples: Vec<_> = quads
.into_iter()
.filter(|q| q.is_default_graph())
.map(|q| q.to_triple())
.collect();
let graph = Graph::from_iter(triples);
serializer.serialize_graph(&graph)?
}
RdfFormat::TriG | RdfFormat::NQuads | RdfFormat::JsonLd => {
let dataset = Dataset::from_iter(quads);
serializer.serialize_dataset(&dataset)?
}
};
writer
.write_all(output.as_bytes())
.map_err(|e| OxirsError::Serialize(format!("Failed to write output: {e}")))?;
Ok(())
}
pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool> {
let quad_ref = quad.into();
let quad = Quad::new(
quad_ref.subject().to_owned(),
quad_ref.predicate().to_owned(),
quad_ref.object().to_owned(),
quad_ref.graph_name().to_owned(),
);
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
store.contains_quad(&quad)
}
pub fn len(&self) -> Result<usize> {
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
store.len()
}
pub fn is_empty(&self) -> Result<bool> {
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
store.is_empty()
}
pub fn quads_for_pattern<'a>(
&self,
subject: Option<impl Into<SubjectRef<'a>>>,
predicate: Option<impl Into<PredicateRef<'a>>>,
object: Option<impl Into<ObjectRef<'a>>>,
graph_name: Option<impl Into<GraphNameRef<'a>>>,
) -> QuadIter {
let subject = subject.map(|s| {
let s_ref = s.into();
match s_ref {
SubjectRef::NamedNode(n) => Subject::NamedNode(n.to_owned()),
SubjectRef::BlankNode(b) => Subject::BlankNode(b.to_owned()),
SubjectRef::Variable(v) => Subject::Variable(v.to_owned()),
}
});
let predicate = predicate.map(|p| {
let p_ref = p.into();
match p_ref {
PredicateRef::NamedNode(n) => Predicate::NamedNode(n.to_owned()),
PredicateRef::Variable(v) => Predicate::Variable(v.to_owned()),
}
});
let object = object.map(|o| {
let o_ref = o.into();
match o_ref {
ObjectRef::NamedNode(n) => Object::NamedNode(n.to_owned()),
ObjectRef::BlankNode(b) => Object::BlankNode(b.to_owned()),
ObjectRef::Literal(l) => Object::Literal(l.to_owned()),
ObjectRef::Variable(v) => Object::Variable(v.to_owned()),
}
});
let graph_name = graph_name.map(|g| {
let g_ref = g.into();
g_ref.to_owned()
});
let quads = match self.inner.read() {
Ok(store) => store
.query_quads(
subject.as_ref(),
predicate.as_ref(),
object.as_ref(),
graph_name.as_ref(),
)
.unwrap_or_default(),
_ => Vec::new(),
};
QuadIter { quads, index: 0 }
}
pub fn iter(&self) -> QuadIter {
self.quads_for_pattern(
None::<SubjectRef>,
None::<PredicateRef>,
None::<ObjectRef>,
None::<GraphNameRef>,
)
}
pub fn named_graphs(&self) -> GraphNameIter {
let mut graph_names = std::collections::HashSet::new();
if let Ok(store) = self.inner.read() {
if let Ok(quads) = store.iter_quads() {
for quad in quads {
if let GraphName::NamedNode(n) = quad.graph_name() {
graph_names.insert(n.clone());
}
}
}
}
GraphNameIter {
graphs: graph_names.into_iter().collect(),
index: 0,
}
}
pub fn contains_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool> {
let graph_ref = graph_name.into();
let graph = match graph_ref {
NamedOrBlankNodeRef::NamedNode(n) => GraphName::NamedNode(n.to_owned()),
NamedOrBlankNodeRef::BlankNode(b) => GraphName::BlankNode(b.to_owned()),
};
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
let quads = store.query_quads(None, None, None, Some(&graph))?;
Ok(!quads.is_empty())
}
pub fn clear(&self) -> Result<()> {
let mut store = self
.inner
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire write lock: {e}")))?;
store.clear()
}
pub fn clear_graph<'a>(&self, graph_name: impl Into<GraphNameRef<'a>>) -> Result<()> {
let graph_ref = graph_name.into();
let graph = graph_ref.to_owned();
let store = self
.inner
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire write lock: {e}")))?;
let quads_to_remove = store.query_quads(None, None, None, Some(&graph))?;
for quad in quads_to_remove {
store.remove_quad(&quad)?;
}
Ok(())
}
pub fn query(&self, query: &str) -> Result<QueryResults> {
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
let results = store.query(query)?;
Ok(QueryResults { inner: results })
}
pub fn update(&self, update_str: &str) -> Result<()> {
use crate::query::{UpdateExecutor, UpdateParser};
let parser = UpdateParser::new();
let update = parser.parse(update_str)?;
let store = self
.inner
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire write lock: {e}")))?;
let executor = UpdateExecutor::new(&*store);
executor.execute(&update)?;
Ok(())
}
pub fn transaction<T, E>(
&self,
f: impl FnOnce(&mut crate::AcidTransaction) -> std::result::Result<T, E>,
) -> std::result::Result<T, E>
where
E: From<OxirsError>,
{
self.ensure_tx_manager()?;
let mut tx_mgr_guard = self
.tx_manager
.write()
.map_err(|e| E::from(OxirsError::Store(format!("Failed to acquire lock: {e}"))))?;
let tx_mgr = tx_mgr_guard.as_mut().ok_or_else(|| {
E::from(OxirsError::Store(
"Transaction manager not initialized".to_string(),
))
})?;
let mut transaction = tx_mgr.begin(IsolationLevel::Snapshot).map_err(E::from)?;
let result = f(&mut transaction);
match result {
Ok(value) => {
transaction.commit().map_err(E::from)?;
Ok(value)
}
Err(error) => {
let _ = transaction.abort();
Err(error)
}
}
}
fn ensure_tx_manager(&self) -> Result<()> {
let mut tx_mgr_guard = self
.tx_manager
.write()
.map_err(|e| OxirsError::Store(format!("Failed to acquire lock: {e}")))?;
if tx_mgr_guard.is_none() {
let wal_dir = if let Some(ref wal_path) = self.wal_dir {
wal_path.clone()
} else {
std::env::temp_dir().join("oxirs_wal")
};
let tx_mgr = TransactionManager::new(&wal_dir)?;
*tx_mgr_guard = Some(tx_mgr);
}
Ok(())
}
pub fn validate(&self) -> Result<()> {
Ok(())
}
pub fn optimize(&self) -> Result<()> {
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
store.clear_arena();
Ok(())
}
pub fn backup<P: AsRef<Path>>(&self, path: P) -> Result<()> {
use crate::parser::RdfFormat;
use crate::serializer::Serializer;
use std::fs::File;
use std::io::Write;
use std::time::{SystemTime, UNIX_EPOCH};
let backup_path = path.as_ref();
if let Some(parent) = backup_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| {
OxirsError::Store(format!("Failed to create backup directory: {e}"))
})?;
}
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let backup_file_path = if backup_path.is_dir() {
backup_path.join(format!("oxirs_backup_{timestamp}.nq"))
} else {
backup_path.to_path_buf()
};
let store = self
.inner
.read()
.map_err(|e| OxirsError::Store(format!("Failed to acquire read lock: {e}")))?;
let quads = store
.iter_quads()
.map_err(|e| OxirsError::Store(format!("Failed to iterate quads: {e}")))?;
let dataset = crate::model::dataset::Dataset::from_iter(quads.clone());
let serializer = Serializer::new(RdfFormat::NQuads);
let serialized_data = serializer
.serialize_dataset(&dataset)
.map_err(|e| OxirsError::Store(format!("Failed to serialize dataset: {e}")))?;
let mut backup_file = File::create(&backup_file_path)
.map_err(|e| OxirsError::Store(format!("Failed to create backup file: {e}")))?;
backup_file
.write_all(serialized_data.as_bytes())
.map_err(|e| OxirsError::Store(format!("Failed to write backup data: {e}")))?;
backup_file
.sync_all()
.map_err(|e| OxirsError::Store(format!("Failed to sync backup file: {e}")))?;
let backup_size = serialized_data.len();
let quad_count = quads.len();
tracing::info!(
"Store backup completed successfully. File: {}, Quads: {}, Size: {} bytes",
backup_file_path.display(),
quad_count,
backup_size
);
Ok(())
}
pub fn flush(&self) -> Result<()> {
Ok(())
}
}
impl Default for Store {
fn default() -> Self {
Store::new().expect("Store::new() should not fail")
}
}
pub struct QuadIter {
quads: Vec<Quad>,
index: usize,
}
impl Iterator for QuadIter {
type Item = Quad;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.quads.len() {
let quad = self.quads[self.index].clone();
self.index += 1;
Some(quad)
} else {
None
}
}
}
pub struct GraphNameIter {
graphs: Vec<NamedNode>,
index: usize,
}
impl Iterator for GraphNameIter {
type Item = NamedNode;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.graphs.len() {
let graph = self.graphs[self.index].clone();
self.index += 1;
Some(graph)
} else {
None
}
}
}
pub struct QueryResults {
#[allow(dead_code)]
inner: OxirsQueryResults,
}
impl QueryResults {
pub fn is_boolean(&self) -> bool {
matches!(
self.inner.results(),
crate::rdf_store::types::QueryResults::Boolean(_)
)
}
pub fn boolean(&self) -> Option<bool> {
match self.inner.results() {
crate::rdf_store::types::QueryResults::Boolean(b) => Some(*b),
_ => None,
}
}
pub fn is_solutions(&self) -> bool {
matches!(
self.inner.results(),
crate::rdf_store::types::QueryResults::Bindings(_)
)
}
pub fn is_graph(&self) -> bool {
matches!(
self.inner.results(),
crate::rdf_store::types::QueryResults::Graph(_)
)
}
}
pub struct Transaction {
operations: Vec<TransactionOp>,
}
enum TransactionOp {
#[allow(dead_code)]
Insert(Quad),
#[allow(dead_code)]
Remove(Quad),
}
impl Transaction {
#[allow(dead_code)]
fn new() -> Self {
Transaction {
operations: Vec::new(),
}
}
pub fn insert<'b>(&mut self, quad: impl Into<QuadRef<'b>>) -> Result<bool> {
let quad_ref = quad.into();
let quad = Quad::new(
quad_ref.subject().to_owned(),
quad_ref.predicate().to_owned(),
quad_ref.object().to_owned(),
quad_ref.graph_name().to_owned(),
);
self.operations.push(TransactionOp::Insert(quad));
Ok(true) }
pub fn remove<'b>(&mut self, quad: impl Into<QuadRef<'b>>) -> Result<bool> {
let quad_ref = quad.into();
let quad = Quad::new(
quad_ref.subject().to_owned(),
quad_ref.predicate().to_owned(),
quad_ref.object().to_owned(),
quad_ref.graph_name().to_owned(),
);
self.operations.push(TransactionOp::Remove(quad));
Ok(true) }
}
#[derive(Debug, thiserror::Error)]
pub enum OxigraphCompatError {
#[error("Store error: {0}")]
Store(String),
#[error("Parse error: {0}")]
Parse(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
impl From<OxirsError> for OxigraphCompatError {
fn from(err: OxirsError) -> Self {
match err {
OxirsError::Store(msg) => OxigraphCompatError::Store(msg),
OxirsError::Parse(msg) => OxigraphCompatError::Parse(msg),
_ => OxigraphCompatError::Store(err.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::{Literal, NamedNode};
use crate::parser::RdfFormat;
use std::io::Cursor;
#[test]
fn test_oxigraph_compat_store_creation() {
let store = Store::new().expect("construction should succeed");
assert!(store.is_empty().expect("store operation should succeed"));
assert_eq!(store.len().expect("store operation should succeed"), 0);
}
#[test]
fn test_oxigraph_compat_insert_and_query() {
let store = Store::new().expect("construction should succeed");
let subject = NamedNode::new("http://example.org/subject").expect("valid IRI");
let predicate = NamedNode::new("http://example.org/predicate").expect("valid IRI");
let object = Literal::new("test object");
let graph = NamedNode::new("http://example.org/graph").expect("valid IRI");
let quad = Quad::new(
subject.clone(),
predicate.clone(),
object.clone(),
graph.clone(),
);
assert!(store
.insert(QuadRef::from(&quad))
.expect("store insert should succeed"));
assert_eq!(store.len().expect("store operation should succeed"), 1);
assert!(!store.is_empty().expect("store operation should succeed"));
assert!(store
.contains(QuadRef::from(&quad))
.expect("store contains should succeed"));
let quads: Vec<_> = store
.quads_for_pattern(
Some(SubjectRef::NamedNode(&subject)),
None::<PredicateRef>,
None::<ObjectRef>,
None::<GraphNameRef>,
)
.collect();
assert_eq!(quads.len(), 1);
assert_eq!(quads[0], quad);
assert!(store
.remove(QuadRef::from(&quad))
.expect("store remove should succeed"));
assert!(store.is_empty().expect("store operation should succeed"));
}
#[test]
fn test_oxigraph_compat_extend() {
let store = Store::new().expect("construction should succeed");
let quads = [
Quad::new(
NamedNode::new("http://example.org/s1").expect("valid IRI"),
NamedNode::new("http://example.org/p1").expect("valid IRI"),
Literal::new("o1"),
GraphName::DefaultGraph,
),
Quad::new(
NamedNode::new("http://example.org/s2").expect("valid IRI"),
NamedNode::new("http://example.org/p2").expect("valid IRI"),
Literal::new("o2"),
NamedNode::new("http://example.org/g1").expect("valid IRI"),
),
];
store
.extend(quads.iter().map(QuadRef::from))
.expect("extend should succeed");
assert_eq!(store.len().expect("store operation should succeed"), 2);
}
#[test]
fn test_oxigraph_compat_named_graphs() {
let store = Store::new().expect("construction should succeed");
let s1 = NamedNode::new("http://example.org/s1").expect("valid IRI");
let s2 = NamedNode::new("http://example.org/s2").expect("valid IRI");
let p1 = NamedNode::new("http://example.org/p1").expect("valid IRI");
let p2 = NamedNode::new("http://example.org/p2").expect("valid IRI");
let o1 = Literal::new("o1");
let o2 = Literal::new("o2");
let g1 = NamedNode::new("http://example.org/g1").expect("valid IRI");
let g2 = NamedNode::new("http://example.org/g2").expect("valid IRI");
store
.insert(QuadRef::new(
SubjectRef::NamedNode(&s1),
PredicateRef::NamedNode(&p1),
ObjectRef::Literal(&o1),
GraphNameRef::NamedNode(&g1),
))
.expect("operation should succeed");
store
.insert(QuadRef::new(
SubjectRef::NamedNode(&s2),
PredicateRef::NamedNode(&p2),
ObjectRef::Literal(&o2),
GraphNameRef::NamedNode(&g2),
))
.expect("operation should succeed");
let graphs: Vec<_> = store.named_graphs().collect();
assert_eq!(graphs.len(), 2);
assert!(graphs.contains(&g1));
assert!(graphs.contains(&g2));
assert!(store
.contains_named_graph(NamedOrBlankNodeRef::NamedNode(&g1))
.expect("operation should succeed"));
assert!(store
.contains_named_graph(NamedOrBlankNodeRef::NamedNode(&g2))
.expect("operation should succeed"));
}
#[test]
fn test_oxigraph_compat_clear_graph() {
let store = Store::new().expect("construction should succeed");
let s1 = NamedNode::new("http://example.org/s1").expect("valid IRI");
let s2 = NamedNode::new("http://example.org/s2").expect("valid IRI");
let p1 = NamedNode::new("http://example.org/p1").expect("valid IRI");
let p2 = NamedNode::new("http://example.org/p2").expect("valid IRI");
let o1 = Literal::new("o1");
let o2 = Literal::new("o2");
let graph = NamedNode::new("http://example.org/graph").expect("valid IRI");
store
.insert(QuadRef::new(
SubjectRef::NamedNode(&s1),
PredicateRef::NamedNode(&p1),
ObjectRef::Literal(&o1),
GraphNameRef::NamedNode(&graph),
))
.expect("operation should succeed");
store
.insert(QuadRef::new(
SubjectRef::NamedNode(&s2),
PredicateRef::NamedNode(&p2),
ObjectRef::Literal(&o2),
GraphNameRef::DefaultGraph,
))
.expect("operation should succeed");
assert_eq!(store.len().expect("store operation should succeed"), 2);
store
.clear_graph(GraphNameRef::NamedNode(&graph))
.expect("clear_graph should succeed");
assert_eq!(store.len().expect("store operation should succeed"), 1);
store.clear().expect("store operation should succeed");
assert!(store.is_empty().expect("store operation should succeed"));
}
#[test]
fn test_oxigraph_compat_load_from_reader() {
let store = Store::new().expect("construction should succeed");
let turtle_data = r#"
@prefix ex: <http://example.org/> .
ex:subject ex:predicate "object" .
"#;
let reader = Cursor::new(turtle_data.as_bytes());
store
.load_from_reader(
reader,
RdfFormat::Turtle,
Some("http://example.org/"),
None::<GraphName>,
)
.expect("operation should succeed");
assert_eq!(store.len().expect("store operation should succeed"), 1);
let quads: Vec<_> = store.iter().collect();
assert_eq!(quads.len(), 1);
assert_eq!(
quads[0].subject().to_string(),
"<http://example.org/subject>"
);
assert_eq!(
quads[0].predicate().to_string(),
"<http://example.org/predicate>"
);
}
#[test]
fn test_oxigraph_compat_dump_to_writer() {
let store = Store::new().expect("construction should succeed");
let subject = NamedNode::new("http://example.org/subject").expect("valid IRI");
let predicate = NamedNode::new("http://example.org/predicate").expect("valid IRI");
let object = Literal::new("object");
store
.insert(QuadRef::new(
SubjectRef::NamedNode(&subject),
PredicateRef::NamedNode(&predicate),
ObjectRef::Literal(&object),
GraphNameRef::DefaultGraph,
))
.expect("operation should succeed");
let mut output = Vec::new();
store
.dump_to_writer(&mut output, RdfFormat::NTriples, None::<GraphNameRef>)
.expect("operation should succeed");
let result = String::from_utf8(output).expect("bytes should be valid UTF-8");
assert!(result.contains("<http://example.org/subject>"));
assert!(result.contains("<http://example.org/predicate>"));
assert!(result.contains("\"object\""));
}
}