use alloc::string::FromUtf8Error;
use chrono::Utc;
use oxigraph::io::{RdfFormat, RdfParseError, RdfParser};
use oxigraph::model::{
GraphNameRef, IriParseError, LiteralRef, NamedNodeRef, Quad, QuadRef, TermRef,
};
use oxigraph::sparql::results::QueryResultsFormat;
use oxigraph::sparql::{EvaluationError, QueryResults, QuerySolutionIter};
use oxigraph::store::{LoaderError, SerializerError, StorageError, Store};
use oxjsonld::{JsonLdProfile, JsonLdProfileSet};
use oxttl::TriGParser;
use serde;
use serde_json::Error as SerdeError;
use std::collections::HashMap;
use std::io::Cursor;
use std::path::PathBuf;
use thiserror;
use tracing::{debug, error, info};
macro_rules! PREDICATE {
($e:expr) => {
concat!("ant://colonylib/", "v1/", $e)
};
}
macro_rules! OBJECT {
($e:expr) => {
concat!("ant://colonylib/", "v1/", $e)
};
}
pub const HAS_ADDR_TYPE: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type";
pub const HAS_NAME: &str = "http://schema.org/name";
pub const HAS_DEPTH: &str = PREDICATE!("depth");
pub const HAS_INDEX: &str = PREDICATE!("index");
pub const KEY_COUNT: &str = PREDICATE!("count");
pub const HAS_CREATION_DATE: &str = PREDICATE!("creation");
pub const HAS_MODIFIED_DATE: &str = PREDICATE!("modified");
pub const POD: &str = OBJECT!("pod"); pub const POD_REF: &str = OBJECT!("ref"); pub const DATA: &str = OBJECT!("data"); pub const FREED_POD: &str = OBJECT!("free_pod"); pub const FREED_DATA: &str = OBJECT!("free_data"); pub const ABANDONED: &str = OBJECT!("abandoned");
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Graph(#[from] StorageError),
#[error(transparent)]
Iri(#[from] IriParseError),
#[error(transparent)]
Serializer(#[from] SerializerError),
#[error(transparent)]
Evaluation(#[from] EvaluationError),
#[error(transparent)]
Serde(#[from] SerdeError),
#[error(transparent)]
FromUtf8(#[from] FromUtf8Error),
#[error(transparent)]
Loader(#[from] LoaderError),
#[error(transparent)]
RdfParse(#[from] RdfParseError),
}
#[derive(serde::Serialize)]
#[serde(tag = "kind", content = "message")]
#[serde(rename_all = "camelCase")]
pub enum ErrorKind {
Graph(String),
Iri(String),
Serializer(String),
Evaluation(String),
Serde(String),
FromUtf8(String),
Loader(String),
RdfParse(String),
}
impl serde::Serialize for Error {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let error_message = self.to_string();
let error_kind = match self {
Self::Graph(_) => ErrorKind::Graph(error_message),
Self::Iri(_) => ErrorKind::Iri(error_message),
Self::Serializer(_) => ErrorKind::Serializer(error_message),
Self::Evaluation(_) => ErrorKind::Evaluation(error_message),
Self::Serde(_) => ErrorKind::Serde(error_message),
Self::FromUtf8(_) => ErrorKind::FromUtf8(error_message),
Self::Loader(_) => ErrorKind::Loader(error_message),
Self::RdfParse(_) => ErrorKind::RdfParse(error_message),
};
error_kind.serialize(serializer)
}
}
#[derive(Clone)]
pub struct Graph {
store: Store,
}
impl Graph {
pub fn open(db: &PathBuf) -> Result<Self, Error> {
let store = Store::open(db)?;
info!("Opened graph store at {:?}", db);
Ok(Graph { store })
}
pub fn put_quad(
&self,
subject: &str,
predicate: &str,
object: &str,
graph_name: Option<&str>,
) -> Result<Quad, Error> {
let subject_node = NamedNodeRef::new(subject)?;
let predicate_node = NamedNodeRef::new(predicate)?;
let object_node = match object {
_ if object.starts_with("http://")
|| object.starts_with("https://")
|| object.starts_with("ant://") =>
{
TermRef::NamedNode(NamedNodeRef::new(object)?)
}
_ => TermRef::Literal(LiteralRef::new_simple_literal(object)),
};
let graph_name_ref = match graph_name {
Some(name) => GraphNameRef::NamedNode(NamedNodeRef::new(name)?),
None => GraphNameRef::DefaultGraph,
};
let quad = QuadRef::new(subject_node, predicate_node, object_node, graph_name_ref);
debug!("Creating quad: {:?}", quad);
self.store.remove(quad)?;
self.store.insert(quad)?;
Ok(quad.into_owned())
}
pub fn add_pod_entry(
&mut self,
pod_name: &str,
pod_address: &str,
scratchpad_address: &str,
configuration_address: &str,
configuration_scratchpad_address: &str,
num_keys: u64,
) -> Result<(Vec<u8>, Vec<u8>), Error> {
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let pod = NamedNodeRef::new(pod_iri)?;
self.store.insert_named_graph(pod)?;
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let config = NamedNodeRef::new(configuration_iri)?;
self.store.insert_named_graph(config)?;
self.update_key_count(configuration_address, num_keys)?;
let scratchpad_iri = format!("ant://{scratchpad_address}");
let scratchpad_iri = scratchpad_iri.as_str();
let configuration_scratchpad_iri = format!("ant://{configuration_scratchpad_address}");
let configuration_scratchpad_iri = configuration_scratchpad_iri.as_str();
let date = Utc::now().to_rfc3339();
let date = date.as_str();
let _quad = self.put_quad(pod_iri, HAS_ADDR_TYPE, POD, Some(configuration_iri))?;
let _quad = self.put_quad(pod_iri, HAS_DEPTH, "0", Some(configuration_iri))?;
let _quad = self.put_quad(pod_iri, HAS_NAME, pod_name, Some(pod_iri))?;
let _quad = self.put_quad(pod_iri, HAS_CREATION_DATE, date, Some(pod_iri))?;
let _quad = self.put_quad(pod_iri, HAS_MODIFIED_DATE, date, Some(pod_iri))?;
let _quad = self.put_quad(scratchpad_iri, HAS_ADDR_TYPE, DATA, Some(configuration_iri))?;
let _quad = self.put_quad(scratchpad_iri, HAS_INDEX, "0", Some(pod_iri))?;
let _quad = self.put_quad(scratchpad_iri, HAS_MODIFIED_DATE, date, Some(pod_iri))?;
let _quad = self.put_quad(
configuration_scratchpad_iri,
HAS_INDEX,
"0",
Some(configuration_iri),
)?;
let _quad = self.put_quad(
configuration_scratchpad_iri,
HAS_ADDR_TYPE,
DATA,
Some(configuration_iri),
)?;
let _quad = self.put_quad(
configuration_scratchpad_iri,
HAS_MODIFIED_DATE,
date,
Some(configuration_iri),
)?;
let _quad = self.put_quad(
configuration_iri,
HAS_ADDR_TYPE,
POD,
Some(configuration_iri),
)?;
let _quad = self.put_quad(
configuration_iri,
HAS_NAME,
"User Configuration",
Some(configuration_iri),
)?;
debug!("Pod entries added");
let mut buffer = Vec::new();
self.store
.dump_graph_to_writer(pod, RdfFormat::TriG, &mut buffer)?;
let mut configuration = Vec::new();
self.store
.dump_graph_to_writer(config, RdfFormat::TriG, &mut configuration)?;
Ok((buffer, configuration))
}
pub fn check_pod_exists(&self, pod_address: &str) -> Result<String, Error> {
let query = format!(
"SELECT ?pod WHERE {{ GRAPH ?graph {{ ?pod <{HAS_NAME}> \"{pod_address}\" . }} }}"
);
debug!("Pod exists query: {}", query);
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(pod_node)) = solution.get("pod") {
let pod_iri = pod_node.as_str();
if let Some(address) = pod_iri.strip_prefix("ant://") {
debug!(
"Found address for pod alias \"{}\": {}",
pod_address, address
);
return Ok(address.to_string());
}
}
}
}
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let pod = NamedNodeRef::new(pod_iri)?;
if self.store.contains_named_graph(pod)? {
debug!("Pod address exists: {}", pod_address);
Ok(pod_address.to_string())
} else {
Err(Error::Graph(StorageError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Pod not found",
))))
}
}
pub fn remove_pod_entry(
&mut self,
pod_address: &str,
pod_scratchpads: Vec<String>,
configuration_address: &str,
) -> Result<Vec<u8>, Error> {
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let pod = NamedNodeRef::new(pod_iri)?;
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let config = NamedNodeRef::new(configuration_iri)?;
self.store.clear_graph(pod)?;
let update =
format!("DELETE WHERE {{ GRAPH <{configuration_iri}> {{ <{pod_iri}> ?p ?o . }} }}");
self.store.update(update.as_str())?;
for scratchpad in pod_scratchpads.clone() {
let scratchpad_iri = format!("ant://{scratchpad}");
let scratchpad_iri = scratchpad_iri.as_str();
let update = format!(
"DELETE WHERE {{ GRAPH <{configuration_iri}> {{ <{scratchpad_iri}> ?p ?o . }} }}"
);
self.store.update(update.as_str())?;
}
let _quad = self.put_quad(pod_iri, HAS_ADDR_TYPE, FREED_POD, Some(configuration_iri))?;
for scratchpad in pod_scratchpads {
let scratchpad_iri = format!("ant://{scratchpad}");
let scratchpad_iri = scratchpad_iri.as_str();
let _quad = self.put_quad(
scratchpad_iri,
HAS_ADDR_TYPE,
FREED_DATA,
Some(configuration_iri),
)?;
}
let mut configuration = Vec::new();
self.store
.dump_graph_to_writer(config, RdfFormat::TriG, &mut configuration)?;
Ok(configuration)
}
pub fn rename_pod_entry(
&mut self,
pod_address: &str,
new_name: &str,
) -> Result<Vec<u8>, Error> {
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let pod = NamedNodeRef::new(pod_iri)?;
let update =
format!("DELETE WHERE {{ GRAPH <{pod_iri}> {{ <{pod_iri}> <{HAS_NAME}> ?o . }} }}");
debug!("Delete existing pod name string: {}", update);
self.store.update(update.as_str())?;
let _quad = self.put_quad(pod_iri, HAS_NAME, new_name, Some(pod_iri))?;
debug!("Pod name updated to {}", new_name);
let mut buffer = Vec::new();
self.store
.dump_graph_to_writer(pod, RdfFormat::TriG, &mut buffer)?;
Ok(buffer)
}
pub fn remove_scratchpad_entry(
&mut self,
pod_address: &str,
scratchpad_address: &str,
) -> Result<(), Error> {
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let scratchpad_iri = format!("ant://{scratchpad_address}");
let scratchpad_iri = scratchpad_iri.as_str();
let update =
format!("DELETE WHERE {{ GRAPH <{pod_iri}> {{ <{scratchpad_iri}> ?p ?o . }} }}");
debug!("Delete unused scratchpad from pod string: {}", update);
self.store.update(update.as_str())?;
Ok(())
}
pub fn pod_ref_entry(
&mut self,
pod_address: &str,
pod_ref_address: &str,
configuration_address: &str,
add: bool,
is_local: bool,
) -> Result<(Vec<u8>, Vec<u8>), Error> {
let pod_ref_iri = format!("ant://{pod_ref_address}");
let pod_ref_iri = pod_ref_iri.as_str();
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let update =
format!("DELETE WHERE {{ GRAPH <{configuration_iri}> {{ <{pod_ref_iri}> ?p ?o . }} }}");
debug!("Delete pod_ref from configuration graph string: {}", update);
self.store.update(update.as_str())?;
let update = format!("DELETE WHERE {{ GRAPH <{pod_iri}> {{ <{pod_ref_iri}> ?p ?o . }} }}");
debug!("Delete pod_ref from pod string: {}", update);
self.store.update(update.as_str())?;
if add {
let depth = if is_local { "0" } else { "1" };
let _quad = self.put_quad(pod_ref_iri, HAS_DEPTH, depth, Some(configuration_iri))?;
let _quad = self.put_quad(pod_ref_iri, HAS_ADDR_TYPE, POD_REF, Some(pod_iri))?;
debug!("Pod ref {} added to pod {}", pod_ref_address, pod_address);
} else {
debug!(
"Pod ref {} removed from pod {}",
pod_ref_address, pod_address
);
if is_local {
let _quad = self.put_quad(pod_ref_iri, HAS_DEPTH, "0", Some(configuration_iri))?;
} else {
let query = format!(
r#"
SELECT ?g WHERE {{
GRAPH ?g {{ <{pod_ref_iri}> ?p ?o . }}
GRAPH <{configuration_iri}> {{ ?g <{HAS_DEPTH}> "0" . }}
}}
"#
);
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(graph)) = solution.get("g") {
debug!("Pod ref {} found in graph {}", pod_ref_address, graph);
let _quad = self.put_quad(
pod_ref_iri,
HAS_DEPTH,
"1",
Some(configuration_iri),
)?;
break;
}
}
}
}
}
if is_local {
let _quad = self.put_quad(pod_ref_iri, HAS_ADDR_TYPE, POD, Some(configuration_iri))?;
debug!("Pod ref {} set as local pod", pod_ref_address);
}
let delete_query = format!(
"DELETE WHERE {{ GRAPH <{configuration_iri}> {{ ?subject <{HAS_MODIFIED_DATE}> ?date . }} }}"
);
debug!("Delete existing modified date query: {}", delete_query);
self.store.update(delete_query.as_str())?;
let date = Utc::now().to_rfc3339();
let date = date.as_str();
let _quad = self.put_quad(
configuration_iri,
HAS_MODIFIED_DATE,
date,
Some(configuration_iri),
)?;
let scratchpads = self.get_scratchpads(configuration_address)?;
for scratchpad in scratchpads {
let scratchpad_iri = format!("ant://{scratchpad}");
let scratchpad_iri = scratchpad_iri.as_str();
let _quad = self.put_quad(
scratchpad_iri,
HAS_MODIFIED_DATE,
date,
Some(configuration_iri),
)?;
}
let delete_query = format!(
"DELETE WHERE {{ GRAPH <{pod_iri}> {{ ?subject <{HAS_MODIFIED_DATE}> ?date . }} }}"
);
debug!("Delete existing modified date query: {}", delete_query);
self.store.update(delete_query.as_str())?;
let _quad = self.put_quad(pod_iri, HAS_MODIFIED_DATE, date, Some(pod_iri))?;
let scratchpads = self.get_pod_scratchpads(pod_address)?;
for scratchpad in scratchpads {
let scratchpad_iri = format!("ant://{scratchpad}");
let scratchpad_iri = scratchpad_iri.as_str();
let _quad = self.put_quad(scratchpad_iri, HAS_MODIFIED_DATE, date, Some(pod_iri))?;
}
let pod = oxigraph::model::NamedNodeRef::new(pod_iri)?;
let mut buffer = Vec::new();
self.store
.dump_graph_to_writer(pod, RdfFormat::TriG, &mut buffer)?;
let pod = oxigraph::model::NamedNodeRef::new(configuration_iri)?;
let mut configuration = Vec::new();
self.store
.dump_graph_to_writer(pod, RdfFormat::TriG, &mut configuration)?;
Ok((buffer, configuration))
}
pub fn update_key_count(
&mut self,
configuration_address: &str,
num_keys: u64,
) -> Result<(), Error> {
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let update = format!(
"DELETE WHERE {{ GRAPH <{configuration_iri}> {{ <{configuration_iri}> <{KEY_COUNT}> ?o . }} }}"
);
self.store.update(update.as_str())?;
let _quad = self.put_quad(
configuration_iri,
KEY_COUNT,
&num_keys.to_string(),
Some(configuration_iri),
)?;
Ok(())
}
pub fn put_subject_data(
&mut self,
pod_address: &str,
subject_address: &str,
configuration_address: &str,
data: &str,
) -> Result<(Vec<u8>, Vec<u8>), Error> {
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let pod = NamedNodeRef::new(pod_iri)?;
let subject_iri = format!("ant://{subject_address}");
let subject_iri = subject_iri.as_str();
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let config = NamedNodeRef::new(configuration_iri)?;
let update = format!("DELETE WHERE {{ GRAPH <{pod_iri}> {{ <{subject_iri}> ?p ?o . }} }}");
debug!("Delete string: {}", update);
self.store.update(update.as_str())?;
let data_reader = Cursor::new(data);
let mut profile = JsonLdProfileSet::empty();
profile |= JsonLdProfile::Compacted;
profile |= JsonLdProfile::Context;
self.store.load_from_reader(
RdfParser::from_format(RdfFormat::JsonLd { profile })
.without_named_graphs() .with_default_graph(pod), data_reader,
)?;
let delete_query = format!(
"DELETE WHERE {{ GRAPH <{pod_iri}> {{ ?subject <{HAS_MODIFIED_DATE}> ?date . }} }}"
);
debug!("Delete existing modified date query: {}", delete_query);
self.store.update(delete_query.as_str())?;
let date = Utc::now().to_rfc3339();
let date = date.as_str();
let _quad = self.put_quad(pod_iri, HAS_MODIFIED_DATE, date, Some(pod_iri))?;
let scratchpads = self.get_pod_scratchpads(pod_address)?;
for scratchpad in scratchpads {
let scratchpad_iri = format!("ant://{scratchpad}");
let scratchpad_iri = scratchpad_iri.as_str();
let _quad = self.put_quad(scratchpad_iri, HAS_MODIFIED_DATE, date, Some(pod_iri))?;
}
let mut buffer = Vec::new();
self.store
.dump_graph_to_writer(pod, RdfFormat::TriG, &mut buffer)?;
let delete_query = format!(
"DELETE WHERE {{ GRAPH <{configuration_iri}> {{ ?subject <{HAS_MODIFIED_DATE}> ?date . }} }}"
);
debug!("Delete existing modified date query: {}", delete_query);
self.store.update(delete_query.as_str())?;
let _quad = self.put_quad(
configuration_iri,
HAS_MODIFIED_DATE,
date,
Some(configuration_iri),
)?;
let scratchpads = self.get_scratchpads(configuration_address)?;
for scratchpad in scratchpads {
let scratchpad_iri = format!("ant://{scratchpad}");
let scratchpad_iri = scratchpad_iri.as_str();
let _quad = self.put_quad(
scratchpad_iri,
HAS_MODIFIED_DATE,
date,
Some(configuration_iri),
)?;
}
let mut configuration = Vec::new();
self.store
.dump_graph_to_writer(config, RdfFormat::TriG, &mut configuration)?;
Ok((buffer, configuration))
}
pub fn get_subject_data(&self, subject_address: &str) -> Result<String, Error> {
let subject_iri = format!("ant://{subject_address}");
let query = format!(
"SELECT ?graph ?predicate ?object WHERE {{ GRAPH ?graph {{ <{}> ?predicate ?object . }} }}",
subject_iri.as_str()
);
debug!("Query string: {}", query);
let results = self.store.query(query.as_str())?;
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("Query results: {}", json_str);
Ok(json_str)
}
pub fn get_pod_depth(&self, pod_address: &str) -> Result<u64, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT ?depth WHERE {{ GRAPH ?graph {{ <{pod_iri}> <{HAS_DEPTH}> ?depth . }} }}"
);
debug!("Depth query: {}", query);
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::Literal(literal)) = solution.get("depth")
&& let Ok(depth_value) = literal.value().parse::<u64>()
{
debug!("Found depth {} for pod {}", depth_value, pod_address);
return Ok(depth_value);
}
}
}
debug!("No depth found for pod {}, returning default", pod_address);
Ok(u64::MAX)
}
pub fn update_pod_depth(
&mut self,
pod_address: &str,
configuration_address: &str,
new_depth: u64,
) -> Result<(), Error> {
self.update_pod_depth_internal(pod_address, configuration_address, new_depth, false)
}
pub fn force_set_pod_depth(
&mut self,
pod_address: &str,
configuration_address: &str,
new_depth: u64,
) -> Result<(), Error> {
self.update_pod_depth_internal(pod_address, configuration_address, new_depth, true)
}
fn update_pod_depth_internal(
&mut self,
pod_address: &str,
configuration_address: &str,
new_depth: u64,
force: bool,
) -> Result<(), Error> {
let pod_iri = format!("ant://{pod_address}");
let configuration_iri = format!("ant://{configuration_address}");
let current_depth = self.get_pod_depth(pod_address)?;
if force || new_depth < current_depth {
info!(
"Updating depth for pod {} from {} to {}",
pod_address, current_depth, new_depth
);
let delete_query =
format!("DELETE WHERE {{ GRAPH ?graph {{ <{pod_iri}> <{HAS_DEPTH}> ?depth . }} }}");
debug!("Delete depth query: {}", delete_query);
self.store.update(delete_query.as_str())?;
let _quad = self.put_quad(
&pod_iri,
HAS_DEPTH,
&new_depth.to_string(),
Some(&configuration_iri),
)?;
info!("Set depth {} for pod {}", new_depth, pod_address);
} else {
debug!(
"Not updating depth for pod {} (current: {}, new: {})",
pod_address, current_depth, new_depth
);
}
Ok(())
}
pub fn get_max_pod_depth(&self) -> Result<u64, Error> {
let query = format!(
"SELECT (MAX(?depth) AS ?max_depth) WHERE {{ GRAPH ?graph {{ ?pod <{HAS_DEPTH}> ?depth . }} }}"
);
debug!("Max depth query: {}", query);
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::Literal(literal)) = solution.get("max_depth")
&& let Ok(max_depth_value) = literal.value().parse::<u64>()
{
debug!("Max pod depth found: {}", max_depth_value);
return Ok(max_depth_value);
}
}
}
debug!("No pods found, returning depth 0");
Ok(0)
}
pub fn get_pods_at_depth(&self, depth: u64) -> Result<Vec<String>, Error> {
let query =
format!("SELECT ?pod WHERE {{ GRAPH ?graph {{ ?pod <{HAS_DEPTH}> \"{depth}\" . }} }}");
debug!("Pods at depth query: {}", query);
let mut pods = Vec::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(pod_node)) = solution.get("pod") {
let pod_iri = pod_node.as_str();
if let Some(address) = pod_iri.strip_prefix("ant://") {
pods.push(address.to_string());
}
}
}
}
debug!("Found {} pods at depth {}", pods.len(), depth);
Ok(pods)
}
pub fn get_pod_references(&self, pod_address: &str) -> Result<Vec<String>, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT DISTINCT ?pod_ref WHERE {{ GRAPH <{pod_iri}> {{ ?pod_ref <{HAS_ADDR_TYPE}> <{POD_REF}> . }} }}"
);
debug!("Pod references query: {}", query);
let mut references = Vec::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(ref_node)) = solution.get("pod_ref") {
let ref_iri = ref_node.as_str();
if let Some(address) = ref_iri.strip_prefix("ant://") {
references.push(address.to_string());
}
}
}
}
debug!(
"Found {} references in pod {}",
references.len(),
pod_address
);
Ok(references)
}
pub fn get_free_pointers(&self, pod_address: &str) -> Result<Vec<String>, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT DISTINCT ?pod_ref WHERE {{ GRAPH <{pod_iri}> {{ ?pod_ref <{HAS_ADDR_TYPE}> <{FREED_POD}> . }} }}"
);
debug!("Free pointers query: {}", query);
let mut pointers = Vec::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(ref_node)) = solution.get("pod_ref") {
let ref_iri = ref_node.as_str();
if let Some(address) = ref_iri.strip_prefix("ant://") {
pointers.push(address.to_string());
}
}
}
}
debug!(
"Found {} free pointers in pod {}",
pointers.len(),
pod_address
);
Ok(pointers)
}
pub fn get_free_scratchpads(&self, pod_address: &str) -> Result<Vec<String>, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT DISTINCT ?pod_ref WHERE {{ GRAPH <{pod_iri}> {{ ?pod_ref <{HAS_ADDR_TYPE}> <{FREED_DATA}> . }} }}"
);
debug!("Free scratchpads query: {}", query);
let mut scratchpads = Vec::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(ref_node)) = solution.get("pod_ref") {
let ref_iri = ref_node.as_str();
if let Some(address) = ref_iri.strip_prefix("ant://") {
scratchpads.push(address.to_string());
}
}
}
}
debug!(
"Found {} free scratchpads in pod {}",
scratchpads.len(),
pod_address
);
Ok(scratchpads)
}
pub fn get_pointers(&self, pod_address: &str) -> Result<Vec<String>, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT DISTINCT ?pod_ref WHERE {{ GRAPH <{pod_iri}> {{ ?pod_ref <{HAS_ADDR_TYPE}> <{POD}> . }} }}"
);
debug!("Pointers query: {}", query);
let mut pointers = Vec::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(ref_node)) = solution.get("pod_ref") {
let ref_iri = ref_node.as_str();
if let Some(address) = ref_iri.strip_prefix("ant://") {
pointers.push(address.to_string());
}
}
}
}
debug!("Found {} pointers in pod {}", pointers.len(), pod_address);
Ok(pointers)
}
pub fn get_scratchpads(&self, pod_address: &str) -> Result<Vec<String>, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT DISTINCT ?pod_ref WHERE {{ GRAPH <{pod_iri}> {{ ?pod_ref <{HAS_ADDR_TYPE}> <{DATA}> . }} }}"
);
debug!("Scratchpads query: {}", query);
let mut scratchpads = Vec::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(ref_node)) = solution.get("pod_ref") {
let ref_iri = ref_node.as_str();
if let Some(address) = ref_iri.strip_prefix("ant://") {
scratchpads.push(address.to_string());
}
}
}
}
debug!(
"Found {} scratchpads in pod {}",
scratchpads.len(),
pod_address
);
Ok(scratchpads)
}
pub fn get_key_count(&self, pod_address: &str) -> Result<u64, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT ?count WHERE {{ GRAPH <{pod_iri}> {{ <{pod_iri}> <{KEY_COUNT}> ?count . }} }}"
);
debug!("Key count query: {}", query);
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::Literal(literal)) = solution.get("count")
&& let Ok(count_value) = literal.value().parse::<u64>()
{
debug!("Found key count {} for pod {}", count_value, pod_address);
return Ok(count_value);
}
}
}
Ok(0)
}
pub fn get_pod_subjects(&self, pod_address: &str) -> Result<Vec<String>, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
r#"
SELECT DISTINCT ?subject WHERE {{
GRAPH <{pod_iri}> {{
?subject ?p ?o .
FILTER(STRSTARTS(STR(?subject), "ant://"))
}}
}}
"#
);
debug!("Pod subjects query: {}", query);
let mut subjects = Vec::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(ref_node)) = solution.get("subject") {
let ref_iri = ref_node.as_str();
if let Some(address) = ref_iri.strip_prefix("ant://") {
subjects.push(address.to_string());
}
}
}
}
debug!("Found {} subjects in pod {}", subjects.len(), pod_address);
Ok(subjects)
}
pub fn get_my_pods(&self, configuration_address: &str) -> Result<String, Error> {
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let query = format!(
r#"
SELECT DISTINCT ?subject ?predicate ?object ?graph WHERE {{
{{
SELECT DISTINCT ?subject WHERE {{
GRAPH <{configuration_iri}> {{
?subject <{HAS_ADDR_TYPE}> <{POD}> .
}}
}}
}}
GRAPH ?graph {{
?subject ?predicate ?object .
}}
}}
ORDER BY ?subject ?predicate
"#
);
debug!("My pods query: {}", query);
let results = self.store.query(query.as_str()).unwrap_or_else(|e| {
error!("Error executing advanced search query: {}", e);
QueryResults::Solutions(QuerySolutionIter::new(
std::sync::Arc::new([]),
std::iter::empty(),
))
});
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("My pods results: {}", json_str);
Ok(json_str)
}
pub fn load_pod_into_graph(&mut self, pod_address: &str, trig_data: &str) -> Result<(), Error> {
if !trig_data.trim().is_empty() {
let pod_iri = format!("ant://{pod_address}");
let pod_iri = pod_iri.as_str();
let pod = NamedNodeRef::new(pod_iri)?;
self.store.insert_named_graph(pod)?;
self.store.clear_graph(pod)?;
let data_reader = Cursor::new(trig_data);
self.store.load_from_reader(
RdfParser::from_format(RdfFormat::TriG)
.without_named_graphs() .with_default_graph(pod), data_reader,
)?;
debug!("Successfully loaded TriG data into graph database");
}
Ok(())
}
pub fn browse(&self, limit: Option<u64>) -> Result<String, Error> {
let limit_clause = if let Some(l) = limit {
format!("LIMIT {l}")
} else {
String::new()
};
let query = format!(
r#"
SELECT DISTINCT ?subject ?name ?type ?description ?size ?graph ?depth WHERE {{
GRAPH ?graph {{
?subject <{HAS_NAME}> ?name .
OPTIONAL {{ ?subject <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?type . }}
OPTIONAL {{ ?subject <http://schema.org/description> ?description . }}
OPTIONAL {{ ?subject <http://schema.org/contentSize> ?size . }}
}}
OPTIONAL {{
# Look for depth in any graph (typically configuration graphs)
GRAPH ?config_graph {{
?graph <{HAS_DEPTH}> ?depth .
}}
}}
}}
ORDER BY ASC(COALESCE(?depth, 999999)) ?graph ?subject
{limit_clause}
"#
);
debug!("Browse query: {}", query);
let results = self.store.query(query.as_str())?;
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("Browse results: {}", json_str);
Ok(json_str)
}
pub fn search_content(&self, search_text: &str, limit: Option<u64>) -> Result<String, Error> {
let limit_clause = if let Some(l) = limit {
format!("LIMIT {l}")
} else {
String::new()
};
let search_terms = Self::parse_search_terms(search_text);
if search_terms.is_empty() {
return Ok("[]".to_string()); }
let mut subquery_term_filters = Vec::new();
for term in &search_terms {
let escaped_term = term.replace("\"", "\\\"");
subquery_term_filters.push(format!(
"CONTAINS(LCASE(STR(?filter_object)), LCASE(\"{escaped_term}\"))"
));
}
let subquery_combined_filter = subquery_term_filters.join(" || ");
let mut match_expressions = Vec::new();
for term in &search_terms {
let escaped_term = term.replace("\"", "\\\"");
match_expressions.push(format!(
"IF(CONTAINS(LCASE(STR(?object)), LCASE(\"{escaped_term}\")), 1, 0)"
));
}
let match_count_expr = match_expressions.join(" + ");
let query = format!(
r#"
SELECT ?subject ?predicate ?object ?graph ?depth
(({match_count_expr}) AS ?match_count) WHERE {{
{{
SELECT DISTINCT ?subject WHERE {{
GRAPH ?filter_graph {{
?subject ?filter_predicate ?filter_object .
FILTER(isLiteral(?filter_object) && ({subquery_combined_filter}))
}}
}}
}}
GRAPH ?graph {{
?subject ?predicate ?object .
}}
OPTIONAL {{
# Look for depth in any graph (typically configuration graphs)
GRAPH ?config_graph {{
?graph <{HAS_DEPTH}> ?depth .
}}
}}
}}
ORDER BY DESC(?match_count) ASC(COALESCE(?depth, 999999)) ?graph ?subject
{limit_clause}
"#
);
debug!("Enhanced search query: {}", query);
let results = self.store.query(query.as_str()).unwrap_or_else(|e| {
error!("Error executing enhanced search query: {}", e);
QueryResults::Solutions(QuerySolutionIter::new(
std::sync::Arc::new([]),
std::iter::empty(),
))
});
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("Enhanced search results: {}", json_str);
Ok(json_str)
}
fn parse_search_terms(search_text: &str) -> Vec<String> {
let mut terms = Vec::new();
let chars = search_text.chars();
let mut current_term = String::new();
let mut in_quotes = false;
for ch in chars {
match ch {
'"' => {
if in_quotes {
if !current_term.is_empty() {
terms.push(current_term.trim().to_string());
current_term.clear();
}
in_quotes = false;
} else {
if !current_term.trim().is_empty() {
for word in current_term.split_whitespace() {
if !word.is_empty() {
terms.push(word.to_string());
}
}
current_term.clear();
}
in_quotes = true;
}
}
' ' | '\t' | '\n' | '\r' => {
if in_quotes {
current_term.push(ch);
} else {
if !current_term.trim().is_empty() {
terms.push(current_term.trim().to_string());
current_term.clear();
}
}
}
_ => {
current_term.push(ch);
}
}
}
if !current_term.trim().is_empty() {
if in_quotes {
terms.push(current_term.trim().to_string());
} else {
for word in current_term.split_whitespace() {
if !word.is_empty() {
terms.push(word.to_string());
}
}
}
}
terms
}
pub fn search_by_type(&self, type_uri: &str, limit: Option<u64>) -> Result<String, Error> {
let limit_clause = if let Some(l) = limit {
format!("LIMIT {l}")
} else {
String::new()
};
let query = format!(
r#"
SELECT DISTINCT ?subject ?graph WHERE {{
GRAPH ?graph {{
?subject <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <{type_uri}> .
}}
}}
ORDER BY ?graph ?subject
{limit_clause}
"#
);
debug!("Type search query: {}", query);
let results = self.store.query(query.as_str()).unwrap_or_else(|e| {
error!("Error executing advanced search query: {}", e);
QueryResults::Solutions(QuerySolutionIter::new(
std::sync::Arc::new([]),
std::iter::empty(),
))
});
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("Type search results: {}", json_str);
Ok(json_str)
}
pub fn search_by_predicate(
&self,
predicate_uri: &str,
limit: Option<u64>,
) -> Result<String, Error> {
let limit_clause = if let Some(l) = limit {
format!("LIMIT {l}")
} else {
String::new()
};
let query = format!(
r#"
SELECT DISTINCT ?subject ?object ?graph WHERE {{
GRAPH ?graph {{
?subject <{predicate_uri}> ?object .
}}
}}
ORDER BY ?graph ?subject
{limit_clause}
"#
);
debug!("Predicate search query: {}", query);
let results = self.store.query(query.as_str()).unwrap_or_else(|e| {
error!("Error executing advanced search query: {}", e);
QueryResults::Solutions(QuerySolutionIter::new(
std::sync::Arc::new([]),
std::iter::empty(),
))
});
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("Predicate search results: {}", json_str);
Ok(json_str)
}
pub fn advanced_search(&self, query: &str) -> Result<String, Error> {
debug!("Advanced search query: {}", query);
let results = self.store.query(query).unwrap_or_else(|e| {
error!("Error executing advanced search query: {}", e);
QueryResults::Solutions(QuerySolutionIter::new(
std::sync::Arc::new([]),
std::iter::empty(),
))
});
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("Advanced search results: {}", json_str);
Ok(json_str)
}
pub fn query_builder(&self, criteria: &serde_json::Value) -> Result<String, Error> {
let mut where_clauses = Vec::new();
let mut filters = Vec::new();
if let Some(text) = criteria.get("text").and_then(|v| v.as_str())
&& !text.is_empty()
{
where_clauses.push("?subject ?predicate ?object .".to_string());
filters.push(format!(
"FILTER(isLiteral(?object) && CONTAINS(LCASE(STR(?object)), LCASE(\"{}\")))",
text.replace("\"", "\\\"")
));
}
if let Some(type_uri) = criteria.get("type").and_then(|v| v.as_str())
&& !type_uri.is_empty()
{
where_clauses.push(format!(
"?subject <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <{type_uri}> ."
));
}
if let Some(predicate) = criteria.get("predicate").and_then(|v| v.as_str())
&& !predicate.is_empty()
{
where_clauses.push(format!("?subject <{predicate}> ?object ."));
}
if let Some(pod_address) = criteria.get("pod").and_then(|v| v.as_str())
&& !pod_address.is_empty()
{
let _pod_iri = if pod_address.starts_with("ant://") {
pod_address.to_string()
} else {
format!("ant://{pod_address}")
};
}
if where_clauses.is_empty() {
where_clauses.push("?subject ?predicate ?object .".to_string());
}
let limit = criteria
.get("limit")
.and_then(|v| v.as_u64())
.unwrap_or(100);
let where_clause = where_clauses.join(" ");
let filter_clause = if filters.is_empty() {
"".to_string()
} else {
filters.join(" ")
};
let query = format!(
r#"
SELECT DISTINCT ?subject ?predicate ?object ?graph WHERE {{
GRAPH ?graph {{
{where_clause}
{filter_clause}
}}
}}
ORDER BY ?graph ?subject
LIMIT {limit}
"#
);
debug!("Advanced search query: {}", query);
let results = self.store.query(query.as_str())?;
let buffer = results.write(Vec::new(), QueryResultsFormat::Json)?;
let json_str = String::from_utf8(buffer)?;
debug!("Advanced search results: {}", json_str);
Ok(json_str)
}
pub fn get_pod_scratchpads(&self, pod_address: &str) -> Result<Vec<String>, Error> {
let pod_iri = format!("ant://{pod_address}");
let query = format!(
"SELECT DISTINCT ?scratchpad ?index WHERE {{ GRAPH <{pod_iri}> {{ ?scratchpad <{HAS_INDEX}> ?index . }} }}"
);
debug!("Pod scratchpads query: {}", query);
let mut triples = HashMap::new();
let results = self.store.query(query.as_str())?;
if let QueryResults::Solutions(solutions) = results {
for solution in solutions.flatten() {
if let Some(oxigraph::model::Term::NamedNode(scratchpad_node)) =
solution.get("scratchpad")
{
let scratchpad_iri = scratchpad_node.as_str();
if let Some(address) = scratchpad_iri.strip_prefix("ant://")
&& let Some(oxigraph::model::Term::Literal(literal)) = solution.get("index")
&& let Ok(index) = literal.value().parse::<u64>()
{
triples.insert(index, address.to_string());
}
}
}
}
let mut scratchpads = Vec::new();
for i in 0..triples.len() {
if let Some(scratchpad) = triples.get(&(i as u64)) {
scratchpads.push(scratchpad.clone());
} else {
error!("Missing scratchpad at index {}", i);
}
}
debug!(
"Found {} scratchpads for pod {}",
scratchpads.len(),
pod_address
);
Ok(scratchpads)
}
pub fn get_pod_scratchpads_from_string(&self, data: &str) -> Result<Vec<String>, Error> {
let mut triples = HashMap::new();
for triple in TriGParser::new().for_reader(data.as_bytes()) {
let triple = triple.unwrap_or_else(|_e| {
Quad::new(
NamedNodeRef::new("http://example.org/subject").unwrap(),
NamedNodeRef::new("http://example.org/predicate").unwrap(),
NamedNodeRef::new("http://example.org/object").unwrap(),
GraphNameRef::DefaultGraph,
)
});
if triple.predicate == HAS_INDEX {
if let oxigraph::model::Term::Literal(literal) = triple.object
&& let Ok(index) = literal.value().parse::<u64>()
&& let oxigraph::model::Subject::NamedNode(scratchpad) = triple.subject
{
triples.insert(index, scratchpad.into_string());
}
}
}
let mut scratchpads = Vec::new();
for i in 0..triples.len() {
if let Some(scratchpad) = triples.get(&(i as u64)) {
let address = scratchpad
.as_str()
.strip_prefix("ant://")
.unwrap_or_default();
scratchpads.push(address.to_string());
} else {
error!("Missing scratchpad at index {}", i);
}
}
Ok(scratchpads)
}
pub fn clear_pod_graph(&mut self, pod_address: &str) -> Result<(), Error> {
let pod_iri = format!("ant://{pod_address}");
let pod_node = NamedNodeRef::new(&pod_iri)?;
self.store.clear_graph(pod_node)?;
debug!("Cleared graph for pod: {}", pod_address);
Ok(())
}
pub fn use_free_pointer(
&mut self,
address: &str,
configuration_address: &str,
) -> Result<(), Error> {
let address_iri = format!("ant://{address}");
let address_iri = address_iri.as_str();
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let update = format!(
"DELETE WHERE {{ GRAPH <{configuration_iri}> {{ <{address_iri}> <{HAS_ADDR_TYPE}> <{FREED_POD}> . }} }}"
);
debug!("Delete pod from configuration graph string: {}", update);
self.store.update(update.as_str())?;
Ok(())
}
pub fn use_free_scratchpad(
&mut self,
address: &str,
configuration_address: &str,
) -> Result<(), Error> {
let address_iri = format!("ant://{address}");
let address_iri = address_iri.as_str();
let configuration_iri = format!("ant://{configuration_address}");
let configuration_iri = configuration_iri.as_str();
let update = format!(
"DELETE WHERE {{ GRAPH <{configuration_iri}> {{ <{address_iri}> <{HAS_ADDR_TYPE}> <{FREED_DATA}> . }} }}"
);
debug!("Delete pod from configuration graph string: {}", update);
self.store.update(update.as_str())?;
Ok(())
}
}