mod edge_ops;
mod graph_store_impl;
mod index;
mod memory;
mod node_ops;
mod property_ops;
mod schema;
mod search;
mod statistics;
mod traversal;
mod versioning;
#[cfg(test)]
mod tests;
use super::PropertyStorage;
#[cfg(not(feature = "tiered-storage"))]
use super::{EdgeRecord, NodeRecord};
use crate::index::adjacency::ChunkedAdjacency;
use crate::statistics::Statistics;
use arcstr::ArcStr;
use dashmap::DashMap;
#[cfg(not(feature = "tiered-storage"))]
use grafeo_common::mvcc::VersionChain;
use grafeo_common::types::{
EdgeId, EpochId, HashableValue, NodeId, PropertyKey, TransactionId, Value,
};
use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
use parking_lot::RwLock;
use std::cmp::Ordering as CmpOrdering;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
#[cfg(feature = "vector-index")]
use crate::index::vector::HnswIndex;
#[cfg(feature = "tiered-storage")]
use crate::storage::EpochStore;
use grafeo_common::memory::arena::AllocError;
#[cfg(feature = "tiered-storage")]
use grafeo_common::memory::arena::ArenaAllocator;
#[cfg(feature = "tiered-storage")]
use grafeo_common::mvcc::VersionIndex;
#[cfg(feature = "temporal")]
use grafeo_common::temporal::VersionLog;
#[derive(Debug, Clone)]
pub enum PropertyUndoEntry {
NodeProperty {
node_id: NodeId,
key: PropertyKey,
old_value: Option<Value>,
},
EdgeProperty {
edge_id: EdgeId,
key: PropertyKey,
old_value: Option<Value>,
},
LabelAdded {
node_id: NodeId,
label: String,
},
LabelRemoved {
node_id: NodeId,
label: String,
},
NodeDeleted {
node_id: NodeId,
labels: Vec<String>,
properties: Vec<(PropertyKey, Value)>,
},
EdgeDeleted {
edge_id: EdgeId,
src: NodeId,
dst: NodeId,
edge_type: String,
properties: Vec<(PropertyKey, Value)>,
},
}
pub(super) fn compare_values_for_range(a: &Value, b: &Value) -> Option<CmpOrdering> {
match (a, b) {
(Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
(Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
(Value::Int64(a), Value::Float64(b)) => (*a as f64).partial_cmp(b),
(Value::Float64(a), Value::Int64(b)) => a.partial_cmp(&(*b as f64)),
(Value::String(a), Value::String(b)) => Some(a.cmp(b)),
(Value::Bool(a), Value::Bool(b)) => Some(a.cmp(b)),
(Value::Timestamp(a), Value::Timestamp(b)) => Some(a.cmp(b)),
(Value::Date(a), Value::Date(b)) => Some(a.cmp(b)),
(Value::Time(a), Value::Time(b)) => Some(a.cmp(b)),
_ => None,
}
}
pub(super) fn value_in_range(
value: &Value,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> bool {
if let Some(min_val) = min {
match compare_values_for_range(value, min_val) {
Some(CmpOrdering::Less) => return false,
Some(CmpOrdering::Equal) if !min_inclusive => return false,
None => return false, _ => {}
}
}
if let Some(max_val) = max {
match compare_values_for_range(value, max_val) {
Some(CmpOrdering::Greater) => return false,
Some(CmpOrdering::Equal) if !max_inclusive => return false,
None => return false,
_ => {}
}
}
true
}
#[derive(Debug, Clone)]
pub struct LpgStoreConfig {
pub backward_edges: bool,
pub initial_node_capacity: usize,
pub initial_edge_capacity: usize,
}
impl Default for LpgStoreConfig {
fn default() -> Self {
Self {
backward_edges: true,
initial_node_capacity: 1024,
initial_edge_capacity: 4096,
}
}
}
pub struct LpgStore {
#[cfg(not(feature = "tiered-storage"))]
pub(super) nodes: RwLock<FxHashMap<NodeId, VersionChain<NodeRecord>>>,
#[cfg(not(feature = "tiered-storage"))]
pub(super) edges: RwLock<FxHashMap<EdgeId, VersionChain<EdgeRecord>>>,
#[cfg(feature = "tiered-storage")]
pub(super) arena_allocator: Arc<ArenaAllocator>,
#[cfg(feature = "tiered-storage")]
pub(super) node_versions: RwLock<FxHashMap<NodeId, VersionIndex>>,
#[cfg(feature = "tiered-storage")]
pub(super) edge_versions: RwLock<FxHashMap<EdgeId, VersionIndex>>,
#[cfg(feature = "tiered-storage")]
pub(super) epoch_store: Arc<EpochStore>,
pub(super) node_properties: PropertyStorage<NodeId>,
pub(super) edge_properties: PropertyStorage<EdgeId>,
pub(super) label_to_id: RwLock<FxHashMap<ArcStr, u32>>,
pub(super) id_to_label: RwLock<Vec<ArcStr>>,
pub(super) edge_type_to_id: RwLock<FxHashMap<ArcStr, u32>>,
pub(super) id_to_edge_type: RwLock<Vec<ArcStr>>,
pub(super) forward_adj: ChunkedAdjacency,
pub(super) backward_adj: Option<ChunkedAdjacency>,
pub(super) label_index: RwLock<Vec<FxHashMap<NodeId, ()>>>,
#[cfg(not(feature = "temporal"))]
pub(super) node_labels: RwLock<FxHashMap<NodeId, FxHashSet<u32>>>,
#[cfg(feature = "temporal")]
pub(super) node_labels: RwLock<FxHashMap<NodeId, VersionLog<FxHashSet<u32>>>>,
pub(super) property_indexes:
RwLock<FxHashMap<PropertyKey, DashMap<HashableValue, FxHashSet<NodeId>>>>,
#[cfg(feature = "vector-index")]
pub(super) vector_indexes: RwLock<FxHashMap<String, Arc<HnswIndex>>>,
#[cfg(feature = "text-index")]
pub(super) text_indexes:
RwLock<FxHashMap<String, Arc<RwLock<crate::index::text::InvertedIndex>>>>,
pub(super) next_node_id: AtomicU64,
pub(super) next_edge_id: AtomicU64,
pub(super) current_epoch: AtomicU64,
pub(super) live_node_count: AtomicI64,
pub(super) live_edge_count: AtomicI64,
pub(super) edge_type_live_counts: RwLock<Vec<i64>>,
pub(super) statistics: RwLock<Arc<Statistics>>,
pub(super) needs_stats_recompute: AtomicBool,
named_graphs: RwLock<FxHashMap<String, Arc<LpgStore>>>,
property_undo_log: RwLock<FxHashMap<TransactionId, Vec<PropertyUndoEntry>>>,
}
impl LpgStore {
pub fn new() -> Result<Self, AllocError> {
Self::with_config(LpgStoreConfig::default())
}
pub fn with_config(config: LpgStoreConfig) -> Result<Self, AllocError> {
let backward_adj = if config.backward_edges {
Some(ChunkedAdjacency::new())
} else {
None
};
Ok(Self {
#[cfg(not(feature = "tiered-storage"))]
nodes: RwLock::new(FxHashMap::default()),
#[cfg(not(feature = "tiered-storage"))]
edges: RwLock::new(FxHashMap::default()),
#[cfg(feature = "tiered-storage")]
arena_allocator: Arc::new(ArenaAllocator::new()?),
#[cfg(feature = "tiered-storage")]
node_versions: RwLock::new(FxHashMap::default()),
#[cfg(feature = "tiered-storage")]
edge_versions: RwLock::new(FxHashMap::default()),
#[cfg(feature = "tiered-storage")]
epoch_store: Arc::new(EpochStore::new()),
node_properties: PropertyStorage::new(),
edge_properties: PropertyStorage::new(),
label_to_id: RwLock::new(FxHashMap::default()),
id_to_label: RwLock::new(Vec::new()),
edge_type_to_id: RwLock::new(FxHashMap::default()),
id_to_edge_type: RwLock::new(Vec::new()),
forward_adj: ChunkedAdjacency::new(),
backward_adj,
label_index: RwLock::new(Vec::with_capacity(16)),
node_labels: RwLock::new(FxHashMap::default()),
property_indexes: RwLock::new(FxHashMap::default()),
#[cfg(feature = "vector-index")]
vector_indexes: RwLock::new(FxHashMap::default()),
#[cfg(feature = "text-index")]
text_indexes: RwLock::new(FxHashMap::default()),
next_node_id: AtomicU64::new(0),
next_edge_id: AtomicU64::new(0),
current_epoch: AtomicU64::new(0),
live_node_count: AtomicI64::new(0),
live_edge_count: AtomicI64::new(0),
edge_type_live_counts: RwLock::new(Vec::new()),
statistics: RwLock::new(Arc::new(Statistics::new())),
needs_stats_recompute: AtomicBool::new(false),
named_graphs: RwLock::new(FxHashMap::default()),
property_undo_log: RwLock::new(FxHashMap::default()),
})
}
#[must_use]
pub fn current_epoch(&self) -> EpochId {
EpochId::new(self.current_epoch.load(Ordering::Acquire))
}
#[doc(hidden)]
pub fn new_epoch(&self) -> EpochId {
let id = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
EpochId::new(id)
}
#[doc(hidden)]
pub fn sync_epoch(&self, epoch: EpochId) {
self.current_epoch
.fetch_max(epoch.as_u64(), Ordering::AcqRel);
}
pub fn clear(&self) {
#[cfg(not(feature = "tiered-storage"))]
{
self.nodes.write().clear();
self.edges.write().clear();
}
#[cfg(feature = "tiered-storage")]
{
self.node_versions.write().clear();
self.edge_versions.write().clear();
}
{
self.label_to_id.write().clear();
self.id_to_label.write().clear();
}
{
self.edge_type_to_id.write().clear();
self.id_to_edge_type.write().clear();
}
self.label_index.write().clear();
self.node_labels.write().clear();
self.property_indexes.write().clear();
#[cfg(feature = "vector-index")]
self.vector_indexes.write().clear();
#[cfg(feature = "text-index")]
self.text_indexes.write().clear();
self.node_properties.clear();
self.edge_properties.clear();
self.forward_adj.clear();
if let Some(ref backward) = self.backward_adj {
backward.clear();
}
self.next_node_id.store(0, Ordering::Release);
self.next_edge_id.store(0, Ordering::Release);
self.current_epoch.store(0, Ordering::Release);
self.live_node_count.store(0, Ordering::Release);
self.live_edge_count.store(0, Ordering::Release);
self.edge_type_live_counts.write().clear();
*self.statistics.write() = Arc::new(Statistics::new());
self.needs_stats_recompute.store(false, Ordering::Release);
self.property_undo_log.write().clear();
}
#[must_use]
pub fn has_backward_adjacency(&self) -> bool {
self.backward_adj.is_some()
}
#[must_use]
pub fn graph(&self, name: &str) -> Option<Arc<LpgStore>> {
self.named_graphs.read().get(name).cloned()
}
pub fn graph_or_create(&self, name: &str) -> Result<Arc<LpgStore>, AllocError> {
{
let graphs = self.named_graphs.read();
if let Some(g) = graphs.get(name) {
return Ok(Arc::clone(g));
}
}
let mut graphs = self.named_graphs.write();
if let Some(g) = graphs.get(name) {
return Ok(Arc::clone(g));
}
let store = Arc::new(LpgStore::new()?);
graphs.insert(name.to_string(), Arc::clone(&store));
Ok(store)
}
pub fn create_graph(&self, name: &str) -> Result<bool, AllocError> {
let mut graphs = self.named_graphs.write();
if graphs.contains_key(name) {
return Ok(false);
}
graphs.insert(name.to_string(), Arc::new(LpgStore::new()?));
Ok(true)
}
pub fn drop_graph(&self, name: &str) -> bool {
self.named_graphs.write().remove(name).is_some()
}
#[must_use]
pub fn graph_names(&self) -> Vec<String> {
self.named_graphs.read().keys().cloned().collect()
}
#[must_use]
pub fn graph_count(&self) -> usize {
self.named_graphs.read().len()
}
pub fn clear_graph(&self, name: Option<&str>) {
match name {
Some(n) => {
if let Some(g) = self.named_graphs.read().get(n) {
g.clear();
}
}
None => self.clear(),
}
}
pub fn copy_graph(&self, source: Option<&str>, dest: Option<&str>) -> Result<(), AllocError> {
let _src = match source {
Some(n) => self.graph(n),
None => None, };
let _dest_graph = dest.map(|n| self.graph_or_create(n)).transpose()?;
Ok(())
}
pub(super) fn get_or_create_label_id(&self, label: &str) -> u32 {
{
let label_to_id = self.label_to_id.read();
if let Some(&id) = label_to_id.get(label) {
return id;
}
}
let mut label_to_id = self.label_to_id.write();
let mut id_to_label = self.id_to_label.write();
if let Some(&id) = label_to_id.get(label) {
return id;
}
let id = id_to_label.len() as u32;
let label: ArcStr = label.into();
label_to_id.insert(label.clone(), id);
id_to_label.push(label);
id
}
pub(super) fn get_or_create_edge_type_id(&self, edge_type: &str) -> u32 {
{
let type_to_id = self.edge_type_to_id.read();
if let Some(&id) = type_to_id.get(edge_type) {
return id;
}
}
let mut type_to_id = self.edge_type_to_id.write();
let mut id_to_type = self.id_to_edge_type.write();
if let Some(&id) = type_to_id.get(edge_type) {
return id;
}
let id = id_to_type.len() as u32;
let edge_type: ArcStr = edge_type.into();
type_to_id.insert(edge_type.clone(), id);
id_to_type.push(edge_type);
let mut counts = self.edge_type_live_counts.write();
while counts.len() <= id as usize {
counts.push(0);
}
id
}
pub(super) fn increment_edge_type_count(&self, type_id: u32) {
let mut counts = self.edge_type_live_counts.write();
if counts.len() <= type_id as usize {
counts.resize(type_id as usize + 1, 0);
}
counts[type_id as usize] += 1;
}
pub(super) fn decrement_edge_type_count(&self, type_id: u32) {
let mut counts = self.edge_type_live_counts.write();
if type_id < counts.len() as u32 {
counts[type_id as usize] -= 1;
}
}
}