pub mod backend;
pub mod column_store;
pub mod disk;
pub mod interner;
pub mod lookups;
pub mod mapped;
pub mod mapped_graph_impl;
pub mod memory;
pub mod type_build_meta;
use crate::datatypes::Value;
use crate::graph::core::iterators::GraphEdgeRef;
use crate::graph::schema::{EdgeData, InternedKey, NodeData};
use petgraph::graph::{EdgeIndex, NodeIndex};
use petgraph::stable_graph::StableDiGraph;
use petgraph::Direction;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::Instant;
pub trait GraphRead {
type NodeIndicesIter<'a>: Iterator<Item = NodeIndex>
where
Self: 'a;
type EdgeIndicesIter<'a>: Iterator<Item = EdgeIndex>
where
Self: 'a;
type EdgesIter<'a>: Iterator<Item = GraphEdgeRef<'a>>
where
Self: 'a;
type EdgeReferencesIter<'a>: Iterator<Item = GraphEdgeRef<'a>>
where
Self: 'a;
type EdgesConnectingIter<'a>: Iterator<Item = GraphEdgeRef<'a>>
where
Self: 'a;
type NeighborsIter<'a>: Iterator<Item = NodeIndex>
where
Self: 'a;
fn node_count(&self) -> usize;
fn edge_count(&self) -> usize;
fn node_bound(&self) -> usize;
#[allow(dead_code)]
fn is_memory(&self) -> bool;
fn is_mapped(&self) -> bool {
false
}
fn is_disk(&self) -> bool {
false
}
fn node_type_of(&self, idx: NodeIndex) -> Option<InternedKey>;
fn node_labels_of(&self, idx: NodeIndex) -> Vec<InternedKey> {
match self.node_type_of(idx) {
Some(key) => vec![key],
None => Vec::new(),
}
}
fn node_weight(&self, idx: NodeIndex) -> Option<&NodeData>;
fn get_node_property(&self, idx: NodeIndex, key: InternedKey) -> Option<Value>;
fn get_node_id(&self, idx: NodeIndex) -> Option<Value>;
fn get_node_title(&self, idx: NodeIndex) -> Option<Value>;
fn str_prop_eq(&self, idx: NodeIndex, key: InternedKey, target: &str) -> Option<bool>;
fn node_indices(&self) -> Self::NodeIndicesIter<'_>;
fn edge_indices(&self) -> Self::EdgeIndicesIter<'_>;
fn edge_references(&self) -> Self::EdgeReferencesIter<'_>;
fn edge_weights<'a>(&'a self) -> Box<dyn Iterator<Item = &'a EdgeData> + 'a>;
fn edges_directed(&self, idx: NodeIndex, dir: Direction) -> Self::EdgesIter<'_>;
fn edges(&self, idx: NodeIndex) -> Self::EdgesIter<'_>;
fn edges_directed_filtered(
&self,
idx: NodeIndex,
dir: Direction,
conn_type_filter: Option<InternedKey>,
) -> Self::EdgesIter<'_>;
fn edges_connecting(&self, a: NodeIndex, b: NodeIndex) -> Self::EdgesConnectingIter<'_>;
fn edge_weight(&self, idx: EdgeIndex) -> Option<&EdgeData>;
fn find_edge(&self, a: NodeIndex, b: NodeIndex) -> Option<EdgeIndex>;
fn edge_endpoints(&self, idx: EdgeIndex) -> Option<(NodeIndex, NodeIndex)>;
fn edge_endpoint_keys<'a>(
&'a self,
) -> Box<dyn Iterator<Item = (NodeIndex, NodeIndex, InternedKey)> + 'a>;
fn neighbors_directed(&self, idx: NodeIndex, dir: Direction) -> Self::NeighborsIter<'_>;
fn neighbors_undirected(&self, idx: NodeIndex) -> Self::NeighborsIter<'_>;
fn sources_for_conn_type_bounded(
&self,
_conn_type: InternedKey,
_max: Option<usize>,
) -> Option<Vec<u32>> {
None
}
fn lookup_peer_counts(&self, _conn_type: InternedKey) -> Option<HashMap<u32, i64>> {
None
}
fn lookup_by_property_eq(
&self,
_node_type: &str,
_property: &str,
_value: &str,
) -> Option<Vec<NodeIndex>> {
None
}
fn lookup_by_property_prefix(
&self,
_node_type: &str,
_property: &str,
_prefix: &str,
_limit: usize,
) -> Option<Vec<NodeIndex>> {
None
}
fn lookup_by_property_eq_any_type(
&self,
_property: &str,
_value: &str,
) -> Option<Vec<NodeIndex>> {
None
}
fn lookup_by_property_prefix_any_type(
&self,
_property: &str,
_prefix: &str,
_limit: usize,
) -> Option<Vec<NodeIndex>> {
None
}
fn count_edges_grouped_by_peer(
&self,
conn_type: InternedKey,
dir: Direction,
deadline: Option<Instant>,
) -> Result<HashMap<u32, i64>, String>;
fn count_edges_filtered(
&self,
node: NodeIndex,
dir: Direction,
conn_type: Option<InternedKey>,
other_node_type: Option<InternedKey>,
deadline: Option<Instant>,
) -> Result<usize, String>;
fn iter_peers_filtered<'a>(
&'a self,
node: NodeIndex,
dir: Direction,
conn_type: Option<u64>,
) -> Box<dyn Iterator<Item = (NodeIndex, EdgeIndex)> + 'a> {
let iter = self.edges_directed(node, dir).filter_map(move |er| {
if let Some(want) = conn_type {
if er.weight().connection_type.as_u64() != want {
return None;
}
}
let peer = match dir {
Direction::Outgoing => er.target(),
Direction::Incoming => er.source(),
};
Some((peer, er.id()))
});
Box::new(iter)
}
fn reset_arenas(&self) {}
}
pub trait GraphWrite: GraphRead {
fn node_weight_mut(&mut self, idx: NodeIndex) -> Option<&mut NodeData>;
fn node_weight_mut_silent(&mut self, idx: NodeIndex) -> Option<&mut NodeData> {
self.node_weight_mut(idx)
}
fn edge_weight_mut(&mut self, idx: EdgeIndex) -> Option<&mut EdgeData>;
fn add_node(&mut self, data: NodeData) -> NodeIndex;
fn remove_node(&mut self, idx: NodeIndex) -> Option<NodeData>;
fn add_edge(&mut self, a: NodeIndex, b: NodeIndex, data: EdgeData) -> EdgeIndex;
fn remove_edge(&mut self, idx: EdgeIndex) -> Option<EdgeData>;
fn update_row_id(&mut self, _node_idx: NodeIndex, _row_id: u32) {}
fn flush_pending_writes(&mut self) {}
}
#[derive(Clone, Debug, Default)]
pub struct MemoryGraph(pub(crate) StableDiGraph<NodeData, EdgeData>);
#[derive(Debug, Default)]
pub struct MappedGraph {
pub(crate) inner: StableDiGraph<NodeData, EdgeData>,
pub(crate) type_index: RwLock<HashMap<u64, Arc<MappedTypeIndex>>>,
pub(crate) property_index: RwLock<HashMap<(String, String), Arc<MappedPropertyIndex>>>,
pub(crate) global_property_index: RwLock<HashMap<String, Arc<MappedPropertyIndex>>>,
}
#[derive(Debug, Default)]
pub struct MappedTypeIndex {
pub out_sources: Vec<NodeIndex>,
pub out_offsets: Vec<u32>,
pub out_edges: Vec<EdgeIndex>,
pub in_sources: Vec<NodeIndex>,
pub in_offsets: Vec<u32>,
pub in_edges: Vec<EdgeIndex>,
pub out_peer_counts: HashMap<NodeIndex, i64>,
pub in_peer_counts: HashMap<NodeIndex, i64>,
}
#[derive(Debug, Default)]
pub struct MappedPropertyIndex {
pub keys: Vec<String>,
pub nodes: Vec<NodeIndex>,
}
impl MappedPropertyIndex {
fn lower_bound(&self, target: &str) -> usize {
let mut lo = 0usize;
let mut hi = self.keys.len();
while lo < hi {
let mid = lo + (hi - lo) / 2;
if self.keys[mid].as_str() < target {
lo = mid + 1;
} else {
hi = mid;
}
}
lo
}
pub fn lookup_eq(&self, value: &str) -> Vec<NodeIndex> {
let start = self.lower_bound(value);
let mut out = Vec::new();
let mut i = start;
while i < self.keys.len() && self.keys[i] == value {
out.push(self.nodes[i]);
i += 1;
}
out
}
pub fn lookup_prefix(&self, prefix: &str, limit: usize) -> Vec<NodeIndex> {
if limit == 0 {
return Vec::new();
}
let start = self.lower_bound(prefix);
let mut out = Vec::with_capacity(limit.min(16));
let mut i = start;
while i < self.keys.len() && out.len() < limit {
if !self.keys[i].starts_with(prefix) {
break;
}
out.push(self.nodes[i]);
i += 1;
}
out
}
}
impl Clone for MappedGraph {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
type_index: RwLock::new(HashMap::new()),
property_index: RwLock::new(HashMap::new()),
global_property_index: RwLock::new(HashMap::new()),
}
}
}
impl MemoryGraph {
#[inline]
pub fn new() -> Self {
Self(StableDiGraph::new())
}
#[inline]
pub fn inner(&self) -> &StableDiGraph<NodeData, EdgeData> {
&self.0
}
#[inline]
pub fn inner_mut(&mut self) -> &mut StableDiGraph<NodeData, EdgeData> {
&mut self.0
}
}
pub(super) fn flatten_to_csr(
mut map: HashMap<NodeIndex, Vec<EdgeIndex>>,
) -> (Vec<NodeIndex>, Vec<u32>, Vec<EdgeIndex>) {
let mut sources: Vec<NodeIndex> = map.keys().copied().collect();
sources.sort_by_key(|n| n.index());
let mut offsets: Vec<u32> = Vec::with_capacity(sources.len() + 1);
let total: usize = map.values().map(|v| v.len()).sum();
let mut flat: Vec<EdgeIndex> = Vec::with_capacity(total);
offsets.push(0);
for src in &sources {
if let Some(edges) = map.remove(src) {
flat.extend(edges);
}
offsets.push(flat.len() as u32);
}
(sources, offsets, flat)
}
impl Deref for MemoryGraph {
type Target = StableDiGraph<NodeData, EdgeData>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Deref for MappedGraph {
type Target = StableDiGraph<NodeData, EdgeData>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl serde::Serialize for MemoryGraph {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
self.0.serialize(ser)
}
}
impl<'de> serde::Deserialize<'de> for MemoryGraph {
fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
StableDiGraph::deserialize(de).map(MemoryGraph)
}
}
impl serde::Serialize for MappedGraph {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
self.inner.serialize(ser)
}
}
impl<'de> serde::Deserialize<'de> for MappedGraph {
fn deserialize<D: serde::Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
StableDiGraph::deserialize(de).map(|inner| MappedGraph {
inner,
type_index: RwLock::new(HashMap::new()),
property_index: RwLock::new(HashMap::new()),
global_property_index: RwLock::new(HashMap::new()),
})
}
}
pub mod impls;
pub mod recording;
#[allow(unused_imports)]
pub use recording::RecordingGraph;