use arcstr::ArcStr;
use grafeo_common::types::{EdgeId, NodeId, PropertyKey, Value};
use grafeo_common::utils::hash::FxHashMap;
use super::column::ColumnCodec;
use super::csr::CsrAdjacency;
use super::id::{encode_edge_id, encode_node_id};
use super::schema::EdgeSchema;
#[derive(Debug)]
pub struct RelTable {
schema: EdgeSchema,
fwd: CsrAdjacency,
bwd: Option<CsrAdjacency>,
properties: FxHashMap<PropertyKey, ColumnCodec>,
src_table_id: u16,
dst_table_id: u16,
}
impl RelTable {
#[must_use]
pub fn new(
schema: EdgeSchema,
fwd: CsrAdjacency,
bwd: Option<CsrAdjacency>,
properties: FxHashMap<PropertyKey, ColumnCodec>,
src_table_id: u16,
dst_table_id: u16,
) -> Self {
if let Some(ref b) = bwd {
assert!(
b.has_edge_data() || b.num_edges() == 0,
"backward CSR must have edge_data populated"
);
}
Self {
schema,
fwd,
bwd,
properties,
src_table_id,
dst_table_id,
}
}
#[must_use]
pub fn edge_type(&self) -> &ArcStr {
&self.schema.edge_type
}
#[must_use]
pub fn rel_table_id(&self) -> u16 {
self.schema.rel_table_id
}
#[must_use]
pub fn src_table_id(&self) -> u16 {
self.src_table_id
}
#[must_use]
pub fn dst_table_id(&self) -> u16 {
self.dst_table_id
}
#[must_use]
pub fn num_edges(&self) -> usize {
self.fwd.num_edges()
}
#[must_use]
pub fn has_backward(&self) -> bool {
self.bwd.is_some()
}
#[must_use]
pub fn edges_from_source(&self, src_offset: u32) -> Vec<(NodeId, EdgeId)> {
let neighbors = self.fwd.neighbors(src_offset);
let start_pos = u64::from(self.fwd.offset_of(src_offset));
let rel_id = self.schema.rel_table_id;
neighbors
.iter()
.enumerate()
.map(|(i, &target_offset)| {
let node_id = encode_node_id(self.dst_table_id, u64::from(target_offset));
let edge_id = encode_edge_id(rel_id, start_pos + i as u64);
(node_id, edge_id)
})
.collect()
}
#[must_use]
pub fn edges_to_target(&self, dst_offset: u32) -> Option<Vec<(NodeId, EdgeId)>> {
let bwd = self.bwd.as_ref()?;
let bwd_start = bwd.offset_of(dst_offset) as usize;
let source_offsets = bwd.neighbors(dst_offset);
let rel_id = self.schema.rel_table_id;
let results = source_offsets
.iter()
.enumerate()
.filter_map(|(i, &src_offset)| {
let fwd_pos = bwd.edge_data_at(bwd_start + i)?;
let node_id = encode_node_id(self.src_table_id, u64::from(src_offset));
let edge_id = encode_edge_id(rel_id, u64::from(fwd_pos));
Some((node_id, edge_id))
})
.collect();
Some(results)
}
#[must_use]
pub fn get_edge_property(&self, csr_position: usize, key: &PropertyKey) -> Option<Value> {
self.properties.get(key)?.get(csr_position)
}
#[must_use]
pub fn get_all_edge_properties(&self, csr_position: usize) -> FxHashMap<PropertyKey, Value> {
let mut props = FxHashMap::default();
for (key, col) in &self.properties {
if let Some(value) = col.get(csr_position) {
props.insert(key.clone(), value);
}
}
props
}
#[must_use]
pub fn property_keys(&self) -> Vec<PropertyKey> {
self.properties.keys().cloned().collect()
}
#[must_use]
pub fn source_node_id(&self, csr_position: u32) -> Option<NodeId> {
let src_offset = self.fwd.source_for_position(csr_position)?;
Some(encode_node_id(self.src_table_id, u64::from(src_offset)))
}
#[must_use]
pub fn dest_node_id(&self, csr_position: u32) -> Option<NodeId> {
let src = self.fwd.source_for_position(csr_position)?;
let start = self.fwd.offset_of(src);
let local_idx = (csr_position - start) as usize;
let target_offset = *self.fwd.neighbors(src).get(local_idx)?;
Some(encode_node_id(self.dst_table_id, u64::from(target_offset)))
}
#[must_use]
pub fn out_degree(&self, src_offset: u32) -> usize {
self.fwd.degree(src_offset)
}
#[must_use]
pub fn in_degree(&self, dst_offset: u32) -> Option<usize> {
self.bwd.as_ref().map(|b| b.degree(dst_offset))
}
#[must_use]
pub fn memory_bytes(&self) -> usize {
let fwd_bytes = self.fwd.memory_bytes();
let bwd_bytes = self.bwd.as_ref().map_or(0, |b| b.memory_bytes());
let prop_bytes: usize = self.properties.values().map(|c| c.heap_bytes()).sum();
fwd_bytes + bwd_bytes + prop_bytes
}
}
#[cfg(test)]
mod tests {
use super::super::id::decode_node_id;
use super::*;
fn make_test_rel_table() -> RelTable {
let fwd_edges = vec![(0u32, 0u32), (0, 1), (1, 0), (1, 1), (2, 0)];
let fwd = CsrAdjacency::from_sorted_edges(3, &fwd_edges);
let bwd_edges = vec![(0u32, 0u32), (0, 1), (0, 2), (1, 0), (1, 1)];
let mut bwd = CsrAdjacency::from_sorted_edges(2, &bwd_edges);
let mut mapping = Vec::with_capacity(bwd_edges.len());
for &(dst, src) in &bwd_edges {
let fwd_start = fwd.offset_of(src);
let local_idx = fwd.neighbors(src).iter().position(|&t| t == dst).unwrap();
mapping.push(fwd_start + local_idx as u32);
}
bwd.set_edge_data(mapping);
let schema = EdgeSchema::new("LIKES", 5, "Person", "Movie", vec![]);
RelTable::new(
schema,
fwd,
Some(bwd),
FxHashMap::default(),
0, 1, )
}
#[test]
fn test_forward_traversal() {
let rt = make_test_rel_table();
assert_eq!(rt.edge_type().as_str(), "LIKES");
assert_eq!(rt.rel_table_id(), 5);
assert_eq!(rt.num_edges(), 5);
assert!(rt.has_backward());
let edges_0 = rt.edges_from_source(0);
assert_eq!(edges_0.len(), 2);
let (node_id_0, _edge_id_0) = edges_0[0];
let (table, offset) = decode_node_id(node_id_0);
assert_eq!(table, 1); assert_eq!(offset, 0);
let (node_id_1, _edge_id_1) = edges_0[1];
let (table, offset) = decode_node_id(node_id_1);
assert_eq!(table, 1);
assert_eq!(offset, 1);
let edges_1 = rt.edges_from_source(1);
assert_eq!(edges_1.len(), 2);
let edges_2 = rt.edges_from_source(2);
assert_eq!(edges_2.len(), 1);
let (nid, _) = edges_2[0];
let (table, offset) = decode_node_id(nid);
assert_eq!(table, 1);
assert_eq!(offset, 0);
}
#[test]
fn test_backward_traversal() {
let rt = make_test_rel_table();
let edges_to_0 = rt.edges_to_target(0).expect("backward CSR is present");
assert_eq!(edges_to_0.len(), 3);
let source_offsets: Vec<u64> = edges_to_0
.iter()
.map(|(nid, _)| decode_node_id(*nid).1)
.collect();
assert_eq!(source_offsets, vec![0, 1, 2]);
let edges_to_1 = rt.edges_to_target(1).expect("backward CSR is present");
assert_eq!(edges_to_1.len(), 2);
let source_offsets: Vec<u64> = edges_to_1
.iter()
.map(|(nid, _)| decode_node_id(*nid).1)
.collect();
assert_eq!(source_offsets, vec![0, 1]);
}
#[test]
fn test_degree() {
let rt = make_test_rel_table();
assert_eq!(rt.out_degree(0), 2);
assert_eq!(rt.out_degree(1), 2);
assert_eq!(rt.out_degree(2), 1);
assert_eq!(rt.in_degree(0), Some(3));
assert_eq!(rt.in_degree(1), Some(2));
}
#[test]
fn test_no_backward_csr() {
let fwd_edges = vec![(0u32, 1u32)];
let fwd = CsrAdjacency::from_sorted_edges(2, &fwd_edges);
let schema = EdgeSchema::new("FOLLOWS", 10, "User", "User", vec![]);
let rt = RelTable::new(schema, fwd, None, FxHashMap::default(), 0, 0);
assert!(!rt.has_backward());
assert_eq!(rt.edges_to_target(0), None);
assert_eq!(rt.in_degree(0), None);
}
}