vyre-self-substrate 0.6.3

Vyre self-substrate: vyre using its own primitives on its own scheduler problems. The recursion-thesis layer between vyre-primitives and vyre-driver.
Documentation
use super::super::*;
use super::support::RecordingBatchDispatcher;
use crate::csr_frontier_queue_resident::upload_resident_csr_queue_graph;
use crate::graph::csr_frontier_queue_scratch::STRIDED_FORWARD_MIN_ROW_DEGREE;

#[test]
fn budgeted_batch_memory_plan_uses_effective_queue_capacity() {
    let dispatcher = RecordingBatchDispatcher::default();
    let node_count = 4096u32;
    let edge_offsets = vec![0u32; node_count as usize + 1];
    let graph = upload_resident_csr_queue_graph(&dispatcher, node_count, &edge_offsets, &[], &[])
        .expect("Fix: zero-edge resident CSR graph is valid");
    let words = vyre_primitives::bitset::bitset_words(node_count) as usize;
    let mut one = vec![0u32; words];
    one[0] = 1;
    let frontiers: [&[u32]; 4] = [&one, &one, &one, &one];
    let mut scratch = ResidentCsrQueueBatchScratch::default();
    let mut outputs = Vec::new();

    let plan = run_resident_csr_queue_batch_budgeted_into(
        &dispatcher,
        &graph,
        &mut scratch,
        &frontiers,
        node_count,
        u32::MAX,
        2 * (words * std::mem::size_of::<u32>() * 2 + 2 * std::mem::size_of::<u32>()),
        &mut outputs,
    )
    .expect("Fix: sparse frontiers should fit a budget that graph-sized queues would exceed");

    assert_eq!(plan.query_count, frontiers.len());
    assert_eq!(
        plan.bytes_per_query,
        words * std::mem::size_of::<u32>() * 2 + 2 * std::mem::size_of::<u32>()
    );
    assert_eq!(plan.max_queries_per_dispatch, 2);
    assert_eq!(plan.dispatch_batches, 2);
    assert_eq!(
        outputs,
        vec![vec![0; words * std::mem::size_of::<u32>()]; 4]
    );
}

#[test]
fn budgeted_batch_memory_plan_accounts_for_split_high_queue_scratch() {
    let dispatcher = RecordingBatchDispatcher::default();
    let node_count = 16u32;
    let mut edge_offsets = vec![0u32; node_count as usize + 1];
    for offset in edge_offsets.iter_mut().skip(1) {
        *offset = STRIDED_FORWARD_MIN_ROW_DEGREE;
    }
    let edge_targets = vec![1u32; STRIDED_FORWARD_MIN_ROW_DEGREE as usize];
    let edge_kind_mask = vec![1u32; STRIDED_FORWARD_MIN_ROW_DEGREE as usize];
    let graph = upload_resident_csr_queue_graph(
        &dispatcher,
        node_count,
        &edge_offsets,
        &edge_targets,
        &edge_kind_mask,
    )
    .expect("Fix: skewed high-degree resident CSR graph is valid");
    let first = [0x1ffu32];
    let second = [0x1ffu32];
    let frontiers: [&[u32]; 2] = [&first, &second];
    let mut scratch = ResidentCsrQueueBatchScratch::default();
    let mut outputs = Vec::new();

    let plan = run_resident_csr_queue_batch_budgeted_into(
        &dispatcher,
        &graph,
        &mut scratch,
        &frontiers,
        1024,
        u32::MAX,
        152,
        &mut outputs,
    )
    .expect("Fix: split high-row scratch should fit one query per dispatch under this budget");

    assert_eq!(
        plan.bytes_per_query, 84,
        "budgeted split batches must count frontier, active queue, queue_len, frontier_out, high_queue, and high_len"
    );
    assert_eq!(plan.max_queries_per_dispatch, 1);
    assert_eq!(plan.dispatch_batches, 2);
    assert_eq!(plan.peak_batch_scratch_bytes, 84);
}

#[test]
fn budgeted_batch_packs_sparse_runs_around_dense_outlier() {
    let dispatcher = RecordingBatchDispatcher::default();
    let node_count = 4096u32;
    let edge_offsets = vec![0u32; node_count as usize + 1];
    let graph = upload_resident_csr_queue_graph(&dispatcher, node_count, &edge_offsets, &[], &[])
        .expect("Fix: zero-edge resident CSR graph is valid");
    let words = vyre_primitives::bitset::bitset_words(node_count) as usize;
    let mut sparse = vec![0u32; words];
    sparse[0] = 1;
    let dense = vec![u32::MAX; words];
    let frontiers: [&[u32]; 7] = [&sparse, &sparse, &sparse, &dense, &sparse, &sparse, &sparse];
    let mut scratch = ResidentCsrQueueBatchScratch::default();
    let mut outputs = Vec::new();
    let dense_bytes_per_query =
        words * std::mem::size_of::<u32>() * 2 + node_count as usize * 4 + 4;

    let plan = run_resident_csr_queue_batch_budgeted_into(
        &dispatcher,
        &graph,
        &mut scratch,
        &frontiers,
        node_count,
        u32::MAX,
        dense_bytes_per_query,
        &mut outputs,
    )
    .expect("Fix: sparse runs should pack into large chunks around one dense outlier");

    assert_eq!(plan.query_count, frontiers.len());
    assert_eq!(
        plan.max_queries_per_dispatch, 3,
        "sparse runs should not inherit the dense outlier's graph-sized queue capacity"
    );
    assert_eq!(plan.dispatch_batches, 3);
    assert_eq!(plan.bytes_per_query, dense_bytes_per_query);
    assert_eq!(plan.peak_batch_scratch_bytes, dense_bytes_per_query);
    assert_eq!(
        dispatcher
            .upload_handles
            .borrow()
            .iter()
            .map(Vec::len)
            .collect::<Vec<_>>(),
        vec![3, 1, 3],
        "budgeted dispatch should preserve order while packing sparse chunks on both sides of the dense frontier"
    );
    assert_eq!(
        outputs,
        vec![vec![0; words * std::mem::size_of::<u32>()]; frontiers.len()]
    );
}