use crate::types::*;
use super::db::GraphDB;
pub struct NodeIter<'a> {
db: &'a GraphDB,
snapshot_phys: u64,
delta_iter: Option<std::collections::hash_map::Keys<'a, NodeId, NodeDelta>>,
yielded_from_snapshot: std::collections::HashSet<NodeId>,
phase: NodeIterPhase,
}
enum NodeIterPhase {
Snapshot,
DeltaCreated,
Done,
}
impl<'a> NodeIter<'a> {
pub fn new(db: &'a GraphDB) -> Self {
Self {
db,
snapshot_phys: 0,
delta_iter: None,
yielded_from_snapshot: std::collections::HashSet::new(),
phase: NodeIterPhase::Snapshot,
}
}
}
impl<'a> Iterator for NodeIter<'a> {
type Item = NodeId;
fn next(&mut self) -> Option<Self::Item> {
let delta = self.db.delta.read();
loop {
match self.phase {
NodeIterPhase::Snapshot => {
if let Some(ref snapshot) = self.db.snapshot {
while self.snapshot_phys < snapshot.header.num_nodes {
let phys = self.snapshot_phys as u32;
self.snapshot_phys += 1;
if let Some(node_id) = snapshot.get_node_id(phys) {
if delta.deleted_nodes.contains(&node_id) {
continue;
}
self.yielded_from_snapshot.insert(node_id);
return Some(node_id);
}
}
}
self.phase = NodeIterPhase::DeltaCreated;
}
NodeIterPhase::DeltaCreated => {
if self.delta_iter.is_none() {
drop(delta);
let delta = self.db.delta.read();
for &node_id in delta.created_nodes.keys() {
if !self.yielded_from_snapshot.contains(&node_id)
&& !delta.deleted_nodes.contains(&node_id)
{
self.yielded_from_snapshot.insert(node_id);
return Some(node_id);
}
}
self.phase = NodeIterPhase::Done;
return None;
}
self.phase = NodeIterPhase::Done;
return None;
}
NodeIterPhase::Done => {
return None;
}
}
}
}
}
pub struct OutEdgeIter<'a> {
db: &'a GraphDB,
src: NodeId,
snapshot_idx: usize,
snapshot_count: usize,
src_phys: Option<u32>,
delta_add_iter: Option<std::collections::btree_set::Iter<'a, EdgePatch>>,
phase: OutEdgeIterPhase,
}
#[derive(Debug, Clone, Copy)]
enum OutEdgeIterPhase {
Snapshot,
DeltaAdded,
Done,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Edge {
pub etype: ETypeId,
pub dst: NodeId,
}
impl<'a> OutEdgeIter<'a> {
pub fn new(db: &'a GraphDB, src: NodeId) -> Self {
let src_phys = db.snapshot.as_ref().and_then(|s| s.get_phys_node(src));
let snapshot_count = src_phys
.and_then(|p| db.snapshot.as_ref()?.get_out_degree(p))
.unwrap_or(0);
Self {
db,
src,
snapshot_idx: 0,
snapshot_count,
src_phys,
delta_add_iter: None,
phase: OutEdgeIterPhase::Snapshot,
}
}
}
impl<'a> Iterator for OutEdgeIter<'a> {
type Item = Edge;
fn next(&mut self) -> Option<Self::Item> {
let delta = self.db.delta.read();
loop {
match self.phase {
OutEdgeIterPhase::Snapshot => {
if let (Some(snapshot), Some(phys)) = (&self.db.snapshot, self.src_phys) {
while self.snapshot_idx < self.snapshot_count {
let idx = self.snapshot_idx;
self.snapshot_idx += 1;
let mut iter = snapshot.iter_out_edges(phys);
for _ in 0..idx {
iter.next();
}
if let Some((etype, dst_phys)) = iter.next() {
if let Some(dst) = snapshot.get_node_id(dst_phys) {
if delta.is_edge_deleted(self.src, etype, dst) {
continue;
}
return Some(Edge { etype, dst });
}
}
}
}
self.phase = OutEdgeIterPhase::DeltaAdded;
}
OutEdgeIterPhase::DeltaAdded => {
if let Some(add_set) = delta.out_add.get(&self.src) {
if let Some(patch) = add_set.iter().next() {
return Some(Edge {
etype: patch.etype,
dst: patch.other,
});
}
}
self.phase = OutEdgeIterPhase::Done;
return None;
}
OutEdgeIterPhase::Done => {
return None;
}
}
}
}
}
pub fn list_nodes(db: &GraphDB) -> Vec<NodeId> {
let delta = db.delta.read();
let mut nodes = Vec::new();
let mut seen = std::collections::HashSet::new();
if let Some(ref snapshot) = db.snapshot {
for phys in 0..snapshot.header.num_nodes {
if let Some(node_id) = snapshot.get_node_id(phys as u32) {
if !delta.deleted_nodes.contains(&node_id) {
nodes.push(node_id);
seen.insert(node_id);
}
}
}
}
for &node_id in delta.created_nodes.keys() {
if !seen.contains(&node_id) && !delta.deleted_nodes.contains(&node_id) {
nodes.push(node_id);
}
}
nodes
}
pub fn list_out_edges(db: &GraphDB, src: NodeId) -> Vec<Edge> {
let delta = db.delta.read();
let mut edges = Vec::new();
if let Some(ref snapshot) = db.snapshot {
if let Some(phys) = snapshot.get_phys_node(src) {
for (dst_phys, etype) in snapshot.iter_out_edges(phys) {
if let Some(dst) = snapshot.get_node_id(dst_phys) {
if !delta.is_edge_deleted(src, etype, dst) {
edges.push(Edge { etype, dst });
}
}
}
}
}
if let Some(add_set) = delta.out_add.get(&src) {
for patch in add_set {
edges.push(Edge {
etype: patch.etype,
dst: patch.other,
});
}
}
edges
}
pub fn count_nodes(db: &GraphDB) -> u64 {
let delta = db.delta.read();
let mut count = db
.snapshot
.as_ref()
.map(|s| s.header.num_nodes)
.unwrap_or(0);
for &node_id in &delta.deleted_nodes {
if let Some(ref snapshot) = db.snapshot {
if snapshot.has_node(node_id) {
count = count.saturating_sub(1);
}
}
}
for &node_id in delta.created_nodes.keys() {
if !delta.deleted_nodes.contains(&node_id) {
count += 1;
}
}
count
}
pub fn count_edges(db: &GraphDB, etype_filter: Option<ETypeId>) -> u64 {
let delta = db.delta.read();
let mut count = db
.snapshot
.as_ref()
.map(|s| s.header.num_edges)
.unwrap_or(0);
for del_set in delta.out_del.values() {
for patch in del_set {
if etype_filter.is_none() || etype_filter == Some(patch.etype) {
count = count.saturating_sub(1);
}
}
}
for add_set in delta.out_add.values() {
for patch in add_set {
if etype_filter.is_none() || etype_filter == Some(patch.etype) {
count += 1;
}
}
}
count
}
pub fn list_in_edges(db: &GraphDB, dst: NodeId) -> Vec<Edge> {
let delta = db.delta.read();
let mut edges = Vec::new();
if let Some(ref snapshot) = db.snapshot {
if let Some(phys) = snapshot.get_phys_node(dst) {
for (src_phys, etype, _out_index) in snapshot.iter_in_edges(phys) {
if let Some(src) = snapshot.get_node_id(src_phys) {
if !delta.is_edge_deleted(src, etype, dst) {
edges.push(Edge { etype, dst: src }); }
}
}
}
}
if let Some(add_set) = delta.in_add.get(&dst) {
for patch in add_set {
edges.push(Edge {
etype: patch.etype,
dst: patch.other, });
}
}
edges
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FullEdge {
pub src: NodeId,
pub etype: ETypeId,
pub dst: NodeId,
}
#[derive(Debug, Clone, Default)]
pub struct ListEdgesOptions {
pub etype: Option<ETypeId>,
}
pub fn list_edges(db: &GraphDB, options: ListEdgesOptions) -> Vec<FullEdge> {
let delta = db.delta.read();
let mut edges = Vec::new();
let mut seen_nodes = std::collections::HashSet::new();
if let Some(ref snapshot) = db.snapshot {
for phys in 0..snapshot.header.num_nodes as u32 {
if let Some(src) = snapshot.get_node_id(phys) {
if delta.deleted_nodes.contains(&src) {
continue;
}
seen_nodes.insert(src);
for (dst_phys, etype) in snapshot.iter_out_edges(phys) {
if let Some(dst) = snapshot.get_node_id(dst_phys) {
if let Some(filter_etype) = options.etype {
if etype != filter_etype {
continue;
}
}
if delta.is_edge_deleted(src, etype, dst) {
continue;
}
edges.push(FullEdge { src, etype, dst });
}
}
}
}
}
for &src in delta.created_nodes.keys() {
if delta.deleted_nodes.contains(&src) {
continue;
}
seen_nodes.insert(src);
}
for (&src, add_set) in &delta.out_add {
for patch in add_set {
if let Some(filter_etype) = options.etype {
if patch.etype != filter_etype {
continue;
}
}
edges.push(FullEdge {
src,
etype: patch.etype,
dst: patch.other,
});
}
}
edges
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::db::{close_graph_db, open_graph_db, OpenOptions};
use crate::graph::edges::add_edge;
use crate::graph::nodes::{create_node, NodeOpts};
use crate::graph::tx::{begin_tx, commit};
use tempfile::tempdir;
#[test]
fn test_list_nodes_empty() {
let temp_dir = tempdir().unwrap();
let db = open_graph_db(temp_dir.path(), OpenOptions::new()).unwrap();
let nodes = list_nodes(&db);
assert!(nodes.is_empty());
close_graph_db(db).unwrap();
}
#[test]
fn test_list_nodes_with_data() {
let temp_dir = tempdir().unwrap();
let db = open_graph_db(temp_dir.path(), OpenOptions::new()).unwrap();
let mut tx = begin_tx(&db).unwrap();
let n1 = create_node(&mut tx, NodeOpts::new()).unwrap();
let n2 = create_node(&mut tx, NodeOpts::new()).unwrap();
let n3 = create_node(&mut tx, NodeOpts::new()).unwrap();
commit(&mut tx).unwrap();
let nodes = list_nodes(&db);
assert_eq!(nodes.len(), 3);
assert!(nodes.contains(&n1));
assert!(nodes.contains(&n2));
assert!(nodes.contains(&n3));
close_graph_db(db).unwrap();
}
#[test]
fn test_list_out_edges() {
let temp_dir = tempdir().unwrap();
let db = open_graph_db(temp_dir.path(), OpenOptions::new()).unwrap();
let mut tx = begin_tx(&db).unwrap();
let alice = create_node(&mut tx, NodeOpts::new()).unwrap();
let bob = create_node(&mut tx, NodeOpts::new()).unwrap();
let charlie = create_node(&mut tx, NodeOpts::new()).unwrap();
add_edge(&mut tx, alice, 1, bob).unwrap();
add_edge(&mut tx, alice, 1, charlie).unwrap();
commit(&mut tx).unwrap();
let edges = list_out_edges(&db, alice);
assert_eq!(edges.len(), 2);
close_graph_db(db).unwrap();
}
}