1use super::super::edge::{DeltaEdge, EdgeKind, EdgeStore};
35use super::super::node::NodeId;
36use super::super::storage::{CsrError, CsrGraph};
37use super::errors::{BuildFailureReason, CompactionError, Direction};
38use super::merge::{MergeStats, MergedEdge, merge_delta_edges};
39
40#[derive(Debug, Clone)]
45pub struct CompactionSnapshot {
46 pub csr_edges: Vec<MergedEdge>,
48 pub delta_edges: Vec<DeltaEdge>,
50 pub node_count: usize,
52 pub csr_version: u64,
54}
55
56impl CompactionSnapshot {
57 #[must_use]
59 pub fn empty(node_count: usize) -> Self {
60 Self {
61 csr_edges: Vec::new(),
62 delta_edges: Vec::new(),
63 node_count,
64 csr_version: 0,
65 }
66 }
67
68 #[must_use]
70 pub fn total_edges(&self) -> usize {
71 self.csr_edges.len() + self.delta_edges.len()
72 }
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
77pub struct BuildStats {
78 pub csr_input_edges: usize,
80 pub delta_input_edges: usize,
82 pub merge_stats: MergeStats,
84 pub output_edges: usize,
86 pub output_nodes: usize,
88}
89
90#[must_use]
108pub fn snapshot_edges(store: &EdgeStore, node_count: usize) -> CompactionSnapshot {
109 let mut csr_edges = Vec::new();
110
111 if let Some(csr) = store.csr() {
113 for node_idx in 0..csr.node_count() {
114 let node_idx_u32 = u32::try_from(node_idx).expect("CSR node index exceeds u32::MAX");
115 for edge_ref in csr.edges_of(node_idx_u32) {
116 if store.is_edge_tombstoned(edge_ref.index) {
118 continue;
119 }
120
121 csr_edges.push(MergedEdge {
122 source: NodeId::new(node_idx_u32, 0), target: edge_ref.target,
124 kind: edge_ref.kind,
125 seq: edge_ref.seq,
126 file: super::super::file::FileId::new(0), spans: edge_ref.spans, });
129 }
130 }
131 }
132
133 let delta_edges: Vec<DeltaEdge> = store.delta().iter().cloned().collect();
135
136 CompactionSnapshot {
137 csr_edges,
138 delta_edges,
139 node_count,
140 csr_version: store.csr_version(),
141 }
142}
143
144pub fn build_compacted_csr(
168 snapshot: &CompactionSnapshot,
169 direction: Direction,
170) -> Result<(CsrGraph, BuildStats), CompactionError> {
171 let csr_input_edges = snapshot.csr_edges.len();
172 let delta_input_edges = snapshot.delta_edges.len();
173 let node_count = snapshot.node_count;
174
175 if node_count == 0 {
177 return Ok((
178 CsrGraph::empty(0),
179 BuildStats {
180 csr_input_edges,
181 delta_input_edges,
182 merge_stats: MergeStats::default(),
183 output_edges: 0,
184 output_nodes: 0,
185 },
186 ));
187 }
188
189 let mut all_delta_edges: Vec<DeltaEdge> = snapshot
193 .csr_edges
194 .iter()
195 .map(|e| {
196 DeltaEdge::with_spans(
197 e.source,
198 e.target,
199 e.kind.clone(),
200 e.seq,
201 super::super::edge::DeltaOp::Add,
202 e.file,
203 e.spans.clone(),
204 )
205 })
206 .collect();
207
208 all_delta_edges.extend(snapshot.delta_edges.iter().cloned());
210
211 let (merged_edges, merge_stats) = merge_delta_edges(all_delta_edges);
213
214 let csr = build_csr_from_edges(&merged_edges, node_count).map_err(|e| {
216 CompactionError::BuildFailed {
217 direction,
218 reason: BuildFailureReason::BuilderError {
219 message: e.to_string(),
220 },
221 }
222 })?;
223
224 let output_edges = csr.edge_count();
225 let output_nodes = csr.node_count();
226
227 Ok((
228 csr,
229 BuildStats {
230 csr_input_edges,
231 delta_input_edges,
232 merge_stats,
233 output_edges,
234 output_nodes,
235 },
236 ))
237}
238
239pub fn build_csr_from_edges(edges: &[MergedEdge], node_count: usize) -> Result<CsrGraph, CsrError> {
263 if node_count == 0 {
264 return Ok(CsrGraph::empty(0));
265 }
266
267 let mut sorted_edges: Vec<&MergedEdge> = edges.iter().collect();
269 sorted_edges.sort_by_key(|e| e.source.index());
270
271 let mut edge_counts: Vec<usize> = vec![0; node_count];
273 for edge in &sorted_edges {
274 let src_idx = edge.source.index() as usize;
275 if src_idx < node_count {
276 edge_counts[src_idx] += 1;
277 }
278 }
279
280 let mut row_ptr: Vec<u32> = Vec::with_capacity(node_count + 1);
282 row_ptr.push(0);
283 let mut cumulative = 0u32;
284 for count in &edge_counts {
285 let count_u32 = u32::try_from(*count).expect("CSR edge count exceeds u32::MAX");
286 cumulative = cumulative
287 .checked_add(count_u32)
288 .expect("CSR row_ptr overflow");
289 row_ptr.push(cumulative);
290 }
291
292 let edge_count = sorted_edges.len();
294 let mut col_idx: Vec<NodeId> = Vec::with_capacity(edge_count);
295 let mut edge_kind: Vec<EdgeKind> = Vec::with_capacity(edge_count);
296 let mut edge_seq: Vec<u64> = Vec::with_capacity(edge_count);
297 let mut edge_spans: Vec<Vec<crate::graph::node::Span>> = Vec::with_capacity(edge_count);
298
299 for edge in sorted_edges {
300 col_idx.push(edge.target);
301 edge_kind.push(edge.kind.clone());
302 edge_seq.push(edge.seq);
303 edge_spans.push(edge.spans.clone());
304 }
305
306 let csr = CsrGraph::from_raw(
308 node_count, row_ptr, col_idx, edge_kind, edge_seq, edge_spans,
309 );
310 csr.validate()?;
311
312 Ok(csr)
313}
314
315#[cfg(test)]
316mod tests {
317 use super::super::super::edge::{DeltaOp, EdgeKind};
318 use super::super::super::file::FileId;
319 use super::super::super::node::NodeId;
320 use super::*;
321
322 fn make_merged_edge(source: u32, target: u32, seq: u64) -> MergedEdge {
323 MergedEdge::new(
324 NodeId::new(source, 0),
325 NodeId::new(target, 0),
326 EdgeKind::Calls {
327 argument_count: 0,
328 is_async: false,
329 },
330 seq,
331 FileId::new(1),
332 )
333 }
334
335 fn make_delta_edge(source: u32, target: u32, seq: u64, op: DeltaOp) -> DeltaEdge {
336 DeltaEdge::new(
337 NodeId::new(source, 0),
338 NodeId::new(target, 0),
339 EdgeKind::Calls {
340 argument_count: 0,
341 is_async: false,
342 },
343 seq,
344 op,
345 FileId::new(1),
346 )
347 }
348
349 #[test]
350 fn test_compaction_snapshot_empty() {
351 let snapshot = CompactionSnapshot::empty(10);
352 assert_eq!(snapshot.node_count, 10);
353 assert_eq!(snapshot.total_edges(), 0);
354 assert_eq!(snapshot.csr_version, 0);
355 }
356
357 #[test]
358 fn test_build_csr_from_edges_empty() {
359 let edges: Vec<MergedEdge> = vec![];
360 let csr = build_csr_from_edges(&edges, 5).unwrap();
361
362 assert_eq!(csr.node_count(), 5);
363 assert_eq!(csr.edge_count(), 0);
364 assert!(csr.validate().is_ok());
365 }
366
367 #[test]
368 fn test_build_csr_from_edges_simple() {
369 let edges = vec![
370 make_merged_edge(0, 1, 1),
371 make_merged_edge(0, 2, 2),
372 make_merged_edge(1, 2, 3),
373 ];
374
375 let csr = build_csr_from_edges(&edges, 3).unwrap();
376
377 assert_eq!(csr.node_count(), 3);
378 assert_eq!(csr.edge_count(), 3);
379 assert_eq!(csr.out_degree(0), 2);
380 assert_eq!(csr.out_degree(1), 1);
381 assert_eq!(csr.out_degree(2), 0);
382 assert!(csr.validate().is_ok());
383 }
384
385 #[test]
386 fn test_build_csr_from_edges_unsorted() {
387 let edges = vec![
389 make_merged_edge(2, 0, 3),
390 make_merged_edge(0, 1, 1),
391 make_merged_edge(1, 2, 2),
392 ];
393
394 let csr = build_csr_from_edges(&edges, 3).unwrap();
395
396 assert_eq!(csr.edge_count(), 3);
397 assert_eq!(csr.out_degree(0), 1);
398 assert_eq!(csr.out_degree(1), 1);
399 assert_eq!(csr.out_degree(2), 1);
400 assert!(csr.validate().is_ok());
401 }
402
403 #[test]
404 fn test_build_csr_from_edges_zero_nodes() {
405 let edges: Vec<MergedEdge> = vec![];
406 let csr = build_csr_from_edges(&edges, 0).unwrap();
407
408 assert_eq!(csr.node_count(), 0);
409 assert_eq!(csr.edge_count(), 0);
410 assert!(csr.validate().is_ok());
411 }
412
413 #[test]
414 fn test_build_compacted_csr_empty_snapshot() {
415 let snapshot = CompactionSnapshot::empty(5);
416 let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
417
418 assert_eq!(csr.node_count(), 5);
419 assert_eq!(csr.edge_count(), 0);
420 assert_eq!(stats.csr_input_edges, 0);
421 assert_eq!(stats.delta_input_edges, 0);
422 assert_eq!(stats.output_edges, 0);
423 }
424
425 #[test]
426 fn test_build_compacted_csr_with_delta_edges() {
427 let snapshot = CompactionSnapshot {
428 csr_edges: vec![],
429 delta_edges: vec![
430 make_delta_edge(0, 1, 1, DeltaOp::Add),
431 make_delta_edge(1, 2, 2, DeltaOp::Add),
432 ],
433 node_count: 3,
434 csr_version: 0,
435 };
436
437 let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
438
439 assert_eq!(csr.edge_count(), 2);
440 assert_eq!(stats.delta_input_edges, 2);
441 assert_eq!(stats.output_edges, 2);
442 }
443
444 #[test]
445 fn test_build_compacted_csr_merge_removes_duplicates() {
446 let snapshot = CompactionSnapshot {
447 csr_edges: vec![make_merged_edge(0, 1, 1)], delta_edges: vec![
449 make_delta_edge(0, 1, 5, DeltaOp::Add), ],
451 node_count: 3,
452 csr_version: 1,
453 };
454
455 let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
456
457 assert_eq!(csr.edge_count(), 1);
459 assert_eq!(stats.csr_input_edges, 1);
460 assert_eq!(stats.delta_input_edges, 1);
461 assert_eq!(stats.output_edges, 1);
462
463 let edge = csr.edge_at(0).unwrap();
465 assert_eq!(edge.seq, 5);
466 }
467
468 #[test]
469 fn test_build_compacted_csr_remove_wins() {
470 let snapshot = CompactionSnapshot {
471 csr_edges: vec![make_merged_edge(0, 1, 1)], delta_edges: vec![
473 make_delta_edge(0, 1, 5, DeltaOp::Remove), ],
475 node_count: 3,
476 csr_version: 1,
477 };
478
479 let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
480
481 assert_eq!(csr.edge_count(), 0);
483 assert_eq!(stats.csr_input_edges, 1);
484 assert_eq!(stats.delta_input_edges, 1);
485 assert_eq!(stats.output_edges, 0);
486 }
487
488 #[test]
489 fn test_build_compacted_csr_add_after_remove() {
490 let snapshot = CompactionSnapshot {
491 csr_edges: vec![],
492 delta_edges: vec![
493 make_delta_edge(0, 1, 1, DeltaOp::Add),
494 make_delta_edge(0, 1, 2, DeltaOp::Remove),
495 make_delta_edge(0, 1, 3, DeltaOp::Add), ],
497 node_count: 3,
498 csr_version: 0,
499 };
500
501 let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
502
503 assert_eq!(csr.edge_count(), 1);
505 assert_eq!(stats.delta_input_edges, 3);
506 assert_eq!(stats.output_edges, 1);
507 assert_eq!(stats.merge_stats.deduplicated_count, 2);
508 }
509
510 #[test]
511 fn test_build_stats() {
512 let snapshot = CompactionSnapshot {
513 csr_edges: vec![make_merged_edge(0, 1, 1), make_merged_edge(1, 2, 2)],
514 delta_edges: vec![
515 make_delta_edge(2, 0, 3, DeltaOp::Add),
516 make_delta_edge(0, 1, 4, DeltaOp::Add), ],
518 node_count: 3,
519 csr_version: 1,
520 };
521
522 let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
523
524 assert_eq!(stats.csr_input_edges, 2);
525 assert_eq!(stats.delta_input_edges, 2);
526 assert_eq!(stats.output_edges, 3); assert_eq!(stats.output_nodes, 3);
528 assert!(csr.validate().is_ok());
529 }
530
531 #[test]
532 fn test_build_csr_preserves_edge_data() {
533 use super::super::super::edge::EdgeKind;
534
535 let edges = vec![MergedEdge::new(
536 NodeId::new(0, 0),
537 NodeId::new(1, 0),
538 EdgeKind::References,
539 42,
540 FileId::new(99),
541 )];
542
543 let csr = build_csr_from_edges(&edges, 2).unwrap();
544 let edge_ref = csr.edge_at(0).unwrap();
545
546 assert_eq!(edge_ref.target, NodeId::new(1, 0));
547 assert_eq!(edge_ref.kind, EdgeKind::References);
548 assert_eq!(edge_ref.seq, 42);
549 }
550}