Skip to main content

openentropy_core/sources/frontier/
pipe_buffer.rs

1//! Pipe buffer timing — entropy from multi-pipe kernel zone allocator contention.
2
3use crate::source::{EntropySource, Platform, SourceCategory, SourceInfo};
4use crate::sources::helpers::{extract_timing_entropy, mach_time};
5
6/// Configuration for pipe buffer entropy collection.
7///
8/// # Example
9/// ```
10/// # use openentropy_core::sources::frontier::PipeBufferConfig;
11/// let config = PipeBufferConfig {
12///     num_pipes: 8,              // more pipes = more zone contention
13///     min_write_size: 64,        // skip tiny writes
14///     max_write_size: 2048,      // cap at 2KB
15///     non_blocking: true,        // capture EAGAIN timing (recommended)
16/// };
17/// ```
18#[derive(Debug, Clone)]
19pub struct PipeBufferConfig {
20    /// Number of pipes to use simultaneously.
21    ///
22    /// Multiple pipes competing for kernel buffer space creates zone allocator
23    /// contention. Each pipe is allocated from the kernel's pipe zone, and
24    /// cross-CPU magazine transfers add nondeterminism.
25    ///
26    /// **Range:** 1+ (clamped to >=1). **Default:** `4`
27    pub num_pipes: usize,
28
29    /// Minimum write size in bytes.
30    ///
31    /// Small writes use inline pipe buffer storage; larger writes chain mbufs.
32    /// The transition between these paths adds entropy.
33    ///
34    /// **Range:** 1+. **Default:** `1`
35    pub min_write_size: usize,
36
37    /// Maximum write size in bytes.
38    ///
39    /// Larger writes exercise different mbuf allocation paths and are more
40    /// likely to trigger cross-CPU magazine transfers in the zone allocator.
41    ///
42    /// **Range:** >= `min_write_size`. **Default:** `4096`
43    pub max_write_size: usize,
44
45    /// Use non-blocking mode for pipe writes.
46    ///
47    /// Non-blocking writes that hit `EAGAIN` (pipe buffer full) follow a
48    /// different kernel path than blocking writes. The timing of the failure
49    /// check is itself a source of entropy.
50    ///
51    /// **Default:** `true`
52    pub non_blocking: bool,
53}
54
55impl Default for PipeBufferConfig {
56    fn default() -> Self {
57        Self {
58            num_pipes: 4,
59            min_write_size: 1,
60            max_write_size: 4096,
61            non_blocking: true,
62        }
63    }
64}
65
66/// Harvests timing jitter from pipe I/O with multiple pipes competing for
67/// kernel buffer space.
68///
69/// # What it measures
70/// Nanosecond timing of `write()` + `read()` cycles on a pool of pipes,
71/// with variable write sizes and periodic pipe creation/destruction for
72/// zone allocator churn.
73///
74/// # Why it's entropic
75/// Multiple simultaneous pipes competing for kernel zone allocator resources
76/// amplifies nondeterminism:
77/// - **Zone allocator contention** — multiple pipes allocating from the pipe
78///   zone simultaneously creates cross-CPU magazine transfer contention
79/// - **Variable buffer sizes** — different write sizes exercise different mbuf
80///   allocation paths (small = inline storage, large = chained mbufs)
81/// - **Non-blocking I/O** — `EAGAIN` on full pipe buffers follows a different
82///   kernel path with its own latency characteristics
83/// - **Cross-pipe interference** — reading from one pipe while another has
84///   pending data creates wakeup scheduling interference
85///
86/// # What makes it unique
87/// Pipe buffers exercise the kernel's zone allocator (magazine layer) in a way
88/// that no other entropy source does. The zone allocator's per-CPU caching
89/// and cross-CPU transfers create timing that depends on every CPU's allocation
90/// history.
91///
92/// # Configuration
93/// See [`PipeBufferConfig`] for tunable parameters. Key options:
94/// - `non_blocking`: capture EAGAIN failure path timing (recommended: `true`)
95/// - `num_pipes`: controls zone allocator contention level
96/// - `min_write_size`/`max_write_size`: controls mbuf allocation path diversity
97#[derive(Default)]
98pub struct PipeBufferSource {
99    /// Source configuration. Use `Default::default()` for recommended settings.
100    pub config: PipeBufferConfig,
101}
102
103static PIPE_BUFFER_INFO: SourceInfo = SourceInfo {
104    name: "pipe_buffer",
105    description: "Multi-pipe kernel zone allocator competition and buffer timing jitter",
106    physics: "Creates multiple pipes simultaneously, writes variable-size data, reads it back, \
107              and closes — measuring contention in the kernel zone allocator. Multiple pipes \
108              compete for pipe zone and mbuf allocations, creating cross-CPU magazine transfer \
109              contention. Variable write sizes exercise different mbuf paths. Non-blocking mode \
110              captures EAGAIN timing on different kernel failure paths. Zone allocator timing \
111              depends on zone fragmentation, magazine layer state, and cross-CPU transfers.",
112    category: SourceCategory::IPC,
113    platform: Platform::Any,
114    requirements: &[],
115    entropy_rate_estimate: 1500.0,
116    composite: false,
117};
118
119impl EntropySource for PipeBufferSource {
120    fn info(&self) -> &SourceInfo {
121        &PIPE_BUFFER_INFO
122    }
123
124    fn is_available(&self) -> bool {
125        cfg!(unix)
126    }
127
128    fn collect(&self, n_samples: usize) -> Vec<u8> {
129        let raw_count = n_samples * 4 + 64;
130        let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
131        let mut lcg: u64 = mach_time() | 1;
132        let num_pipes = self.config.num_pipes.max(1);
133        let min_size = self.config.min_write_size.max(1);
134        let max_size = self.config.max_write_size.max(min_size);
135
136        // Pre-allocate a persistent pool of pipes for contention.
137        let mut pipe_pool: Vec<[i32; 2]> = Vec::new();
138        for _ in 0..num_pipes {
139            let mut fds: [i32; 2] = [0; 2];
140            // SAFETY: fds is a 2-element array matching pipe()'s expected output.
141            let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
142            if ret == 0 {
143                if self.config.non_blocking {
144                    // SAFETY: fds[1] is a valid file descriptor from pipe().
145                    unsafe {
146                        let flags = libc::fcntl(fds[1], libc::F_GETFL);
147                        libc::fcntl(fds[1], libc::F_SETFL, flags | libc::O_NONBLOCK);
148                    }
149                }
150                pipe_pool.push(fds);
151            }
152        }
153
154        if pipe_pool.is_empty() {
155            return self.collect_single_pipe(n_samples);
156        }
157
158        for i in 0..raw_count {
159            // Vary write size to exercise different mbuf allocation paths.
160            lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
161            let write_size = if min_size == max_size {
162                min_size
163            } else {
164                min_size + (lcg >> 48) as usize % (max_size - min_size + 1)
165            };
166            let write_data = vec![0xBEu8; write_size];
167            let mut read_buf = vec![0u8; write_size];
168
169            let pipe_idx = i % pipe_pool.len();
170            let fds = pipe_pool[pipe_idx];
171
172            let t0 = mach_time();
173
174            // SAFETY: fds are valid file descriptors from pipe().
175            unsafe {
176                let written = libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
177
178                if written > 0 {
179                    libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, written as usize);
180                }
181            }
182
183            let t1 = mach_time();
184            std::hint::black_box(&read_buf);
185            timings.push(t1.wrapping_sub(t0));
186
187            // Periodically create/destroy an extra pipe for zone allocator churn.
188            if i % 8 == 0 {
189                let mut extra_fds: [i32; 2] = [0; 2];
190                let ret = unsafe { libc::pipe(extra_fds.as_mut_ptr()) };
191                if ret == 0 {
192                    unsafe {
193                        libc::close(extra_fds[0]);
194                        libc::close(extra_fds[1]);
195                    }
196                }
197            }
198        }
199
200        // Clean up pipe pool.
201        for fds in &pipe_pool {
202            unsafe {
203                libc::close(fds[0]);
204                libc::close(fds[1]);
205            }
206        }
207
208        extract_timing_entropy(&timings, n_samples)
209    }
210}
211
212impl PipeBufferSource {
213    /// Fallback single-pipe collection (matches original behavior).
214    pub(crate) fn collect_single_pipe(&self, n_samples: usize) -> Vec<u8> {
215        let raw_count = n_samples * 4 + 64;
216        let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
217        let mut lcg: u64 = mach_time() | 1;
218
219        for _ in 0..raw_count {
220            lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
221            let write_size = 1 + (lcg >> 48) as usize % 256;
222            let write_data = vec![0xBEu8; write_size];
223            let mut read_buf = vec![0u8; write_size];
224
225            let mut fds: [i32; 2] = [0; 2];
226            let t0 = mach_time();
227            let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
228            if ret != 0 {
229                continue;
230            }
231            unsafe {
232                libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
233                libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, write_size);
234                libc::close(fds[0]);
235                libc::close(fds[1]);
236            }
237            let t1 = mach_time();
238            std::hint::black_box(&read_buf);
239            timings.push(t1.wrapping_sub(t0));
240        }
241
242        extract_timing_entropy(&timings, n_samples)
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    #[test]
251    fn info() {
252        let src = PipeBufferSource::default();
253        assert_eq!(src.name(), "pipe_buffer");
254        assert_eq!(src.info().category, SourceCategory::IPC);
255        assert!(!src.info().composite);
256    }
257
258    #[test]
259    fn default_config() {
260        let config = PipeBufferConfig::default();
261        assert_eq!(config.num_pipes, 4);
262        assert_eq!(config.min_write_size, 1);
263        assert_eq!(config.max_write_size, 4096);
264        assert!(config.non_blocking);
265    }
266
267    #[test]
268    fn custom_config() {
269        let src = PipeBufferSource {
270            config: PipeBufferConfig {
271                num_pipes: 8,
272                min_write_size: 64,
273                max_write_size: 1024,
274                non_blocking: false,
275            },
276        };
277        assert_eq!(src.config.num_pipes, 8);
278    }
279
280    #[test]
281    #[ignore] // Uses pipe syscall
282    fn collects_bytes() {
283        let src = PipeBufferSource::default();
284        if src.is_available() {
285            let data = src.collect(64);
286            assert!(!data.is_empty());
287            assert!(data.len() <= 64);
288        }
289    }
290
291    #[test]
292    #[ignore] // Uses pipe syscall
293    fn single_pipe_mode() {
294        let src = PipeBufferSource {
295            config: PipeBufferConfig {
296                num_pipes: 0,
297                ..PipeBufferConfig::default()
298            },
299        };
300        if src.is_available() {
301            assert!(!src.collect_single_pipe(64).is_empty());
302        }
303    }
304}