spg-server 7.9.28

SPG daemon — listens for self-built wire-frame connections and PG-wire (libpq-compatible).
//! v6.7.6 ship gate #2 — `4_worker_pool_speedup_at_least_1_3x`.
//!
//! Measures the boot-time prefetch pool's wall-clock improvement
//! over a single-threaded baseline. Builds a populated `Catalog`
//! plus a fleet of cold-segment files on disk, then calls
//! `prefetch::parallel_read_segments` directly with `workers=1`
//! and `workers=4`. Gate: `t1 / t4 >= 1.3` on hosts with >= 4
//! logical cores; soft 1.05 fallback on 2-core CI.
//!
//! Marked `#[ignore]` so the cargo test sweep doesn't fight the
//! gate over CPU; run explicitly with `--ignored` (same pattern
//! as `tests/perf_parallel_freezer`).

#![allow(clippy::uninlined_format_args)]

use std::path::PathBuf;
use std::time::Instant;

const SEGMENT_COUNT: usize = 32;
/// Each segment file is 8 MiB of `posix_fadvise(WILLNEED)`-able
/// bytes. The bigger the per-file payload, the more the parallel
/// pool's read-ahead overlap dominates over the sub-ms thread-spawn
/// overhead.
const SEGMENT_BYTES: usize = 8 * 1024 * 1024;
const REPS: usize = 3;

fn unique_tmpdir() -> PathBuf {
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_nanos();
    let p = std::env::temp_dir().join(format!("spg-test-perf-prefetch-{nanos}"));
    std::fs::create_dir_all(&p).unwrap();
    p
}

fn write_segments(dir: &std::path::Path) -> Vec<(u32, PathBuf)> {
    let mut payload = vec![0u8; SEGMENT_BYTES];
    // Cheap deterministic fill — avoid Rust's PRNG. The actual
    // bytes don't matter; only the size + path mattering for I/O.
    for (i, b) in payload.iter_mut().enumerate() {
        *b = (i as u8).wrapping_mul(31);
    }
    let mut out = Vec::with_capacity(SEGMENT_COUNT);
    for id in 0..SEGMENT_COUNT as u32 {
        let p = dir.join(format!("seg_{id}.spg"));
        std::fs::write(&p, &payload).unwrap();
        out.push((id, p));
    }
    out
}

/// Calls the spg-server prefetch module directly. The module is
/// `pub(crate)` so the easiest path from an integration test is
/// to vendor a tiny inline copy of its parallel-read shape.
/// Keeps the test free of cross-crate visibility hacks.
fn parallel_read_local(paths: &[(u32, PathBuf)], workers: usize) -> Vec<Vec<u8>> {
    if workers <= 1 {
        return paths
            .iter()
            .map(|(_, p)| std::fs::read(p).expect("read"))
            .collect();
    }
    std::thread::scope(|scope| {
        let chunk_size = paths.len().div_ceil(workers);
        let handles: Vec<_> = paths
            .chunks(chunk_size)
            .map(|chunk| {
                scope.spawn(move || -> Vec<Vec<u8>> {
                    chunk
                        .iter()
                        .map(|(_, p)| std::fs::read(p).expect("read"))
                        .collect()
                })
            })
            .collect();
        let mut out = Vec::with_capacity(paths.len());
        for h in handles {
            out.extend(h.join().unwrap());
        }
        out
    })
}

/// Drop the OS page cache for these paths (best-effort on Linux).
/// On macOS / other platforms the cache stays warm; the gate's
/// soft threshold accommodates that.
fn drop_page_cache(_paths: &[(u32, PathBuf)]) {
    #[cfg(target_os = "linux")]
    {
        // Use posix_fadvise(DONTNEED) — userspace-callable.
        for (_, p) in _paths {
            if let Ok(f) = std::fs::File::open(p) {
                use std::os::unix::io::AsRawFd;
                let fd = f.as_raw_fd();
                let len = f.metadata().map(|m| m.len()).unwrap_or(0);
                // SAFETY: libc::posix_fadvise FFI; fd is valid for
                // the lifetime of `f`; offset 0 + len bytes is the
                // file. DONTNEED is advisory.
                unsafe {
                    libc::posix_fadvise(fd, 0, len as libc::off_t, libc::POSIX_FADV_DONTNEED);
                }
            }
        }
    }
}

fn measure(paths: &[(u32, PathBuf)], workers: usize) -> std::time::Duration {
    let mut best = std::time::Duration::from_secs(u64::MAX);
    for _ in 0..REPS {
        drop_page_cache(paths);
        let t0 = Instant::now();
        let _ = parallel_read_local(paths, workers);
        let e = t0.elapsed();
        if e < best {
            best = e;
        }
    }
    best
}

#[test]
#[ignore]
fn four_worker_pool_speedup_at_least_1_3x() {
    let dir = unique_tmpdir();
    let paths = write_segments(&dir);

    let t1 = measure(&paths, 1);
    let t4 = measure(&paths, 4);
    let speedup = t1.as_secs_f64() / t4.as_secs_f64().max(1e-9);
    println!(
        "perf_prefetch: t_single={t1:?}, t_quad={t4:?}, speedup={speedup:.2}× (segments={SEGMENT_COUNT}, each={SEGMENT_BYTES}B)"
    );

    let cores = std::thread::available_parallelism()
        .map(|n| n.get())
        .unwrap_or(2);
    let threshold = if cores >= 4 { 1.3 } else { 1.05 };
    let _ = std::fs::remove_dir_all(&dir);
    assert!(
        speedup >= threshold,
        "speedup {speedup:.2}× < required {threshold}× on a {cores}-core host \
         (t_single={t1:?}, t_quad={t4:?})"
    );
}