Skip to main content

uni_algo/algo/
projection.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph Projection - Dense CSR representation for algorithm execution.
5//!
6//! A `GraphProjection` is a materialized, algorithm-optimized view of a subgraph.
7//! It provides:
8//! - Dense vertex indexing (0..V) for efficient array-based state
9//! - CSR format for cache-friendly neighbor iteration
10//! - Optional reverse edges for algorithms like PageRank
11//! - Optional edge weights for weighted algorithms
12
13use crate::algo::IdMap;
14use anyhow::{Result, anyhow};
15use uni_common::core::id::{Eid, Vid};
16use uni_store::runtime::L0Manager;
17use uni_store::runtime::property_manager::PropertyManager;
18use uni_store::storage::direction::Direction as CacheDir;
19use uni_store::storage::manager::StorageManager;
20
21/// Edge list for CSR construction: (source_slot, destination_slot, weight) pairs.
22type WeightedEdgeList = Vec<(u32, u32, f64)>;
23
24/// Configuration for building a graph projection.
25#[derive(Debug, Clone, Default)]
26pub struct ProjectionConfig {
27    /// Node labels to include (empty = all)
28    pub node_labels: Vec<String>,
29    /// Edge types to include (empty = all)
30    pub edge_types: Vec<String>,
31    /// Property to use as edge weight
32    pub weight_property: Option<String>,
33    /// Whether to build reverse edges (in_neighbors)
34    pub include_reverse: bool,
35}
36
37/// Dense CSR representation optimized for algorithm execution.
38#[derive(Debug)]
39pub struct GraphProjection {
40    /// Number of vertices in the projection
41    pub(crate) vertex_count: usize,
42
43    /// Outbound edges: CSR format
44    pub(crate) out_offsets: Vec<u32>, // [V+1] vertex slot -> edge start
45    pub(crate) out_neighbors: Vec<u32>, // [E] neighbor slots
46
47    /// Inbound edges: CSR format (optional, for PageRank/SCC)
48    pub(crate) in_offsets: Vec<u32>, // [V+1]
49    pub(crate) in_neighbors: Vec<u32>, // [E]
50
51    /// Optional edge weights
52    pub(crate) out_weights: Option<Vec<f64>>,
53
54    /// Identity mapping
55    pub(crate) id_map: IdMap,
56
57    /// Metadata
58    pub(crate) _node_labels: Vec<String>,
59    pub(crate) _edge_types: Vec<String>,
60}
61
62impl GraphProjection {
63    /// Number of vertices in the projection.
64    #[inline]
65    pub fn vertex_count(&self) -> usize {
66        self.vertex_count
67    }
68
69    /// Number of edges in the projection.
70    #[inline]
71    pub fn edge_count(&self) -> usize {
72        self.out_neighbors.len()
73    }
74
75    /// Outbound neighbors of a vertex (by slot).
76    #[inline]
77    pub fn out_neighbors(&self, slot: u32) -> &[u32] {
78        let start = self.out_offsets[slot as usize] as usize;
79        let end = self.out_offsets[slot as usize + 1] as usize;
80        &self.out_neighbors[start..end]
81    }
82
83    /// Outbound degree of a vertex.
84    #[inline]
85    pub fn out_degree(&self, slot: u32) -> u32 {
86        self.out_offsets[slot as usize + 1] - self.out_offsets[slot as usize]
87    }
88
89    /// Inbound neighbors of a vertex (by slot).
90    ///
91    /// Panics if projection was built without `include_reverse`.
92    #[inline]
93    pub fn in_neighbors(&self, slot: u32) -> &[u32] {
94        let start = self.in_offsets[slot as usize] as usize;
95        let end = self.in_offsets[slot as usize + 1] as usize;
96        &self.in_neighbors[start..end]
97    }
98
99    /// Inbound degree of a vertex.
100    #[inline]
101    pub fn in_degree(&self, slot: u32) -> u32 {
102        self.in_offsets[slot as usize + 1] - self.in_offsets[slot as usize]
103    }
104
105    /// Get edge weight for outbound edge.
106    ///
107    /// Panics if projection was built without weights.
108    #[inline]
109    pub fn out_weight(&self, slot: u32, edge_idx: usize) -> f64 {
110        let start = self.out_offsets[slot as usize] as usize;
111        self.out_weights.as_ref().expect("no weights")[start + edge_idx]
112    }
113
114    /// Check if weights are available.
115    #[inline]
116    pub fn has_weights(&self) -> bool {
117        self.out_weights.is_some()
118    }
119
120    /// Check if reverse edges are available.
121    #[inline]
122    pub fn has_reverse(&self) -> bool {
123        !self.in_neighbors.is_empty()
124    }
125
126    /// Map slot back to VID.
127    #[inline]
128    pub fn to_vid(&self, slot: u32) -> Vid {
129        self.id_map.to_vid_unchecked(slot)
130    }
131
132    /// Map VID to slot.
133    #[inline]
134    pub fn to_slot(&self, vid: Vid) -> Option<u32> {
135        self.id_map.to_slot(vid)
136    }
137
138    /// Iterate over all vertices as (slot, vid).
139    pub fn vertices(&self) -> impl Iterator<Item = (u32, Vid)> + '_ {
140        self.id_map.iter()
141    }
142
143    /// Memory usage in bytes.
144    pub fn memory_size(&self) -> usize {
145        self.out_offsets.len() * 4
146            + self.out_neighbors.len() * 4
147            + self.in_offsets.len() * 4
148            + self.in_neighbors.len() * 4
149            + self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
150            + self.id_map.memory_size()
151    }
152}
153
154use std::sync::Arc;
155
156/// Builder for constructing a `GraphProjection` from storage.
157pub struct ProjectionBuilder {
158    storage: Arc<StorageManager>,
159    /// L0 manager for scanning in-memory vertices not yet flushed.
160    l0_manager: Option<Arc<L0Manager>>,
161    config: ProjectionConfig,
162}
163
164impl ProjectionBuilder {
165    /// Create a new projection builder.
166    pub fn new(storage: Arc<StorageManager>) -> Self {
167        Self {
168            storage,
169            l0_manager: None,
170            config: ProjectionConfig::default(),
171        }
172    }
173
174    /// Set the L0 manager for scanning in-memory vertices.
175    pub fn l0_manager(mut self, l0_manager: Option<Arc<L0Manager>>) -> Self {
176        self.l0_manager = l0_manager;
177        self
178    }
179
180    /// Set node labels to include.
181    pub fn node_labels(mut self, labels: &[&str]) -> Self {
182        self.config.node_labels = labels.iter().map(|s| s.to_string()).collect();
183        self
184    }
185
186    /// Set edge types to include.
187    pub fn edge_types(mut self, types: &[&str]) -> Self {
188        self.config.edge_types = types.iter().map(|s| s.to_string()).collect();
189        self
190    }
191
192    /// Set weight property.
193    pub fn weight_property(mut self, prop: &str) -> Self {
194        self.config.weight_property = Some(prop.to_string());
195        self
196    }
197
198    /// Include reverse edges for in_neighbors access.
199    pub fn include_reverse(mut self, enabled: bool) -> Self {
200        self.config.include_reverse = enabled;
201        self
202    }
203
204    /// Build the projection.
205    pub async fn build(self) -> Result<GraphProjection> {
206        let schema = self.storage.schema_manager().schema();
207
208        // 1. Resolve label and edge type IDs
209        let (label_ids, edge_type_ids) = self.resolve_ids(&schema)?;
210
211        // 2. Warm cache for all requested edge types
212        self.warm_caches(&label_ids, &edge_type_ids).await?;
213
214        // 3. Collect VIDs from storage and L0
215        let all_vids = self.collect_vertices(&schema, &label_ids).await?;
216
217        let mut id_map = IdMap::with_capacity(all_vids.len());
218        for vid in all_vids {
219            id_map.insert(vid);
220        }
221        let vertex_count = id_map.len();
222
223        // 4. Collect edges from cache
224        let (out_edges, in_edges) = self.collect_edges(&id_map, &edge_type_ids).await?;
225
226        // Compact IdMap (drops hash map, enables binary search)
227        id_map.compact();
228
229        let (out_offsets, out_neighbors, out_weights) = build_csr(vertex_count, &out_edges, true);
230        let (in_offsets, in_neighbors, _) = if self.config.include_reverse {
231            build_csr(vertex_count, &in_edges, false)
232        } else {
233            (vec![0; vertex_count + 1], Vec::new(), None)
234        };
235
236        Ok(GraphProjection {
237            vertex_count,
238            out_offsets,
239            out_neighbors,
240            in_offsets,
241            in_neighbors,
242            out_weights,
243            id_map,
244            _node_labels: self.config.node_labels,
245            _edge_types: self.config.edge_types,
246        })
247    }
248
249    /// Resolve label and edge type IDs from configuration.
250    fn resolve_ids(
251        &self,
252        schema: &uni_common::core::schema::Schema,
253    ) -> Result<(Vec<u16>, Vec<u32>)> {
254        let mut label_ids = Vec::new();
255        for label_name in &self.config.node_labels {
256            let meta = schema
257                .labels
258                .get(label_name)
259                .ok_or_else(|| anyhow!("Label {} not found", label_name))?;
260            label_ids.push(meta.id);
261        }
262
263        let mut edge_type_ids = Vec::new();
264        for type_name in &self.config.edge_types {
265            let meta = schema
266                .edge_types
267                .get(type_name)
268                .ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
269            edge_type_ids.push(meta.id);
270        }
271
272        // If empty, include all from schema
273        if label_ids.is_empty() {
274            label_ids = schema.labels.values().map(|m| m.id).collect();
275        }
276        if edge_type_ids.is_empty() {
277            edge_type_ids = schema.edge_types.values().map(|m| m.id).collect();
278        }
279
280        Ok((label_ids, edge_type_ids))
281    }
282
283    /// Warm adjacency manager for all requested edge types.
284    async fn warm_caches(&self, _label_ids: &[u16], edge_type_ids: &[u32]) -> Result<()> {
285        for &type_id in edge_type_ids {
286            let edge_ver = self.storage.get_edge_version_by_id(type_id);
287            self.storage
288                .warm_adjacency(type_id, CacheDir::Outgoing, edge_ver)
289                .await?;
290            if self.config.include_reverse {
291                self.storage
292                    .warm_adjacency(type_id, CacheDir::Incoming, edge_ver)
293                    .await?;
294            }
295        }
296        Ok(())
297    }
298
299    /// Collect VIDs from storage and L0 buffers.
300    async fn collect_vertices(
301        &self,
302        schema: &uni_common::core::schema::Schema,
303        label_ids: &[u16],
304    ) -> Result<Vec<Vid>> {
305        use arrow_array::UInt64Array;
306
307        let mut all_vids = Vec::new();
308
309        for &lid in label_ids {
310            let label_name = schema.label_name_by_id(lid).unwrap();
311            if let Ok(Some(batch)) = self
312                .storage
313                .scan_vertex_table(label_name, &["_vid"], None)
314                .await
315            {
316                let vid_col = batch
317                    .column_by_name("_vid")
318                    .unwrap()
319                    .as_any()
320                    .downcast_ref::<UInt64Array>()
321                    .unwrap();
322                for i in 0..batch.num_rows() {
323                    all_vids.push(Vid::from(vid_col.value(i)));
324                }
325            }
326        }
327
328        // Overlay L0 vertices (not yet flushed to Lance)
329        if let Some(ref l0_mgr) = self.l0_manager {
330            let label_names: Vec<&str> = label_ids
331                .iter()
332                .filter_map(|id| schema.label_name_by_id(*id))
333                .collect();
334
335            // Pending flush L0 buffers (oldest first)
336            for pending_l0_arc in l0_mgr.get_pending_flush() {
337                all_vids.extend(pending_l0_arc.read().vids_for_labels(&label_names));
338            }
339
340            // Current L0 buffer
341            let current_l0 = l0_mgr.get_current();
342            all_vids.extend(current_l0.read().vids_for_labels(&label_names));
343        }
344
345        // Sort and dedup to ensure IdMap is sorted for compaction
346        all_vids.sort_unstable();
347        all_vids.dedup();
348
349        Ok(all_vids)
350    }
351
352    /// Collect edges from adjacency manager.
353    async fn collect_edges(
354        &self,
355        id_map: &IdMap,
356        edge_type_ids: &[u32],
357    ) -> Result<(WeightedEdgeList, WeightedEdgeList)> {
358        // Phase 1: Collect topology from AdjacencyManager
359        let mut raw_out_edges = Vec::new(); // (src_slot, dst_vid, eid)
360        let mut raw_in_edges = Vec::new();
361
362        for (src_slot, src_vid) in id_map.iter() {
363            for &type_id in edge_type_ids {
364                // Outbound
365                let neighbors = self.storage.adjacency_manager().get_neighbors(
366                    src_vid,
367                    type_id,
368                    CacheDir::Outgoing,
369                );
370                for (dst_vid, eid) in neighbors {
371                    raw_out_edges.push((src_slot, dst_vid, eid));
372                }
373
374                // Inbound
375                if self.config.include_reverse {
376                    let in_neighbors = self.storage.adjacency_manager().get_neighbors(
377                        src_vid,
378                        type_id,
379                        CacheDir::Incoming,
380                    );
381                    for (dst_vid, eid) in in_neighbors {
382                        raw_in_edges.push((src_slot, dst_vid, eid));
383                    }
384                }
385            }
386        }
387
388        // Phase 2: Fetch weights and map destination slots (Async, No Lock)
389        let pm = if self.config.weight_property.is_some() {
390            Some(PropertyManager::new(
391                self.storage.clone(),
392                self.storage.schema_manager_arc(),
393                1000,
394            ))
395        } else {
396            None
397        };
398        let weight_prop = self.config.weight_property.as_deref();
399
400        // Batch fetch weights if weight property is configured
401        let mut weights_cache: std::collections::HashMap<Eid, f64> =
402            std::collections::HashMap::new();
403
404        if let (Some(pm), Some(prop)) = (&pm, weight_prop) {
405            // Collect and deduplicate EIDs from both edge lists
406            let mut all_eids: Vec<Eid> = raw_out_edges
407                .iter()
408                .map(|(_, _, eid)| *eid)
409                .chain(
410                    self.config
411                        .include_reverse
412                        .then(|| raw_in_edges.iter().map(|(_, _, eid)| *eid))
413                        .into_iter()
414                        .flatten(),
415                )
416                .collect();
417            all_eids.sort_unstable();
418            all_eids.dedup();
419
420            // Batch fetch edge properties
421            let batch_props = pm.get_batch_edge_props(&all_eids, &[prop], None).await?;
422
423            // Build weight cache from fetched properties
424            for eid in all_eids {
425                let vid_key = Vid::from(eid.as_u64());
426                if let Some(weight) = batch_props
427                    .get(&vid_key)
428                    .and_then(|props| props.get(prop))
429                    .and_then(|val| val.as_f64())
430                {
431                    weights_cache.insert(eid, weight);
432                }
433            }
434        }
435
436        // Convert raw edges to weighted edges, filtering to vertices in the projection
437        let out_edges: WeightedEdgeList = raw_out_edges
438            .into_iter()
439            .filter_map(|(src_slot, dst_vid, eid)| {
440                id_map.to_slot(dst_vid).map(|dst_slot| {
441                    let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
442                    (src_slot, dst_slot, weight)
443                })
444            })
445            .collect();
446
447        let in_edges: WeightedEdgeList = raw_in_edges
448            .into_iter()
449            .filter_map(|(src_slot, dst_vid, eid)| {
450                id_map.to_slot(dst_vid).map(|dst_slot| {
451                    let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
452                    (src_slot, dst_slot, weight)
453                })
454            })
455            .collect();
456
457        Ok((out_edges, in_edges))
458    }
459}
460
461/// Build CSR from edge list.
462fn build_csr(
463    vertex_count: usize,
464    edges: &[(u32, u32, f64)],
465    include_weights: bool,
466) -> (Vec<u32>, Vec<u32>, Option<Vec<f64>>) {
467    if vertex_count == 0 {
468        return (vec![0], Vec::new(), None);
469    }
470
471    // Count degrees
472    let mut degrees = vec![0u32; vertex_count];
473    for &(src, _, _) in edges {
474        degrees[src as usize] += 1;
475    }
476
477    // Build offsets (prefix sum)
478    let mut offsets = vec![0u32; vertex_count + 1];
479    for i in 0..vertex_count {
480        offsets[i + 1] = offsets[i] + degrees[i];
481    }
482
483    // Fill neighbors
484    let mut neighbors = vec![0u32; edges.len()];
485    let mut weights = if include_weights {
486        Some(vec![0.0; edges.len()])
487    } else {
488        None
489    };
490    let mut current = offsets.clone();
491
492    for &(src, dst, w) in edges {
493        let idx = current[src as usize] as usize;
494        neighbors[idx] = dst;
495        if let Some(ws) = &mut weights {
496            ws[idx] = w;
497        }
498        current[src as usize] += 1;
499    }
500
501    (offsets, neighbors, weights)
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507
508    #[test]
509    fn test_build_csr() {
510        // Triangle: 0 -> 1, 1 -> 2, 2 -> 0
511        let edges = vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0), (0, 2, 0.5)];
512        let (offsets, neighbors, weights) = build_csr(3, &edges, true);
513
514        assert_eq!(offsets, vec![0, 2, 3, 4]);
515        // Node 0 has edges to 1 and 2
516        assert_eq!(&neighbors[0..2], &[1, 2]);
517        if let Some(w) = weights {
518            assert_eq!(&w[0..2], &[1.0, 0.5]);
519        }
520        // Node 1 has edge to 2
521        assert_eq!(&neighbors[2..3], &[2]);
522        // Node 2 has edge to 0
523        assert_eq!(&neighbors[3..4], &[0]);
524    }
525}