use super::condensation::{CondensationDag, Interval};
use anyhow::Result;
use rayon::prelude::*;
type LabelData = (Vec<u32>, Vec<Interval>, Vec<u32>, Vec<Interval>);
const PARALLEL_LEVEL_THRESHOLD: usize = 512;
#[allow(clippy::cast_possible_truncation)] pub fn compute_2hop_labels(dag: &CondensationDag, budget: usize) -> Result<LabelData> {
let scc_count = dag.scc_count as usize;
let base_intervals = compute_base_intervals(dag, scc_count);
let predecessors = build_predecessors(dag, scc_count);
let (label_out_data, total_out) =
compute_label_out_wavefront(dag, scc_count, &base_intervals, budget)?;
let label_in_data = compute_label_in_wavefront(
dag,
scc_count,
&base_intervals,
&predecessors,
budget,
total_out,
)?;
let (label_out_offsets, label_out_flat) = flatten_labels(&label_out_data, scc_count);
let (label_in_offsets, label_in_flat) = flatten_labels(&label_in_data, scc_count);
debug_assert!(label_out_flat.len() + label_in_flat.len() <= budget);
Ok((
label_out_offsets,
label_out_flat,
label_in_offsets,
label_in_flat,
))
}
#[allow(clippy::cast_possible_truncation)]
fn compute_base_intervals(dag: &CondensationDag, scc_count: usize) -> Vec<Interval> {
let mut base_intervals = vec![Interval::new(0, 0); scc_count];
for (topo_idx, &scc_id) in dag.topo_order.iter().enumerate() {
base_intervals[scc_id as usize] = Interval::new(topo_idx as u32, (topo_idx + 1) as u32);
}
base_intervals
}
#[allow(clippy::cast_possible_truncation)]
fn build_predecessors(dag: &CondensationDag, scc_count: usize) -> Vec<Vec<u32>> {
let mut predecessors: Vec<Vec<u32>> = vec![Vec::new(); scc_count];
for scc in 0..scc_count {
for &successor in dag.successors(scc as u32) {
predecessors[successor as usize].push(scc as u32);
}
}
predecessors
}
fn compute_sink_levels(dag: &CondensationDag, scc_count: usize) -> (Vec<usize>, usize) {
let mut levels = vec![0usize; scc_count];
for &scc_id in dag.topo_order.iter().rev() {
let scc = scc_id as usize;
let max_succ = dag
.successors(scc_id)
.iter()
.map(|&s| levels[s as usize])
.max();
if let Some(max) = max_succ {
levels[scc] = max + 1;
}
}
let max_level = levels.iter().copied().max().unwrap_or(0);
(levels, max_level)
}
fn compute_source_levels(
scc_count: usize,
predecessors: &[Vec<u32>],
topo_order: &[u32],
) -> (Vec<usize>, usize) {
let mut levels = vec![0usize; scc_count];
for &scc_id in topo_order {
let scc = scc_id as usize;
let max_pred = predecessors[scc].iter().map(|&p| levels[p as usize]).max();
if let Some(max) = max_pred {
levels[scc] = max + 1;
}
}
let max_level = levels.iter().copied().max().unwrap_or(0);
(levels, max_level)
}
fn group_by_level(scc_count: usize, levels: &[usize], max_level: usize) -> Option<Vec<Vec<u32>>> {
if max_level > scc_count / 4 {
return None;
}
let mut groups: Vec<Vec<u32>> = vec![Vec::new(); max_level + 1];
for scc in 0..scc_count {
let scc_u32 = u32::try_from(scc).ok()?;
groups[levels[scc]].push(scc_u32);
}
Some(groups)
}
fn merge_intervals(mut intervals: Vec<Interval>) -> Vec<Interval> {
if intervals.len() <= 1 {
return intervals;
}
intervals.sort_unstable_by_key(|i| i.start);
let mut merged = Vec::with_capacity(intervals.len());
let mut current = intervals[0];
for &next in &intervals[1..] {
if next.start <= current.end {
current.end = current.end.max(next.end);
} else {
merged.push(current);
current = next;
}
}
merged.push(current);
merged
}
fn process_sequential_out(
sccs: &[u32],
dag: &CondensationDag,
base_intervals: &[Interval],
label_out_data: &mut [Vec<Interval>],
bitset: &mut FastBitSet,
total_intervals: &mut usize,
budget: usize,
) -> Result<()> {
for &scc_id in sccs {
let scc = scc_id as usize;
bitset.clear();
bitset.set_range(base_intervals[scc].start, base_intervals[scc].end);
for &successor in dag.successors(scc_id) {
for interval in &label_out_data[successor as usize] {
bitset.set_range(interval.start, interval.end);
}
}
label_out_data[scc] = bitset.extract_intervals();
*total_intervals += label_out_data[scc].len();
if *total_intervals > budget {
anyhow::bail!(
"2-hop label budget exceeded during label_out computation: \
{} intervals > {budget} budget",
*total_intervals
);
}
}
Ok(())
}
fn estimate_level_intervals_out(
sccs: &[u32],
dag: &CondensationDag,
label_out_data: &[Vec<Interval>],
) -> usize {
sccs.iter()
.map(|&scc_id| {
1 + dag
.successors(scc_id)
.iter()
.map(|&s| label_out_data[s as usize].len())
.sum::<usize>()
})
.sum()
}
fn compute_label_out_wavefront(
dag: &CondensationDag,
scc_count: usize,
base_intervals: &[Interval],
budget: usize,
) -> Result<(Vec<Vec<Interval>>, usize)> {
let mut label_out_data: Vec<Vec<Interval>> = vec![Vec::new(); scc_count];
let mut total_intervals = 0usize;
let mut bitset = FastBitSet::new(scc_count);
let (levels, max_level) = compute_sink_levels(dag, scc_count);
let level_groups = group_by_level(scc_count, &levels, max_level);
let Some(level_groups) = level_groups else {
process_out_topo_order_sequential(
dag,
base_intervals,
&mut label_out_data,
&mut bitset,
&mut total_intervals,
budget,
)?;
return Ok((label_out_data, total_intervals));
};
for sccs in &level_groups {
process_label_out_level(
sccs,
dag,
base_intervals,
&mut label_out_data,
&mut bitset,
&mut total_intervals,
budget,
)?;
}
Ok((label_out_data, total_intervals))
}
fn process_out_topo_order_sequential(
dag: &CondensationDag,
base_intervals: &[Interval],
label_out_data: &mut [Vec<Interval>],
bitset: &mut FastBitSet,
total_intervals: &mut usize,
budget: usize,
) -> Result<()> {
for &scc_id in dag.topo_order.iter().rev() {
process_sequential_out(
&[scc_id],
dag,
base_intervals,
label_out_data,
bitset,
total_intervals,
budget,
)?;
}
Ok(())
}
fn process_label_out_level(
sccs: &[u32],
dag: &CondensationDag,
base_intervals: &[Interval],
label_out_data: &mut [Vec<Interval>],
bitset: &mut FastBitSet,
total_intervals: &mut usize,
budget: usize,
) -> Result<()> {
if should_process_level_out_in_parallel(sccs, dag, label_out_data, *total_intervals, budget) {
let results = compute_parallel_out_level_results(sccs, dag, base_intervals, label_out_data);
apply_label_level_results(results, label_out_data, total_intervals);
ensure_label_budget(*total_intervals, budget, "label_out")?;
return Ok(());
}
process_sequential_out(
sccs,
dag,
base_intervals,
label_out_data,
bitset,
total_intervals,
budget,
)
}
fn should_process_level_out_in_parallel(
sccs: &[u32],
dag: &CondensationDag,
label_out_data: &[Vec<Interval>],
total_intervals: usize,
budget: usize,
) -> bool {
let remaining_budget = budget.saturating_sub(total_intervals);
sccs.len() >= PARALLEL_LEVEL_THRESHOLD
&& estimate_level_intervals_out(sccs, dag, label_out_data) <= remaining_budget
}
fn compute_parallel_out_level_results(
sccs: &[u32],
dag: &CondensationDag,
base_intervals: &[Interval],
label_out_data: &[Vec<Interval>],
) -> Vec<(usize, Vec<Interval>)> {
sccs.par_iter()
.map(|&scc_id| {
let scc = scc_id as usize;
let successors = dag.successors(scc_id);
if successors.is_empty() {
return (scc, vec![base_intervals[scc]]);
}
let capacity = 1 + successors
.iter()
.map(|&successor| label_out_data[successor as usize].len())
.sum::<usize>();
let mut intervals = Vec::with_capacity(capacity);
intervals.push(base_intervals[scc]);
for &successor in successors {
intervals.extend_from_slice(&label_out_data[successor as usize]);
}
(scc, merge_intervals(intervals))
})
.collect()
}
fn process_sequential_in(
sccs: &[u32],
base_intervals: &[Interval],
predecessors: &[Vec<u32>],
label_in_data: &mut [Vec<Interval>],
bitset: &mut FastBitSet,
total_intervals: &mut usize,
budget: usize,
) -> Result<()> {
for &scc_id in sccs {
let scc = scc_id as usize;
bitset.clear();
bitset.set_range(base_intervals[scc].start, base_intervals[scc].end);
for &predecessor in &predecessors[scc] {
for interval in &label_in_data[predecessor as usize] {
bitset.set_range(interval.start, interval.end);
}
}
label_in_data[scc] = bitset.extract_intervals();
*total_intervals += label_in_data[scc].len();
if *total_intervals > budget {
anyhow::bail!(
"2-hop label budget exceeded during label_in computation: \
{} intervals > {budget} budget",
*total_intervals
);
}
}
Ok(())
}
fn estimate_level_intervals_in(
sccs: &[u32],
predecessors: &[Vec<u32>],
label_in_data: &[Vec<Interval>],
) -> usize {
sccs.iter()
.map(|&scc_id| {
let scc = scc_id as usize;
1 + predecessors[scc]
.iter()
.map(|&p| label_in_data[p as usize].len())
.sum::<usize>()
})
.sum()
}
fn compute_label_in_wavefront(
dag: &CondensationDag,
scc_count: usize,
base_intervals: &[Interval],
predecessors: &[Vec<u32>],
budget: usize,
initial_total: usize,
) -> Result<Vec<Vec<Interval>>> {
let mut label_in_data: Vec<Vec<Interval>> = vec![Vec::new(); scc_count];
let mut total_intervals = initial_total;
let mut bitset = FastBitSet::new(scc_count);
let (levels, max_level) = compute_source_levels(scc_count, predecessors, &dag.topo_order);
let level_groups = group_by_level(scc_count, &levels, max_level);
let Some(level_groups) = level_groups else {
process_in_topo_order_sequential(
dag,
base_intervals,
predecessors,
&mut label_in_data,
&mut bitset,
&mut total_intervals,
budget,
)?;
return Ok(label_in_data);
};
for sccs in &level_groups {
process_label_in_level(
sccs,
base_intervals,
predecessors,
&mut label_in_data,
&mut bitset,
&mut total_intervals,
budget,
)?;
}
Ok(label_in_data)
}
fn process_in_topo_order_sequential(
dag: &CondensationDag,
base_intervals: &[Interval],
predecessors: &[Vec<u32>],
label_in_data: &mut [Vec<Interval>],
bitset: &mut FastBitSet,
total_intervals: &mut usize,
budget: usize,
) -> Result<()> {
for &scc_id in &dag.topo_order {
process_sequential_in(
&[scc_id],
base_intervals,
predecessors,
label_in_data,
bitset,
total_intervals,
budget,
)?;
}
Ok(())
}
fn process_label_in_level(
sccs: &[u32],
base_intervals: &[Interval],
predecessors: &[Vec<u32>],
label_in_data: &mut [Vec<Interval>],
bitset: &mut FastBitSet,
total_intervals: &mut usize,
budget: usize,
) -> Result<()> {
if should_process_level_in_parallel(sccs, predecessors, label_in_data, *total_intervals, budget)
{
let results =
compute_parallel_in_level_results(sccs, base_intervals, predecessors, label_in_data);
apply_label_level_results(results, label_in_data, total_intervals);
ensure_label_budget(*total_intervals, budget, "label_in")?;
return Ok(());
}
process_sequential_in(
sccs,
base_intervals,
predecessors,
label_in_data,
bitset,
total_intervals,
budget,
)
}
fn should_process_level_in_parallel(
sccs: &[u32],
predecessors: &[Vec<u32>],
label_in_data: &[Vec<Interval>],
total_intervals: usize,
budget: usize,
) -> bool {
let remaining_budget = budget.saturating_sub(total_intervals);
sccs.len() >= PARALLEL_LEVEL_THRESHOLD
&& estimate_level_intervals_in(sccs, predecessors, label_in_data) <= remaining_budget
}
fn compute_parallel_in_level_results(
sccs: &[u32],
base_intervals: &[Interval],
predecessors: &[Vec<u32>],
label_in_data: &[Vec<Interval>],
) -> Vec<(usize, Vec<Interval>)> {
sccs.par_iter()
.map(|&scc_id| {
let scc = scc_id as usize;
let preds = &predecessors[scc];
if preds.is_empty() {
return (scc, vec![base_intervals[scc]]);
}
let capacity = 1 + preds
.iter()
.map(|&predecessor| label_in_data[predecessor as usize].len())
.sum::<usize>();
let mut intervals = Vec::with_capacity(capacity);
intervals.push(base_intervals[scc]);
for &predecessor in preds {
intervals.extend_from_slice(&label_in_data[predecessor as usize]);
}
(scc, merge_intervals(intervals))
})
.collect()
}
fn apply_label_level_results(
results: Vec<(usize, Vec<Interval>)>,
label_data: &mut [Vec<Interval>],
total_intervals: &mut usize,
) {
for (idx, intervals) in results {
*total_intervals += intervals.len();
label_data[idx] = intervals;
}
}
fn ensure_label_budget(total_intervals: usize, budget: usize, label_kind: &str) -> Result<()> {
if total_intervals > budget {
anyhow::bail!(
"2-hop label budget exceeded during {label_kind} computation: \
{total_intervals} intervals > {budget} budget"
);
}
Ok(())
}
#[allow(clippy::cast_possible_truncation)]
fn flatten_labels(label_data: &[Vec<Interval>], scc_count: usize) -> (Vec<u32>, Vec<Interval>) {
let mut offsets = Vec::with_capacity(scc_count + 1);
let mut flat = Vec::new();
offsets.push(0);
for labels in label_data.iter().take(scc_count) {
flat.extend_from_slice(labels);
offsets.push(flat.len() as u32);
}
(offsets, flat)
}
struct FastBitSet {
words: Vec<u64>,
min_word: usize,
max_word: usize,
}
impl FastBitSet {
fn new(size: usize) -> Self {
let num_words = size.div_ceil(64);
Self {
words: vec![0; num_words],
min_word: num_words,
max_word: 0,
}
}
fn clear(&mut self) {
if self.min_word <= self.max_word {
for i in self.min_word..=self.max_word {
self.words[i] = 0;
}
}
self.min_word = self.words.len();
self.max_word = 0;
}
fn set_range(&mut self, start: u32, end: u32) {
if start >= end {
return;
}
let start = start as usize;
let end = end as usize;
let start_word = start / 64;
let end_word = (end - 1) / 64;
self.min_word = self.min_word.min(start_word);
self.max_word = self.max_word.max(end_word);
if start_word == end_word {
let mask = ((!0u64) << (start % 64)) & ((!0u64) >> (63 - ((end - 1) % 64)));
self.words[start_word] |= mask;
} else {
let start_mask = (!0u64) << (start % 64);
self.words[start_word] |= start_mask;
for i in (start_word + 1)..end_word {
self.words[i] = !0u64;
}
let end_mask = (!0u64) >> (63 - ((end - 1) % 64));
self.words[end_word] |= end_mask;
}
}
fn extract_intervals(&self) -> Vec<Interval> {
let mut intervals = Vec::new();
if self.min_word > self.max_word {
return intervals;
}
let mut in_interval = false;
let mut current_start = 0u32;
for i in self.min_word..=self.max_word {
let word = self.words[i];
let word_start = u32::try_from(i)
.unwrap_or_else(|_| unreachable!("word index exceeds u32 invariant"))
* 64;
if word == 0 {
if in_interval {
intervals.push(Interval::new(current_start, word_start));
in_interval = false;
}
continue;
}
if word == !0u64 {
if !in_interval {
current_start = word_start;
in_interval = true;
}
continue;
}
scan_mixed_word(
word,
word_start,
&mut in_interval,
&mut current_start,
&mut intervals,
);
}
if in_interval {
intervals.push(self.flush_trailing_interval(current_start));
}
intervals
}
fn flush_trailing_interval(&self, current_start: u32) -> Interval {
let last_word = self.words[self.max_word];
let leading_zeros = last_word.leading_zeros();
let end = (u32::try_from(self.max_word)
.unwrap_or_else(|_| unreachable!("word index exceeds u32 invariant"))
+ 1)
* 64
- leading_zeros;
Interval::new(current_start, end)
}
}
fn scan_mixed_word(
word: u64,
word_start: u32,
in_interval: &mut bool,
current_start: &mut u32,
intervals: &mut Vec<Interval>,
) {
let mut bit_offset = 0;
while bit_offset < 64 {
if *in_interval {
let ones = (!(word >> bit_offset)).trailing_zeros();
bit_offset += ones;
if bit_offset < 64 {
intervals.push(Interval::new(*current_start, word_start + bit_offset));
*in_interval = false;
}
continue;
}
let zeros = (word >> bit_offset).trailing_zeros();
bit_offset += zeros;
if bit_offset < 64 {
*current_start = word_start + bit_offset;
*in_interval = true;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fast_bitset_basic() {
let mut bitset = FastBitSet::new(128);
bitset.set_range(1, 4);
bitset.set_range(10, 12);
bitset.set_range(63, 67);
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 3);
assert_eq!(intervals[0], Interval::new(1, 4));
assert_eq!(intervals[1], Interval::new(10, 12));
assert_eq!(intervals[2], Interval::new(63, 67));
}
#[test]
fn test_fast_bitset_overlap() {
let mut bitset = FastBitSet::new(128);
bitset.set_range(1, 10);
bitset.set_range(5, 15);
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 1);
assert_eq!(intervals[0], Interval::new(1, 15));
}
#[test]
fn test_fast_bitset_full_word() {
let mut bitset = FastBitSet::new(128);
bitset.set_range(0, 64);
bitset.set_range(64, 128);
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 1);
assert_eq!(intervals[0], Interval::new(0, 128));
}
#[test]
fn test_fast_bitset_mixed_word() {
let mut bitset = FastBitSet::new(64);
for i in (1..64).step_by(2) {
bitset.set_range(i, i + 1);
}
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 32);
for (i, interval) in intervals.iter().enumerate().take(32) {
#[allow(clippy::cast_possible_truncation)]
let expected = Interval::new((i as u32 * 2) + 1, (i as u32 * 2) + 2);
assert_eq!(*interval, expected);
}
}
#[test]
fn test_fast_bitset_edge_of_capacity() {
let mut bitset = FastBitSet::new(128);
bitset.set_range(127, 128);
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 1);
assert_eq!(intervals[0], Interval::new(127, 128));
let mut bitset = FastBitSet::new(128);
bitset.set_range(60, 128);
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 1);
assert_eq!(intervals[0], Interval::new(60, 128));
let mut bitset = FastBitSet::new(128);
bitset.set_range(0, 128);
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 1);
assert_eq!(intervals[0], Interval::new(0, 128));
}
#[test]
fn test_fast_bitset_clear_reuse() {
let mut bitset = FastBitSet::new(128);
bitset.set_range(0, 64);
let intervals = bitset.extract_intervals();
assert_eq!(intervals[0], Interval::new(0, 64));
bitset.clear();
bitset.set_range(100, 110);
let intervals = bitset.extract_intervals();
assert_eq!(intervals.len(), 1);
assert_eq!(intervals[0], Interval::new(100, 110));
}
#[test]
fn test_merge_intervals_basic() {
let intervals = vec![
Interval::new(1, 3),
Interval::new(5, 8),
Interval::new(2, 6),
];
let merged = merge_intervals(intervals);
assert_eq!(merged.len(), 1);
assert_eq!(merged[0], Interval::new(1, 8));
}
#[test]
fn test_merge_intervals_adjacent() {
let intervals = vec![Interval::new(1, 3), Interval::new(3, 5)];
let merged = merge_intervals(intervals);
assert_eq!(merged.len(), 1);
assert_eq!(merged[0], Interval::new(1, 5));
}
#[test]
fn test_merge_intervals_disjoint() {
let intervals = vec![Interval::new(1, 3), Interval::new(5, 7)];
let merged = merge_intervals(intervals);
assert_eq!(merged.len(), 2);
assert_eq!(merged[0], Interval::new(1, 3));
assert_eq!(merged[1], Interval::new(5, 7));
}
#[test]
fn test_merge_intervals_single() {
let intervals = vec![Interval::new(1, 5)];
let merged = merge_intervals(intervals);
assert_eq!(merged.len(), 1);
assert_eq!(merged[0], Interval::new(1, 5));
}
#[test]
fn test_merge_intervals_empty() {
let intervals: Vec<Interval> = Vec::new();
let merged = merge_intervals(intervals);
assert!(merged.is_empty());
}
use super::super::condensation::CondensationDag;
use super::super::csr::CsrAdjacency;
use super::super::scc::SccData;
use crate::graph::unified::compaction::{CompactionSnapshot, MergedEdge};
use crate::graph::unified::edge::EdgeKind;
use crate::graph::unified::file::FileId;
use crate::graph::unified::node::NodeId;
fn wide_dag_snapshot(n: usize) -> CompactionSnapshot {
let file = FileId::new(0);
let kind = EdgeKind::Calls {
argument_count: 0,
is_async: false,
};
let edges: Vec<MergedEdge> = (1..=n)
.enumerate()
.map(|(i, target)| {
MergedEdge::new(
NodeId::new(0, 0),
#[allow(clippy::cast_possible_truncation)]
NodeId::new(target as u32, 0),
kind.clone(),
(i + 1) as u64,
file,
)
})
.collect();
CompactionSnapshot {
csr_edges: edges,
delta_edges: Vec::new(),
node_count: n + 1,
csr_version: 0,
}
}
#[test]
fn test_wavefront_parallel_path_wide_dag() {
let snapshot = wide_dag_snapshot(600);
let csr = CsrAdjacency::build_from_snapshot(&snapshot).unwrap();
let kind = EdgeKind::Calls {
argument_count: 0,
is_async: false,
};
let scc = SccData::compute_tarjan(&csr, &kind).unwrap();
let dag = CondensationDag::build(&scc, &csr).unwrap();
assert_eq!(
dag.strategy,
super::super::condensation::ReachabilityStrategy::IntervalLabels
);
assert!(!dag.label_out_data.is_empty());
let scc_0 = scc.scc_of(NodeId::new(0, 0)).unwrap();
for i in 1..=600u32 {
let scc_i = scc.scc_of(NodeId::new(i, 0)).unwrap();
assert!(dag.can_reach(scc_0, scc_i), "0 should reach {i}");
assert!(!dag.can_reach(scc_i, scc_0), "{i} should not reach 0");
}
}
#[test]
fn test_wavefront_equivalence_with_sequential() {
let snapshot = wide_dag_snapshot(600);
let csr = CsrAdjacency::build_from_snapshot(&snapshot).unwrap();
let kind = EdgeKind::Calls {
argument_count: 0,
is_async: false,
};
let scc = SccData::compute_tarjan(&csr, &kind).unwrap();
let dag_labels = CondensationDag::build(&scc, &csr).unwrap();
assert_eq!(
dag_labels.strategy,
super::super::condensation::ReachabilityStrategy::IntervalLabels
);
let config_bfs = super::super::condensation::LabelBudgetConfig {
budget_per_kind: 1,
on_exceeded: super::super::condensation::BudgetExceededPolicy::Degrade,
..Default::default()
};
let dag_bfs = CondensationDag::build_with_budget(&scc, &csr, &config_bfs).unwrap();
assert_eq!(
dag_bfs.strategy,
super::super::condensation::ReachabilityStrategy::DagBfs
);
for from in 0..dag_labels.scc_count {
for to in 0..dag_labels.scc_count {
assert_eq!(
dag_labels.can_reach(from, to),
dag_bfs.can_reach(from, to),
"Mismatch for can_reach({from}, {to})"
);
}
}
}
#[test]
fn test_wavefront_budget_exceeded_parallel_path() {
let snapshot = wide_dag_snapshot(600);
let csr = CsrAdjacency::build_from_snapshot(&snapshot).unwrap();
let kind = EdgeKind::Calls {
argument_count: 0,
is_async: false,
};
let scc = SccData::compute_tarjan(&csr, &kind).unwrap();
let config_fail = super::super::condensation::LabelBudgetConfig {
budget_per_kind: 100,
on_exceeded: super::super::condensation::BudgetExceededPolicy::Fail,
density_gate_threshold: 0,
skip_labels: false,
};
let result = CondensationDag::build_with_budget(&scc, &csr, &config_fail);
assert!(result.is_err(), "Should fail with tight budget");
let config_degrade = super::super::condensation::LabelBudgetConfig {
budget_per_kind: 100,
on_exceeded: super::super::condensation::BudgetExceededPolicy::Degrade,
density_gate_threshold: 0,
skip_labels: false,
};
let dag = CondensationDag::build_with_budget(&scc, &csr, &config_degrade).unwrap();
assert_eq!(
dag.strategy,
super::super::condensation::ReachabilityStrategy::DagBfs
);
let scc_0 = scc.scc_of(NodeId::new(0, 0)).unwrap();
let scc_1 = scc.scc_of(NodeId::new(1, 0)).unwrap();
assert!(dag.can_reach(scc_0, scc_1));
}
#[test]
fn test_skip_labels_produces_bfs_strategy() {
let snapshot = wide_dag_snapshot(10);
let csr = CsrAdjacency::build_from_snapshot(&snapshot).unwrap();
let kind = EdgeKind::Calls {
argument_count: 0,
is_async: false,
};
let scc = SccData::compute_tarjan(&csr, &kind).unwrap();
let config = super::super::condensation::LabelBudgetConfig {
skip_labels: true,
..Default::default()
};
let dag = CondensationDag::build_with_budget(&scc, &csr, &config).unwrap();
assert_eq!(
dag.strategy,
super::super::condensation::ReachabilityStrategy::DagBfs
);
assert!(dag.label_out_data.is_empty());
assert!(dag.label_in_data.is_empty());
let scc_0 = scc.scc_of(NodeId::new(0, 0)).unwrap();
let scc_5 = scc.scc_of(NodeId::new(5, 0)).unwrap();
assert!(dag.can_reach(scc_0, scc_5));
assert!(!dag.can_reach(scc_5, scc_0));
}
fn deep_chain_snapshot(n: usize) -> CompactionSnapshot {
let file = FileId::new(0);
let kind = EdgeKind::Calls {
argument_count: 0,
is_async: false,
};
let edges: Vec<MergedEdge> = (0..n)
.map(|i| {
MergedEdge::new(
#[allow(clippy::cast_possible_truncation)]
NodeId::new(i as u32, 0),
#[allow(clippy::cast_possible_truncation)]
NodeId::new((i + 1) as u32, 0),
kind.clone(),
(i + 1) as u64,
file,
)
})
.collect();
CompactionSnapshot {
csr_edges: edges,
delta_edges: Vec::new(),
node_count: n + 1,
csr_version: 0,
}
}
#[test]
#[allow(clippy::similar_names)] fn test_deep_dag_falls_back_to_sequential() {
let snapshot = deep_chain_snapshot(99);
let csr = CsrAdjacency::build_from_snapshot(&snapshot).unwrap();
let kind = EdgeKind::Calls {
argument_count: 0,
is_async: false,
};
let scc = SccData::compute_tarjan(&csr, &kind).unwrap();
let dag = CondensationDag::build(&scc, &csr).unwrap();
let (levels, max_level) = compute_sink_levels(&dag, dag.scc_count as usize);
assert!(max_level > dag.scc_count as usize / 4);
assert!(group_by_level(dag.scc_count as usize, &levels, max_level).is_none());
assert_eq!(
dag.strategy,
super::super::condensation::ReachabilityStrategy::IntervalLabels
);
assert!(!dag.label_out_data.is_empty());
let scc_0 = scc.scc_of(NodeId::new(0, 0)).unwrap();
let scc_50 = scc.scc_of(NodeId::new(50, 0)).unwrap();
let scc_99 = scc.scc_of(NodeId::new(99, 0)).unwrap();
assert!(dag.can_reach(scc_0, scc_50));
assert!(dag.can_reach(scc_0, scc_99));
assert!(dag.can_reach(scc_50, scc_99));
assert!(!dag.can_reach(scc_99, scc_0));
assert!(!dag.can_reach(scc_50, scc_0));
}
}