1use crate::algo::IdMap;
14use anyhow::{Result, anyhow};
15use uni_common::core::id::{Eid, Vid};
16use uni_store::runtime::L0Manager;
17use uni_store::runtime::property_manager::PropertyManager;
18use uni_store::storage::direction::Direction as CacheDir;
19use uni_store::storage::manager::StorageManager;
20
21type WeightedEdgeList = Vec<(u32, u32, f64)>;
23
24#[derive(Debug, Clone, Default)]
26pub struct ProjectionConfig {
27 pub node_labels: Vec<String>,
29 pub edge_types: Vec<String>,
31 pub weight_property: Option<String>,
33 pub include_reverse: bool,
35}
36
37#[derive(Debug)]
39pub struct GraphProjection {
40 pub(crate) vertex_count: usize,
42
43 pub(crate) out_offsets: Vec<u32>, pub(crate) out_neighbors: Vec<u32>, pub(crate) in_offsets: Vec<u32>, pub(crate) in_neighbors: Vec<u32>, pub(crate) out_weights: Option<Vec<f64>>,
53
54 pub(crate) id_map: IdMap,
56
57 pub(crate) _node_labels: Vec<String>,
59 pub(crate) _edge_types: Vec<String>,
60}
61
62impl GraphProjection {
63 #[inline]
65 pub fn vertex_count(&self) -> usize {
66 self.vertex_count
67 }
68
69 #[inline]
71 pub fn edge_count(&self) -> usize {
72 self.out_neighbors.len()
73 }
74
75 #[inline]
77 pub fn out_neighbors(&self, slot: u32) -> &[u32] {
78 let start = self.out_offsets[slot as usize] as usize;
79 let end = self.out_offsets[slot as usize + 1] as usize;
80 &self.out_neighbors[start..end]
81 }
82
83 #[inline]
85 pub fn out_degree(&self, slot: u32) -> u32 {
86 self.out_offsets[slot as usize + 1] - self.out_offsets[slot as usize]
87 }
88
89 #[inline]
93 pub fn in_neighbors(&self, slot: u32) -> &[u32] {
94 let start = self.in_offsets[slot as usize] as usize;
95 let end = self.in_offsets[slot as usize + 1] as usize;
96 &self.in_neighbors[start..end]
97 }
98
99 #[inline]
101 pub fn in_degree(&self, slot: u32) -> u32 {
102 self.in_offsets[slot as usize + 1] - self.in_offsets[slot as usize]
103 }
104
105 #[inline]
109 pub fn out_weight(&self, slot: u32, edge_idx: usize) -> f64 {
110 let start = self.out_offsets[slot as usize] as usize;
111 self.out_weights.as_ref().expect("no weights")[start + edge_idx]
112 }
113
114 #[inline]
116 pub fn has_weights(&self) -> bool {
117 self.out_weights.is_some()
118 }
119
120 #[inline]
122 pub fn has_reverse(&self) -> bool {
123 !self.in_neighbors.is_empty()
124 }
125
126 #[inline]
128 pub fn to_vid(&self, slot: u32) -> Vid {
129 self.id_map.to_vid_unchecked(slot)
130 }
131
132 #[inline]
134 pub fn to_slot(&self, vid: Vid) -> Option<u32> {
135 self.id_map.to_slot(vid)
136 }
137
138 pub fn vertices(&self) -> impl Iterator<Item = (u32, Vid)> + '_ {
140 self.id_map.iter()
141 }
142
143 pub fn memory_size(&self) -> usize {
145 self.out_offsets.len() * 4
146 + self.out_neighbors.len() * 4
147 + self.in_offsets.len() * 4
148 + self.in_neighbors.len() * 4
149 + self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
150 + self.id_map.memory_size()
151 }
152}
153
154use std::sync::Arc;
155
156pub struct ProjectionBuilder {
158 storage: Arc<StorageManager>,
159 l0_manager: Option<Arc<L0Manager>>,
161 config: ProjectionConfig,
162}
163
164impl ProjectionBuilder {
165 pub fn new(storage: Arc<StorageManager>) -> Self {
167 Self {
168 storage,
169 l0_manager: None,
170 config: ProjectionConfig::default(),
171 }
172 }
173
174 pub fn l0_manager(mut self, l0_manager: Option<Arc<L0Manager>>) -> Self {
176 self.l0_manager = l0_manager;
177 self
178 }
179
180 pub fn node_labels(mut self, labels: &[&str]) -> Self {
182 self.config.node_labels = labels.iter().map(|s| s.to_string()).collect();
183 self
184 }
185
186 pub fn edge_types(mut self, types: &[&str]) -> Self {
188 self.config.edge_types = types.iter().map(|s| s.to_string()).collect();
189 self
190 }
191
192 pub fn weight_property(mut self, prop: &str) -> Self {
194 self.config.weight_property = Some(prop.to_string());
195 self
196 }
197
198 pub fn include_reverse(mut self, enabled: bool) -> Self {
200 self.config.include_reverse = enabled;
201 self
202 }
203
204 pub async fn build(self) -> Result<GraphProjection> {
206 let schema = self.storage.schema_manager().schema();
207
208 let (label_ids, edge_type_ids) = self.resolve_ids(&schema)?;
210
211 self.warm_caches(&label_ids, &edge_type_ids).await?;
213
214 let all_vids = self.collect_vertices(&schema, &label_ids).await?;
216
217 let mut id_map = IdMap::with_capacity(all_vids.len());
218 for vid in all_vids {
219 id_map.insert(vid);
220 }
221 let vertex_count = id_map.len();
222
223 let (out_edges, in_edges) = self.collect_edges(&id_map, &edge_type_ids).await?;
225
226 id_map.compact();
228
229 let (out_offsets, out_neighbors, out_weights) = build_csr(vertex_count, &out_edges, true);
230 let (in_offsets, in_neighbors, _) = if self.config.include_reverse {
231 build_csr(vertex_count, &in_edges, false)
232 } else {
233 (vec![0; vertex_count + 1], Vec::new(), None)
234 };
235
236 Ok(GraphProjection {
237 vertex_count,
238 out_offsets,
239 out_neighbors,
240 in_offsets,
241 in_neighbors,
242 out_weights,
243 id_map,
244 _node_labels: self.config.node_labels,
245 _edge_types: self.config.edge_types,
246 })
247 }
248
249 fn resolve_ids(
251 &self,
252 schema: &uni_common::core::schema::Schema,
253 ) -> Result<(Vec<u16>, Vec<u32>)> {
254 let mut label_ids = Vec::new();
255 for label_name in &self.config.node_labels {
256 let meta = schema
257 .labels
258 .get(label_name)
259 .ok_or_else(|| anyhow!("Label {} not found", label_name))?;
260 label_ids.push(meta.id);
261 }
262
263 let mut edge_type_ids = Vec::new();
264 for type_name in &self.config.edge_types {
265 let meta = schema
266 .edge_types
267 .get(type_name)
268 .ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
269 edge_type_ids.push(meta.id);
270 }
271
272 if label_ids.is_empty() {
274 label_ids = schema.labels.values().map(|m| m.id).collect();
275 }
276 if edge_type_ids.is_empty() {
277 edge_type_ids = schema.edge_types.values().map(|m| m.id).collect();
278 }
279
280 Ok((label_ids, edge_type_ids))
281 }
282
283 async fn warm_caches(&self, _label_ids: &[u16], edge_type_ids: &[u32]) -> Result<()> {
285 for &type_id in edge_type_ids {
286 let edge_ver = self.storage.get_edge_version_by_id(type_id);
287 self.storage
288 .warm_adjacency(type_id, CacheDir::Outgoing, edge_ver)
289 .await?;
290 if self.config.include_reverse {
291 self.storage
292 .warm_adjacency(type_id, CacheDir::Incoming, edge_ver)
293 .await?;
294 }
295 }
296 Ok(())
297 }
298
299 async fn collect_vertices(
301 &self,
302 schema: &uni_common::core::schema::Schema,
303 label_ids: &[u16],
304 ) -> Result<Vec<Vid>> {
305 use arrow_array::UInt64Array;
306 use futures::TryStreamExt;
307 use lancedb::query::{ExecutableQuery, QueryBase, Select};
308
309 let mut all_vids = Vec::new();
310 let lancedb_store = self.storage.lancedb_store();
311
312 for &lid in label_ids {
314 let label_name = schema.label_name_by_id(lid).unwrap();
315
316 let ds = self.storage.vertex_dataset(label_name)?;
317 if let Ok(table) = ds.open_lancedb(lancedb_store).await {
318 let batches: Vec<arrow_array::RecordBatch> = table
319 .query()
320 .select(Select::Columns(vec!["_vid".to_string()]))
321 .execute()
322 .await
323 .map_err(|e| anyhow!("Failed to query table: {}", e))?
324 .try_collect()
325 .await
326 .map_err(|e| anyhow!("Failed to collect batches: {}", e))?;
327
328 for batch in batches {
329 let vid_col = batch
330 .column_by_name("_vid")
331 .unwrap()
332 .as_any()
333 .downcast_ref::<UInt64Array>()
334 .unwrap();
335 for i in 0..batch.num_rows() {
336 all_vids.push(Vid::from(vid_col.value(i)));
337 }
338 }
339 }
340 }
341
342 if let Some(ref l0_mgr) = self.l0_manager {
344 let label_names: Vec<&str> = label_ids
345 .iter()
346 .filter_map(|id| schema.label_name_by_id(*id))
347 .collect();
348
349 for pending_l0_arc in l0_mgr.get_pending_flush() {
351 all_vids.extend(pending_l0_arc.read().vids_for_labels(&label_names));
352 }
353
354 let current_l0 = l0_mgr.get_current();
356 all_vids.extend(current_l0.read().vids_for_labels(&label_names));
357 }
358
359 all_vids.sort_unstable();
361 all_vids.dedup();
362
363 Ok(all_vids)
364 }
365
366 async fn collect_edges(
368 &self,
369 id_map: &IdMap,
370 edge_type_ids: &[u32],
371 ) -> Result<(WeightedEdgeList, WeightedEdgeList)> {
372 let mut raw_out_edges = Vec::new(); let mut raw_in_edges = Vec::new();
375
376 for (src_slot, src_vid) in id_map.iter() {
377 for &type_id in edge_type_ids {
378 let neighbors = self.storage.adjacency_manager().get_neighbors(
380 src_vid,
381 type_id,
382 CacheDir::Outgoing,
383 );
384 for (dst_vid, eid) in neighbors {
385 raw_out_edges.push((src_slot, dst_vid, eid));
386 }
387
388 if self.config.include_reverse {
390 let in_neighbors = self.storage.adjacency_manager().get_neighbors(
391 src_vid,
392 type_id,
393 CacheDir::Incoming,
394 );
395 for (dst_vid, eid) in in_neighbors {
396 raw_in_edges.push((src_slot, dst_vid, eid));
397 }
398 }
399 }
400 }
401
402 let pm = if self.config.weight_property.is_some() {
404 Some(PropertyManager::new(
405 self.storage.clone(),
406 self.storage.schema_manager_arc(),
407 1000,
408 ))
409 } else {
410 None
411 };
412 let weight_prop = self.config.weight_property.as_deref();
413
414 let mut weights_cache: std::collections::HashMap<Eid, f64> =
416 std::collections::HashMap::new();
417
418 if let (Some(pm), Some(prop)) = (&pm, weight_prop) {
419 let mut all_eids: Vec<Eid> = raw_out_edges
421 .iter()
422 .map(|(_, _, eid)| *eid)
423 .chain(
424 self.config
425 .include_reverse
426 .then(|| raw_in_edges.iter().map(|(_, _, eid)| *eid))
427 .into_iter()
428 .flatten(),
429 )
430 .collect();
431 all_eids.sort_unstable();
432 all_eids.dedup();
433
434 let batch_props = pm.get_batch_edge_props(&all_eids, &[prop], None).await?;
436
437 for eid in all_eids {
439 let vid_key = Vid::from(eid.as_u64());
440 if let Some(weight) = batch_props
441 .get(&vid_key)
442 .and_then(|props| props.get(prop))
443 .and_then(|val| val.as_f64())
444 {
445 weights_cache.insert(eid, weight);
446 }
447 }
448 }
449
450 let out_edges: WeightedEdgeList = raw_out_edges
452 .into_iter()
453 .filter_map(|(src_slot, dst_vid, eid)| {
454 id_map.to_slot(dst_vid).map(|dst_slot| {
455 let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
456 (src_slot, dst_slot, weight)
457 })
458 })
459 .collect();
460
461 let in_edges: WeightedEdgeList = raw_in_edges
462 .into_iter()
463 .filter_map(|(src_slot, dst_vid, eid)| {
464 id_map.to_slot(dst_vid).map(|dst_slot| {
465 let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
466 (src_slot, dst_slot, weight)
467 })
468 })
469 .collect();
470
471 Ok((out_edges, in_edges))
472 }
473}
474
475fn build_csr(
477 vertex_count: usize,
478 edges: &[(u32, u32, f64)],
479 include_weights: bool,
480) -> (Vec<u32>, Vec<u32>, Option<Vec<f64>>) {
481 if vertex_count == 0 {
482 return (vec![0], Vec::new(), None);
483 }
484
485 let mut degrees = vec![0u32; vertex_count];
487 for &(src, _, _) in edges {
488 degrees[src as usize] += 1;
489 }
490
491 let mut offsets = vec![0u32; vertex_count + 1];
493 for i in 0..vertex_count {
494 offsets[i + 1] = offsets[i] + degrees[i];
495 }
496
497 let mut neighbors = vec![0u32; edges.len()];
499 let mut weights = if include_weights {
500 Some(vec![0.0; edges.len()])
501 } else {
502 None
503 };
504 let mut current = offsets.clone();
505
506 for &(src, dst, w) in edges {
507 let idx = current[src as usize] as usize;
508 neighbors[idx] = dst;
509 if let Some(ws) = &mut weights {
510 ws[idx] = w;
511 }
512 current[src as usize] += 1;
513 }
514
515 (offsets, neighbors, weights)
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[test]
523 fn test_build_csr() {
524 let edges = vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0), (0, 2, 0.5)];
526 let (offsets, neighbors, weights) = build_csr(3, &edges, true);
527
528 assert_eq!(offsets, vec![0, 2, 3, 4]);
529 assert_eq!(&neighbors[0..2], &[1, 2]);
531 if let Some(w) = weights {
532 assert_eq!(&w[0..2], &[1.0, 0.5]);
533 }
534 assert_eq!(&neighbors[2..3], &[2]);
536 assert_eq!(&neighbors[3..4], &[0]);
538 }
539}