openentropy_core/sources/frontier/
pipe_buffer.rs1use crate::source::{EntropySource, Platform, SourceCategory, SourceInfo};
4use crate::sources::helpers::{extract_timing_entropy, mach_time};
5
6#[derive(Debug, Clone)]
19pub struct PipeBufferConfig {
20 pub num_pipes: usize,
28
29 pub min_write_size: usize,
36
37 pub max_write_size: usize,
44
45 pub non_blocking: bool,
53}
54
55impl Default for PipeBufferConfig {
56 fn default() -> Self {
57 Self {
58 num_pipes: 4,
59 min_write_size: 1,
60 max_write_size: 4096,
61 non_blocking: true,
62 }
63 }
64}
65
66#[derive(Default)]
98pub struct PipeBufferSource {
99 pub config: PipeBufferConfig,
101}
102
103static PIPE_BUFFER_INFO: SourceInfo = SourceInfo {
104 name: "pipe_buffer",
105 description: "Multi-pipe kernel zone allocator competition and buffer timing jitter",
106 physics: "Creates multiple pipes simultaneously, writes variable-size data, reads it back, \
107 and closes — measuring contention in the kernel zone allocator. Multiple pipes \
108 compete for pipe zone and mbuf allocations, creating cross-CPU magazine transfer \
109 contention. Variable write sizes exercise different mbuf paths. Non-blocking mode \
110 captures EAGAIN timing on different kernel failure paths. Zone allocator timing \
111 depends on zone fragmentation, magazine layer state, and cross-CPU transfers.",
112 category: SourceCategory::IPC,
113 platform: Platform::Any,
114 requirements: &[],
115 entropy_rate_estimate: 1500.0,
116 composite: false,
117};
118
119impl EntropySource for PipeBufferSource {
120 fn info(&self) -> &SourceInfo {
121 &PIPE_BUFFER_INFO
122 }
123
124 fn is_available(&self) -> bool {
125 cfg!(unix)
126 }
127
128 fn collect(&self, n_samples: usize) -> Vec<u8> {
129 let raw_count = n_samples * 4 + 64;
130 let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
131 let mut lcg: u64 = mach_time() | 1;
132 let num_pipes = self.config.num_pipes.max(1);
133 let min_size = self.config.min_write_size.max(1);
134 let max_size = self.config.max_write_size.max(min_size);
135
136 let mut pipe_pool: Vec<[i32; 2]> = Vec::new();
138 for _ in 0..num_pipes {
139 let mut fds: [i32; 2] = [0; 2];
140 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
142 if ret == 0 {
143 if self.config.non_blocking {
144 unsafe {
146 let flags = libc::fcntl(fds[1], libc::F_GETFL);
147 libc::fcntl(fds[1], libc::F_SETFL, flags | libc::O_NONBLOCK);
148 }
149 }
150 pipe_pool.push(fds);
151 }
152 }
153
154 if pipe_pool.is_empty() {
155 return self.collect_single_pipe(n_samples);
156 }
157
158 for i in 0..raw_count {
159 lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
161 let write_size = if min_size == max_size {
162 min_size
163 } else {
164 min_size + (lcg >> 48) as usize % (max_size - min_size + 1)
165 };
166 let write_data = vec![0xBEu8; write_size];
167 let mut read_buf = vec![0u8; write_size];
168
169 let pipe_idx = i % pipe_pool.len();
170 let fds = pipe_pool[pipe_idx];
171
172 let t0 = mach_time();
173
174 unsafe {
176 let written = libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
177
178 if written > 0 {
179 libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, written as usize);
180 }
181 }
182
183 let t1 = mach_time();
184 std::hint::black_box(&read_buf);
185 timings.push(t1.wrapping_sub(t0));
186
187 if i % 8 == 0 {
189 let mut extra_fds: [i32; 2] = [0; 2];
190 let ret = unsafe { libc::pipe(extra_fds.as_mut_ptr()) };
191 if ret == 0 {
192 unsafe {
193 libc::close(extra_fds[0]);
194 libc::close(extra_fds[1]);
195 }
196 }
197 }
198 }
199
200 for fds in &pipe_pool {
202 unsafe {
203 libc::close(fds[0]);
204 libc::close(fds[1]);
205 }
206 }
207
208 extract_timing_entropy(&timings, n_samples)
209 }
210}
211
212impl PipeBufferSource {
213 pub(crate) fn collect_single_pipe(&self, n_samples: usize) -> Vec<u8> {
215 let raw_count = n_samples * 4 + 64;
216 let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
217 let mut lcg: u64 = mach_time() | 1;
218
219 for _ in 0..raw_count {
220 lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
221 let write_size = 1 + (lcg >> 48) as usize % 256;
222 let write_data = vec![0xBEu8; write_size];
223 let mut read_buf = vec![0u8; write_size];
224
225 let mut fds: [i32; 2] = [0; 2];
226 let t0 = mach_time();
227 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
228 if ret != 0 {
229 continue;
230 }
231 unsafe {
232 libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
233 libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, write_size);
234 libc::close(fds[0]);
235 libc::close(fds[1]);
236 }
237 let t1 = mach_time();
238 std::hint::black_box(&read_buf);
239 timings.push(t1.wrapping_sub(t0));
240 }
241
242 extract_timing_entropy(&timings, n_samples)
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 #[test]
251 fn info() {
252 let src = PipeBufferSource::default();
253 assert_eq!(src.name(), "pipe_buffer");
254 assert_eq!(src.info().category, SourceCategory::IPC);
255 assert!(!src.info().composite);
256 }
257
258 #[test]
259 fn default_config() {
260 let config = PipeBufferConfig::default();
261 assert_eq!(config.num_pipes, 4);
262 assert_eq!(config.min_write_size, 1);
263 assert_eq!(config.max_write_size, 4096);
264 assert!(config.non_blocking);
265 }
266
267 #[test]
268 fn custom_config() {
269 let src = PipeBufferSource {
270 config: PipeBufferConfig {
271 num_pipes: 8,
272 min_write_size: 64,
273 max_write_size: 1024,
274 non_blocking: false,
275 },
276 };
277 assert_eq!(src.config.num_pipes, 8);
278 }
279
280 #[test]
281 #[ignore] fn collects_bytes() {
283 let src = PipeBufferSource::default();
284 if src.is_available() {
285 let data = src.collect(64);
286 assert!(!data.is_empty());
287 assert!(data.len() <= 64);
288 }
289 }
290
291 #[test]
292 #[ignore] fn single_pipe_mode() {
294 let src = PipeBufferSource {
295 config: PipeBufferConfig {
296 num_pipes: 0,
297 ..PipeBufferConfig::default()
298 },
299 };
300 if src.is_available() {
301 assert!(!src.collect_single_pipe(64).is_empty());
302 }
303 }
304}