use crate::algo::IdMap;
use anyhow::{Result, anyhow};
use uni_common::core::id::{Eid, Vid};
use uni_store::runtime::L0Manager;
use uni_store::runtime::property_manager::PropertyManager;
use uni_store::storage::direction::Direction as CacheDir;
use uni_store::storage::manager::StorageManager;
type WeightedEdgeList = Vec<(u32, u32, f64)>;
#[derive(Debug, Clone, Default)]
pub struct ProjectionConfig {
pub node_labels: Vec<String>,
pub edge_types: Vec<String>,
pub weight_property: Option<String>,
pub include_reverse: bool,
}
#[derive(Debug, Clone)]
pub struct GraphProjection {
pub(crate) vertex_count: usize,
pub(crate) out_offsets: Vec<u32>, pub(crate) out_neighbors: Vec<u32>,
pub(crate) in_offsets: Vec<u32>, pub(crate) in_neighbors: Vec<u32>,
pub(crate) out_weights: Option<Vec<f64>>,
pub(crate) id_map: IdMap,
}
impl GraphProjection {
#[inline]
pub fn vertex_count(&self) -> usize {
self.vertex_count
}
#[inline]
pub fn edge_count(&self) -> usize {
self.out_neighbors.len()
}
#[inline]
pub fn out_neighbors(&self, slot: u32) -> &[u32] {
let start = self.out_offsets[slot as usize] as usize;
let end = self.out_offsets[slot as usize + 1] as usize;
&self.out_neighbors[start..end]
}
#[inline]
pub fn out_degree(&self, slot: u32) -> u32 {
self.out_offsets[slot as usize + 1] - self.out_offsets[slot as usize]
}
#[inline]
pub fn in_neighbors(&self, slot: u32) -> &[u32] {
let start = self.in_offsets[slot as usize] as usize;
let end = self.in_offsets[slot as usize + 1] as usize;
&self.in_neighbors[start..end]
}
#[inline]
pub fn in_degree(&self, slot: u32) -> u32 {
self.in_offsets[slot as usize + 1] - self.in_offsets[slot as usize]
}
#[inline]
pub fn out_weight(&self, slot: u32, edge_idx: usize) -> f64 {
let start = self.out_offsets[slot as usize] as usize;
self.out_weights.as_ref().expect("no weights")[start + edge_idx]
}
#[inline]
pub fn has_weights(&self) -> bool {
self.out_weights.is_some()
}
#[inline]
pub fn has_reverse(&self) -> bool {
!self.in_neighbors.is_empty()
}
#[inline]
pub fn to_vid(&self, slot: u32) -> Vid {
self.id_map.to_vid_unchecked(slot)
}
#[inline]
pub fn to_slot(&self, vid: Vid) -> Option<u32> {
self.id_map.to_slot(vid)
}
pub fn vertices(&self) -> impl Iterator<Item = (u32, Vid)> + '_ {
self.id_map.iter()
}
pub fn memory_size(&self) -> usize {
self.out_offsets.len() * 4
+ self.out_neighbors.len() * 4
+ self.in_offsets.len() * 4
+ self.in_neighbors.len() * 4
+ self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
+ self.id_map.memory_size()
}
}
use std::sync::Arc;
pub struct ProjectionBuilder {
storage: Arc<StorageManager>,
l0_manager: Option<Arc<L0Manager>>,
config: ProjectionConfig,
}
impl ProjectionBuilder {
pub fn new(storage: Arc<StorageManager>) -> Self {
Self {
storage,
l0_manager: None,
config: ProjectionConfig::default(),
}
}
pub fn l0_manager(mut self, l0_manager: Option<Arc<L0Manager>>) -> Self {
self.l0_manager = l0_manager;
self
}
pub fn node_labels(mut self, labels: &[&str]) -> Self {
self.config.node_labels = labels.iter().map(|s| s.to_string()).collect();
self
}
pub fn edge_types(mut self, types: &[&str]) -> Self {
self.config.edge_types = types.iter().map(|s| s.to_string()).collect();
self
}
pub fn weight_property(mut self, prop: &str) -> Self {
self.config.weight_property = Some(prop.to_string());
self
}
pub fn include_reverse(mut self, enabled: bool) -> Self {
self.config.include_reverse = enabled;
self
}
pub async fn build(self) -> Result<GraphProjection> {
let schema = self.storage.schema_manager().schema();
let (label_ids, edge_type_ids) = self.resolve_ids(&schema)?;
self.warm_caches(&label_ids, &edge_type_ids).await?;
let all_vids = self.collect_vertices(&schema, &label_ids).await?;
let mut id_map = IdMap::with_capacity(all_vids.len());
for vid in all_vids {
id_map.insert(vid);
}
let vertex_count = id_map.len();
let (out_edges, in_edges) = self.collect_edges(&id_map, &edge_type_ids).await?;
id_map.compact();
let (out_offsets, out_neighbors, out_weights) = build_csr(vertex_count, &out_edges, true);
let (in_offsets, in_neighbors, _) = if self.config.include_reverse {
build_csr(vertex_count, &in_edges, false)
} else {
(vec![0; vertex_count + 1], Vec::new(), None)
};
Ok(GraphProjection {
vertex_count,
out_offsets,
out_neighbors,
in_offsets,
in_neighbors,
out_weights,
id_map,
})
}
fn resolve_ids(
&self,
schema: &uni_common::core::schema::Schema,
) -> Result<(Vec<u16>, Vec<u32>)> {
let mut label_ids = Vec::new();
for label_name in &self.config.node_labels {
let meta = schema
.labels
.get(label_name)
.ok_or_else(|| anyhow!("Label {} not found", label_name))?;
label_ids.push(meta.id);
}
let mut edge_type_ids = Vec::new();
for type_name in &self.config.edge_types {
let meta = schema
.edge_types
.get(type_name)
.ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
edge_type_ids.push(meta.id);
}
if label_ids.is_empty() {
label_ids = schema.labels.values().map(|m| m.id).collect();
}
if edge_type_ids.is_empty() {
edge_type_ids = schema.edge_types.values().map(|m| m.id).collect();
}
Ok((label_ids, edge_type_ids))
}
async fn warm_caches(&self, _label_ids: &[u16], edge_type_ids: &[u32]) -> Result<()> {
for &type_id in edge_type_ids {
let edge_ver = self.storage.get_edge_version_by_id(type_id);
self.storage
.warm_adjacency(type_id, CacheDir::Outgoing, edge_ver)
.await?;
if self.config.include_reverse {
self.storage
.warm_adjacency(type_id, CacheDir::Incoming, edge_ver)
.await?;
}
}
Ok(())
}
async fn collect_vertices(
&self,
schema: &uni_common::core::schema::Schema,
label_ids: &[u16],
) -> Result<Vec<Vid>> {
use arrow_array::UInt64Array;
let mut all_vids = Vec::new();
for &lid in label_ids {
let label_name = schema.label_name_by_id(lid).unwrap();
if let Ok(Some(batch)) = self
.storage
.scan_vertex_table(label_name, &["_vid"], None)
.await
{
let vid_col = batch
.column_by_name("_vid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
for i in 0..batch.num_rows() {
all_vids.push(Vid::from(vid_col.value(i)));
}
}
}
if let Some(ref l0_mgr) = self.l0_manager {
let label_names: Vec<&str> = label_ids
.iter()
.filter_map(|id| schema.label_name_by_id(*id))
.collect();
for pending_l0_arc in l0_mgr.get_pending_flush() {
all_vids.extend(pending_l0_arc.read().vids_for_labels(&label_names));
}
let current_l0 = l0_mgr.get_current();
all_vids.extend(current_l0.read().vids_for_labels(&label_names));
}
all_vids.sort_unstable();
all_vids.dedup();
Ok(all_vids)
}
async fn collect_edges(
&self,
id_map: &IdMap,
edge_type_ids: &[u32],
) -> Result<(WeightedEdgeList, WeightedEdgeList)> {
let mut raw_out_edges = Vec::new(); let mut raw_in_edges = Vec::new();
for (src_slot, src_vid) in id_map.iter() {
for &type_id in edge_type_ids {
let neighbors = self.storage.adjacency_manager().get_neighbors(
src_vid,
type_id,
CacheDir::Outgoing,
);
for (dst_vid, eid) in neighbors {
raw_out_edges.push((src_slot, dst_vid, eid));
}
if self.config.include_reverse {
let in_neighbors = self.storage.adjacency_manager().get_neighbors(
src_vid,
type_id,
CacheDir::Incoming,
);
for (dst_vid, eid) in in_neighbors {
raw_in_edges.push((src_slot, dst_vid, eid));
}
}
}
}
let pm = if self.config.weight_property.is_some() {
Some(PropertyManager::new(
self.storage.clone(),
self.storage.schema_manager_arc(),
1000,
))
} else {
None
};
let weight_prop = self.config.weight_property.as_deref();
let mut weights_cache: std::collections::HashMap<Eid, f64> =
std::collections::HashMap::new();
if let (Some(pm), Some(prop)) = (&pm, weight_prop) {
let mut all_eids: Vec<Eid> = raw_out_edges
.iter()
.map(|(_, _, eid)| *eid)
.chain(
self.config
.include_reverse
.then(|| raw_in_edges.iter().map(|(_, _, eid)| *eid))
.into_iter()
.flatten(),
)
.collect();
all_eids.sort_unstable();
all_eids.dedup();
let batch_props = pm.get_batch_edge_props(&all_eids, &[prop], None).await?;
for eid in all_eids {
let vid_key = Vid::from(eid.as_u64());
if let Some(weight) = batch_props
.get(&vid_key)
.and_then(|props| props.get(prop))
.and_then(|val| val.as_f64())
{
weights_cache.insert(eid, weight);
}
}
}
let out_edges: WeightedEdgeList = raw_out_edges
.into_iter()
.filter_map(|(src_slot, dst_vid, eid)| {
id_map.to_slot(dst_vid).map(|dst_slot| {
let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
(src_slot, dst_slot, weight)
})
})
.collect();
let in_edges: WeightedEdgeList = raw_in_edges
.into_iter()
.filter_map(|(src_slot, dst_vid, eid)| {
id_map.to_slot(dst_vid).map(|dst_slot| {
let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
(src_slot, dst_slot, weight)
})
})
.collect();
Ok((out_edges, in_edges))
}
}
pub type ProjectionRow = std::collections::HashMap<String, uni_common::Value>;
impl GraphProjection {
pub fn from_rows(
node_rows: &[ProjectionRow],
edge_rows: &[ProjectionRow],
weight_column: Option<&str>,
include_reverse: bool,
) -> Result<Self> {
let mut id_map = IdMap::with_capacity(node_rows.len());
for (i, row) in node_rows.iter().enumerate() {
let vid_u = row
.get("id")
.and_then(value_as_u64)
.ok_or_else(|| anyhow!("node row {i} missing `id` (Int) column"))?;
id_map.insert(Vid::new(vid_u));
}
let vertex_count = id_map.len();
let mut out_edges: WeightedEdgeList = Vec::with_capacity(edge_rows.len());
let mut in_edges: WeightedEdgeList = if include_reverse {
Vec::with_capacity(edge_rows.len())
} else {
Vec::new()
};
for (i, row) in edge_rows.iter().enumerate() {
let src_u = row
.get("source")
.and_then(value_as_u64)
.ok_or_else(|| anyhow!("edge row {i} missing `source` (Int) column"))?;
let dst_u = row
.get("target")
.and_then(value_as_u64)
.ok_or_else(|| anyhow!("edge row {i} missing `target` (Int) column"))?;
let weight = if let Some(name) = weight_column {
row.get(name).and_then(value_as_f64).unwrap_or(1.0)
} else {
1.0
};
let (Some(src_slot), Some(dst_slot)) = (
id_map.to_slot(Vid::new(src_u)),
id_map.to_slot(Vid::new(dst_u)),
) else {
log::debug!(
"from_rows: edge endpoint (src={src_u}, dst={dst_u}) not in node id map; dropping"
);
continue; };
out_edges.push((src_slot, dst_slot, weight));
if include_reverse {
in_edges.push((dst_slot, src_slot, weight));
}
}
let (out_offsets, out_neighbors, out_weights) =
build_csr(vertex_count, &out_edges, weight_column.is_some());
let (in_offsets, in_neighbors, _) = if include_reverse {
build_csr(vertex_count, &in_edges, false)
} else {
(vec![0; vertex_count + 1], Vec::new(), None)
};
Ok(GraphProjection {
vertex_count,
out_offsets,
out_neighbors,
in_offsets,
in_neighbors,
out_weights,
id_map,
})
}
}
fn value_as_u64(v: &uni_common::Value) -> Option<u64> {
use uni_common::Value;
match v {
Value::Int(i) if *i >= 0 => Some(*i as u64),
Value::Float(f) if f.is_finite() && *f >= 0.0 => Some(*f as u64),
_ => None,
}
}
fn value_as_f64(v: &uni_common::Value) -> Option<f64> {
use uni_common::Value;
match v {
Value::Float(f) => Some(*f),
Value::Int(i) => Some(*i as f64),
_ => None,
}
}
fn build_csr(
vertex_count: usize,
edges: &[(u32, u32, f64)],
include_weights: bool,
) -> (Vec<u32>, Vec<u32>, Option<Vec<f64>>) {
if vertex_count == 0 {
return (vec![0], Vec::new(), None);
}
let mut degrees = vec![0u32; vertex_count];
for &(src, _, _) in edges {
degrees[src as usize] += 1;
}
let mut offsets = vec![0u32; vertex_count + 1];
for i in 0..vertex_count {
offsets[i + 1] = offsets[i] + degrees[i];
}
let mut neighbors = vec![0u32; edges.len()];
let mut weights = if include_weights {
Some(vec![0.0; edges.len()])
} else {
None
};
let mut current = offsets.clone();
for &(src, dst, w) in edges {
let idx = current[src as usize] as usize;
neighbors[idx] = dst;
if let Some(ws) = &mut weights {
ws[idx] = w;
}
current[src as usize] += 1;
}
(offsets, neighbors, weights)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_csr() {
let edges = vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0), (0, 2, 0.5)];
let (offsets, neighbors, weights) = build_csr(3, &edges, true);
assert_eq!(offsets, vec![0, 2, 3, 4]);
assert_eq!(&neighbors[0..2], &[1, 2]);
if let Some(w) = weights {
assert_eq!(&w[0..2], &[1.0, 0.5]);
}
assert_eq!(&neighbors[2..3], &[2]);
assert_eq!(&neighbors[3..4], &[0]);
}
}