use crate::source::{EntropySource, Platform, SourceCategory, SourceInfo};
use crate::sources::helpers::{extract_timing_entropy, mach_time};
#[derive(Debug, Clone)]
pub struct PipeBufferConfig {
pub num_pipes: usize,
pub min_write_size: usize,
pub max_write_size: usize,
pub non_blocking: bool,
}
impl Default for PipeBufferConfig {
fn default() -> Self {
Self {
num_pipes: 4,
min_write_size: 1,
max_write_size: 4096,
non_blocking: true,
}
}
}
#[derive(Default)]
pub struct PipeBufferSource {
pub config: PipeBufferConfig,
}
static PIPE_BUFFER_INFO: SourceInfo = SourceInfo {
name: "pipe_buffer",
description: "Multi-pipe kernel zone allocator competition and buffer timing jitter",
physics: "Creates multiple pipes simultaneously, writes variable-size data, reads it back, \
and closes — measuring contention in the kernel zone allocator. Multiple pipes \
compete for pipe zone and mbuf allocations, creating cross-CPU magazine transfer \
contention. Variable write sizes exercise different mbuf paths. Non-blocking mode \
captures EAGAIN timing on different kernel failure paths. Zone allocator timing \
depends on zone fragmentation, magazine layer state, and cross-CPU transfers.",
category: SourceCategory::IPC,
platform: Platform::MacOS,
requirements: &[],
entropy_rate_estimate: 1.5,
composite: false,
is_fast: true,
};
impl EntropySource for PipeBufferSource {
fn info(&self) -> &SourceInfo {
&PIPE_BUFFER_INFO
}
fn is_available(&self) -> bool {
cfg!(target_os = "macos")
}
fn collect(&self, n_samples: usize) -> Vec<u8> {
let raw_count = n_samples * 4 + 64;
let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
let mut lcg: u64 = mach_time() | 1;
let num_pipes = self.config.num_pipes.max(1);
let min_size = self.config.min_write_size.max(1);
let max_size = self.config.max_write_size.max(min_size);
let mut pipe_pool: Vec<[i32; 2]> = Vec::new();
for _ in 0..num_pipes {
let mut fds: [i32; 2] = [0; 2];
let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
if ret == 0 {
if self.config.non_blocking {
unsafe {
let flags = libc::fcntl(fds[1], libc::F_GETFL);
if flags >= 0 {
libc::fcntl(fds[1], libc::F_SETFL, flags | libc::O_NONBLOCK);
}
}
}
pipe_pool.push(fds);
}
}
if pipe_pool.is_empty() {
return self.collect_single_pipe(n_samples);
}
for i in 0..raw_count {
lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
let write_size = if min_size == max_size {
min_size
} else {
min_size + (lcg >> 48) as usize % (max_size - min_size + 1)
};
let write_data = vec![0xBEu8; write_size];
let mut read_buf = vec![0u8; write_size];
let pipe_idx = i % pipe_pool.len();
let fds = pipe_pool[pipe_idx];
let t0 = mach_time();
unsafe {
let written = libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
if written > 0 {
libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, written as usize);
}
}
let t1 = mach_time();
std::hint::black_box(&read_buf);
timings.push(t1.wrapping_sub(t0));
if i % 8 == 0 {
let mut extra_fds: [i32; 2] = [0; 2];
let ret = unsafe { libc::pipe(extra_fds.as_mut_ptr()) };
if ret == 0 {
unsafe {
libc::close(extra_fds[0]);
libc::close(extra_fds[1]);
}
}
}
}
for fds in &pipe_pool {
unsafe {
libc::close(fds[0]);
libc::close(fds[1]);
}
}
extract_timing_entropy(&timings, n_samples)
}
}
impl PipeBufferSource {
pub(crate) fn collect_single_pipe(&self, n_samples: usize) -> Vec<u8> {
let raw_count = n_samples * 4 + 64;
let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
let mut lcg: u64 = mach_time() | 1;
for _ in 0..raw_count {
lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
let write_size = 1 + (lcg >> 48) as usize % 256;
let write_data = vec![0xBEu8; write_size];
let mut read_buf = vec![0u8; write_size];
let mut fds: [i32; 2] = [0; 2];
let t0 = mach_time();
let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
if ret != 0 {
continue;
}
unsafe {
libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, write_size);
libc::close(fds[0]);
libc::close(fds[1]);
}
let t1 = mach_time();
std::hint::black_box(&read_buf);
timings.push(t1.wrapping_sub(t0));
}
extract_timing_entropy(&timings, n_samples)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn info() {
let src = PipeBufferSource::default();
assert_eq!(src.name(), "pipe_buffer");
assert_eq!(src.info().category, SourceCategory::IPC);
assert!(!src.info().composite);
}
#[test]
fn default_config() {
let config = PipeBufferConfig::default();
assert_eq!(config.num_pipes, 4);
assert_eq!(config.min_write_size, 1);
assert_eq!(config.max_write_size, 4096);
assert!(config.non_blocking);
}
#[test]
fn custom_config() {
let src = PipeBufferSource {
config: PipeBufferConfig {
num_pipes: 8,
min_write_size: 64,
max_write_size: 1024,
non_blocking: false,
},
};
assert_eq!(src.config.num_pipes, 8);
}
#[test]
#[ignore] fn collects_bytes() {
let src = PipeBufferSource::default();
if src.is_available() {
let data = src.collect(64);
assert!(!data.is_empty());
assert!(data.len() <= 64);
}
}
#[test]
#[ignore] fn single_pipe_mode() {
let src = PipeBufferSource {
config: PipeBufferConfig {
num_pipes: 0,
..PipeBufferConfig::default()
},
};
if src.is_available() {
assert!(!src.collect_single_pipe(64).is_empty());
}
}
}