use std::path::{Path, PathBuf};
use crate::{
Catalog, CommitSeq, DbError, ElementId, ElementRecord, GraphProjection, HypergraphProjection,
IncidenceId, IncidenceRecord, IndexId, LabelId, PreparedQuery, ProjectionDefinition,
ProjectionId, PropertyKeyId, PropertySubject, PropertyType, PropertyValue, QueryLanguage,
QueryResult, RelationId, RelationRecord, RelationTypeId, RoleId, TransactionId,
catalog::{IndexDefinition, PropertyFamily},
projection::{self},
state::DatabaseState,
storage::{self, StoredDatabase},
traversal::{self, TraversalOptions, TraversalResult},
};
#[derive(Clone, Copy, Debug)]
pub enum IndexLookup<'value> {
All,
Equal(&'value PropertyValue),
Range {
min: &'value PropertyValue,
max: &'value PropertyValue,
},
CompositeEqual(&'value [PropertyValue]),
}
pub struct Database {
path: PathBuf,
state: DatabaseState,
visible_commit_seq: CommitSeq,
last_transaction_id: TransactionId,
}
impl Database {
pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
let path = path.as_ref().to_path_buf();
if storage::store_path(&path).exists() {
return Err(DbError::AlreadyExists);
}
let stored = StoredDatabase::empty();
storage::write_store(&path, &stored)?;
Ok(Self::from_stored(path, stored))
}
pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
let path = path.as_ref().to_path_buf();
let stored = storage::read_store(&path)?;
Ok(Self::from_stored(path, stored))
}
pub fn validate_path(path: impl AsRef<Path>) -> Result<(), DbError> {
storage::validate_store(path.as_ref())
}
pub fn compact(&mut self) -> Result<(), DbError> {
self.state.validate()?;
storage::write_store(&self.path, &self.to_stored())
}
pub fn validate(&self) -> Result<(), DbError> {
self.state.validate()?;
storage::validate_store(&self.path)
}
#[must_use]
pub fn status(&self) -> DatabaseStatus {
DatabaseStatus {
visible_commit_seq: self.visible_commit_seq,
last_transaction_id: self.last_transaction_id,
element_count: self.state.element_count(),
relation_count: self.state.relation_count(),
incidence_count: self.state.incidence_count(),
catalog: self.catalog_summary(),
}
}
#[must_use]
pub fn catalog_summary(&self) -> CatalogSummary {
CatalogSummary::from_catalog(self.state.catalog())
}
#[must_use]
pub fn begin_read(&self) -> ReadTransaction {
ReadTransaction {
pin: ReadPin {
visible_commit_seq: self.visible_commit_seq,
last_transaction_id: self.last_transaction_id,
},
state: self.state.clone(),
}
}
pub fn begin_write(&mut self) -> Result<WriteTransaction<'_>, DbError> {
let transaction_id = self
.last_transaction_id
.checked_next()
.ok_or(DbError::TransactionIdOverflow)?;
let state = self.state.clone();
self.last_transaction_id = transaction_id;
Ok(WriteTransaction {
database: self,
state,
transaction_id,
dirty: false,
})
}
pub fn prepare(&self, language: QueryLanguage, query: &str) -> Result<PreparedQuery, DbError> {
PreparedQuery::prepare(language, query, &self.state)
}
fn from_stored(path: PathBuf, stored: StoredDatabase) -> Self {
Self {
path,
state: stored.state,
visible_commit_seq: stored.commit_seq,
last_transaction_id: stored.transaction_id,
}
}
fn to_stored(&self) -> StoredDatabase {
StoredDatabase {
commit_seq: self.visible_commit_seq,
transaction_id: self.last_transaction_id,
state: self.state.clone(),
}
}
fn next_commit_seq(&self) -> Result<CommitSeq, DbError> {
self.visible_commit_seq
.checked_next()
.ok_or(DbError::CommitSeqOverflow)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct DatabaseStatus {
pub visible_commit_seq: CommitSeq,
pub last_transaction_id: TransactionId,
pub element_count: usize,
pub relation_count: usize,
pub incidence_count: usize,
pub catalog: CatalogSummary,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CatalogSummary {
pub role_count: usize,
pub label_count: usize,
pub relation_type_count: usize,
pub property_key_count: usize,
pub projection_count: usize,
pub index_count: usize,
}
impl CatalogSummary {
#[must_use]
pub fn from_catalog(catalog: &Catalog) -> Self {
Self {
role_count: catalog.roles().count(),
label_count: catalog.labels().count(),
relation_type_count: catalog.relation_types().count(),
property_key_count: catalog.property_keys().count(),
projection_count: catalog.projections().count(),
index_count: catalog.indexes().count(),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct ReadPin {
pub visible_commit_seq: CommitSeq,
pub last_transaction_id: TransactionId,
}
pub struct ReadTransaction {
pin: ReadPin,
state: DatabaseState,
}
impl ReadTransaction {
#[must_use]
pub const fn pin(&self) -> ReadPin {
self.pin
}
#[must_use]
pub const fn catalog(&self) -> &Catalog {
self.state.catalog()
}
#[must_use]
pub fn element_count(&self) -> usize {
self.state.element_count()
}
#[must_use]
pub fn relation_count(&self) -> usize {
self.state.relation_count()
}
#[must_use]
pub fn incidence_count(&self) -> usize {
self.state.incidence_count()
}
#[must_use]
pub fn contains_element(&self, id: ElementId) -> bool {
self.state.contains_element(id)
}
#[must_use]
pub fn contains_relation(&self, id: RelationId) -> bool {
self.state.contains_relation(id)
}
#[must_use]
pub fn contains_incidence(&self, id: IncidenceId) -> bool {
self.state.contains_incidence(id)
}
#[must_use]
pub fn element(&self, id: ElementId) -> Option<&ElementRecord> {
self.state.element(id)
}
#[must_use]
pub fn relation(&self, id: RelationId) -> Option<&RelationRecord> {
self.state.relation(id)
}
#[must_use]
pub fn incidence(&self, id: IncidenceId) -> Option<&IncidenceRecord> {
self.state.incidence(id)
}
pub fn element_incidences(&self, id: ElementId) -> impl Iterator<Item = &IncidenceRecord> {
self.state.element_incidences(id)
}
#[must_use]
pub fn property(&self, subject: PropertySubject, key: PropertyKeyId) -> Option<&PropertyValue> {
self.state.property(subject, key)
}
pub fn lookup_property_equal(
&self,
key: PropertyKeyId,
value: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.state.typed_property_equal(key, value)
}
pub fn lookup_property_range(
&self,
key: PropertyKeyId,
min: &PropertyValue,
max: &PropertyValue,
) -> Result<Vec<PropertySubject>, DbError> {
self.state.typed_property_range(key, min, max)
}
pub fn lookup_index(
&self,
index: IndexId,
lookup: IndexLookup<'_>,
) -> Result<Vec<PropertySubject>, DbError> {
let entry = self
.state
.catalog()
.index(index)
.ok_or(DbError::UnknownIndex { id: index })?;
match (&entry.definition, lookup) {
(IndexDefinition::Label { label }, IndexLookup::All) => Ok(self
.state
.elements_with_label(*label)
.into_iter()
.map(PropertySubject::Element)
.collect()),
(IndexDefinition::Label { .. }, _lookup) => {
Err(DbError::unsupported("label index expects all lookup"))
}
(IndexDefinition::RelationType { relation_type }, IndexLookup::All) => Ok(self
.state
.relations_with_type(*relation_type)
.into_iter()
.map(PropertySubject::Relation)
.collect()),
(IndexDefinition::RelationType { .. }, _lookup) => Err(DbError::unsupported(
"relation type index expects all lookup",
)),
(IndexDefinition::PropertyEquality { key }, IndexLookup::Equal(value)) => {
self.state.typed_property_equal(*key, value)
}
(IndexDefinition::PropertyEquality { .. }, _lookup) => Err(DbError::unsupported(
"property equality index expects equality lookup",
)),
(IndexDefinition::PropertyRange { key }, IndexLookup::Range { min, max }) => {
self.state.typed_property_range(*key, min, max)
}
(IndexDefinition::PropertyRange { .. }, _lookup) => Err(DbError::unsupported(
"property range index expects range lookup",
)),
(IndexDefinition::CompositeEquality { keys }, IndexLookup::CompositeEqual(values)) => {
self.state.typed_property_composite_equal(keys, values)
}
(IndexDefinition::CompositeEquality { .. }, _lookup) => Err(DbError::unsupported(
"composite equality index expects composite equality lookup",
)),
(IndexDefinition::Projection { projection }, IndexLookup::All) => {
self.projection_index_subjects(*projection)
}
(IndexDefinition::Projection { .. }, _lookup) => {
Err(DbError::unsupported("projection index expects all lookup"))
}
}
}
pub fn graph_projection(&self, id: ProjectionId) -> Result<GraphProjection, DbError> {
let entry = self
.state
.catalog()
.projection(id)
.ok_or(DbError::UnknownProjection { id })?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => {
projection::GraphProjection::from_state(&self.state, definition.clone())
}
ProjectionDefinition::Hypergraph(_definition) => {
Err(DbError::invalid_projection("projection is not a graph"))
}
}
}
pub fn graph_projection_by_name(&self, name: &str) -> Result<GraphProjection, DbError> {
let id = self
.state
.catalog()
.projection_id(name)
.ok_or_else(|| DbError::unsupported(format!("unknown projection {name}")))?;
self.graph_projection(id)
}
pub fn traverse_graph(
&self,
projection: ProjectionId,
seeds: &[ElementId],
options: TraversalOptions,
) -> Result<TraversalResult, DbError> {
if seeds.is_empty() || options.limit == 0 {
return Ok(TraversalResult::new(Vec::new()));
}
let graph = self.graph_projection(projection)?;
traversal::traverse_graph_projection(&graph, seeds, options)
}
pub fn hypergraph_projection(&self, id: ProjectionId) -> Result<HypergraphProjection, DbError> {
let entry = self
.state
.catalog()
.projection(id)
.ok_or(DbError::UnknownProjection { id })?;
match &entry.definition {
ProjectionDefinition::Hypergraph(definition) => {
projection::HypergraphProjection::from_state(&self.state, definition.clone())
}
ProjectionDefinition::Graph(_definition) => Err(DbError::invalid_projection(
"projection is not a hypergraph",
)),
}
}
pub fn execute(&self, query: &PreparedQuery) -> Result<QueryResult, DbError> {
query.execute(&self.state)
}
#[must_use]
pub fn explain(&self, query: &PreparedQuery) -> String {
query.explain()
}
fn projection_index_subjects(
&self,
projection: ProjectionId,
) -> Result<Vec<PropertySubject>, DbError> {
let entry = self
.state
.catalog()
.projection(projection)
.ok_or(DbError::UnknownProjection { id: projection })?;
match &entry.definition {
ProjectionDefinition::Graph(definition) => Ok(projection::GraphProjection::from_state(
&self.state,
definition.clone(),
)?
.subjects()),
ProjectionDefinition::Hypergraph(definition) => Ok(
projection::HypergraphProjection::from_state(&self.state, definition.clone())?
.subjects(),
),
}
}
}
pub struct WriteTransaction<'db> {
database: &'db mut Database,
state: DatabaseState,
transaction_id: TransactionId,
dirty: bool,
}
impl WriteTransaction<'_> {
pub fn register_role(&mut self, name: impl Into<String>) -> Result<RoleId, DbError> {
let id = self.state.register_role(name.into())?;
self.dirty = true;
Ok(id)
}
pub fn register_label(&mut self, name: impl Into<String>) -> Result<LabelId, DbError> {
let id = self.state.register_label(name.into())?;
self.dirty = true;
Ok(id)
}
pub fn register_relation_type(
&mut self,
name: impl Into<String>,
) -> Result<RelationTypeId, DbError> {
let id = self.state.register_relation_type(name.into())?;
self.dirty = true;
Ok(id)
}
pub fn register_property_key(
&mut self,
name: impl Into<String>,
family: PropertyFamily,
value_type: PropertyType,
) -> Result<PropertyKeyId, DbError> {
let id = self
.state
.register_property_key(name.into(), family, value_type)?;
self.dirty = true;
Ok(id)
}
pub fn define_projection(
&mut self,
definition: ProjectionDefinition,
) -> Result<ProjectionId, DbError> {
let id = self.state.define_projection(definition)?;
self.dirty = true;
Ok(id)
}
pub fn define_index(
&mut self,
name: impl Into<String>,
definition: IndexDefinition,
) -> Result<IndexId, DbError> {
let id = self.state.define_index(name.into(), definition)?;
self.dirty = true;
Ok(id)
}
pub fn create_element(&mut self) -> Result<ElementId, DbError> {
let id = self.state.create_element()?;
self.dirty = true;
Ok(id)
}
pub fn create_relation(&mut self) -> Result<RelationId, DbError> {
let id = self.state.create_relation()?;
self.dirty = true;
Ok(id)
}
pub fn create_incidence(
&mut self,
relation: RelationId,
element: ElementId,
role: RoleId,
) -> Result<IncidenceId, DbError> {
let id = self.state.create_incidence(relation, element, role)?;
self.dirty = true;
Ok(id)
}
pub fn tombstone_element(&mut self, id: ElementId) -> Result<(), DbError> {
self.state.tombstone_element(id)?;
self.dirty = true;
Ok(())
}
pub fn tombstone_relation(&mut self, id: RelationId) -> Result<(), DbError> {
self.state.tombstone_relation(id)?;
self.dirty = true;
Ok(())
}
pub fn tombstone_incidence(&mut self, id: IncidenceId) -> Result<(), DbError> {
self.state.tombstone_incidence(id)?;
self.dirty = true;
Ok(())
}
pub fn add_element_label(&mut self, element: ElementId, label: LabelId) -> Result<(), DbError> {
self.state.add_element_label(element, label)?;
self.dirty = true;
Ok(())
}
pub fn add_relation_label(
&mut self,
relation: RelationId,
label: LabelId,
) -> Result<(), DbError> {
self.state.add_relation_label(relation, label)?;
self.dirty = true;
Ok(())
}
pub fn set_relation_type(
&mut self,
relation: RelationId,
relation_type: RelationTypeId,
) -> Result<(), DbError> {
self.state.set_relation_type(relation, relation_type)?;
self.dirty = true;
Ok(())
}
pub fn set_property(
&mut self,
subject: PropertySubject,
key: PropertyKeyId,
value: PropertyValue,
) -> Result<(), DbError> {
self.state.set_property(subject, key, value)?;
self.dirty = true;
Ok(())
}
pub fn remove_property(
&mut self,
subject: PropertySubject,
key: PropertyKeyId,
) -> Result<(), DbError> {
self.state.remove_property(subject, key)?;
self.dirty = true;
Ok(())
}
pub fn commit(self) -> Result<CommitSeq, DbError> {
let commit_seq = if self.dirty {
self.database.next_commit_seq()?
} else {
self.database.visible_commit_seq
};
let stored = StoredDatabase {
commit_seq,
transaction_id: self.transaction_id,
state: self.state.clone(),
};
storage::write_store(&self.database.path, &stored)?;
self.database.state = self.state;
self.database.visible_commit_seq = commit_seq;
self.database.last_transaction_id = self.transaction_id;
Ok(commit_seq)
}
pub fn rollback(self) {}
}