openentropy_core/sources/frontier/
pipe_buffer.rs1use crate::source::{EntropySource, SourceCategory, SourceInfo};
4use crate::sources::helpers::{extract_timing_entropy, mach_time};
5
6#[derive(Debug, Clone)]
19pub struct PipeBufferConfig {
20 pub num_pipes: usize,
28
29 pub min_write_size: usize,
36
37 pub max_write_size: usize,
44
45 pub non_blocking: bool,
53}
54
55impl Default for PipeBufferConfig {
56 fn default() -> Self {
57 Self {
58 num_pipes: 4,
59 min_write_size: 1,
60 max_write_size: 4096,
61 non_blocking: true,
62 }
63 }
64}
65
66#[derive(Default)]
98pub struct PipeBufferSource {
99 pub config: PipeBufferConfig,
101}
102
103static PIPE_BUFFER_INFO: SourceInfo = SourceInfo {
104 name: "pipe_buffer",
105 description: "Multi-pipe kernel zone allocator competition and buffer timing jitter",
106 physics: "Creates multiple pipes simultaneously, writes variable-size data, reads it back, \
107 and closes — measuring contention in the kernel zone allocator. Multiple pipes \
108 compete for pipe zone and mbuf allocations, creating cross-CPU magazine transfer \
109 contention. Variable write sizes exercise different mbuf paths. Non-blocking mode \
110 captures EAGAIN timing on different kernel failure paths. Zone allocator timing \
111 depends on zone fragmentation, magazine layer state, and cross-CPU transfers.",
112 category: SourceCategory::Frontier,
113 platform_requirements: &[],
114 entropy_rate_estimate: 1500.0,
115 composite: false,
116};
117
118impl EntropySource for PipeBufferSource {
119 fn info(&self) -> &SourceInfo {
120 &PIPE_BUFFER_INFO
121 }
122
123 fn is_available(&self) -> bool {
124 cfg!(unix)
125 }
126
127 fn collect(&self, n_samples: usize) -> Vec<u8> {
128 let raw_count = n_samples * 4 + 64;
129 let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
130 let mut lcg: u64 = mach_time() | 1;
131 let num_pipes = self.config.num_pipes.max(1);
132 let min_size = self.config.min_write_size.max(1);
133 let max_size = self.config.max_write_size.max(min_size);
134
135 let mut pipe_pool: Vec<[i32; 2]> = Vec::new();
137 for _ in 0..num_pipes {
138 let mut fds: [i32; 2] = [0; 2];
139 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
141 if ret == 0 {
142 if self.config.non_blocking {
143 unsafe {
145 let flags = libc::fcntl(fds[1], libc::F_GETFL);
146 libc::fcntl(fds[1], libc::F_SETFL, flags | libc::O_NONBLOCK);
147 }
148 }
149 pipe_pool.push(fds);
150 }
151 }
152
153 if pipe_pool.is_empty() {
154 return self.collect_single_pipe(n_samples);
155 }
156
157 for i in 0..raw_count {
158 lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
160 let write_size = if min_size == max_size {
161 min_size
162 } else {
163 min_size + (lcg >> 48) as usize % (max_size - min_size + 1)
164 };
165 let write_data = vec![0xBEu8; write_size];
166 let mut read_buf = vec![0u8; write_size];
167
168 let pipe_idx = i % pipe_pool.len();
169 let fds = pipe_pool[pipe_idx];
170
171 let t0 = mach_time();
172
173 unsafe {
175 let written = libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
176
177 if written > 0 {
178 libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, written as usize);
179 }
180 }
181
182 let t1 = mach_time();
183 std::hint::black_box(&read_buf);
184 timings.push(t1.wrapping_sub(t0));
185
186 if i % 8 == 0 {
188 let mut extra_fds: [i32; 2] = [0; 2];
189 let ret = unsafe { libc::pipe(extra_fds.as_mut_ptr()) };
190 if ret == 0 {
191 unsafe {
192 libc::close(extra_fds[0]);
193 libc::close(extra_fds[1]);
194 }
195 }
196 }
197 }
198
199 for fds in &pipe_pool {
201 unsafe {
202 libc::close(fds[0]);
203 libc::close(fds[1]);
204 }
205 }
206
207 extract_timing_entropy(&timings, n_samples)
208 }
209}
210
211impl PipeBufferSource {
212 pub(crate) fn collect_single_pipe(&self, n_samples: usize) -> Vec<u8> {
214 let raw_count = n_samples * 4 + 64;
215 let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
216 let mut lcg: u64 = mach_time() | 1;
217
218 for _ in 0..raw_count {
219 lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
220 let write_size = 1 + (lcg >> 48) as usize % 256;
221 let write_data = vec![0xBEu8; write_size];
222 let mut read_buf = vec![0u8; write_size];
223
224 let mut fds: [i32; 2] = [0; 2];
225 let t0 = mach_time();
226 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
227 if ret != 0 {
228 continue;
229 }
230 unsafe {
231 libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
232 libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, write_size);
233 libc::close(fds[0]);
234 libc::close(fds[1]);
235 }
236 let t1 = mach_time();
237 std::hint::black_box(&read_buf);
238 timings.push(t1.wrapping_sub(t0));
239 }
240
241 extract_timing_entropy(&timings, n_samples)
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn info() {
251 let src = PipeBufferSource::default();
252 assert_eq!(src.name(), "pipe_buffer");
253 assert_eq!(src.info().category, SourceCategory::Frontier);
254 assert!(!src.info().composite);
255 }
256
257 #[test]
258 fn default_config() {
259 let config = PipeBufferConfig::default();
260 assert_eq!(config.num_pipes, 4);
261 assert_eq!(config.min_write_size, 1);
262 assert_eq!(config.max_write_size, 4096);
263 assert!(config.non_blocking);
264 }
265
266 #[test]
267 fn custom_config() {
268 let src = PipeBufferSource {
269 config: PipeBufferConfig {
270 num_pipes: 8,
271 min_write_size: 64,
272 max_write_size: 1024,
273 non_blocking: false,
274 },
275 };
276 assert_eq!(src.config.num_pipes, 8);
277 }
278
279 #[test]
280 #[ignore] fn collects_bytes() {
282 let src = PipeBufferSource::default();
283 if src.is_available() {
284 let data = src.collect(64);
285 assert!(!data.is_empty());
286 assert!(data.len() <= 64);
287 }
288 }
289
290 #[test]
291 #[ignore] fn single_pipe_mode() {
293 let src = PipeBufferSource {
294 config: PipeBufferConfig {
295 num_pipes: 0,
296 ..PipeBufferConfig::default()
297 },
298 };
299 if src.is_available() {
300 assert!(!src.collect_single_pipe(64).is_empty());
301 }
302 }
303}