Skip to main content

openentropy_core/sources/frontier/
pipe_buffer.rs

1//! Pipe buffer timing — entropy from multi-pipe kernel zone allocator contention.
2
3use crate::source::{EntropySource, SourceCategory, SourceInfo};
4use crate::sources::helpers::{extract_timing_entropy, mach_time};
5
6/// Configuration for pipe buffer entropy collection.
7///
8/// # Example
9/// ```
10/// # use openentropy_core::sources::frontier::PipeBufferConfig;
11/// let config = PipeBufferConfig {
12///     num_pipes: 8,              // more pipes = more zone contention
13///     min_write_size: 64,        // skip tiny writes
14///     max_write_size: 2048,      // cap at 2KB
15///     non_blocking: true,        // capture EAGAIN timing (recommended)
16/// };
17/// ```
18#[derive(Debug, Clone)]
19pub struct PipeBufferConfig {
20    /// Number of pipes to use simultaneously.
21    ///
22    /// Multiple pipes competing for kernel buffer space creates zone allocator
23    /// contention. Each pipe is allocated from the kernel's pipe zone, and
24    /// cross-CPU magazine transfers add nondeterminism.
25    ///
26    /// **Range:** 1+ (clamped to >=1). **Default:** `4`
27    pub num_pipes: usize,
28
29    /// Minimum write size in bytes.
30    ///
31    /// Small writes use inline pipe buffer storage; larger writes chain mbufs.
32    /// The transition between these paths adds entropy.
33    ///
34    /// **Range:** 1+. **Default:** `1`
35    pub min_write_size: usize,
36
37    /// Maximum write size in bytes.
38    ///
39    /// Larger writes exercise different mbuf allocation paths and are more
40    /// likely to trigger cross-CPU magazine transfers in the zone allocator.
41    ///
42    /// **Range:** >= `min_write_size`. **Default:** `4096`
43    pub max_write_size: usize,
44
45    /// Use non-blocking mode for pipe writes.
46    ///
47    /// Non-blocking writes that hit `EAGAIN` (pipe buffer full) follow a
48    /// different kernel path than blocking writes. The timing of the failure
49    /// check is itself a source of entropy.
50    ///
51    /// **Default:** `true`
52    pub non_blocking: bool,
53}
54
55impl Default for PipeBufferConfig {
56    fn default() -> Self {
57        Self {
58            num_pipes: 4,
59            min_write_size: 1,
60            max_write_size: 4096,
61            non_blocking: true,
62        }
63    }
64}
65
66/// Harvests timing jitter from pipe I/O with multiple pipes competing for
67/// kernel buffer space.
68///
69/// # What it measures
70/// Nanosecond timing of `write()` + `read()` cycles on a pool of pipes,
71/// with variable write sizes and periodic pipe creation/destruction for
72/// zone allocator churn.
73///
74/// # Why it's entropic
75/// Multiple simultaneous pipes competing for kernel zone allocator resources
76/// amplifies nondeterminism:
77/// - **Zone allocator contention** — multiple pipes allocating from the pipe
78///   zone simultaneously creates cross-CPU magazine transfer contention
79/// - **Variable buffer sizes** — different write sizes exercise different mbuf
80///   allocation paths (small = inline storage, large = chained mbufs)
81/// - **Non-blocking I/O** — `EAGAIN` on full pipe buffers follows a different
82///   kernel path with its own latency characteristics
83/// - **Cross-pipe interference** — reading from one pipe while another has
84///   pending data creates wakeup scheduling interference
85///
86/// # What makes it unique
87/// Pipe buffers exercise the kernel's zone allocator (magazine layer) in a way
88/// that no other entropy source does. The zone allocator's per-CPU caching
89/// and cross-CPU transfers create timing that depends on every CPU's allocation
90/// history.
91///
92/// # Configuration
93/// See [`PipeBufferConfig`] for tunable parameters. Key options:
94/// - `non_blocking`: capture EAGAIN failure path timing (recommended: `true`)
95/// - `num_pipes`: controls zone allocator contention level
96/// - `min_write_size`/`max_write_size`: controls mbuf allocation path diversity
97#[derive(Default)]
98pub struct PipeBufferSource {
99    /// Source configuration. Use `Default::default()` for recommended settings.
100    pub config: PipeBufferConfig,
101}
102
103static PIPE_BUFFER_INFO: SourceInfo = SourceInfo {
104    name: "pipe_buffer",
105    description: "Multi-pipe kernel zone allocator competition and buffer timing jitter",
106    physics: "Creates multiple pipes simultaneously, writes variable-size data, reads it back, \
107              and closes — measuring contention in the kernel zone allocator. Multiple pipes \
108              compete for pipe zone and mbuf allocations, creating cross-CPU magazine transfer \
109              contention. Variable write sizes exercise different mbuf paths. Non-blocking mode \
110              captures EAGAIN timing on different kernel failure paths. Zone allocator timing \
111              depends on zone fragmentation, magazine layer state, and cross-CPU transfers.",
112    category: SourceCategory::Frontier,
113    platform_requirements: &[],
114    entropy_rate_estimate: 1500.0,
115    composite: false,
116};
117
118impl EntropySource for PipeBufferSource {
119    fn info(&self) -> &SourceInfo {
120        &PIPE_BUFFER_INFO
121    }
122
123    fn is_available(&self) -> bool {
124        cfg!(unix)
125    }
126
127    fn collect(&self, n_samples: usize) -> Vec<u8> {
128        let raw_count = n_samples * 4 + 64;
129        let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
130        let mut lcg: u64 = mach_time() | 1;
131        let num_pipes = self.config.num_pipes.max(1);
132        let min_size = self.config.min_write_size.max(1);
133        let max_size = self.config.max_write_size.max(min_size);
134
135        // Pre-allocate a persistent pool of pipes for contention.
136        let mut pipe_pool: Vec<[i32; 2]> = Vec::new();
137        for _ in 0..num_pipes {
138            let mut fds: [i32; 2] = [0; 2];
139            // SAFETY: fds is a 2-element array matching pipe()'s expected output.
140            let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
141            if ret == 0 {
142                if self.config.non_blocking {
143                    // SAFETY: fds[1] is a valid file descriptor from pipe().
144                    unsafe {
145                        let flags = libc::fcntl(fds[1], libc::F_GETFL);
146                        libc::fcntl(fds[1], libc::F_SETFL, flags | libc::O_NONBLOCK);
147                    }
148                }
149                pipe_pool.push(fds);
150            }
151        }
152
153        if pipe_pool.is_empty() {
154            return self.collect_single_pipe(n_samples);
155        }
156
157        for i in 0..raw_count {
158            // Vary write size to exercise different mbuf allocation paths.
159            lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
160            let write_size = if min_size == max_size {
161                min_size
162            } else {
163                min_size + (lcg >> 48) as usize % (max_size - min_size + 1)
164            };
165            let write_data = vec![0xBEu8; write_size];
166            let mut read_buf = vec![0u8; write_size];
167
168            let pipe_idx = i % pipe_pool.len();
169            let fds = pipe_pool[pipe_idx];
170
171            let t0 = mach_time();
172
173            // SAFETY: fds are valid file descriptors from pipe().
174            unsafe {
175                let written = libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
176
177                if written > 0 {
178                    libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, written as usize);
179                }
180            }
181
182            let t1 = mach_time();
183            std::hint::black_box(&read_buf);
184            timings.push(t1.wrapping_sub(t0));
185
186            // Periodically create/destroy an extra pipe for zone allocator churn.
187            if i % 8 == 0 {
188                let mut extra_fds: [i32; 2] = [0; 2];
189                let ret = unsafe { libc::pipe(extra_fds.as_mut_ptr()) };
190                if ret == 0 {
191                    unsafe {
192                        libc::close(extra_fds[0]);
193                        libc::close(extra_fds[1]);
194                    }
195                }
196            }
197        }
198
199        // Clean up pipe pool.
200        for fds in &pipe_pool {
201            unsafe {
202                libc::close(fds[0]);
203                libc::close(fds[1]);
204            }
205        }
206
207        extract_timing_entropy(&timings, n_samples)
208    }
209}
210
211impl PipeBufferSource {
212    /// Fallback single-pipe collection (matches original behavior).
213    pub(crate) fn collect_single_pipe(&self, n_samples: usize) -> Vec<u8> {
214        let raw_count = n_samples * 4 + 64;
215        let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
216        let mut lcg: u64 = mach_time() | 1;
217
218        for _ in 0..raw_count {
219            lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
220            let write_size = 1 + (lcg >> 48) as usize % 256;
221            let write_data = vec![0xBEu8; write_size];
222            let mut read_buf = vec![0u8; write_size];
223
224            let mut fds: [i32; 2] = [0; 2];
225            let t0 = mach_time();
226            let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
227            if ret != 0 {
228                continue;
229            }
230            unsafe {
231                libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
232                libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, write_size);
233                libc::close(fds[0]);
234                libc::close(fds[1]);
235            }
236            let t1 = mach_time();
237            std::hint::black_box(&read_buf);
238            timings.push(t1.wrapping_sub(t0));
239        }
240
241        extract_timing_entropy(&timings, n_samples)
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn info() {
251        let src = PipeBufferSource::default();
252        assert_eq!(src.name(), "pipe_buffer");
253        assert_eq!(src.info().category, SourceCategory::Frontier);
254        assert!(!src.info().composite);
255    }
256
257    #[test]
258    fn default_config() {
259        let config = PipeBufferConfig::default();
260        assert_eq!(config.num_pipes, 4);
261        assert_eq!(config.min_write_size, 1);
262        assert_eq!(config.max_write_size, 4096);
263        assert!(config.non_blocking);
264    }
265
266    #[test]
267    fn custom_config() {
268        let src = PipeBufferSource {
269            config: PipeBufferConfig {
270                num_pipes: 8,
271                min_write_size: 64,
272                max_write_size: 1024,
273                non_blocking: false,
274            },
275        };
276        assert_eq!(src.config.num_pipes, 8);
277    }
278
279    #[test]
280    #[ignore] // Uses pipe syscall
281    fn collects_bytes() {
282        let src = PipeBufferSource::default();
283        if src.is_available() {
284            let data = src.collect(64);
285            assert!(!data.is_empty());
286            assert!(data.len() <= 64);
287        }
288    }
289
290    #[test]
291    #[ignore] // Uses pipe syscall
292    fn single_pipe_mode() {
293        let src = PipeBufferSource {
294            config: PipeBufferConfig {
295                num_pipes: 0,
296                ..PipeBufferConfig::default()
297            },
298        };
299        if src.is_available() {
300            assert!(!src.collect_single_pipe(64).is_empty());
301        }
302    }
303}