Skip to main content

uni_algo/algo/
projection.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Graph Projection - Dense CSR representation for algorithm execution.
5//!
6//! A `GraphProjection` is a materialized, algorithm-optimized view of a subgraph.
7//! It provides:
8//! - Dense vertex indexing (0..V) for efficient array-based state
9//! - CSR format for cache-friendly neighbor iteration
10//! - Optional reverse edges for algorithms like PageRank
11//! - Optional edge weights for weighted algorithms
12
13use crate::algo::IdMap;
14use anyhow::{Result, anyhow};
15use uni_common::core::id::{Eid, Vid};
16use uni_store::runtime::L0Manager;
17use uni_store::runtime::property_manager::PropertyManager;
18use uni_store::storage::direction::Direction as CacheDir;
19use uni_store::storage::manager::StorageManager;
20
21/// Edge list for CSR construction: (source_slot, destination_slot, weight) pairs.
22type WeightedEdgeList = Vec<(u32, u32, f64)>;
23
24/// Configuration for building a graph projection.
25#[derive(Debug, Clone, Default)]
26pub struct ProjectionConfig {
27    /// Node labels to include (empty = all)
28    pub node_labels: Vec<String>,
29    /// Edge types to include (empty = all)
30    pub edge_types: Vec<String>,
31    /// Property to use as edge weight
32    pub weight_property: Option<String>,
33    /// Whether to build reverse edges (in_neighbors)
34    pub include_reverse: bool,
35}
36
37/// Dense CSR representation optimized for algorithm execution.
38#[derive(Debug)]
39pub struct GraphProjection {
40    /// Number of vertices in the projection
41    pub(crate) vertex_count: usize,
42
43    /// Outbound edges: CSR format
44    pub(crate) out_offsets: Vec<u32>, // [V+1] vertex slot -> edge start
45    pub(crate) out_neighbors: Vec<u32>, // [E] neighbor slots
46
47    /// Inbound edges: CSR format (optional, for PageRank/SCC)
48    pub(crate) in_offsets: Vec<u32>, // [V+1]
49    pub(crate) in_neighbors: Vec<u32>, // [E]
50
51    /// Optional edge weights
52    pub(crate) out_weights: Option<Vec<f64>>,
53
54    /// Identity mapping
55    pub(crate) id_map: IdMap,
56
57    /// Metadata
58    pub(crate) _node_labels: Vec<String>,
59    pub(crate) _edge_types: Vec<String>,
60}
61
62impl GraphProjection {
63    /// Number of vertices in the projection.
64    #[inline]
65    pub fn vertex_count(&self) -> usize {
66        self.vertex_count
67    }
68
69    /// Number of edges in the projection.
70    #[inline]
71    pub fn edge_count(&self) -> usize {
72        self.out_neighbors.len()
73    }
74
75    /// Outbound neighbors of a vertex (by slot).
76    #[inline]
77    pub fn out_neighbors(&self, slot: u32) -> &[u32] {
78        let start = self.out_offsets[slot as usize] as usize;
79        let end = self.out_offsets[slot as usize + 1] as usize;
80        &self.out_neighbors[start..end]
81    }
82
83    /// Outbound degree of a vertex.
84    #[inline]
85    pub fn out_degree(&self, slot: u32) -> u32 {
86        self.out_offsets[slot as usize + 1] - self.out_offsets[slot as usize]
87    }
88
89    /// Inbound neighbors of a vertex (by slot).
90    ///
91    /// Panics if projection was built without `include_reverse`.
92    #[inline]
93    pub fn in_neighbors(&self, slot: u32) -> &[u32] {
94        let start = self.in_offsets[slot as usize] as usize;
95        let end = self.in_offsets[slot as usize + 1] as usize;
96        &self.in_neighbors[start..end]
97    }
98
99    /// Inbound degree of a vertex.
100    #[inline]
101    pub fn in_degree(&self, slot: u32) -> u32 {
102        self.in_offsets[slot as usize + 1] - self.in_offsets[slot as usize]
103    }
104
105    /// Get edge weight for outbound edge.
106    ///
107    /// Panics if projection was built without weights.
108    #[inline]
109    pub fn out_weight(&self, slot: u32, edge_idx: usize) -> f64 {
110        let start = self.out_offsets[slot as usize] as usize;
111        self.out_weights.as_ref().expect("no weights")[start + edge_idx]
112    }
113
114    /// Check if weights are available.
115    #[inline]
116    pub fn has_weights(&self) -> bool {
117        self.out_weights.is_some()
118    }
119
120    /// Check if reverse edges are available.
121    #[inline]
122    pub fn has_reverse(&self) -> bool {
123        !self.in_neighbors.is_empty()
124    }
125
126    /// Map slot back to VID.
127    #[inline]
128    pub fn to_vid(&self, slot: u32) -> Vid {
129        self.id_map.to_vid_unchecked(slot)
130    }
131
132    /// Map VID to slot.
133    #[inline]
134    pub fn to_slot(&self, vid: Vid) -> Option<u32> {
135        self.id_map.to_slot(vid)
136    }
137
138    /// Iterate over all vertices as (slot, vid).
139    pub fn vertices(&self) -> impl Iterator<Item = (u32, Vid)> + '_ {
140        self.id_map.iter()
141    }
142
143    /// Memory usage in bytes.
144    pub fn memory_size(&self) -> usize {
145        self.out_offsets.len() * 4
146            + self.out_neighbors.len() * 4
147            + self.in_offsets.len() * 4
148            + self.in_neighbors.len() * 4
149            + self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
150            + self.id_map.memory_size()
151    }
152}
153
154use std::sync::Arc;
155
156/// Builder for constructing a `GraphProjection` from storage.
157pub struct ProjectionBuilder {
158    storage: Arc<StorageManager>,
159    /// L0 manager for scanning in-memory vertices not yet flushed.
160    l0_manager: Option<Arc<L0Manager>>,
161    config: ProjectionConfig,
162}
163
164impl ProjectionBuilder {
165    /// Create a new projection builder.
166    pub fn new(storage: Arc<StorageManager>) -> Self {
167        Self {
168            storage,
169            l0_manager: None,
170            config: ProjectionConfig::default(),
171        }
172    }
173
174    /// Set the L0 manager for scanning in-memory vertices.
175    pub fn l0_manager(mut self, l0_manager: Option<Arc<L0Manager>>) -> Self {
176        self.l0_manager = l0_manager;
177        self
178    }
179
180    /// Set node labels to include.
181    pub fn node_labels(mut self, labels: &[&str]) -> Self {
182        self.config.node_labels = labels.iter().map(|s| s.to_string()).collect();
183        self
184    }
185
186    /// Set edge types to include.
187    pub fn edge_types(mut self, types: &[&str]) -> Self {
188        self.config.edge_types = types.iter().map(|s| s.to_string()).collect();
189        self
190    }
191
192    /// Set weight property.
193    pub fn weight_property(mut self, prop: &str) -> Self {
194        self.config.weight_property = Some(prop.to_string());
195        self
196    }
197
198    /// Include reverse edges for in_neighbors access.
199    pub fn include_reverse(mut self, enabled: bool) -> Self {
200        self.config.include_reverse = enabled;
201        self
202    }
203
204    /// Build the projection.
205    pub async fn build(self) -> Result<GraphProjection> {
206        let schema = self.storage.schema_manager().schema();
207
208        // 1. Resolve label and edge type IDs
209        let (label_ids, edge_type_ids) = self.resolve_ids(&schema)?;
210
211        // 2. Warm cache for all requested edge types
212        self.warm_caches(&label_ids, &edge_type_ids).await?;
213
214        // 3. Collect VIDs from storage and L0
215        let all_vids = self.collect_vertices(&schema, &label_ids).await?;
216
217        let mut id_map = IdMap::with_capacity(all_vids.len());
218        for vid in all_vids {
219            id_map.insert(vid);
220        }
221        let vertex_count = id_map.len();
222
223        // 4. Collect edges from cache
224        let (out_edges, in_edges) = self.collect_edges(&id_map, &edge_type_ids).await?;
225
226        // Compact IdMap (drops hash map, enables binary search)
227        id_map.compact();
228
229        let (out_offsets, out_neighbors, out_weights) = build_csr(vertex_count, &out_edges, true);
230        let (in_offsets, in_neighbors, _) = if self.config.include_reverse {
231            build_csr(vertex_count, &in_edges, false)
232        } else {
233            (vec![0; vertex_count + 1], Vec::new(), None)
234        };
235
236        Ok(GraphProjection {
237            vertex_count,
238            out_offsets,
239            out_neighbors,
240            in_offsets,
241            in_neighbors,
242            out_weights,
243            id_map,
244            _node_labels: self.config.node_labels,
245            _edge_types: self.config.edge_types,
246        })
247    }
248
249    /// Resolve label and edge type IDs from configuration.
250    fn resolve_ids(
251        &self,
252        schema: &uni_common::core::schema::Schema,
253    ) -> Result<(Vec<u16>, Vec<u32>)> {
254        let mut label_ids = Vec::new();
255        for label_name in &self.config.node_labels {
256            let meta = schema
257                .labels
258                .get(label_name)
259                .ok_or_else(|| anyhow!("Label {} not found", label_name))?;
260            label_ids.push(meta.id);
261        }
262
263        let mut edge_type_ids = Vec::new();
264        for type_name in &self.config.edge_types {
265            let meta = schema
266                .edge_types
267                .get(type_name)
268                .ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
269            edge_type_ids.push(meta.id);
270        }
271
272        // If empty, include all from schema
273        if label_ids.is_empty() {
274            label_ids = schema.labels.values().map(|m| m.id).collect();
275        }
276        if edge_type_ids.is_empty() {
277            edge_type_ids = schema.edge_types.values().map(|m| m.id).collect();
278        }
279
280        Ok((label_ids, edge_type_ids))
281    }
282
283    /// Warm adjacency manager for all requested edge types.
284    async fn warm_caches(&self, _label_ids: &[u16], edge_type_ids: &[u32]) -> Result<()> {
285        for &type_id in edge_type_ids {
286            let edge_ver = self.storage.get_edge_version_by_id(type_id);
287            self.storage
288                .warm_adjacency(type_id, CacheDir::Outgoing, edge_ver)
289                .await?;
290            if self.config.include_reverse {
291                self.storage
292                    .warm_adjacency(type_id, CacheDir::Incoming, edge_ver)
293                    .await?;
294            }
295        }
296        Ok(())
297    }
298
299    /// Collect VIDs from storage and L0 buffers.
300    async fn collect_vertices(
301        &self,
302        schema: &uni_common::core::schema::Schema,
303        label_ids: &[u16],
304    ) -> Result<Vec<Vid>> {
305        use arrow_array::UInt64Array;
306        use futures::TryStreamExt;
307        use lancedb::query::{ExecutableQuery, QueryBase, Select};
308
309        let mut all_vids = Vec::new();
310        let lancedb_store = self.storage.lancedb_store();
311
312        // Scan storage for each label via LanceDB
313        for &lid in label_ids {
314            let label_name = schema.label_name_by_id(lid).unwrap();
315
316            let ds = self.storage.vertex_dataset(label_name)?;
317            if let Ok(table) = ds.open_lancedb(lancedb_store).await {
318                let batches: Vec<arrow_array::RecordBatch> = table
319                    .query()
320                    .select(Select::Columns(vec!["_vid".to_string()]))
321                    .execute()
322                    .await
323                    .map_err(|e| anyhow!("Failed to query table: {}", e))?
324                    .try_collect()
325                    .await
326                    .map_err(|e| anyhow!("Failed to collect batches: {}", e))?;
327
328                for batch in batches {
329                    let vid_col = batch
330                        .column_by_name("_vid")
331                        .unwrap()
332                        .as_any()
333                        .downcast_ref::<UInt64Array>()
334                        .unwrap();
335                    for i in 0..batch.num_rows() {
336                        all_vids.push(Vid::from(vid_col.value(i)));
337                    }
338                }
339            }
340        }
341
342        // Overlay L0 vertices (not yet flushed to Lance)
343        if let Some(ref l0_mgr) = self.l0_manager {
344            let label_names: Vec<&str> = label_ids
345                .iter()
346                .filter_map(|id| schema.label_name_by_id(*id))
347                .collect();
348
349            // Pending flush L0 buffers (oldest first)
350            for pending_l0_arc in l0_mgr.get_pending_flush() {
351                all_vids.extend(pending_l0_arc.read().vids_for_labels(&label_names));
352            }
353
354            // Current L0 buffer
355            let current_l0 = l0_mgr.get_current();
356            all_vids.extend(current_l0.read().vids_for_labels(&label_names));
357        }
358
359        // Sort and dedup to ensure IdMap is sorted for compaction
360        all_vids.sort_unstable();
361        all_vids.dedup();
362
363        Ok(all_vids)
364    }
365
366    /// Collect edges from adjacency manager.
367    async fn collect_edges(
368        &self,
369        id_map: &IdMap,
370        edge_type_ids: &[u32],
371    ) -> Result<(WeightedEdgeList, WeightedEdgeList)> {
372        // Phase 1: Collect topology from AdjacencyManager
373        let mut raw_out_edges = Vec::new(); // (src_slot, dst_vid, eid)
374        let mut raw_in_edges = Vec::new();
375
376        for (src_slot, src_vid) in id_map.iter() {
377            for &type_id in edge_type_ids {
378                // Outbound
379                let neighbors = self.storage.adjacency_manager().get_neighbors(
380                    src_vid,
381                    type_id,
382                    CacheDir::Outgoing,
383                );
384                for (dst_vid, eid) in neighbors {
385                    raw_out_edges.push((src_slot, dst_vid, eid));
386                }
387
388                // Inbound
389                if self.config.include_reverse {
390                    let in_neighbors = self.storage.adjacency_manager().get_neighbors(
391                        src_vid,
392                        type_id,
393                        CacheDir::Incoming,
394                    );
395                    for (dst_vid, eid) in in_neighbors {
396                        raw_in_edges.push((src_slot, dst_vid, eid));
397                    }
398                }
399            }
400        }
401
402        // Phase 2: Fetch weights and map destination slots (Async, No Lock)
403        let pm = if self.config.weight_property.is_some() {
404            Some(PropertyManager::new(
405                self.storage.clone(),
406                self.storage.schema_manager_arc(),
407                1000,
408            ))
409        } else {
410            None
411        };
412        let weight_prop = self.config.weight_property.as_deref();
413
414        // Batch fetch weights if weight property is configured
415        let mut weights_cache: std::collections::HashMap<Eid, f64> =
416            std::collections::HashMap::new();
417
418        if let (Some(pm), Some(prop)) = (&pm, weight_prop) {
419            // Collect and deduplicate EIDs from both edge lists
420            let mut all_eids: Vec<Eid> = raw_out_edges
421                .iter()
422                .map(|(_, _, eid)| *eid)
423                .chain(
424                    self.config
425                        .include_reverse
426                        .then(|| raw_in_edges.iter().map(|(_, _, eid)| *eid))
427                        .into_iter()
428                        .flatten(),
429                )
430                .collect();
431            all_eids.sort_unstable();
432            all_eids.dedup();
433
434            // Batch fetch edge properties
435            let batch_props = pm.get_batch_edge_props(&all_eids, &[prop], None).await?;
436
437            // Build weight cache from fetched properties
438            for eid in all_eids {
439                let vid_key = Vid::from(eid.as_u64());
440                if let Some(weight) = batch_props
441                    .get(&vid_key)
442                    .and_then(|props| props.get(prop))
443                    .and_then(|val| val.as_f64())
444                {
445                    weights_cache.insert(eid, weight);
446                }
447            }
448        }
449
450        // Convert raw edges to weighted edges, filtering to vertices in the projection
451        let out_edges: WeightedEdgeList = raw_out_edges
452            .into_iter()
453            .filter_map(|(src_slot, dst_vid, eid)| {
454                id_map.to_slot(dst_vid).map(|dst_slot| {
455                    let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
456                    (src_slot, dst_slot, weight)
457                })
458            })
459            .collect();
460
461        let in_edges: WeightedEdgeList = raw_in_edges
462            .into_iter()
463            .filter_map(|(src_slot, dst_vid, eid)| {
464                id_map.to_slot(dst_vid).map(|dst_slot| {
465                    let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
466                    (src_slot, dst_slot, weight)
467                })
468            })
469            .collect();
470
471        Ok((out_edges, in_edges))
472    }
473}
474
475/// Build CSR from edge list.
476fn build_csr(
477    vertex_count: usize,
478    edges: &[(u32, u32, f64)],
479    include_weights: bool,
480) -> (Vec<u32>, Vec<u32>, Option<Vec<f64>>) {
481    if vertex_count == 0 {
482        return (vec![0], Vec::new(), None);
483    }
484
485    // Count degrees
486    let mut degrees = vec![0u32; vertex_count];
487    for &(src, _, _) in edges {
488        degrees[src as usize] += 1;
489    }
490
491    // Build offsets (prefix sum)
492    let mut offsets = vec![0u32; vertex_count + 1];
493    for i in 0..vertex_count {
494        offsets[i + 1] = offsets[i] + degrees[i];
495    }
496
497    // Fill neighbors
498    let mut neighbors = vec![0u32; edges.len()];
499    let mut weights = if include_weights {
500        Some(vec![0.0; edges.len()])
501    } else {
502        None
503    };
504    let mut current = offsets.clone();
505
506    for &(src, dst, w) in edges {
507        let idx = current[src as usize] as usize;
508        neighbors[idx] = dst;
509        if let Some(ws) = &mut weights {
510            ws[idx] = w;
511        }
512        current[src as usize] += 1;
513    }
514
515    (offsets, neighbors, weights)
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    #[test]
523    fn test_build_csr() {
524        // Triangle: 0 -> 1, 1 -> 2, 2 -> 0
525        let edges = vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0), (0, 2, 0.5)];
526        let (offsets, neighbors, weights) = build_csr(3, &edges, true);
527
528        assert_eq!(offsets, vec![0, 2, 3, 4]);
529        // Node 0 has edges to 1 and 2
530        assert_eq!(&neighbors[0..2], &[1, 2]);
531        if let Some(w) = weights {
532            assert_eq!(&w[0..2], &[1.0, 0.5]);
533        }
534        // Node 1 has edge to 2
535        assert_eq!(&neighbors[2..3], &[2]);
536        // Node 2 has edge to 0
537        assert_eq!(&neighbors[3..4], &[0]);
538    }
539}