use std::{path::PathBuf, sync::Arc};
use crate::{
Bound, Catalog, CheckpointGeneration, CommitSeq, DbError, PreparedQuery, PropertyValue, Schema,
TransactionId,
lock::WriterLock,
overlay::{Snapshot, StateView, WriteOverlay},
};
mod maintenance;
mod open;
mod reader;
mod writer;
#[cfg(test)]
#[cfg(not(miri))]
mod tests;
pub use maintenance::CheckpointPolicy;
use open::{base_file, delta_file, file_len};
pub use reader::{ReadPin, Reader};
pub use writer::Writer;
#[derive(Clone, Copy, Debug)]
pub enum IndexProbe<'value> {
All,
Equal(&'value PropertyValue),
Range {
min: &'value PropertyValue,
max: &'value PropertyValue,
},
Composite(&'value [PropertyValue]),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum CommitOutcome {
Empty,
Committed(CommitSeq),
}
pub struct Db {
pub(super) root: PathBuf,
pub(super) current: Arc<Snapshot>,
pub(super) base_generation: u64,
pub(super) last_transaction_id: TransactionId,
pub(super) checkpoint_policy: CheckpointPolicy,
}
impl Db {
#[must_use]
pub const fn live_generation(&self) -> CheckpointGeneration {
CheckpointGeneration::new(self.base_generation)
}
#[must_use]
pub fn stats(&self) -> Stats {
let view = self.current.view();
Stats {
visible_commit_seq: self.current.lsn(),
last_transaction_id: self.last_transaction_id,
live_generation: CheckpointGeneration::new(self.base_generation),
base_byte_size: file_len(&self.root.join(base_file(self.base_generation))),
log_byte_size: file_len(&self.root.join(delta_file(self.base_generation))),
element_count: view.element_count(),
relation_count: view.relation_count(),
incidence_count: view.incidence_count(),
catalog: self.catalog_summary(),
}
}
#[must_use]
pub fn catalog_summary(&self) -> CatalogSummary {
CatalogSummary::from_catalog(self.current.view().catalog())
}
#[must_use]
pub fn reader(&self) -> Reader {
Reader {
snapshot: Arc::clone(&self.current),
}
}
#[must_use]
pub fn pin(&self) -> ReadPin {
ReadPin {
visible_commit_seq: self.current.lsn(),
generation: self.current.generation(),
}
}
pub(crate) fn begin_write(&mut self) -> Result<Writer<'_>, DbError> {
let lock = WriterLock::acquire(&self.root)?;
let transaction_id = self
.last_transaction_id
.checked_next()
.ok_or(DbError::Txn(crate::error::TxnError::TransactionIdOverflow))?;
self.last_transaction_id = transaction_id;
let parent = Arc::clone(&self.current);
let delta = WriteOverlay::from_overlay(parent.overlay());
Ok(Writer {
database: self,
parent,
delta,
transaction_id,
lock,
})
}
pub fn read<R>(&self, f: impl FnOnce(&Reader) -> Result<R, DbError>) -> Result<R, DbError> {
f(&self.reader())
}
pub fn write<R>(
&mut self,
f: impl FnOnce(&mut Writer<'_>) -> Result<R, DbError>,
) -> Result<(R, CommitOutcome), DbError> {
let mut writer = self.begin_write()?;
let value = f(&mut writer)?;
let committed = !writer.delta.is_empty();
let lsn = writer.commit()?;
let outcome = if committed {
CommitOutcome::Committed(lsn)
} else {
CommitOutcome::Empty
};
Ok((value, outcome))
}
pub fn bind(&self, schema: &Schema) -> Result<Bound, DbError> {
let view = self.current.view();
let catalog = view.catalog();
let mut bound = Bound::default();
for name in &schema.roles {
let id = catalog.role_id(name).ok_or_else(|| {
DbError::Catalog(crate::error::CatalogError::UnknownName {
kind: "role",
name: name.clone(),
})
})?;
bound.roles.insert(name.clone(), id);
}
for name in &schema.labels {
let id = catalog.label_id(name).ok_or_else(|| {
DbError::Catalog(crate::error::CatalogError::UnknownName {
kind: "label",
name: name.clone(),
})
})?;
bound.labels.insert(name.clone(), id);
}
for name in &schema.relation_types {
let id = catalog.relation_type_id(name).ok_or_else(|| {
DbError::Catalog(crate::error::CatalogError::UnknownName {
kind: "relation type",
name: name.clone(),
})
})?;
bound.relation_types.insert(name.clone(), id);
}
for (name, _family, value_type) in &schema.keys {
let id = catalog.property_key_id(name).ok_or_else(|| {
DbError::Catalog(crate::error::CatalogError::UnknownName {
kind: "property key",
name: name.clone(),
})
})?;
bound.keys.insert(name.clone(), (id, *value_type));
}
for (name, key_name) in &schema.equality_indexes {
let (_key_id, value_type) = *bound.keys.get(key_name).ok_or_else(|| {
DbError::Catalog(crate::error::CatalogError::UnknownName {
kind: "property key",
name: key_name.clone(),
})
})?;
let id = catalog.index_id(name).ok_or_else(|| {
DbError::Catalog(crate::error::CatalogError::UnknownName {
kind: "index",
name: name.clone(),
})
})?;
bound
.equality_indexes
.insert(name.clone(), (id, value_type));
}
for spec in &schema.graph_projections {
let id = catalog.projection_id(&spec.name).ok_or_else(|| {
DbError::Catalog(crate::error::CatalogError::UnknownName {
kind: "projection",
name: spec.name.clone(),
})
})?;
bound.projections.insert(spec.name.clone(), id);
}
Ok(bound)
}
pub fn prepare(&self, query: &str) -> Result<PreparedQuery, DbError> {
PreparedQuery::prepare(query, &self.current.view())
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct Stats {
pub visible_commit_seq: CommitSeq,
pub last_transaction_id: TransactionId,
pub live_generation: CheckpointGeneration,
pub base_byte_size: u64,
pub log_byte_size: u64,
pub element_count: usize,
pub relation_count: usize,
pub incidence_count: usize,
pub catalog: CatalogSummary,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CatalogSummary {
pub role_count: usize,
pub label_count: usize,
pub relation_type_count: usize,
pub property_key_count: usize,
pub projection_count: usize,
pub index_count: usize,
}
impl CatalogSummary {
#[must_use]
pub fn from_catalog(catalog: &Catalog) -> Self {
Self {
role_count: catalog.roles().count(),
label_count: catalog.labels().count(),
relation_type_count: catalog.relation_types().count(),
property_key_count: catalog.property_keys().count(),
projection_count: catalog.projections().count(),
index_count: catalog.indexes().count(),
}
}
}