use crate::graph::Direction;
use crate::graph::lpg::CompareOp;
use crate::graph::lpg::{Edge, Node};
use crate::statistics::Statistics;
use arcstr::ArcStr;
use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TransactionId, Value};
use grafeo_common::utils::hash::FxHashMap;
use std::sync::Arc;
pub trait GraphStore: Send + Sync {
fn get_node(&self, id: NodeId) -> Option<Node>;
fn get_edge(&self, id: EdgeId) -> Option<Edge>;
fn get_node_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> Option<Node>;
fn get_edge_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> Option<Edge>;
fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node>;
fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge>;
fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value>;
fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value>;
fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>>;
fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>>;
fn get_nodes_properties_selective_batch(
&self,
ids: &[NodeId],
keys: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>>;
fn get_edges_properties_selective_batch(
&self,
ids: &[EdgeId],
keys: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>>;
fn neighbors(&self, node: NodeId, direction: Direction) -> Vec<NodeId>;
fn edges_from(&self, node: NodeId, direction: Direction) -> Vec<(NodeId, EdgeId)>;
fn out_degree(&self, node: NodeId) -> usize;
fn in_degree(&self, node: NodeId) -> usize;
fn has_backward_adjacency(&self) -> bool;
fn node_ids(&self) -> Vec<NodeId>;
fn all_node_ids(&self) -> Vec<NodeId> {
self.node_ids()
}
fn nodes_by_label(&self, label: &str) -> Vec<NodeId>;
fn node_count(&self) -> usize;
fn edge_count(&self) -> usize;
fn edge_type(&self, id: EdgeId) -> Option<ArcStr>;
fn edge_type_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> Option<ArcStr> {
let _ = (epoch, transaction_id);
self.edge_type(id)
}
fn has_property_index(&self, _property: &str) -> bool {
false
}
fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId>;
fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId>;
fn find_nodes_in_range(
&self,
property: &str,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Vec<NodeId>;
fn node_property_might_match(
&self,
property: &PropertyKey,
op: CompareOp,
value: &Value,
) -> bool;
fn edge_property_might_match(
&self,
property: &PropertyKey,
op: CompareOp,
value: &Value,
) -> bool;
fn statistics(&self) -> Arc<Statistics>;
fn estimate_label_cardinality(&self, label: &str) -> f64;
fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64;
fn current_epoch(&self) -> EpochId;
fn all_labels(&self) -> Vec<String> {
Vec::new()
}
fn all_edge_types(&self) -> Vec<String> {
Vec::new()
}
fn all_property_keys(&self) -> Vec<String> {
Vec::new()
}
fn is_node_visible_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
self.get_node_at_epoch(id, epoch).is_some()
}
fn is_node_visible_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
self.get_node_versioned(id, epoch, transaction_id).is_some()
}
fn is_edge_visible_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
self.get_edge_at_epoch(id, epoch).is_some()
}
fn is_edge_visible_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
self.get_edge_versioned(id, epoch, transaction_id).is_some()
}
fn filter_visible_node_ids(&self, ids: &[NodeId], epoch: EpochId) -> Vec<NodeId> {
ids.iter()
.copied()
.filter(|id| self.is_node_visible_at_epoch(*id, epoch))
.collect()
}
fn filter_visible_node_ids_versioned(
&self,
ids: &[NodeId],
epoch: EpochId,
transaction_id: TransactionId,
) -> Vec<NodeId> {
ids.iter()
.copied()
.filter(|id| self.is_node_visible_versioned(*id, epoch, transaction_id))
.collect()
}
fn get_node_history(&self, _id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
Vec::new()
}
fn get_edge_history(&self, _id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
Vec::new()
}
}
pub trait GraphStoreMut: GraphStore {
fn create_node(&self, labels: &[&str]) -> NodeId;
fn create_node_versioned(
&self,
labels: &[&str],
epoch: EpochId,
transaction_id: TransactionId,
) -> NodeId;
fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId;
fn create_edge_versioned(
&self,
src: NodeId,
dst: NodeId,
edge_type: &str,
epoch: EpochId,
transaction_id: TransactionId,
) -> EdgeId;
fn batch_create_edges(&self, edges: &[(NodeId, NodeId, &str)]) -> Vec<EdgeId>;
fn delete_node(&self, id: NodeId) -> bool;
fn delete_node_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool;
fn delete_node_edges(&self, node_id: NodeId);
fn delete_edge(&self, id: EdgeId) -> bool;
fn delete_edge_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool;
fn set_node_property(&self, id: NodeId, key: &str, value: Value);
fn set_edge_property(&self, id: EdgeId, key: &str, value: Value);
fn set_node_property_versioned(
&self,
id: NodeId,
key: &str,
value: Value,
_transaction_id: TransactionId,
) {
self.set_node_property(id, key, value);
}
fn set_edge_property_versioned(
&self,
id: EdgeId,
key: &str,
value: Value,
_transaction_id: TransactionId,
) {
self.set_edge_property(id, key, value);
}
fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value>;
fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value>;
fn remove_node_property_versioned(
&self,
id: NodeId,
key: &str,
_transaction_id: TransactionId,
) -> Option<Value> {
self.remove_node_property(id, key)
}
fn remove_edge_property_versioned(
&self,
id: EdgeId,
key: &str,
_transaction_id: TransactionId,
) -> Option<Value> {
self.remove_edge_property(id, key)
}
fn add_label(&self, node_id: NodeId, label: &str) -> bool;
fn remove_label(&self, node_id: NodeId, label: &str) -> bool;
fn add_label_versioned(
&self,
node_id: NodeId,
label: &str,
_transaction_id: TransactionId,
) -> bool {
self.add_label(node_id, label)
}
fn remove_label_versioned(
&self,
node_id: NodeId,
label: &str,
_transaction_id: TransactionId,
) -> bool {
self.remove_label(node_id, label)
}
fn create_node_with_props(
&self,
labels: &[&str],
properties: &[(PropertyKey, Value)],
) -> NodeId {
let id = self.create_node(labels);
for (key, value) in properties {
self.set_node_property(id, key.as_str(), value.clone());
}
id
}
fn create_edge_with_props(
&self,
src: NodeId,
dst: NodeId,
edge_type: &str,
properties: &[(PropertyKey, Value)],
) -> EdgeId {
let id = self.create_edge(src, dst, edge_type);
for (key, value) in properties {
self.set_edge_property(id, key.as_str(), value.clone());
}
id
}
}
pub struct NullGraphStore;
impl GraphStore for NullGraphStore {
fn get_node(&self, _: NodeId) -> Option<Node> {
None
}
fn get_edge(&self, _: EdgeId) -> Option<Edge> {
None
}
fn get_node_versioned(&self, _: NodeId, _: EpochId, _: TransactionId) -> Option<Node> {
None
}
fn get_edge_versioned(&self, _: EdgeId, _: EpochId, _: TransactionId) -> Option<Edge> {
None
}
fn get_node_at_epoch(&self, _: NodeId, _: EpochId) -> Option<Node> {
None
}
fn get_edge_at_epoch(&self, _: EdgeId, _: EpochId) -> Option<Edge> {
None
}
fn get_node_property(&self, _: NodeId, _: &PropertyKey) -> Option<Value> {
None
}
fn get_edge_property(&self, _: EdgeId, _: &PropertyKey) -> Option<Value> {
None
}
fn get_node_property_batch(&self, ids: &[NodeId], _: &PropertyKey) -> Vec<Option<Value>> {
vec![None; ids.len()]
}
fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
vec![FxHashMap::default(); ids.len()]
}
fn get_nodes_properties_selective_batch(
&self,
ids: &[NodeId],
_: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>> {
vec![FxHashMap::default(); ids.len()]
}
fn get_edges_properties_selective_batch(
&self,
ids: &[EdgeId],
_: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>> {
vec![FxHashMap::default(); ids.len()]
}
fn neighbors(&self, _: NodeId, _: Direction) -> Vec<NodeId> {
Vec::new()
}
fn edges_from(&self, _: NodeId, _: Direction) -> Vec<(NodeId, EdgeId)> {
Vec::new()
}
fn out_degree(&self, _: NodeId) -> usize {
0
}
fn in_degree(&self, _: NodeId) -> usize {
0
}
fn has_backward_adjacency(&self) -> bool {
false
}
fn node_ids(&self) -> Vec<NodeId> {
Vec::new()
}
fn nodes_by_label(&self, _: &str) -> Vec<NodeId> {
Vec::new()
}
fn node_count(&self) -> usize {
0
}
fn edge_count(&self) -> usize {
0
}
fn edge_type(&self, _: EdgeId) -> Option<ArcStr> {
None
}
fn find_nodes_by_property(&self, _: &str, _: &Value) -> Vec<NodeId> {
Vec::new()
}
fn find_nodes_by_properties(&self, _: &[(&str, Value)]) -> Vec<NodeId> {
Vec::new()
}
fn find_nodes_in_range(
&self,
_: &str,
_: Option<&Value>,
_: Option<&Value>,
_: bool,
_: bool,
) -> Vec<NodeId> {
Vec::new()
}
fn node_property_might_match(&self, _: &PropertyKey, _: CompareOp, _: &Value) -> bool {
false
}
fn edge_property_might_match(&self, _: &PropertyKey, _: CompareOp, _: &Value) -> bool {
false
}
fn statistics(&self) -> Arc<Statistics> {
Arc::new(Statistics::default())
}
fn estimate_label_cardinality(&self, _: &str) -> f64 {
0.0
}
fn estimate_avg_degree(&self, _: &str, _: bool) -> f64 {
0.0
}
fn current_epoch(&self) -> EpochId {
EpochId(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn null_graph_store_point_lookups() {
let store = NullGraphStore;
let nid = NodeId(1);
let eid = EdgeId(1);
let epoch = EpochId(0);
let txn = TransactionId(1);
assert!(store.get_node(nid).is_none());
assert!(store.get_edge(eid).is_none());
assert!(store.get_node_versioned(nid, epoch, txn).is_none());
assert!(store.get_edge_versioned(eid, epoch, txn).is_none());
assert!(store.get_node_at_epoch(nid, epoch).is_none());
assert!(store.get_edge_at_epoch(eid, epoch).is_none());
}
#[test]
fn null_graph_store_property_access() {
let store = NullGraphStore;
let nid = NodeId(1);
let eid = EdgeId(1);
let key = PropertyKey::from("name");
assert!(store.get_node_property(nid, &key).is_none());
assert!(store.get_edge_property(eid, &key).is_none());
assert_eq!(
store.get_node_property_batch(&[nid, NodeId(2)], &key),
vec![None, None]
);
let node_props = store.get_nodes_properties_batch(&[nid]);
assert_eq!(node_props.len(), 1);
assert!(node_props[0].is_empty());
let selective =
store.get_nodes_properties_selective_batch(&[nid], std::slice::from_ref(&key));
assert_eq!(selective.len(), 1);
assert!(selective[0].is_empty());
let edge_selective = store.get_edges_properties_selective_batch(&[eid], &[key]);
assert_eq!(edge_selective.len(), 1);
assert!(edge_selective[0].is_empty());
}
#[test]
fn null_graph_store_traversal() {
let store = NullGraphStore;
let nid = NodeId(1);
assert!(store.neighbors(nid, Direction::Outgoing).is_empty());
assert!(store.edges_from(nid, Direction::Incoming).is_empty());
assert_eq!(store.out_degree(nid), 0);
assert_eq!(store.in_degree(nid), 0);
assert!(!store.has_backward_adjacency());
}
#[test]
fn null_graph_store_scans_and_counts() {
let store = NullGraphStore;
assert!(store.node_ids().is_empty());
assert!(store.all_node_ids().is_empty());
assert!(store.nodes_by_label("Person").is_empty());
assert_eq!(store.node_count(), 0);
assert_eq!(store.edge_count(), 0);
}
#[test]
fn null_graph_store_metadata_and_schema() {
let store = NullGraphStore;
let eid = EdgeId(1);
let epoch = EpochId(0);
let txn = TransactionId(1);
assert!(store.edge_type(eid).is_none());
assert!(store.edge_type_versioned(eid, epoch, txn).is_none());
assert!(!store.has_property_index("name"));
assert!(store.all_labels().is_empty());
assert!(store.all_edge_types().is_empty());
assert!(store.all_property_keys().is_empty());
}
#[test]
fn null_graph_store_search() {
let store = NullGraphStore;
let key = PropertyKey::from("age");
let val = Value::Int64(30);
assert!(store.find_nodes_by_property("age", &val).is_empty());
assert!(
store
.find_nodes_by_properties(&[("age", val.clone())])
.is_empty()
);
assert!(
store
.find_nodes_in_range("age", Some(&val), None, true, false)
.is_empty()
);
assert!(!store.node_property_might_match(&key, CompareOp::Eq, &val));
assert!(!store.edge_property_might_match(&key, CompareOp::Eq, &val));
}
#[test]
fn null_graph_store_statistics() {
let store = NullGraphStore;
let _stats = store.statistics();
assert_eq!(store.estimate_label_cardinality("Person"), 0.0);
assert_eq!(store.estimate_avg_degree("KNOWS", true), 0.0);
assert_eq!(store.current_epoch(), EpochId(0));
}
#[test]
fn null_graph_store_visibility() {
let store = NullGraphStore;
let nid = NodeId(1);
let eid = EdgeId(1);
let epoch = EpochId(0);
let txn = TransactionId(1);
assert!(!store.is_node_visible_at_epoch(nid, epoch));
assert!(!store.is_node_visible_versioned(nid, epoch, txn));
assert!(!store.is_edge_visible_at_epoch(eid, epoch));
assert!(!store.is_edge_visible_versioned(eid, epoch, txn));
assert!(
store
.filter_visible_node_ids(&[nid, NodeId(2)], epoch)
.is_empty()
);
assert!(
store
.filter_visible_node_ids_versioned(&[nid], epoch, txn)
.is_empty()
);
}
#[test]
fn null_graph_store_history() {
let store = NullGraphStore;
assert!(store.get_node_history(NodeId(1)).is_empty());
assert!(store.get_edge_history(EdgeId(1)).is_empty());
}
}