vyre-self-substrate 0.6.3

Vyre self-substrate: vyre using its own primitives on its own scheduler problems. The recursion-thesis layer between vyre-primitives and vyre-driver.
Documentation
use super::super::*;
use super::support::RecordingResidentDispatcher;
use crate::graph::csr_frontier_queue_scratch::STRIDED_FORWARD_MIN_ROW_DEGREE;

#[test]
fn sparse_queue_step_accepts_csr_only_resident_graph() {
    let dispatcher = RecordingResidentDispatcher::default();
    let node_count = 4u32;
    let edge_offsets = [0, 1, 1, 1, 1];
    let edge_targets = [2];
    let edge_kind_mask = [1];
    let graph = upload_resident_adaptive_sparse_queue_graph(
        &dispatcher,
        node_count,
        &edge_offsets,
        &edge_targets,
        &edge_kind_mask,
    )
    .expect("Fix: CSR-only adaptive sparse-queue graph upload should accept canonical CSR");
    let graph_handles = graph.handles();
    let mut scratch = AdaptiveTraversalResidentScratch::default();
    let frontier_in = [1u32];
    let mut frontier_out = Vec::new();

    adaptive_traverse_resident_sparse_queue_step_with_scratch_into(
        &dispatcher,
        &graph,
        &frontier_in,
        1,
        &mut scratch,
        &mut frontier_out,
    )
    .expect("Fix: CSR-only adaptive sparse-queue resident step should dispatch");

    let scratch_handles = scratch
        .handles
        .expect("Fix: sparse queue step should allocate frontier scratch");
    let queue_handle = scratch
        .queue_handle
        .expect("Fix: sparse queue step should allocate active queue");
    let steps = dispatcher.last_step_handles();
    assert_eq!(steps.len(), 3);
    assert_eq!(
        steps[2],
        vec![
            queue_handle,
            scratch_handles[2],
            graph_handles[0],
            graph_handles[1],
            graph_handles[2],
            scratch_handles[1],
        ],
        "CSR-only sparse queue traversal must bind only CSR graph handles"
    );
    assert_eq!(frontier_out, vec![0]);
}

#[test]
fn sparse_queue_step_sizes_active_queue_from_frontier_popcount() {
    let dispatcher = RecordingResidentDispatcher::default();
    let node_count = 8_000u32;
    let words = vyre_primitives::bitset::bitset_words(node_count) as usize;
    let graph = ResidentAdaptiveTraversalGraph {
        node_count,
        edge_count: 0,
        max_row_degree: 0,
        high_degree_source_count: 0,
        words,
        layout_hash: 7,
        handles: [101, 102, 103, 104],
    };
    let mut scratch = AdaptiveTraversalResidentScratch::default();
    let mut frontier_in = vec![0u32; words];
    frontier_in[0] = 1;
    let mut frontier_out = Vec::new();

    adaptive_traverse_resident_graph_sparse_queue_step_with_scratch_into(
        &dispatcher,
        &graph,
        &frontier_in,
        u32::MAX,
        &mut scratch,
        &mut frontier_out,
    )
    .expect("Fix: recording dispatcher should complete sparse queue traversal");

    assert_eq!(
        scratch.queue_bytes,
        std::mem::size_of::<u32>(),
        "single-source frontier must not allocate a graph-sized active queue"
    );
    assert_eq!(
        dispatcher.resident_alloc_lengths().last().copied(),
        Some(std::mem::size_of::<u32>()),
        "active queue allocation should be sized from frontier popcount"
    );
    assert_eq!(frontier_out, vec![0; words]);
}

#[test]
fn sparse_queue_step_reuses_larger_queue_scratch_for_smaller_frontier() {
    let dispatcher = RecordingResidentDispatcher::default();
    let node_count = 4096u32;
    let words = vyre_primitives::bitset::bitset_words(node_count) as usize;
    let graph = ResidentAdaptiveTraversalGraph {
        node_count,
        edge_count: 0,
        max_row_degree: 0,
        high_degree_source_count: 0,
        words,
        layout_hash: 11,
        handles: [101, 102, 103, 104],
    };
    let mut scratch = AdaptiveTraversalResidentScratch::default();
    let mut larger_frontier = vec![0u32; words];
    for node in 0..300u32 {
        larger_frontier[(node / 32) as usize] |= 1 << (node % 32);
    }
    let mut frontier_out = Vec::new();

    adaptive_traverse_resident_graph_sparse_queue_step_with_scratch_into(
        &dispatcher,
        &graph,
        &larger_frontier,
        u32::MAX,
        &mut scratch,
        &mut frontier_out,
    )
    .expect("Fix: recording dispatcher should complete larger sparse queue traversal");

    let queue_handle = scratch
        .queue_handle
        .expect("Fix: sparse queue step must allocate active queue");
    assert_eq!(scratch.queue_bytes, 512 * std::mem::size_of::<u32>());
    let allocs_after_large = dispatcher.alloc_count.get();
    let mut single_frontier = vec![0u32; words];
    single_frontier[0] = 1;

    adaptive_traverse_resident_graph_sparse_queue_step_with_scratch_into(
        &dispatcher,
        &graph,
        &single_frontier,
        u32::MAX,
        &mut scratch,
        &mut frontier_out,
    )
    .expect("Fix: recording dispatcher should complete smaller sparse queue traversal");

    assert_eq!(scratch.queue_handle, Some(queue_handle));
    assert_eq!(
        scratch.queue_bytes,
        512 * std::mem::size_of::<u32>(),
        "scratch should keep the larger reusable queue buffer instead of shrinking"
    );
    assert_eq!(
        dispatcher.alloc_count.get(),
        allocs_after_large,
        "smaller frontiers should reuse the existing resident queue allocation"
    );
    assert!(
        dispatcher.freed.borrow().is_empty(),
        "smaller frontiers should not free and reallocate resident queue scratch"
    );
}

#[test]
fn skewed_high_degree_sparse_queue_step_uses_bounded_split_queue() {
    let dispatcher = RecordingResidentDispatcher::default();
    let node_count = 2048u32;
    let words = vyre_primitives::bitset::bitset_words(node_count) as usize;
    let graph = ResidentAdaptiveTraversalGraph {
        node_count,
        edge_count: STRIDED_FORWARD_MIN_ROW_DEGREE,
        max_row_degree: STRIDED_FORWARD_MIN_ROW_DEGREE,
        high_degree_source_count: 1,
        words,
        layout_hash: 13,
        handles: [101, 102, 103, 104],
    };
    let mut scratch = AdaptiveTraversalResidentScratch::default();
    let mut frontier_in = vec![0u32; words];
    for node in 0..9u32 {
        frontier_in[(node / 32) as usize] |= 1 << (node % 32);
    }
    let mut frontier_out = Vec::new();

    adaptive_traverse_resident_graph_sparse_queue_step_with_scratch_into(
        &dispatcher,
        &graph,
        &frontier_in,
        u32::MAX,
        &mut scratch,
        &mut frontier_out,
    )
    .expect("Fix: recording dispatcher should complete high-degree sparse queue traversal");

    assert_eq!(scratch.queue_bytes, 16 * std::mem::size_of::<u32>());
    assert_eq!(
        scratch.high_queue_bytes,
        std::mem::size_of::<u32>(),
        "a graph with one possible hub must not launch a strided team for every active source"
    );
    let high_queue = scratch
        .high_queue_handle
        .expect("Fix: mixed split traversal should allocate a high-degree queue");
    let high_len = scratch
        .high_len_handle
        .expect("Fix: mixed split traversal should allocate a high-degree queue length");
    let scratch_handles = scratch
        .handles
        .expect("Fix: mixed split traversal should allocate frontier scratch");
    let active_queue = scratch
        .queue_handle
        .expect("Fix: mixed split traversal should allocate active queue scratch");
    let steps = dispatcher.last_step_handles();
    assert_eq!(
        steps.len(),
        5,
        "skewed sparse queue traversal should materialize active sources, clear high_len, split low/high rows, then traverse only bounded high rows"
    );
    assert_eq!(
        steps[3],
        vec![
            active_queue,
            scratch_handles[2],
            graph.handles[0],
            graph.handles[1],
            graph.handles[2],
            scratch_handles[1],
            high_queue,
            high_len,
        ],
        "split-low pass must read the active queue and write only bounded high-row scratch"
    );
    assert_eq!(
        steps[4],
        vec![
            high_queue,
            high_len,
            graph.handles[0],
            graph.handles[1],
            graph.handles[2],
            scratch_handles[1],
        ],
        "strided follow-up must consume the bounded high-row queue, not the whole active queue"
    );
    assert_eq!(
        dispatcher.last_step_grids()[4],
        Some(vyre_primitives::graph::csr_queue_strided::csr_queue_strided_forward_dispatch_grid(1)),
        "skewed high-degree sparse queue traversal must launch row-strided teams only for the graph-wide high-row bound"
    );
}

#[test]
fn single_superhub_csr_only_sparse_queue_sizes_split_queue_from_high_row_count() {
    let dispatcher = RecordingResidentDispatcher::default();
    let node_count = 2048u32;
    let words = vyre_primitives::bitset::bitset_words(node_count) as usize;
    let hub_degree = STRIDED_FORWARD_MIN_ROW_DEGREE * 9;
    let mut edge_offsets = vec![hub_degree; node_count as usize + 1];
    edge_offsets[0] = 0;
    let edge_targets = vec![1u32; hub_degree as usize];
    let edge_kind_mask = vec![1u32; hub_degree as usize];
    let graph = upload_resident_adaptive_sparse_queue_graph(
        &dispatcher,
        node_count,
        &edge_offsets,
        &edge_targets,
        &edge_kind_mask,
    )
    .expect("Fix: CSR-only adaptive sparse queue graph should accept a one-superhub graph");
    let mut scratch = AdaptiveTraversalResidentScratch::default();
    let mut frontier_in = vec![0u32; words];
    for node in 0..9u32 {
        frontier_in[(node / 32) as usize] |= 1 << (node % 32);
    }
    let mut frontier_out = Vec::new();

    adaptive_traverse_resident_sparse_queue_step_with_scratch_into(
        &dispatcher,
        &graph,
        &frontier_in,
        u32::MAX,
        &mut scratch,
        &mut frontier_out,
    )
    .expect("Fix: CSR-only adaptive sparse queue step should complete one-superhub traversal");

    assert_eq!(graph.high_degree_source_count(), 1);
    assert_eq!(scratch.queue_bytes, 16 * std::mem::size_of::<u32>());
    assert_eq!(
        scratch.high_queue_bytes,
        std::mem::size_of::<u32>(),
        "one enormous row should allocate one adaptive high-row slot, not edge_count / threshold slots"
    );
    assert_eq!(
        dispatcher.last_step_grids()[4],
        Some(vyre_primitives::graph::csr_queue_strided::csr_queue_strided_forward_dispatch_grid(1)),
        "one-superhub adaptive traversal must launch only one row-strided team"
    );
}

#[test]
fn uniformly_high_degree_sparse_queue_step_keeps_global_strided_consumer() {
    let dispatcher = RecordingResidentDispatcher::default();
    let node_count = 2048u32;
    let words = vyre_primitives::bitset::bitset_words(node_count) as usize;
    let queue_slots = 16u32;
    let graph = ResidentAdaptiveTraversalGraph {
        node_count,
        edge_count: STRIDED_FORWARD_MIN_ROW_DEGREE * queue_slots,
        max_row_degree: STRIDED_FORWARD_MIN_ROW_DEGREE,
        high_degree_source_count: queue_slots,
        words,
        layout_hash: 17,
        handles: [101, 102, 103, 104],
    };
    let mut scratch = AdaptiveTraversalResidentScratch::default();
    let mut frontier_in = vec![0u32; words];
    for node in 0..9u32 {
        frontier_in[(node / 32) as usize] |= 1 << (node % 32);
    }
    let mut frontier_out = Vec::new();

    adaptive_traverse_resident_graph_sparse_queue_step_with_scratch_into(
        &dispatcher,
        &graph,
        &frontier_in,
        u32::MAX,
        &mut scratch,
        &mut frontier_out,
    )
    .expect("Fix: recording dispatcher should complete uniformly high-degree traversal");

    assert!(scratch.high_queue_handle.is_none());
    assert!(scratch.high_len_handle.is_none());
    assert_eq!(
        scratch.queue_bytes,
        queue_slots as usize * std::mem::size_of::<u32>()
    );
    assert_eq!(dispatcher.last_step_handles().len(), 3);
    assert_eq!(
        dispatcher.last_step_grids()[2],
        Some(vyre_primitives::graph::csr_queue_strided::csr_queue_strided_forward_dispatch_grid(
            queue_slots
        )),
        "uniformly high-degree sparse queue traversal should keep the single row-strided consumer"
    );
}