1use crate::algo::IdMap;
14use anyhow::{Result, anyhow};
15use uni_common::core::id::{Eid, Vid};
16use uni_store::runtime::L0Manager;
17use uni_store::runtime::property_manager::PropertyManager;
18use uni_store::storage::direction::Direction as CacheDir;
19use uni_store::storage::manager::StorageManager;
20
21type WeightedEdgeList = Vec<(u32, u32, f64)>;
23
24#[derive(Debug, Clone, Default)]
26pub struct ProjectionConfig {
27 pub node_labels: Vec<String>,
29 pub edge_types: Vec<String>,
31 pub weight_property: Option<String>,
33 pub include_reverse: bool,
35}
36
37#[derive(Debug)]
39pub struct GraphProjection {
40 pub(crate) vertex_count: usize,
42
43 pub(crate) out_offsets: Vec<u32>, pub(crate) out_neighbors: Vec<u32>, pub(crate) in_offsets: Vec<u32>, pub(crate) in_neighbors: Vec<u32>, pub(crate) out_weights: Option<Vec<f64>>,
53
54 pub(crate) id_map: IdMap,
56
57 pub(crate) _node_labels: Vec<String>,
59 pub(crate) _edge_types: Vec<String>,
60}
61
62impl GraphProjection {
63 #[inline]
65 pub fn vertex_count(&self) -> usize {
66 self.vertex_count
67 }
68
69 #[inline]
71 pub fn edge_count(&self) -> usize {
72 self.out_neighbors.len()
73 }
74
75 #[inline]
77 pub fn out_neighbors(&self, slot: u32) -> &[u32] {
78 let start = self.out_offsets[slot as usize] as usize;
79 let end = self.out_offsets[slot as usize + 1] as usize;
80 &self.out_neighbors[start..end]
81 }
82
83 #[inline]
85 pub fn out_degree(&self, slot: u32) -> u32 {
86 self.out_offsets[slot as usize + 1] - self.out_offsets[slot as usize]
87 }
88
89 #[inline]
93 pub fn in_neighbors(&self, slot: u32) -> &[u32] {
94 let start = self.in_offsets[slot as usize] as usize;
95 let end = self.in_offsets[slot as usize + 1] as usize;
96 &self.in_neighbors[start..end]
97 }
98
99 #[inline]
101 pub fn in_degree(&self, slot: u32) -> u32 {
102 self.in_offsets[slot as usize + 1] - self.in_offsets[slot as usize]
103 }
104
105 #[inline]
109 pub fn out_weight(&self, slot: u32, edge_idx: usize) -> f64 {
110 let start = self.out_offsets[slot as usize] as usize;
111 self.out_weights.as_ref().expect("no weights")[start + edge_idx]
112 }
113
114 #[inline]
116 pub fn has_weights(&self) -> bool {
117 self.out_weights.is_some()
118 }
119
120 #[inline]
122 pub fn has_reverse(&self) -> bool {
123 !self.in_neighbors.is_empty()
124 }
125
126 #[inline]
128 pub fn to_vid(&self, slot: u32) -> Vid {
129 self.id_map.to_vid_unchecked(slot)
130 }
131
132 #[inline]
134 pub fn to_slot(&self, vid: Vid) -> Option<u32> {
135 self.id_map.to_slot(vid)
136 }
137
138 pub fn vertices(&self) -> impl Iterator<Item = (u32, Vid)> + '_ {
140 self.id_map.iter()
141 }
142
143 pub fn memory_size(&self) -> usize {
145 self.out_offsets.len() * 4
146 + self.out_neighbors.len() * 4
147 + self.in_offsets.len() * 4
148 + self.in_neighbors.len() * 4
149 + self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
150 + self.id_map.memory_size()
151 }
152}
153
154use std::sync::Arc;
155
156pub struct ProjectionBuilder {
158 storage: Arc<StorageManager>,
159 l0_manager: Option<Arc<L0Manager>>,
161 config: ProjectionConfig,
162}
163
164impl ProjectionBuilder {
165 pub fn new(storage: Arc<StorageManager>) -> Self {
167 Self {
168 storage,
169 l0_manager: None,
170 config: ProjectionConfig::default(),
171 }
172 }
173
174 pub fn l0_manager(mut self, l0_manager: Option<Arc<L0Manager>>) -> Self {
176 self.l0_manager = l0_manager;
177 self
178 }
179
180 pub fn node_labels(mut self, labels: &[&str]) -> Self {
182 self.config.node_labels = labels.iter().map(|s| s.to_string()).collect();
183 self
184 }
185
186 pub fn edge_types(mut self, types: &[&str]) -> Self {
188 self.config.edge_types = types.iter().map(|s| s.to_string()).collect();
189 self
190 }
191
192 pub fn weight_property(mut self, prop: &str) -> Self {
194 self.config.weight_property = Some(prop.to_string());
195 self
196 }
197
198 pub fn include_reverse(mut self, enabled: bool) -> Self {
200 self.config.include_reverse = enabled;
201 self
202 }
203
204 pub async fn build(self) -> Result<GraphProjection> {
206 let schema = self.storage.schema_manager().schema();
207
208 let (label_ids, edge_type_ids) = self.resolve_ids(&schema)?;
210
211 self.warm_caches(&label_ids, &edge_type_ids).await?;
213
214 let all_vids = self.collect_vertices(&schema, &label_ids).await?;
216
217 let mut id_map = IdMap::with_capacity(all_vids.len());
218 for vid in all_vids {
219 id_map.insert(vid);
220 }
221 let vertex_count = id_map.len();
222
223 let (out_edges, in_edges) = self.collect_edges(&id_map, &edge_type_ids).await?;
225
226 id_map.compact();
228
229 let (out_offsets, out_neighbors, out_weights) = build_csr(vertex_count, &out_edges, true);
230 let (in_offsets, in_neighbors, _) = if self.config.include_reverse {
231 build_csr(vertex_count, &in_edges, false)
232 } else {
233 (vec![0; vertex_count + 1], Vec::new(), None)
234 };
235
236 Ok(GraphProjection {
237 vertex_count,
238 out_offsets,
239 out_neighbors,
240 in_offsets,
241 in_neighbors,
242 out_weights,
243 id_map,
244 _node_labels: self.config.node_labels,
245 _edge_types: self.config.edge_types,
246 })
247 }
248
249 fn resolve_ids(
251 &self,
252 schema: &uni_common::core::schema::Schema,
253 ) -> Result<(Vec<u16>, Vec<u32>)> {
254 let mut label_ids = Vec::new();
255 for label_name in &self.config.node_labels {
256 let meta = schema
257 .labels
258 .get(label_name)
259 .ok_or_else(|| anyhow!("Label {} not found", label_name))?;
260 label_ids.push(meta.id);
261 }
262
263 let mut edge_type_ids = Vec::new();
264 for type_name in &self.config.edge_types {
265 let meta = schema
266 .edge_types
267 .get(type_name)
268 .ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
269 edge_type_ids.push(meta.id);
270 }
271
272 if label_ids.is_empty() {
274 label_ids = schema.labels.values().map(|m| m.id).collect();
275 }
276 if edge_type_ids.is_empty() {
277 edge_type_ids = schema.edge_types.values().map(|m| m.id).collect();
278 }
279
280 Ok((label_ids, edge_type_ids))
281 }
282
283 async fn warm_caches(&self, _label_ids: &[u16], edge_type_ids: &[u32]) -> Result<()> {
285 for &type_id in edge_type_ids {
286 let edge_ver = self.storage.get_edge_version_by_id(type_id);
287 self.storage
288 .warm_adjacency(type_id, CacheDir::Outgoing, edge_ver)
289 .await?;
290 if self.config.include_reverse {
291 self.storage
292 .warm_adjacency(type_id, CacheDir::Incoming, edge_ver)
293 .await?;
294 }
295 }
296 Ok(())
297 }
298
299 async fn collect_vertices(
301 &self,
302 schema: &uni_common::core::schema::Schema,
303 label_ids: &[u16],
304 ) -> Result<Vec<Vid>> {
305 use arrow_array::UInt64Array;
306
307 let mut all_vids = Vec::new();
308
309 for &lid in label_ids {
310 let label_name = schema.label_name_by_id(lid).unwrap();
311 if let Ok(Some(batch)) = self
312 .storage
313 .scan_vertex_table(label_name, &["_vid"], None)
314 .await
315 {
316 let vid_col = batch
317 .column_by_name("_vid")
318 .unwrap()
319 .as_any()
320 .downcast_ref::<UInt64Array>()
321 .unwrap();
322 for i in 0..batch.num_rows() {
323 all_vids.push(Vid::from(vid_col.value(i)));
324 }
325 }
326 }
327
328 if let Some(ref l0_mgr) = self.l0_manager {
330 let label_names: Vec<&str> = label_ids
331 .iter()
332 .filter_map(|id| schema.label_name_by_id(*id))
333 .collect();
334
335 for pending_l0_arc in l0_mgr.get_pending_flush() {
337 all_vids.extend(pending_l0_arc.read().vids_for_labels(&label_names));
338 }
339
340 let current_l0 = l0_mgr.get_current();
342 all_vids.extend(current_l0.read().vids_for_labels(&label_names));
343 }
344
345 all_vids.sort_unstable();
347 all_vids.dedup();
348
349 Ok(all_vids)
350 }
351
352 async fn collect_edges(
354 &self,
355 id_map: &IdMap,
356 edge_type_ids: &[u32],
357 ) -> Result<(WeightedEdgeList, WeightedEdgeList)> {
358 let mut raw_out_edges = Vec::new(); let mut raw_in_edges = Vec::new();
361
362 for (src_slot, src_vid) in id_map.iter() {
363 for &type_id in edge_type_ids {
364 let neighbors = self.storage.adjacency_manager().get_neighbors(
366 src_vid,
367 type_id,
368 CacheDir::Outgoing,
369 );
370 for (dst_vid, eid) in neighbors {
371 raw_out_edges.push((src_slot, dst_vid, eid));
372 }
373
374 if self.config.include_reverse {
376 let in_neighbors = self.storage.adjacency_manager().get_neighbors(
377 src_vid,
378 type_id,
379 CacheDir::Incoming,
380 );
381 for (dst_vid, eid) in in_neighbors {
382 raw_in_edges.push((src_slot, dst_vid, eid));
383 }
384 }
385 }
386 }
387
388 let pm = if self.config.weight_property.is_some() {
390 Some(PropertyManager::new(
391 self.storage.clone(),
392 self.storage.schema_manager_arc(),
393 1000,
394 ))
395 } else {
396 None
397 };
398 let weight_prop = self.config.weight_property.as_deref();
399
400 let mut weights_cache: std::collections::HashMap<Eid, f64> =
402 std::collections::HashMap::new();
403
404 if let (Some(pm), Some(prop)) = (&pm, weight_prop) {
405 let mut all_eids: Vec<Eid> = raw_out_edges
407 .iter()
408 .map(|(_, _, eid)| *eid)
409 .chain(
410 self.config
411 .include_reverse
412 .then(|| raw_in_edges.iter().map(|(_, _, eid)| *eid))
413 .into_iter()
414 .flatten(),
415 )
416 .collect();
417 all_eids.sort_unstable();
418 all_eids.dedup();
419
420 let batch_props = pm.get_batch_edge_props(&all_eids, &[prop], None).await?;
422
423 for eid in all_eids {
425 let vid_key = Vid::from(eid.as_u64());
426 if let Some(weight) = batch_props
427 .get(&vid_key)
428 .and_then(|props| props.get(prop))
429 .and_then(|val| val.as_f64())
430 {
431 weights_cache.insert(eid, weight);
432 }
433 }
434 }
435
436 let out_edges: WeightedEdgeList = raw_out_edges
438 .into_iter()
439 .filter_map(|(src_slot, dst_vid, eid)| {
440 id_map.to_slot(dst_vid).map(|dst_slot| {
441 let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
442 (src_slot, dst_slot, weight)
443 })
444 })
445 .collect();
446
447 let in_edges: WeightedEdgeList = raw_in_edges
448 .into_iter()
449 .filter_map(|(src_slot, dst_vid, eid)| {
450 id_map.to_slot(dst_vid).map(|dst_slot| {
451 let weight = weights_cache.get(&eid).copied().unwrap_or(1.0);
452 (src_slot, dst_slot, weight)
453 })
454 })
455 .collect();
456
457 Ok((out_edges, in_edges))
458 }
459}
460
461fn build_csr(
463 vertex_count: usize,
464 edges: &[(u32, u32, f64)],
465 include_weights: bool,
466) -> (Vec<u32>, Vec<u32>, Option<Vec<f64>>) {
467 if vertex_count == 0 {
468 return (vec![0], Vec::new(), None);
469 }
470
471 let mut degrees = vec![0u32; vertex_count];
473 for &(src, _, _) in edges {
474 degrees[src as usize] += 1;
475 }
476
477 let mut offsets = vec![0u32; vertex_count + 1];
479 for i in 0..vertex_count {
480 offsets[i + 1] = offsets[i] + degrees[i];
481 }
482
483 let mut neighbors = vec![0u32; edges.len()];
485 let mut weights = if include_weights {
486 Some(vec![0.0; edges.len()])
487 } else {
488 None
489 };
490 let mut current = offsets.clone();
491
492 for &(src, dst, w) in edges {
493 let idx = current[src as usize] as usize;
494 neighbors[idx] = dst;
495 if let Some(ws) = &mut weights {
496 ws[idx] = w;
497 }
498 current[src as usize] += 1;
499 }
500
501 (offsets, neighbors, weights)
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507
508 #[test]
509 fn test_build_csr() {
510 let edges = vec![(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0), (0, 2, 0.5)];
512 let (offsets, neighbors, weights) = build_csr(3, &edges, true);
513
514 assert_eq!(offsets, vec![0, 2, 3, 4]);
515 assert_eq!(&neighbors[0..2], &[1, 2]);
517 if let Some(w) = weights {
518 assert_eq!(&w[0..2], &[1.0, 0.5]);
519 }
520 assert_eq!(&neighbors[2..3], &[2]);
522 assert_eq!(&neighbors[3..4], &[0]);
524 }
525}