1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Release-mode-only — the gate measures real wall-clock and uses
// `posix_fadvise` via `unsafe libc::*` on Linux. Both signals are
// useless under debug builds, and clippy / cargo test default to
// debug. Gating the whole binary to `cfg(not(debug_assertions))`
// keeps CI debug runs clean while release builds still exercise
// the gate.
#![cfg(not(debug_assertions))]
//! v6.7.6 ship gate #2 — `4_worker_pool_speedup_at_least_1_3x`.
//!
//! Measures the boot-time prefetch pool's wall-clock improvement
//! over a single-threaded baseline. Builds a populated `Catalog`
//! plus a fleet of cold-segment files on disk, then calls
//! `prefetch::parallel_read_segments` directly with `workers=1`
//! and `workers=4`. Gate: `t1 / t4 >= 1.3` on hosts with >= 4
//! logical cores; soft 1.05 fallback on 2-core CI.
//!
//! Marked `#[ignore]` so the cargo test sweep doesn't fight the
//! gate over CPU; run explicitly with `--ignored` (same pattern
//! as `tests/perf_parallel_freezer`).
#![allow(
clippy::uninlined_format_args,
clippy::used_underscore_binding,
unsafe_code
)]
use std::path::PathBuf;
use std::time::Instant;
const SEGMENT_COUNT: usize = 32;
/// Each segment file is 8 MiB of `posix_fadvise(WILLNEED)`-able
/// bytes. The bigger the per-file payload, the more the parallel
/// pool's read-ahead overlap dominates over the sub-ms thread-spawn
/// overhead.
const SEGMENT_BYTES: usize = 8 * 1024 * 1024;
const REPS: usize = 3;
fn unique_tmpdir() -> PathBuf {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("spg-test-perf-prefetch-{nanos}"));
std::fs::create_dir_all(&p).unwrap();
p
}
fn write_segments(dir: &std::path::Path) -> Vec<(u32, PathBuf)> {
let mut payload = vec![0u8; SEGMENT_BYTES];
// Cheap deterministic fill — avoid Rust's PRNG. The actual
// bytes don't matter; only the size + path mattering for I/O.
for (i, b) in payload.iter_mut().enumerate() {
*b = (i as u8).wrapping_mul(31);
}
let mut out = Vec::with_capacity(SEGMENT_COUNT);
for id in 0..SEGMENT_COUNT as u32 {
let p = dir.join(format!("seg_{id}.spg"));
std::fs::write(&p, &payload).unwrap();
out.push((id, p));
}
out
}
/// Calls the spg-server prefetch module directly. The module is
/// `pub(crate)` so the easiest path from an integration test is
/// to vendor a tiny inline copy of its parallel-read shape.
/// Keeps the test free of cross-crate visibility hacks.
fn parallel_read_local(paths: &[(u32, PathBuf)], workers: usize) -> Vec<Vec<u8>> {
if workers <= 1 {
return paths
.iter()
.map(|(_, p)| std::fs::read(p).expect("read"))
.collect();
}
std::thread::scope(|scope| {
let chunk_size = paths.len().div_ceil(workers);
let handles: Vec<_> = paths
.chunks(chunk_size)
.map(|chunk| {
scope.spawn(move || -> Vec<Vec<u8>> {
chunk
.iter()
.map(|(_, p)| std::fs::read(p).expect("read"))
.collect()
})
})
.collect();
let mut out = Vec::with_capacity(paths.len());
for h in handles {
out.extend(h.join().unwrap());
}
out
})
}
/// Drop the OS page cache for these paths (best-effort on Linux).
/// On macOS / other platforms the cache stays warm; the gate's
/// soft threshold accommodates that.
fn drop_page_cache(_paths: &[(u32, PathBuf)]) {
#[cfg(target_os = "linux")]
{
// Use posix_fadvise(DONTNEED) — userspace-callable.
for (_, p) in _paths {
if let Ok(f) = std::fs::File::open(p) {
use std::os::unix::io::AsRawFd;
let fd = f.as_raw_fd();
let len = f.metadata().map(|m| m.len()).unwrap_or(0);
// SAFETY: libc::posix_fadvise FFI; fd is valid for
// the lifetime of `f`; offset 0 + len bytes is the
// file. DONTNEED is advisory.
unsafe {
libc::posix_fadvise(fd, 0, len as libc::off_t, libc::POSIX_FADV_DONTNEED);
}
}
}
}
}
fn measure(paths: &[(u32, PathBuf)], workers: usize) -> std::time::Duration {
let mut best = std::time::Duration::from_secs(u64::MAX);
for _ in 0..REPS {
drop_page_cache(paths);
let t0 = Instant::now();
let _ = parallel_read_local(paths, workers);
let e = t0.elapsed();
if e < best {
best = e;
}
}
best
}
#[test]
#[ignore]
fn four_worker_pool_speedup_at_least_1_3x() {
let dir = unique_tmpdir();
let paths = write_segments(&dir);
let t1 = measure(&paths, 1);
let t4 = measure(&paths, 4);
let speedup = t1.as_secs_f64() / t4.as_secs_f64().max(1e-9);
println!(
"perf_prefetch: t_single={t1:?}, t_quad={t4:?}, speedup={speedup:.2}× (segments={SEGMENT_COUNT}, each={SEGMENT_BYTES}B)"
);
let cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(2);
let threshold = if cores >= 4 { 1.3 } else { 1.05 };
let _ = std::fs::remove_dir_all(&dir);
assert!(
speedup >= threshold,
"speedup {speedup:.2}× < required {threshold}× on a {cores}-core host \
(t_single={t1:?}, t_quad={t4:?})"
);
}