Skip to main content

openentropy_core/sources/ipc/
pipe_buffer.rs

1//! Pipe buffer timing — entropy from multi-pipe kernel zone allocator contention.
2
3use crate::source::{EntropySource, Platform, SourceCategory, SourceInfo};
4use crate::sources::helpers::{extract_timing_entropy, mach_time};
5
6/// Configuration for pipe buffer entropy collection.
7///
8/// # Example
9/// ```
10/// # use openentropy_core::sources::ipc::PipeBufferConfig;
11/// let config = PipeBufferConfig {
12///     num_pipes: 8,              // more pipes = more zone contention
13///     min_write_size: 64,        // skip tiny writes
14///     max_write_size: 2048,      // cap at 2KB
15///     non_blocking: true,        // capture EAGAIN timing (recommended)
16/// };
17/// ```
18#[derive(Debug, Clone)]
19pub struct PipeBufferConfig {
20    /// Number of pipes to use simultaneously.
21    ///
22    /// Multiple pipes competing for kernel buffer space creates zone allocator
23    /// contention. Each pipe is allocated from the kernel's pipe zone, and
24    /// cross-CPU magazine transfers add nondeterminism.
25    ///
26    /// **Range:** 1+ (clamped to >=1). **Default:** `4`
27    pub num_pipes: usize,
28
29    /// Minimum write size in bytes.
30    ///
31    /// Small writes use inline pipe buffer storage; larger writes chain mbufs.
32    /// The transition between these paths adds entropy.
33    ///
34    /// **Range:** 1+. **Default:** `1`
35    pub min_write_size: usize,
36
37    /// Maximum write size in bytes.
38    ///
39    /// Larger writes exercise different mbuf allocation paths and are more
40    /// likely to trigger cross-CPU magazine transfers in the zone allocator.
41    ///
42    /// **Range:** >= `min_write_size`. **Default:** `4096`
43    pub max_write_size: usize,
44
45    /// Use non-blocking mode for pipe writes.
46    ///
47    /// Non-blocking writes that hit `EAGAIN` (pipe buffer full) follow a
48    /// different kernel path than blocking writes. The timing of the failure
49    /// check is itself a source of entropy.
50    ///
51    /// **Default:** `true`
52    pub non_blocking: bool,
53}
54
55impl Default for PipeBufferConfig {
56    fn default() -> Self {
57        Self {
58            num_pipes: 4,
59            min_write_size: 1,
60            max_write_size: 4096,
61            non_blocking: true,
62        }
63    }
64}
65
66/// Harvests timing jitter from pipe I/O with multiple pipes competing for
67/// kernel buffer space.
68///
69/// # What it measures
70/// Nanosecond timing of `write()` + `read()` cycles on a pool of pipes,
71/// with variable write sizes and periodic pipe creation/destruction for
72/// zone allocator churn.
73///
74/// # Why it's entropic
75/// Multiple simultaneous pipes competing for kernel zone allocator resources
76/// amplifies nondeterminism:
77/// - **Zone allocator contention** — multiple pipes allocating from the pipe
78///   zone simultaneously creates cross-CPU magazine transfer contention
79/// - **Variable buffer sizes** — different write sizes exercise different mbuf
80///   allocation paths (small = inline storage, large = chained mbufs)
81/// - **Non-blocking I/O** — `EAGAIN` on full pipe buffers follows a different
82///   kernel path with its own latency characteristics
83/// - **Cross-pipe interference** — reading from one pipe while another has
84///   pending data creates wakeup scheduling interference
85///
86/// # What makes it unique
87/// Pipe buffers exercise the kernel's zone allocator (magazine layer) in a way
88/// that no other entropy source does. The zone allocator's per-CPU caching
89/// and cross-CPU transfers create timing that depends on every CPU's allocation
90/// history.
91///
92/// # Configuration
93/// See [`PipeBufferConfig`] for tunable parameters. Key options:
94/// - `non_blocking`: capture EAGAIN failure path timing (recommended: `true`)
95/// - `num_pipes`: controls zone allocator contention level
96/// - `min_write_size`/`max_write_size`: controls mbuf allocation path diversity
97#[derive(Default)]
98pub struct PipeBufferSource {
99    /// Source configuration. Use `Default::default()` for recommended settings.
100    pub config: PipeBufferConfig,
101}
102
103static PIPE_BUFFER_INFO: SourceInfo = SourceInfo {
104    name: "pipe_buffer",
105    description: "Multi-pipe kernel zone allocator competition and buffer timing jitter",
106    physics: "Creates multiple pipes simultaneously, writes variable-size data, reads it back, \
107              and closes — measuring contention in the kernel zone allocator. Multiple pipes \
108              compete for pipe zone and mbuf allocations, creating cross-CPU magazine transfer \
109              contention. Variable write sizes exercise different mbuf paths. Non-blocking mode \
110              captures EAGAIN timing on different kernel failure paths. Zone allocator timing \
111              depends on zone fragmentation, magazine layer state, and cross-CPU transfers.",
112    category: SourceCategory::IPC,
113    platform: Platform::MacOS,
114    requirements: &[],
115    entropy_rate_estimate: 1.5,
116    composite: false,
117    is_fast: true,
118};
119
120impl EntropySource for PipeBufferSource {
121    fn info(&self) -> &SourceInfo {
122        &PIPE_BUFFER_INFO
123    }
124
125    fn is_available(&self) -> bool {
126        cfg!(target_os = "macos")
127    }
128
129    fn collect(&self, n_samples: usize) -> Vec<u8> {
130        let raw_count = n_samples * 4 + 64;
131        let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
132        let mut lcg: u64 = mach_time() | 1;
133        let num_pipes = self.config.num_pipes.max(1);
134        let min_size = self.config.min_write_size.max(1);
135        let max_size = self.config.max_write_size.max(min_size);
136
137        // Pre-allocate a persistent pool of pipes for contention.
138        let mut pipe_pool: Vec<[i32; 2]> = Vec::new();
139        for _ in 0..num_pipes {
140            let mut fds: [i32; 2] = [0; 2];
141            // SAFETY: fds is a 2-element array matching pipe()'s expected output.
142            let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
143            if ret == 0 {
144                if self.config.non_blocking {
145                    // SAFETY: fds[1] is a valid file descriptor from pipe().
146                    unsafe {
147                        let flags = libc::fcntl(fds[1], libc::F_GETFL);
148                        if flags >= 0 {
149                            libc::fcntl(fds[1], libc::F_SETFL, flags | libc::O_NONBLOCK);
150                        }
151                    }
152                }
153                pipe_pool.push(fds);
154            }
155        }
156
157        if pipe_pool.is_empty() {
158            return self.collect_single_pipe(n_samples);
159        }
160
161        for i in 0..raw_count {
162            // Vary write size to exercise different mbuf allocation paths.
163            lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
164            let write_size = if min_size == max_size {
165                min_size
166            } else {
167                min_size + (lcg >> 48) as usize % (max_size - min_size + 1)
168            };
169            let write_data = vec![0xBEu8; write_size];
170            let mut read_buf = vec![0u8; write_size];
171
172            let pipe_idx = i % pipe_pool.len();
173            let fds = pipe_pool[pipe_idx];
174
175            let t0 = mach_time();
176
177            // SAFETY: fds are valid file descriptors from pipe().
178            unsafe {
179                let written = libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
180
181                if written > 0 {
182                    libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, written as usize);
183                }
184            }
185
186            let t1 = mach_time();
187            std::hint::black_box(&read_buf);
188            timings.push(t1.wrapping_sub(t0));
189
190            // Periodically create/destroy an extra pipe for zone allocator churn.
191            if i % 8 == 0 {
192                let mut extra_fds: [i32; 2] = [0; 2];
193                let ret = unsafe { libc::pipe(extra_fds.as_mut_ptr()) };
194                if ret == 0 {
195                    unsafe {
196                        libc::close(extra_fds[0]);
197                        libc::close(extra_fds[1]);
198                    }
199                }
200            }
201        }
202
203        // Clean up pipe pool.
204        for fds in &pipe_pool {
205            unsafe {
206                libc::close(fds[0]);
207                libc::close(fds[1]);
208            }
209        }
210
211        extract_timing_entropy(&timings, n_samples)
212    }
213}
214
215impl PipeBufferSource {
216    /// Fallback single-pipe collection (matches original behavior).
217    pub(crate) fn collect_single_pipe(&self, n_samples: usize) -> Vec<u8> {
218        let raw_count = n_samples * 4 + 64;
219        let mut timings: Vec<u64> = Vec::with_capacity(raw_count);
220        let mut lcg: u64 = mach_time() | 1;
221
222        for _ in 0..raw_count {
223            lcg = lcg.wrapping_mul(6364136223846793005).wrapping_add(1);
224            let write_size = 1 + (lcg >> 48) as usize % 256;
225            let write_data = vec![0xBEu8; write_size];
226            let mut read_buf = vec![0u8; write_size];
227
228            let mut fds: [i32; 2] = [0; 2];
229            let t0 = mach_time();
230            let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
231            if ret != 0 {
232                continue;
233            }
234            unsafe {
235                libc::write(fds[1], write_data.as_ptr() as *const _, write_size);
236                libc::read(fds[0], read_buf.as_mut_ptr() as *mut _, write_size);
237                libc::close(fds[0]);
238                libc::close(fds[1]);
239            }
240            let t1 = mach_time();
241            std::hint::black_box(&read_buf);
242            timings.push(t1.wrapping_sub(t0));
243        }
244
245        extract_timing_entropy(&timings, n_samples)
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn info() {
255        let src = PipeBufferSource::default();
256        assert_eq!(src.name(), "pipe_buffer");
257        assert_eq!(src.info().category, SourceCategory::IPC);
258        assert!(!src.info().composite);
259    }
260
261    #[test]
262    fn default_config() {
263        let config = PipeBufferConfig::default();
264        assert_eq!(config.num_pipes, 4);
265        assert_eq!(config.min_write_size, 1);
266        assert_eq!(config.max_write_size, 4096);
267        assert!(config.non_blocking);
268    }
269
270    #[test]
271    fn custom_config() {
272        let src = PipeBufferSource {
273            config: PipeBufferConfig {
274                num_pipes: 8,
275                min_write_size: 64,
276                max_write_size: 1024,
277                non_blocking: false,
278            },
279        };
280        assert_eq!(src.config.num_pipes, 8);
281    }
282
283    #[test]
284    #[ignore] // Uses pipe syscall
285    fn collects_bytes() {
286        let src = PipeBufferSource::default();
287        if src.is_available() {
288            let data = src.collect(64);
289            assert!(!data.is_empty());
290            assert!(data.len() <= 64);
291        }
292    }
293
294    #[test]
295    #[ignore] // Uses pipe syscall
296    fn single_pipe_mode() {
297        let src = PipeBufferSource {
298            config: PipeBufferConfig {
299                num_pipes: 0,
300                ..PipeBufferConfig::default()
301            },
302        };
303        if src.is_available() {
304            assert!(!src.collect_single_pipe(64).is_empty());
305        }
306    }
307}