openentropy_core/sources/ipc/
pipe_buffer.rs1use crate::source::{EntropySource, Platform, SourceCategory, SourceInfo};
4use crate::sources::helpers::{extract_timing_entropy, mach_time};
5
6#[derive(Debug, Clone)]
19pub struct PipeBufferConfig {
20 pub num_pipes: usize,
28
29 pub min_write_size: usize,
36
37 pub max_write_size: usize,
44
45 pub non_blocking: bool,
53}
54
55impl Default for PipeBufferConfig {
56 fn default() -> Self {
57 Self {
58 num_pipes: 4,
59 min_write_size: 1,
60 max_write_size: 4096,
61 non_blocking: true,
62 }
63 }
64}
65
66#[derive(Default)]
98pub struct PipeBufferSource {
99 pub config: PipeBufferConfig,
101}
102
103static PIPE_BUFFER_INFO: SourceInfo = SourceInfo {
104 name: "pipe_buffer",
105 description: "Multi-pipe kernel zone allocator competition and buffer timing jitter",
106 physics: "Creates multiple pipes simultaneously, writes variable-size data, reads it back, \
107 and closes — measuring contention in the kernel zone allocator. Multiple pipes \
108 compete for pipe zone and mbuf allocations, creating cross-CPU magazine transfer \
109 contention. Variable write sizes exercise different mbuf paths. Non-blocking mode \
110 captures EAGAIN timing on different kernel failure paths. Zone allocator timing \
111 depends on zone fragmentation, magazine layer state, and cross-CPU transfers.",
112 category: SourceCategory::IPC,
113 platform: Platform::MacOS,
114 requirements: &[],
115 entropy_rate_estimate: 1.5,
116 composite: false,
117 is_fast: true,
118};
119
120impl EntropySource for PipeBufferSource {
121 fn info(&self) -> &SourceInfo {
122 &PIPE_BUFFER_INFO
123 }
124
125 fn is_available(&self) -> bool {
126 cfg!(target_os = "macos")
127 }
128
129 fn collect(&self, n_samples: usize) -> Vec<u8> {
130 let raw_count = n_samples * 4 + 64;
131 let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
132 let mut lcg: u64 = mach_time() | 1;
133 let num_pipes = self.config.num_pipes.max(1);
134 let min_size = self.config.min_write_size.max(1);
135 let max_size = self.config.max_write_size.max(min_size);
136
137 let mut pipe_pool: Vec<[i32; 2]> = Vec::new();
139 for _ in 0..num_pipes {
140 let mut fds: [i32; 2] = [0; 2];
141 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
143 if ret == 0 {
144 if self.config.non_blocking {
145 unsafe {
147 let flags = libc::fcntl(fds[1], libc::F_GETFL);
148 if flags >= 0 {
149 libc::fcntl(fds[1], libc::F_SETFL, flags | libc::O_NONBLOCK);
150 }
151 }
152 }
153 pipe_pool.push(fds);
154 }
155 }
156
157 if pipe_pool.is_empty() {
158 return self.collect_single_pipe(n_samples);
159 }
160
161 for i in 0..raw_count {
162 lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
164 let write_size = if min_size == max_size {
165 min_size
166 } else {
167 min_size + (lcg >> 48) as usize % (max_size - min_size + 1)
168 };
169 let write_data = vec![0xBEu8; write_size];
170 let mut read_buf = vec![0u8; write_size];
171
172 let pipe_idx = i % pipe_pool.len();
173 let fds = pipe_pool[pipe_idx];
174
175 let t0 = mach_time();
176
177 unsafe {
179 let written = libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
180
181 if written > 0 {
182 libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, written as usize);
183 }
184 }
185
186 let t1 = mach_time();
187 std::hint::black_box(&read_buf);
188 timings.push(t1.wrapping_sub(t0));
189
190 if i % 8 == 0 {
192 let mut extra_fds: [i32; 2] = [0; 2];
193 let ret = unsafe { libc::pipe(extra_fds.as_mut_ptr()) };
194 if ret == 0 {
195 unsafe {
196 libc::close(extra_fds[0]);
197 libc::close(extra_fds[1]);
198 }
199 }
200 }
201 }
202
203 for fds in &pipe_pool {
205 unsafe {
206 libc::close(fds[0]);
207 libc::close(fds[1]);
208 }
209 }
210
211 extract_timing_entropy(&timings, n_samples)
212 }
213}
214
215impl PipeBufferSource {
216 pub(crate) fn collect_single_pipe(&self, n_samples: usize) -> Vec<u8> {
218 let raw_count = n_samples * 4 + 64;
219 let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
220 let mut lcg: u64 = mach_time() | 1;
221
222 for _ in 0..raw_count {
223 lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
224 let write_size = 1 + (lcg >> 48) as usize % 256;
225 let write_data = vec![0xBEu8; write_size];
226 let mut read_buf = vec![0u8; write_size];
227
228 let mut fds: [i32; 2] = [0; 2];
229 let t0 = mach_time();
230 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
231 if ret != 0 {
232 continue;
233 }
234 unsafe {
235 libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
236 libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, write_size);
237 libc::close(fds[0]);
238 libc::close(fds[1]);
239 }
240 let t1 = mach_time();
241 std::hint::black_box(&read_buf);
242 timings.push(t1.wrapping_sub(t0));
243 }
244
245 extract_timing_entropy(&timings, n_samples)
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn info() {
255 let src = PipeBufferSource::default();
256 assert_eq!(src.name(), "pipe_buffer");
257 assert_eq!(src.info().category, SourceCategory::IPC);
258 assert!(!src.info().composite);
259 }
260
261 #[test]
262 fn default_config() {
263 let config = PipeBufferConfig::default();
264 assert_eq!(config.num_pipes, 4);
265 assert_eq!(config.min_write_size, 1);
266 assert_eq!(config.max_write_size, 4096);
267 assert!(config.non_blocking);
268 }
269
270 #[test]
271 fn custom_config() {
272 let src = PipeBufferSource {
273 config: PipeBufferConfig {
274 num_pipes: 8,
275 min_write_size: 64,
276 max_write_size: 1024,
277 non_blocking: false,
278 },
279 };
280 assert_eq!(src.config.num_pipes, 8);
281 }
282
283 #[test]
284 #[ignore] fn collects_bytes() {
286 let src = PipeBufferSource::default();
287 if src.is_available() {
288 let data = src.collect(64);
289 assert!(!data.is_empty());
290 assert!(data.len() <= 64);
291 }
292 }
293
294 #[test]
295 #[ignore] fn single_pipe_mode() {
297 let src = PipeBufferSource {
298 config: PipeBufferConfig {
299 num_pipes: 0,
300 ..PipeBufferConfig::default()
301 },
302 };
303 if src.is_available() {
304 assert!(!src.collect_single_pipe(64).is_empty());
305 }
306 }
307}