use crate::algo::IdMap;
use anyhow::{Result, anyhow};
use uni_common::core::id::{Eid, Vid};
use uni_store::runtime::L0Manager;
use uni_store::runtime::property_manager::PropertyManager;
use uni_store::storage::direction::Direction as CacheDir;
use uni_store::storage::manager::StorageManager;
type WeightedEdgeList = Vec<(u32, u32, f64)>;
#[derive(Debug, Clone, Default)]
pub struct ProjectionConfig {
pub node_labels: Vec<String>,
pub edge_types: Vec<String>,
pub weight_property: Option<String>,
pub include_reverse: bool,
}
#[derive(Debug)]
pub struct GraphProjection {
pub(crate) vertex_count: usize,
pub(crate) out_offsets: Vec<u32>, pub(crate) out_neighbors: Vec<u32>,
pub(crate) in_offsets: Vec<u32>, pub(crate) in_neighbors: Vec<u32>,
pub(crate) out_weights: Option<Vec<f64>>,
pub(crate) id_map: IdMap,
pub(crate) _node_labels: Vec<String>,
pub(crate) _edge_types: Vec<String>,
}
impl GraphProjection {
#[inline]
pub fn vertex_count(&self) -> usize {
self.vertex_count
}
#[inline]
pub fn edge_count(&self) -> usize {
self.out_neighbors.len()
}
#[inline]
pub fn out_neighbors(&self, slot: u32) -> &[u32] {
let start = self.out_offsets[slot as usize] as usize;
let end = self.out_offsets[slot as usize + 1] as usize;
&self.out_neighbors[start..end]
}
#[inline]
pub fn out_degree(&self, slot: u32) -> u32 {
self.out_offsets[slot as usize + 1] - self.out_offsets[slot as usize]
}
#[inline]
pub fn in_neighbors(&self, slot: u32) -> &[u32] {
let start = self.in_offsets[slot as usize] as usize;
let end = self.in_offsets[slot as usize + 1] as usize;
&self.in_neighbors[start..end]
}
#[inline]
pub fn in_degree(&self, slot: u32) -> u32 {
self.in_offsets[slot as usize + 1] - self.in_offsets[slot as usize]
}
#[inline]
pub fn out_weight(&self, slot: u32, edge_idx: usize) -> f64 {
let start = self.out_offsets[slot as usize] as usize;
self.out_weights.as_ref().expect("no weights")[start + edge_idx]
}
#[inline]
pub fn has_weights(&self) -> bool {
self.out_weights.is_some()
}
#[inline]
pub fn has_reverse(&self) -> bool {
!self.in_neighbors.is_empty()
}
#[inline]
pub fn to_vid(&self, slot: u32) -> Vid {
self.id_map.to_vid_unchecked(slot)
}
#[inline]
pub fn to_slot(&self, vid: Vid) -> Option<u32> {
self.id_map.to_slot(vid)
}
pub fn vertices(&self) -> impl Iterator<Item = (u32, Vid)> + '_ {
self.id_map.iter()
}
pub fn memory_size(&self) -> usize {
self.out_offsets.len() * 4
+ self.out_neighbors.len() * 4
+ self.in_offsets.len() * 4
+ self.in_neighbors.len() * 4
+ self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
+ self.id_map.memory_size()
}
}
use std::sync::Arc;
pub struct ProjectionBuilder {
storage: Arc<StorageManager>,
l0_manager: Option<Arc<L0Manager>>,
config: ProjectionConfig,
}
impl ProjectionBuilder {
pub fn new(storage: Arc<StorageManager>) -> Self {
Self {
storage,
l0_manager: None,
config: ProjectionConfig::default(),
}
}
pub fn l0_manager(mut self, l0_manager: Option<Arc<L0Manager>>) -> Self {
self.l0_manager = l0_manager;
self
}
pub fn node_labels(mut self, labels: &[&str]) -> Self {
self.config.node_labels = labels.iter().map(|s| s.to_string()).collect();
self
}
pub fn edge_types(mut self, types: &[&str]) -> Self {
self.config.edge_types = types.iter().map(|s| s.to_string()).collect();
self
}
pub fn weight_property(mut self, prop: &str) -> Self {
self.config.weight_property = Some(prop.to_string());
self
}
pub fn include_reverse(mut self, enabled: bool) -> Self {
self.config.include_reverse = enabled;
self
}
pub async fn build(self) -> Result<GraphProjection> {
let schema = self.storage.schema_manager().schema();
let (label_ids, edge_type_ids) = self.resolve_ids(&schema)?;
self.warm_caches(&label_ids, &edge_type_ids).await?;
let all_vids = self.collect_vertices(&schema, &label_ids).await?;
let mut id_map = IdMap::with_capacity(all_vids.len());
for vid in all_vids {
id_map.insert(vid);
}
let vertex_count = id_map.len();
let (out_edges, in_edges) = self.collect_edges(&id_map, &edge_type_ids).await?;
id_map.compact();
let (out_offsets, out_neighbors, out_weights) = build_csr(vertex_count, &out_edges, true);
let (in_offsets, in_neighbors, _) = if self.config.include_reverse {
build_csr(vertex_count, &in_edges, false)
} else {
(vec![0; vertex_count + 1], Vec::new(), None)
};
Ok(GraphProjection {
vertex_count,
out_offsets,
out_neighbors,
in_offsets,
in_neighbors,
out_weights,
id_map,
_node_labels: self.config.node_labels,
_edge_types: self.config.edge_types,
})
}
fn resolve_ids(
&self,
schema: &uni_common::core::schema::Schema,
) -> Result<(Vec<u16>, Vec<u32>)> {
let mut label_ids = Vec::new();
for label_name in &self.config.node_labels {
let meta = schema
.labels
.get(label_name)
.ok_or_else(|| anyhow!("Label {} not found", label_name))?;
label_ids.push(meta.id);
}
let mut edge_type_ids = Vec::new();
for type_name in &self.config.edge_types {
let meta = schema
.edge_types
.get(type_name)
.ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
edge_type_ids.push(meta.id);
}
if label_ids.is_empty() {
label_ids = schema.labels.values().map(|m| m.id).collect();
}
if edge_type_ids.is_empty() {
edge_type_ids = schema.edge_types.values().map(|m| m.id).collect();
}
Ok((label_ids, edge_type_ids))
}
async fn warm_caches(&self, _label_ids: &[u16], edge_type_ids: &[u32]) -> Result<()> {
for &type_id in edge_type_ids {
let edge_ver = self.storage.get_edge_version_by_id(type_id);
self.storage
.warm_adjacency(type_id, CacheDir::Outgoing, edge_ver)
.await?;
if self.config.include_reverse {
self.storage
.warm_adjacency(type_id, CacheDir::Incoming, edge_ver)
.await?;
}
}
Ok(())
}
async fn collect_vertices(
&self,
schema: &uni_common::core::schema::Schema,
label_ids: &[u16],
) -> Result<Vec<Vid>> {
use arrow_array::UInt64Array;
use futures::TryStreamExt;
use lancedb::query::{ExecutableQuery, QueryBase, Select};
let mut all_vids = Vec::new();
let lancedb_store = self.storage.lancedb_store();
for &lid in label_ids {
let label_name = schema.label_name_by_id(lid).unwrap();
let ds = self.storage.vertex_dataset(label_name)?;
if let Ok(table) = ds.open_lancedb(lancedb_store).await {
let batches: Vec<arrow_array::RecordBatch> = table
.query()
.select(Select::Columns(vec!["_vid".to_string()]))
.execute()
.await
.map_err(|e| anyhow!("Failed to query table: {}", e))?
.try_collect()
.await
.map_err(|e| anyhow!("Failed to collect batches: {}", e))?;
for batch in batches {
let vid_col = batch
.column_by_name("_vid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
for i in 0..batch.num_rows() {
all_vids.push(Vid::from(vid_col.value(i)));
}
}
}
}
if let Some(ref l0_mgr) = self.l0_manager {
let label_names: Vec<&str> = label_ids
.iter()
.filter_map(|id| schema.label_name_by_id(*id))
.collect();
for pending_l0_arc in l0_mgr.get_pending_flush() {
all_vids.extend(pending_l0_arc.read().vids_for_labels(&label_names));
}
let current_l0 = l0_mgr.get_current();
all_vids.extend(current_l0.read().vids_for_labels(&label_names));
}
all_vids.sort_unstable();
all_vids.dedup();
Ok(all_vids)
}
async fn collect_edges(
&self,
id_map: &IdMap,
edge_type_ids: &[u32],
) -> Result<(WeightedEdgeList, WeightedEdgeList)> {
let mut raw_out_edges = Vec::new(); let mut raw_in_edges = Vec::new();
for (src_slot, src_vid) in id_map.iter() {
for &type_id in edge_type_ids {
let neighbors = self.storage.adjacency_manager().get_neighbors(
src_vid,
type_id,
CacheDir::Outgoing,
);
for (dst_vid, eid) in neighbors {
raw_out_edges.push((src_slot, dst_vid, eid));
}
if self.config.include_reverse {
let in_neighbors = self.storage.adjacency_manager().get_neighbors(
src_vid,
type_id,
CacheDir::Incoming,
);
for (dst_vid, eid) in in_neighbors {
raw_in_edges.push((src_slot, dst_vid, eid));
}
}
}
}
let pm = if self.config.weight_property.is_some() {
Some(PropertyManager::new(
self.storage.clone(),
self.storage.schema_manager_arc(),
1000,
))
} else {
None
};
let weight_prop = self.config.weight_property.as_deref();
let mut weights_cache: std::collections::HashMap<Eid, f64> =
std::collections::HashMap::new();
if let (Some(pm), Some(prop)) = (&pm, weight_prop) {
let mut all_eids: Vec<Eid> = raw_out_edges
.iter()
.map(|(_, _, eid)| *eid)
.chain(
self.config
.include_reverse
.then(|| raw_in_edges.iter().map(|(_, _, eid)| *eid))
.into_iter()
.flatten(),
)
.collect();
all_eids.sort_unstable();
all_eids.dedup();
let batch_props = pm.get_batch_edge_props(&all_eids, &[prop], None).await?;
for eid in all_eids {
let vid_key = Vid::from(eid.as_u64());
if let Some(weight) = batch_props
.get(&vid_key)
.and_then(|props| props.get(prop))
.and_then(|val| val.as_f64())
{
weights_cache.insert(eid, weight);
}
}
}
let out_edges: WeightedEdgeList = raw_out_edges
.into_iter()
.filter_map(|(src_slot, dst_vid, eid)| {
id_map.to_slot(dst_vid).map(|dst_slot| {
let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
(src_slot, dst_slot, weight)
})
})
.collect();
let in_edges: WeightedEdgeList = raw_in_edges
.into_iter()
.filter_map(|(src_slot, dst_vid, eid)| {
id_map.to_slot(dst_vid).map(|dst_slot| {
let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
(src_slot, dst_slot, weight)
})
})
.collect();
Ok((out_edges, in_edges))
}
}
fn build_csr(
vertex_count: usize,
edges: &[(u32, u32, f64)],
include_weights: bool,
) -> (Vec<u32>, Vec<u32>, Option<Vec<f64>>) {
if vertex_count == 0 {
return (vec![0], Vec::new(), None);
}
let mut degrees = vec![0u32; vertex_count];
for &(src, _, _) in edges {
degrees[src as usize] += 1;
}
let mut offsets = vec![0u32; vertex_count + 1];
for i in 0..vertex_count {
offsets[i + 1] = offsets[i] + degrees[i];
}
let mut neighbors = vec![0u32; edges.len()];
let mut weights = if include_weights {
Some(vec![0.0; edges.len()])
} else {
None
};
let mut current = offsets.clone();
for &(src, dst, w) in edges {
let idx = current[src as usize] as usize;
neighbors[idx] = dst;
if let Some(ws) = &mut weights {
ws[idx] = w;
}
current[src as usize] += 1;
}
(offsets, neighbors, weights)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_csr() {
let edges = vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0), (0, 2, 0.5)];
let (offsets, neighbors, weights) = build_csr(3, &edges, true);
assert_eq!(offsets, vec![0, 2, 3, 4]);
assert_eq!(&neighbors[0..2], &[1, 2]);
if let Some(w) = weights {
assert_eq!(&w[0..2], &[1.0, 0.5]);
}
assert_eq!(&neighbors[2..3], &[2]);
assert_eq!(&neighbors[3..4], &[0]);
}
}