Skip to main content

sqry_core/graph/unified/compaction/
build.rs

1//! CSR Build Phase: Offline construction of compacted CSR graphs.
2//!
3//! This module implements Phase 1 of the two-phase compaction process.
4//! It builds new CSR graphs from merged edges without holding any locks.
5//!
6//! # Design
7//!
8//! - **Lock-free build**: CSR construction happens on owned data, no locks needed
9//! - **Snapshot-based**: Input is a snapshot of merged edges from both CSR and delta
10//! - **Deterministic**: Same input always produces the same CSR
11//!
12//! # Algorithm
13//!
14//! 1. Take snapshot of current state (CSR edges + delta edges)
15//! 2. Merge using last-writer-wins semantics
16//! 3. Sort merged edges by source node
17//! 4. Build CSR arrays (`row_ptr`, `col_idx`, `edge_kind`, `edge_seq`)
18//! 5. Validate and return new CSR
19//!
20//! # Example
21//!
22//! ```rust,ignore
23//! use sqry_core::graph::unified::compaction::build::{
24//!     build_compacted_csr, CompactionSnapshot, snapshot_edges,
25//! };
26//!
27//! // Take snapshot (no locks held after this returns)
28//! let snapshot = snapshot_edges(&edge_store);
29//!
30//! // Build CSR offline (no locks needed)
31//! let (new_csr, stats) = build_compacted_csr(&snapshot, Direction::Forward)?;
32//! ```
33
34use super::super::edge::{DeltaEdge, EdgeKind, EdgeStore};
35use super::super::node::NodeId;
36use super::super::storage::{CsrError, CsrGraph};
37use super::errors::{BuildFailureReason, CompactionError, Direction};
38use super::merge::{MergeStats, MergedEdge, merge_delta_edges};
39
40/// Snapshot of edges for offline compaction.
41///
42/// This struct holds owned copies of edge data, allowing the compaction
43/// to proceed without holding any locks on the original data structures.
44#[derive(Debug, Clone)]
45pub struct CompactionSnapshot {
46    /// Edges from the current CSR (filtered by tombstones).
47    pub csr_edges: Vec<MergedEdge>,
48    /// Edges from the delta buffer.
49    pub delta_edges: Vec<DeltaEdge>,
50    /// Current node count (determines CSR size).
51    pub node_count: usize,
52    /// Current CSR version (for precondition checking during swap).
53    pub csr_version: u64,
54}
55
56impl CompactionSnapshot {
57    /// Creates an empty snapshot.
58    #[must_use]
59    pub fn empty(node_count: usize) -> Self {
60        Self {
61            csr_edges: Vec::new(),
62            delta_edges: Vec::new(),
63            node_count,
64            csr_version: 0,
65        }
66    }
67
68    /// Returns the total number of edges in the snapshot (before merge).
69    #[must_use]
70    pub fn total_edges(&self) -> usize {
71        self.csr_edges.len() + self.delta_edges.len()
72    }
73}
74
75/// Statistics from the CSR build phase.
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
77pub struct BuildStats {
78    /// Number of edges in the input CSR.
79    pub csr_input_edges: usize,
80    /// Number of edges in the delta buffer.
81    pub delta_input_edges: usize,
82    /// Statistics from the merge phase.
83    pub merge_stats: MergeStats,
84    /// Number of edges in the output CSR.
85    pub output_edges: usize,
86    /// Number of nodes in the output CSR.
87    pub output_nodes: usize,
88}
89
90/// Creates a snapshot of edges from an `EdgeStore` for offline compaction.
91///
92/// This function reads from the store and creates owned copies of all edges.
93/// After this function returns, no locks are needed for the compaction.
94///
95/// # Arguments
96///
97/// * `store` - The edge store to snapshot
98/// * `node_count` - The current node count (determines CSR size)
99///
100/// # Returns
101///
102/// A `CompactionSnapshot` containing all edges ready for merge and build.
103///
104/// # Panics
105///
106/// Panics if the CSR contains more than `u32::MAX` nodes.
107#[must_use]
108pub fn snapshot_edges(store: &EdgeStore, node_count: usize) -> CompactionSnapshot {
109    let mut csr_edges = Vec::new();
110
111    // Extract non-tombstoned CSR edges
112    if let Some(csr) = store.csr() {
113        for node_idx in 0..csr.node_count() {
114            let node_idx_u32 = u32::try_from(node_idx).expect("CSR node index exceeds u32::MAX");
115            for edge_ref in csr.edges_of(node_idx_u32) {
116                // Skip tombstoned edges - check against store's tombstone bitmap
117                if store.is_edge_tombstoned(edge_ref.index) {
118                    continue;
119                }
120
121                csr_edges.push(MergedEdge {
122                    source: NodeId::new(node_idx_u32, 0), // Generation not tracked in CSR
123                    target: edge_ref.target,
124                    kind: edge_ref.kind,
125                    seq: edge_ref.seq,
126                    file: super::super::file::FileId::new(0), // File not tracked in CSR
127                    spans: edge_ref.spans,                    // Preserve spans from CSR
128                });
129            }
130        }
131    }
132
133    // Extract all delta edges
134    let delta_edges: Vec<DeltaEdge> = store.delta().iter().cloned().collect();
135
136    CompactionSnapshot {
137        csr_edges,
138        delta_edges,
139        node_count,
140        csr_version: store.csr_version(),
141    }
142}
143
144/// Builds a compacted CSR from a snapshot.
145///
146/// This function performs the offline build phase of compaction:
147/// 1. Merges CSR and delta edges using last-writer-wins
148/// 2. Sorts edges by source node
149/// 3. Builds CSR arrays
150/// 4. Validates the result
151///
152/// # Arguments
153///
154/// * `snapshot` - The edge snapshot to compact
155/// * `direction` - The direction (Forward/Reverse) for error reporting
156///
157/// # Returns
158///
159/// A tuple of (`CsrGraph`, `BuildStats`) on success, or a `CompactionError` on failure.
160///
161/// # Errors
162///
163/// Returns `CompactionError::BuildFailed` if:
164/// - Edge sorting fails
165/// - CSR array construction fails
166/// - CSR validation fails
167pub fn build_compacted_csr(
168    snapshot: &CompactionSnapshot,
169    direction: Direction,
170) -> Result<(CsrGraph, BuildStats), CompactionError> {
171    let csr_input_edges = snapshot.csr_edges.len();
172    let delta_input_edges = snapshot.delta_edges.len();
173    let node_count = snapshot.node_count;
174
175    // Handle empty case
176    if node_count == 0 {
177        return Ok((
178            CsrGraph::empty(0),
179            BuildStats {
180                csr_input_edges,
181                delta_input_edges,
182                merge_stats: MergeStats::default(),
183                output_edges: 0,
184                output_nodes: 0,
185            },
186        ));
187    }
188
189    // Convert CSR edges to DeltaEdge format for merge
190    // Note: CSR edges are already "surviving" edges, so they're adds
191    // Important: Preserve span data from CSR edges using with_spans
192    let mut all_delta_edges: Vec<DeltaEdge> = snapshot
193        .csr_edges
194        .iter()
195        .map(|e| {
196            DeltaEdge::with_spans(
197                e.source,
198                e.target,
199                e.kind.clone(),
200                e.seq,
201                super::super::edge::DeltaOp::Add,
202                e.file,
203                e.spans.clone(),
204            )
205        })
206        .collect();
207
208    // Add delta buffer edges
209    all_delta_edges.extend(snapshot.delta_edges.iter().cloned());
210
211    // Merge using LWW semantics
212    let (merged_edges, merge_stats) = merge_delta_edges(all_delta_edges);
213
214    // Build CSR from merged edges
215    let csr = build_csr_from_edges(&merged_edges, node_count).map_err(|e| {
216        CompactionError::BuildFailed {
217            direction,
218            reason: BuildFailureReason::BuilderError {
219                message: e.to_string(),
220            },
221        }
222    })?;
223
224    let output_edges = csr.edge_count();
225    let output_nodes = csr.node_count();
226
227    Ok((
228        csr,
229        BuildStats {
230            csr_input_edges,
231            delta_input_edges,
232            merge_stats,
233            output_edges,
234            output_nodes,
235        },
236    ))
237}
238
239/// Builds a CSR graph directly from merged edges.
240///
241/// This is the core CSR construction algorithm:
242/// 1. Sort edges by source node
243/// 2. Count edges per source (for `row_ptr`)
244/// 3. Build arrays
245///
246/// # Arguments
247///
248/// * `edges` - Pre-merged edges (no duplicates, removes filtered out)
249/// * `node_count` - The number of nodes in the graph
250///
251/// # Returns
252///
253/// A validated `CsrGraph` on success, or `CsrError` on failure.
254///
255/// # Errors
256///
257/// Returns an error if the CSR structure fails validation.
258///
259/// # Panics
260///
261/// Panics if edge counts exceed `u32::MAX` during row pointer construction.
262pub fn build_csr_from_edges(edges: &[MergedEdge], node_count: usize) -> Result<CsrGraph, CsrError> {
263    if node_count == 0 {
264        return Ok(CsrGraph::empty(0));
265    }
266
267    // Sort edges by source index
268    let mut sorted_edges: Vec<&MergedEdge> = edges.iter().collect();
269    sorted_edges.sort_by_key(|e| e.source.index());
270
271    // Count edges per source node
272    let mut edge_counts: Vec<usize> = vec![0; node_count];
273    for edge in &sorted_edges {
274        let src_idx = edge.source.index() as usize;
275        if src_idx < node_count {
276            edge_counts[src_idx] += 1;
277        }
278    }
279
280    // Build row_ptr (prefix sum of edge counts)
281    let mut row_ptr: Vec<u32> = Vec::with_capacity(node_count + 1);
282    row_ptr.push(0);
283    let mut cumulative = 0u32;
284    for count in &edge_counts {
285        let count_u32 = u32::try_from(*count).expect("CSR edge count exceeds u32::MAX");
286        cumulative = cumulative
287            .checked_add(count_u32)
288            .expect("CSR row_ptr overflow");
289        row_ptr.push(cumulative);
290    }
291
292    // Build col_idx, edge_kind, edge_seq, edge_spans
293    let edge_count = sorted_edges.len();
294    let mut col_idx: Vec<NodeId> = Vec::with_capacity(edge_count);
295    let mut edge_kind: Vec<EdgeKind> = Vec::with_capacity(edge_count);
296    let mut edge_seq: Vec<u64> = Vec::with_capacity(edge_count);
297    let mut edge_spans: Vec<Vec<crate::graph::node::Span>> = Vec::with_capacity(edge_count);
298
299    for edge in sorted_edges {
300        col_idx.push(edge.target);
301        edge_kind.push(edge.kind.clone());
302        edge_seq.push(edge.seq);
303        edge_spans.push(edge.spans.clone());
304    }
305
306    // Construct and validate
307    let csr = CsrGraph::from_raw(
308        node_count, row_ptr, col_idx, edge_kind, edge_seq, edge_spans,
309    );
310    csr.validate()?;
311
312    Ok(csr)
313}
314
315#[cfg(test)]
316mod tests {
317    use super::super::super::edge::{DeltaOp, EdgeKind};
318    use super::super::super::file::FileId;
319    use super::super::super::node::NodeId;
320    use super::*;
321
322    fn make_merged_edge(source: u32, target: u32, seq: u64) -> MergedEdge {
323        MergedEdge::new(
324            NodeId::new(source, 0),
325            NodeId::new(target, 0),
326            EdgeKind::Calls {
327                argument_count: 0,
328                is_async: false,
329            },
330            seq,
331            FileId::new(1),
332        )
333    }
334
335    fn make_delta_edge(source: u32, target: u32, seq: u64, op: DeltaOp) -> DeltaEdge {
336        DeltaEdge::new(
337            NodeId::new(source, 0),
338            NodeId::new(target, 0),
339            EdgeKind::Calls {
340                argument_count: 0,
341                is_async: false,
342            },
343            seq,
344            op,
345            FileId::new(1),
346        )
347    }
348
349    #[test]
350    fn test_compaction_snapshot_empty() {
351        let snapshot = CompactionSnapshot::empty(10);
352        assert_eq!(snapshot.node_count, 10);
353        assert_eq!(snapshot.total_edges(), 0);
354        assert_eq!(snapshot.csr_version, 0);
355    }
356
357    #[test]
358    fn test_build_csr_from_edges_empty() {
359        let edges: Vec<MergedEdge> = vec![];
360        let csr = build_csr_from_edges(&edges, 5).unwrap();
361
362        assert_eq!(csr.node_count(), 5);
363        assert_eq!(csr.edge_count(), 0);
364        assert!(csr.validate().is_ok());
365    }
366
367    #[test]
368    fn test_build_csr_from_edges_simple() {
369        let edges = vec![
370            make_merged_edge(0, 1, 1),
371            make_merged_edge(0, 2, 2),
372            make_merged_edge(1, 2, 3),
373        ];
374
375        let csr = build_csr_from_edges(&edges, 3).unwrap();
376
377        assert_eq!(csr.node_count(), 3);
378        assert_eq!(csr.edge_count(), 3);
379        assert_eq!(csr.out_degree(0), 2);
380        assert_eq!(csr.out_degree(1), 1);
381        assert_eq!(csr.out_degree(2), 0);
382        assert!(csr.validate().is_ok());
383    }
384
385    #[test]
386    fn test_build_csr_from_edges_unsorted() {
387        // Edges not sorted by source - should still work
388        let edges = vec![
389            make_merged_edge(2, 0, 3),
390            make_merged_edge(0, 1, 1),
391            make_merged_edge(1, 2, 2),
392        ];
393
394        let csr = build_csr_from_edges(&edges, 3).unwrap();
395
396        assert_eq!(csr.edge_count(), 3);
397        assert_eq!(csr.out_degree(0), 1);
398        assert_eq!(csr.out_degree(1), 1);
399        assert_eq!(csr.out_degree(2), 1);
400        assert!(csr.validate().is_ok());
401    }
402
403    #[test]
404    fn test_build_csr_from_edges_zero_nodes() {
405        let edges: Vec<MergedEdge> = vec![];
406        let csr = build_csr_from_edges(&edges, 0).unwrap();
407
408        assert_eq!(csr.node_count(), 0);
409        assert_eq!(csr.edge_count(), 0);
410        assert!(csr.validate().is_ok());
411    }
412
413    #[test]
414    fn test_build_compacted_csr_empty_snapshot() {
415        let snapshot = CompactionSnapshot::empty(5);
416        let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
417
418        assert_eq!(csr.node_count(), 5);
419        assert_eq!(csr.edge_count(), 0);
420        assert_eq!(stats.csr_input_edges, 0);
421        assert_eq!(stats.delta_input_edges, 0);
422        assert_eq!(stats.output_edges, 0);
423    }
424
425    #[test]
426    fn test_build_compacted_csr_with_delta_edges() {
427        let snapshot = CompactionSnapshot {
428            csr_edges: vec![],
429            delta_edges: vec![
430                make_delta_edge(0, 1, 1, DeltaOp::Add),
431                make_delta_edge(1, 2, 2, DeltaOp::Add),
432            ],
433            node_count: 3,
434            csr_version: 0,
435        };
436
437        let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
438
439        assert_eq!(csr.edge_count(), 2);
440        assert_eq!(stats.delta_input_edges, 2);
441        assert_eq!(stats.output_edges, 2);
442    }
443
444    #[test]
445    fn test_build_compacted_csr_merge_removes_duplicates() {
446        let snapshot = CompactionSnapshot {
447            csr_edges: vec![make_merged_edge(0, 1, 1)], // Old edge seq=1
448            delta_edges: vec![
449                make_delta_edge(0, 1, 5, DeltaOp::Add), // Same edge, newer seq=5
450            ],
451            node_count: 3,
452            csr_version: 1,
453        };
454
455        let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
456
457        // Should have 1 edge (deduplicated)
458        assert_eq!(csr.edge_count(), 1);
459        assert_eq!(stats.csr_input_edges, 1);
460        assert_eq!(stats.delta_input_edges, 1);
461        assert_eq!(stats.output_edges, 1);
462
463        // The winning edge should have seq=5
464        let edge = csr.edge_at(0).unwrap();
465        assert_eq!(edge.seq, 5);
466    }
467
468    #[test]
469    fn test_build_compacted_csr_remove_wins() {
470        let snapshot = CompactionSnapshot {
471            csr_edges: vec![make_merged_edge(0, 1, 1)], // Edge exists seq=1
472            delta_edges: vec![
473                make_delta_edge(0, 1, 5, DeltaOp::Remove), // Remove with seq=5
474            ],
475            node_count: 3,
476            csr_version: 1,
477        };
478
479        let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
480
481        // Edge should be removed (Remove wins with higher seq)
482        assert_eq!(csr.edge_count(), 0);
483        assert_eq!(stats.csr_input_edges, 1);
484        assert_eq!(stats.delta_input_edges, 1);
485        assert_eq!(stats.output_edges, 0);
486    }
487
488    #[test]
489    fn test_build_compacted_csr_add_after_remove() {
490        let snapshot = CompactionSnapshot {
491            csr_edges: vec![],
492            delta_edges: vec![
493                make_delta_edge(0, 1, 1, DeltaOp::Add),
494                make_delta_edge(0, 1, 2, DeltaOp::Remove),
495                make_delta_edge(0, 1, 3, DeltaOp::Add), // Add wins with highest seq
496            ],
497            node_count: 3,
498            csr_version: 0,
499        };
500
501        let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
502
503        // Edge should exist (Add with seq=3 wins)
504        assert_eq!(csr.edge_count(), 1);
505        assert_eq!(stats.delta_input_edges, 3);
506        assert_eq!(stats.output_edges, 1);
507        assert_eq!(stats.merge_stats.deduplicated_count, 2);
508    }
509
510    #[test]
511    fn test_build_stats() {
512        let snapshot = CompactionSnapshot {
513            csr_edges: vec![make_merged_edge(0, 1, 1), make_merged_edge(1, 2, 2)],
514            delta_edges: vec![
515                make_delta_edge(2, 0, 3, DeltaOp::Add),
516                make_delta_edge(0, 1, 4, DeltaOp::Add), // Duplicate of CSR edge
517            ],
518            node_count: 3,
519            csr_version: 1,
520        };
521
522        let (csr, stats) = build_compacted_csr(&snapshot, Direction::Forward).unwrap();
523
524        assert_eq!(stats.csr_input_edges, 2);
525        assert_eq!(stats.delta_input_edges, 2);
526        assert_eq!(stats.output_edges, 3); // 2 from CSR (1 updated) + 1 new from delta
527        assert_eq!(stats.output_nodes, 3);
528        assert!(csr.validate().is_ok());
529    }
530
531    #[test]
532    fn test_build_csr_preserves_edge_data() {
533        use super::super::super::edge::EdgeKind;
534
535        let edges = vec![MergedEdge::new(
536            NodeId::new(0, 0),
537            NodeId::new(1, 0),
538            EdgeKind::References,
539            42,
540            FileId::new(99),
541        )];
542
543        let csr = build_csr_from_edges(&edges, 2).unwrap();
544        let edge_ref = csr.edge_at(0).unwrap();
545
546        assert_eq!(edge_ref.target, NodeId::new(1, 0));
547        assert_eq!(edge_ref.kind, EdgeKind::References);
548        assert_eq!(edge_ref.seq, 42);
549    }
550}