Skip to main content

openentropy_core/
pool.rs

1//! Multi-source entropy pool with health monitoring.
2//!
3//! Architecture:
4//! 1. Auto-discover available sources on this machine
5//! 2. Collect raw entropy from each source in parallel
6//! 3. Concatenate source bytes into a shared buffer
7//! 4. Apply conditioning (Raw / VonNeumann / SHA-256) on output
8//! 5. Continuous health monitoring per source
9//! 6. Graceful degradation when sources fail
10//! 7. Thread-safe for concurrent access
11
12use std::collections::{HashMap, HashSet};
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15
16use sha2::{Digest, Sha256};
17
18use crate::conditioning::{quick_autocorrelation_lag1, quick_min_entropy, quick_shannon};
19use crate::source::{EntropySource, SourceState};
20
21const SOURCE_TIMEOUT_BACKOFF_SECS: u64 = 30;
22
23/// Thread-safe multi-source entropy pool.
24pub struct EntropyPool {
25    sources: Vec<Arc<Mutex<SourceState>>>,
26    buffer: Mutex<Vec<u8>>,
27    state: Mutex<[u8; 32]>,
28    counter: Mutex<u64>,
29    total_output: Mutex<u64>,
30    // Per-source collection coordination for timeout-safe parallel collection.
31    in_flight: Arc<Mutex<HashSet<usize>>>,
32    backoff_until: Arc<Mutex<HashMap<usize, Instant>>>,
33}
34
35impl EntropyPool {
36    /// Create an empty pool.
37    pub fn new(seed: Option<&[u8]>) -> Self {
38        let initial_state = {
39            let mut h = Sha256::new();
40            if let Some(s) = seed {
41                h.update(s);
42            } else {
43                // Use OS entropy for initial state
44                let mut os_random = [0u8; 32];
45                getrandom(&mut os_random);
46                h.update(os_random);
47            }
48            let digest: [u8; 32] = h.finalize().into();
49            digest
50        };
51
52        Self {
53            sources: Vec::new(),
54            buffer: Mutex::new(Vec::new()),
55            state: Mutex::new(initial_state),
56            counter: Mutex::new(0),
57            total_output: Mutex::new(0),
58            in_flight: Arc::new(Mutex::new(HashSet::new())),
59            backoff_until: Arc::new(Mutex::new(HashMap::new())),
60        }
61    }
62
63    /// Create a pool with all available sources on this machine.
64    pub fn auto() -> Self {
65        let mut pool = Self::new(None);
66        for source in crate::platform::detect_available_sources() {
67            pool.add_source(source);
68        }
69        pool
70    }
71
72    /// Register an entropy source.
73    pub fn add_source(&mut self, source: Box<dyn EntropySource>) {
74        self.sources
75            .push(Arc::new(Mutex::new(SourceState::new(source))));
76    }
77
78    /// Number of registered sources.
79    pub fn source_count(&self) -> usize {
80        self.sources.len()
81    }
82
83    /// Collect entropy from every registered source in parallel.
84    ///
85    /// Uses a 10s collection timeout per cycle. Slow sources are skipped and
86    /// temporarily backed off to keep callers responsive.
87    pub fn collect_all(&self) -> usize {
88        self.collect_all_parallel_n(10.0, 1000)
89    }
90
91    /// Collect entropy from all sources in parallel using detached worker threads.
92    ///
93    /// Slow or hung sources are skipped after `timeout_secs`. Timed-out sources
94    /// enter a backoff window to avoid thread buildup on repeated calls.
95    pub fn collect_all_parallel(&self, timeout_secs: f64) -> usize {
96        self.collect_all_parallel_n(timeout_secs, 1000)
97    }
98
99    /// Collect entropy from all sources in parallel using detached worker threads.
100    ///
101    /// - `timeout_secs`: max wall-clock time to wait for a collection cycle.
102    /// - `n_samples`: samples requested from each source in this cycle.
103    ///
104    /// Slow or hung sources are skipped after `timeout_secs`. Timed-out sources
105    /// enter a backoff window to avoid thread buildup on repeated calls.
106    pub fn collect_all_parallel_n(&self, timeout_secs: f64, n_samples: usize) -> usize {
107        let timeout = Duration::from_secs_f64(timeout_secs.max(0.0));
108        if timeout.is_zero() || n_samples == 0 {
109            return 0;
110        }
111
112        let now = Instant::now();
113        let mut scheduled: Vec<usize> = Vec::new();
114        let mut to_launch: Vec<(usize, Arc<Mutex<SourceState>>)> = Vec::new();
115
116        for (idx, ss_mutex) in self.sources.iter().enumerate() {
117            // Skip sources still in backoff.
118            let in_backoff = {
119                let backoff = self.backoff_until.lock().unwrap();
120                backoff.get(&idx).is_some_and(|until| now < *until)
121            };
122            if in_backoff {
123                continue;
124            }
125
126            // Skip sources with an in-flight worker from a prior timeout.
127            {
128                let mut in_flight = self.in_flight.lock().unwrap();
129                if in_flight.contains(&idx) {
130                    continue;
131                }
132                in_flight.insert(idx);
133            }
134
135            scheduled.push(idx);
136            to_launch.push((idx, Arc::clone(ss_mutex)));
137        }
138
139        if scheduled.is_empty() {
140            return 0;
141        }
142
143        // Limit concurrent collection threads to avoid resource exhaustion.
144        // Many sources use mmap, JIT pages, socket pairs, large allocations —
145        // running all 50+ simultaneously can cause SIGSEGV from memory pressure.
146        let max_concurrent = num_cpus().min(16);
147        let (tx, rx) = std::sync::mpsc::channel::<(usize, Vec<u8>)>();
148        let mut results = Vec::new();
149        let mut received = HashSet::new();
150
151        for chunk in to_launch.chunks(max_concurrent) {
152            let batch_start = Instant::now();
153            let chunk_indices: HashSet<usize> = chunk.iter().map(|&(idx, _)| idx).collect();
154
155            for &(idx, ref src) in chunk {
156                let tx = tx.clone();
157                let src = Arc::clone(src);
158                let in_flight = Arc::clone(&self.in_flight);
159
160                std::thread::spawn(move || {
161                    let data = Self::collect_one_n(&src, n_samples);
162                    {
163                        let mut in_flight = in_flight.lock().unwrap();
164                        in_flight.remove(&idx);
165                    }
166                    let _ = tx.send((idx, data));
167                });
168            }
169
170            // Wait for this batch to finish. Each batch gets its own full timeout
171            // window so that slow sources in early batches don't starve later ones.
172            // Only count messages belonging to this chunk's indices — late messages
173            // from prior chunks are still collected but don't advance batch_done.
174            let mut batch_done = 0;
175            while batch_done < chunk.len() {
176                let remaining = timeout.saturating_sub(batch_start.elapsed());
177                if remaining.is_zero() {
178                    break;
179                }
180                match rx.recv_timeout(remaining) {
181                    Ok((idx, data)) => {
182                        received.insert(idx);
183                        if !data.is_empty() {
184                            results.extend_from_slice(&data);
185                        }
186                        if chunk_indices.contains(&idx) {
187                            batch_done += 1;
188                        }
189                    }
190                    Err(_) => break,
191                }
192            }
193        }
194        drop(tx);
195
196        // Drain any remaining results from threads that finished after batch loops.
197        // Use a brief recv_timeout to catch threads completing just after the
198        // batch loops ended, rather than breaking on the first empty try_recv.
199        let drain_deadline = Instant::now() + Duration::from_millis(50);
200        while received.len() < scheduled.len() {
201            let remaining = drain_deadline.saturating_duration_since(Instant::now());
202            if remaining.is_zero() {
203                break;
204            }
205            match rx.recv_timeout(remaining) {
206                Ok((idx, data)) => {
207                    received.insert(idx);
208                    if !data.is_empty() {
209                        results.extend_from_slice(&data);
210                    }
211                }
212                Err(_) => break,
213            }
214        }
215
216        // Back off any sources that did not respond in time.
217        let backoff_for = Duration::from_secs(SOURCE_TIMEOUT_BACKOFF_SECS);
218        let timeout_mark = Instant::now() + backoff_for;
219        for idx in scheduled {
220            if received.contains(&idx) {
221                continue;
222            }
223
224            {
225                let mut bo = self.backoff_until.lock().unwrap();
226                bo.insert(idx, timeout_mark);
227            }
228
229            if let Ok(mut ss) = self.sources[idx].try_lock() {
230                ss.failures += 1;
231                ss.healthy = false;
232            }
233        }
234
235        let n = results.len();
236        self.buffer.lock().unwrap().extend_from_slice(&results);
237        n
238    }
239
240    /// Collect entropy only from sources whose names are in the given list.
241    /// Uses parallel threads. Collects 1000 samples per source.
242    pub fn collect_enabled(&self, enabled_names: &[String]) -> usize {
243        self.collect_enabled_n(enabled_names, 1000)
244    }
245
246    /// Collect `n_samples` of entropy from sources whose names are in the list.
247    /// Smaller `n_samples` values are faster — use this for interactive/TUI contexts.
248    ///
249    /// Uses detached threads with a 10-second timeout to avoid blocking
250    /// indefinitely on slow or hung sources.
251    pub fn collect_enabled_n(&self, enabled_names: &[String], n_samples: usize) -> usize {
252        let timeout = Duration::from_secs(10);
253        let (tx, rx) = std::sync::mpsc::channel::<(usize, Vec<u8>)>();
254
255        let mut to_launch: Vec<(usize, Arc<Mutex<SourceState>>)> = Vec::new();
256        for (idx, ss_mutex) in self.sources.iter().enumerate() {
257            let matches = {
258                let ss = ss_mutex.lock().unwrap();
259                enabled_names.iter().any(|n| n == ss.source.info().name)
260            };
261            if matches {
262                to_launch.push((idx, Arc::clone(ss_mutex)));
263            }
264        }
265
266        if to_launch.is_empty() {
267            return 0;
268        }
269
270        let total = to_launch.len();
271        let max_concurrent = num_cpus().min(16);
272        let mut results = Vec::new();
273        let mut received = HashSet::new();
274
275        for chunk in to_launch.chunks(max_concurrent) {
276            let batch_start = Instant::now();
277            let chunk_indices: HashSet<usize> = chunk.iter().map(|&(idx, _)| idx).collect();
278
279            for &(idx, ref ss_mutex) in chunk {
280                let tx = tx.clone();
281                let ss_mutex = Arc::clone(ss_mutex);
282                std::thread::spawn(move || {
283                    let data = Self::collect_one_n(&ss_mutex, n_samples);
284                    let _ = tx.send((idx, data));
285                });
286            }
287
288            let mut batch_done = 0;
289            while batch_done < chunk.len() {
290                let remaining = timeout.saturating_sub(batch_start.elapsed());
291                if remaining.is_zero() {
292                    break;
293                }
294                match rx.recv_timeout(remaining) {
295                    Ok((idx, data)) => {
296                        received.insert(idx);
297                        if !data.is_empty() {
298                            results.extend_from_slice(&data);
299                        }
300                        if chunk_indices.contains(&idx) {
301                            batch_done += 1;
302                        }
303                    }
304                    Err(_) => break,
305                }
306            }
307        }
308        drop(tx);
309
310        // Brief drain for threads finishing just after batch loops.
311        let drain_deadline = Instant::now() + Duration::from_millis(50);
312        while received.len() < total {
313            let remaining = drain_deadline.saturating_duration_since(Instant::now());
314            if remaining.is_zero() {
315                break;
316            }
317            match rx.recv_timeout(remaining) {
318                Ok((idx, data)) => {
319                    received.insert(idx);
320                    if !data.is_empty() {
321                        results.extend_from_slice(&data);
322                    }
323                }
324                Err(_) => break,
325            }
326        }
327
328        let n = results.len();
329        self.buffer.lock().unwrap().extend_from_slice(&results);
330        n
331    }
332
333    /// Collect raw bytes from selected sources while preserving source boundaries.
334    ///
335    /// A single shared timeout budget is applied across the whole sweep, so
336    /// callers can bound wall-clock latency even when many slow sources are enabled.
337    #[doc(hidden)]
338    pub fn collect_enabled_raw_n(
339        &self,
340        enabled_names: &[String],
341        timeout_secs: f64,
342        n_samples: usize,
343    ) -> HashMap<String, Vec<u8>> {
344        let timeout = Duration::from_secs_f64(timeout_secs.max(0.0));
345        if timeout.is_zero() || n_samples == 0 {
346            return HashMap::new();
347        }
348
349        let now = Instant::now();
350        let mut to_launch: Vec<(usize, Arc<Mutex<SourceState>>)> = Vec::new();
351        let mut source_names = HashMap::new();
352
353        for (idx, ss_mutex) in self.sources.iter().enumerate() {
354            let source_name = {
355                let ss = ss_mutex.lock().unwrap();
356                let name = ss.source.info().name;
357                if !enabled_names.iter().any(|enabled| enabled == name) {
358                    continue;
359                }
360                name.to_string()
361            };
362
363            let in_backoff = {
364                let backoff = self.backoff_until.lock().unwrap();
365                backoff.get(&idx).is_some_and(|until| now < *until)
366            };
367            if in_backoff {
368                continue;
369            }
370
371            {
372                let mut in_flight = self.in_flight.lock().unwrap();
373                if in_flight.contains(&idx) {
374                    continue;
375                }
376                in_flight.insert(idx);
377            }
378
379            source_names.insert(idx, source_name);
380            to_launch.push((idx, Arc::clone(ss_mutex)));
381        }
382
383        if to_launch.is_empty() {
384            return HashMap::new();
385        }
386
387        let max_concurrent = num_cpus().min(16);
388        let (tx, rx) = std::sync::mpsc::channel::<(usize, Vec<u8>)>();
389        let mut results = HashMap::new();
390        let mut received = HashSet::new();
391        let mut scheduled = Vec::new();
392        let sweep_deadline = Instant::now()
393            .checked_add(timeout)
394            .unwrap_or_else(Instant::now);
395
396        'batches: for chunk in to_launch.chunks(max_concurrent) {
397            if sweep_deadline
398                .saturating_duration_since(Instant::now())
399                .is_zero()
400            {
401                break;
402            }
403
404            let chunk_indices: HashSet<usize> = chunk.iter().map(|&(idx, _)| idx).collect();
405            for &(idx, ref src) in chunk {
406                scheduled.push(idx);
407
408                let tx = tx.clone();
409                let src = Arc::clone(src);
410                let in_flight = Arc::clone(&self.in_flight);
411
412                std::thread::spawn(move || {
413                    let data = Self::collect_one_n(&src, n_samples);
414                    {
415                        let mut in_flight = in_flight.lock().unwrap();
416                        in_flight.remove(&idx);
417                    }
418                    let _ = tx.send((idx, data));
419                });
420            }
421
422            let mut batch_done = 0;
423            while batch_done < chunk.len() {
424                let remaining = sweep_deadline.saturating_duration_since(Instant::now());
425                if remaining.is_zero() {
426                    break 'batches;
427                }
428                match rx.recv_timeout(remaining) {
429                    Ok((idx, data)) => {
430                        received.insert(idx);
431                        results.insert(idx, data);
432                        if chunk_indices.contains(&idx) {
433                            batch_done += 1;
434                        }
435                    }
436                    Err(_) => break 'batches,
437                }
438            }
439        }
440        drop(tx);
441
442        while received.len() < scheduled.len() {
443            let remaining = sweep_deadline.saturating_duration_since(Instant::now());
444            if remaining.is_zero() {
445                break;
446            }
447            match rx.recv_timeout(remaining.min(Duration::from_millis(50))) {
448                Ok((idx, data)) => {
449                    received.insert(idx);
450                    results.insert(idx, data);
451                }
452                Err(_) => break,
453            }
454        }
455
456        let backoff_for = Duration::from_secs(SOURCE_TIMEOUT_BACKOFF_SECS);
457        let timeout_mark = Instant::now() + backoff_for;
458        for idx in scheduled {
459            if received.contains(&idx) {
460                continue;
461            }
462
463            {
464                let mut bo = self.backoff_until.lock().unwrap();
465                bo.insert(idx, timeout_mark);
466            }
467
468            if let Ok(mut ss) = self.sources[idx].try_lock() {
469                ss.failures += 1;
470                ss.healthy = false;
471            }
472        }
473
474        let mut raw_by_source = HashMap::new();
475        for (idx, data) in results {
476            if data.is_empty() {
477                continue;
478            }
479            if let Some(source_name) = source_names.remove(&idx) {
480                raw_by_source.insert(source_name, data);
481            }
482        }
483        raw_by_source
484    }
485
486    fn collect_one_n(ss_mutex: &Arc<Mutex<SourceState>>, n_samples: usize) -> Vec<u8> {
487        // Clone the Arc<dyn EntropySource> so we can release the mutex during
488        // the (potentially slow) collect() call. This allows health_report()
489        // and TUI reads to proceed without blocking on source collection.
490        let source = {
491            let ss = ss_mutex.lock().unwrap();
492            Arc::clone(&ss.source)
493        };
494
495        let t0 = Instant::now();
496        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| source.collect(n_samples))) {
497            Ok(data) if !data.is_empty() => {
498                let mut ss = ss_mutex.lock().unwrap();
499                ss.last_collect_time = t0.elapsed();
500                ss.total_bytes += data.len() as u64;
501                ss.last_entropy = quick_shannon(&data);
502                ss.last_min_entropy = quick_min_entropy(&data);
503                ss.last_autocorrelation = quick_autocorrelation_lag1(&data);
504                ss.healthy = ss.last_entropy > 1.0;
505                data
506            }
507            Ok(_) => {
508                let mut ss = ss_mutex.lock().unwrap();
509                ss.last_collect_time = t0.elapsed();
510                ss.failures += 1;
511                ss.healthy = false;
512                Vec::new()
513            }
514            Err(_) => {
515                let mut ss = ss_mutex.lock().unwrap();
516                ss.last_collect_time = t0.elapsed();
517                ss.failures += 1;
518                ss.healthy = false;
519                Vec::new()
520            }
521        }
522    }
523
524    /// Return up to `n_bytes` of raw, unconditioned entropy (XOR-combined only).
525    ///
526    /// No SHA-256, no DRBG, no whitening. Preserves the raw hardware noise
527    /// signal for researchers studying actual device entropy characteristics.
528    ///
529    /// If sources cannot provide enough bytes after several collection rounds,
530    /// this returns the available bytes rather than blocking indefinitely.
531    pub fn get_raw_bytes(&self, n_bytes: usize) -> Vec<u8> {
532        const MAX_COLLECTION_ROUNDS: usize = 8;
533
534        let mut rounds = 0usize;
535        loop {
536            let ready = { self.buffer.lock().unwrap().len() >= n_bytes };
537            if ready || rounds >= MAX_COLLECTION_ROUNDS {
538                break;
539            }
540
541            let n = self.collect_all();
542            rounds += 1;
543            if n == 0 {
544                std::thread::sleep(Duration::from_millis(1));
545            }
546        }
547
548        let mut buf = self.buffer.lock().unwrap();
549        let take = n_bytes.min(buf.len());
550        if take == 0 {
551            return Vec::new();
552        }
553        let output: Vec<u8> = buf.drain(..take).collect();
554        drop(buf);
555        *self.total_output.lock().unwrap() += take as u64;
556        output
557    }
558
559    /// Return `n_bytes` of conditioned random output.
560    pub fn get_random_bytes(&self, n_bytes: usize) -> Vec<u8> {
561        // Auto-collect if buffer is low
562        {
563            let buf = self.buffer.lock().unwrap();
564            if buf.len() < n_bytes * 2 {
565                drop(buf);
566                self.collect_all();
567            }
568        }
569
570        let mut output = Vec::with_capacity(n_bytes);
571        while output.len() < n_bytes {
572            let mut counter = self.counter.lock().unwrap();
573            *counter += 1;
574            let cnt = *counter;
575            drop(counter);
576
577            // Take up to 256 bytes from buffer
578            let sample = {
579                let mut buf = self.buffer.lock().unwrap();
580                let take = buf.len().min(256);
581                let sample: Vec<u8> = buf.drain(..take).collect();
582                sample
583            };
584
585            // SHA-256 conditioning
586            let mut h = Sha256::new();
587            let state = self.state.lock().unwrap();
588            h.update(*state);
589            drop(state);
590            h.update(&sample);
591            h.update(cnt.to_le_bytes());
592
593            let ts = std::time::SystemTime::now()
594                .duration_since(std::time::UNIX_EPOCH)
595                .unwrap_or_default();
596            h.update(ts.as_nanos().to_le_bytes());
597
598            // Mix in OS entropy as safety net
599            let mut os_random = [0u8; 8];
600            getrandom(&mut os_random);
601            h.update(os_random);
602
603            let digest: [u8; 32] = h.finalize().into();
604            output.extend_from_slice(&digest);
605
606            // Derive state separately from output for forward secrecy.
607            let mut sh = Sha256::new();
608            sh.update(digest);
609            sh.update(b"openentropy_state");
610            let new_state: [u8; 32] = sh.finalize().into();
611            *self.state.lock().unwrap() = new_state;
612        }
613
614        *self.total_output.lock().unwrap() += n_bytes as u64;
615        output.truncate(n_bytes);
616        output
617    }
618
619    /// Return `n_bytes` of entropy with the specified conditioning mode.
620    ///
621    /// - `Raw`: XOR-combined source bytes, no whitening
622    /// - `VonNeumann`: debiased but structure-preserving
623    /// - `Sha256`: full cryptographic conditioning (default)
624    pub fn get_bytes(
625        &self,
626        n_bytes: usize,
627        mode: crate::conditioning::ConditioningMode,
628    ) -> Vec<u8> {
629        use crate::conditioning::ConditioningMode;
630        match mode {
631            ConditioningMode::Raw => self.get_raw_bytes(n_bytes),
632            ConditioningMode::VonNeumann => {
633                // VN debiasing yields ~25% of input, so collect 6x
634                let raw = self.get_raw_bytes(n_bytes * 6);
635                crate::conditioning::condition(&raw, n_bytes, ConditioningMode::VonNeumann)
636            }
637            ConditioningMode::Sha256 => self.get_random_bytes(n_bytes),
638        }
639    }
640
641    /// Health report as structured data.
642    pub fn health_report(&self) -> HealthReport {
643        let mut sources = Vec::new();
644        let mut healthy_count = 0;
645        let mut total_raw = 0u64;
646
647        for ss_mutex in &self.sources {
648            let ss = ss_mutex.lock().unwrap();
649            if ss.healthy {
650                healthy_count += 1;
651            }
652            total_raw += ss.total_bytes;
653            sources.push(SourceHealth {
654                name: ss.source.name().to_string(),
655                healthy: ss.healthy,
656                bytes: ss.total_bytes,
657                entropy: ss.last_entropy,
658                min_entropy: ss.last_min_entropy,
659                autocorrelation: ss.last_autocorrelation,
660                time: ss.last_collect_time.as_secs_f64(),
661                failures: ss.failures,
662            });
663        }
664
665        HealthReport {
666            healthy: healthy_count,
667            total: self.sources.len(),
668            raw_bytes: total_raw,
669            output_bytes: *self.total_output.lock().unwrap(),
670            buffer_size: self.buffer.lock().unwrap().len(),
671            sources,
672        }
673    }
674
675    /// Pretty-print health report.
676    pub fn print_health(&self) {
677        let r = self.health_report();
678        println!("\n{}", "=".repeat(60));
679        println!("ENTROPY POOL HEALTH REPORT");
680        println!("{}", "=".repeat(60));
681        println!("Sources: {}/{} healthy", r.healthy, r.total);
682        println!("Raw collected: {} bytes", r.raw_bytes);
683        println!(
684            "Output: {} bytes | Buffer: {} bytes",
685            r.output_bytes, r.buffer_size
686        );
687        println!(
688            "\n{:<25} {:>4} {:>10} {:>6} {:>6} {:>7} {:>5}",
689            "Source", "OK", "Bytes", "H", "H∞", "Time", "Fail"
690        );
691        println!("{}", "-".repeat(68));
692        for s in &r.sources {
693            let ok = if s.healthy { "✓" } else { "✗" };
694            println!(
695                "{:<25} {:>4} {:>10} {:>5.2} {:>5.2} {:>6.3}s {:>5}",
696                s.name, ok, s.bytes, s.entropy, s.min_entropy, s.time, s.failures
697            );
698        }
699    }
700
701    /// Collect entropy from a single named source and return conditioned bytes.
702    ///
703    /// Returns `None` if the source name doesn't match any registered source.
704    pub fn get_source_bytes(
705        &self,
706        source_name: &str,
707        n_bytes: usize,
708        mode: crate::conditioning::ConditioningMode,
709    ) -> Option<Vec<u8>> {
710        if n_bytes == 0 {
711            return Some(Vec::new());
712        }
713
714        let ss_mutex = self
715            .sources
716            .iter()
717            .find(|ss_mutex| {
718                let ss = ss_mutex.lock().unwrap();
719                ss.source.info().name == source_name
720            })
721            .cloned()?;
722
723        let n_samples = match mode {
724            crate::conditioning::ConditioningMode::Raw => n_bytes,
725            crate::conditioning::ConditioningMode::VonNeumann => n_bytes * 6,
726            crate::conditioning::ConditioningMode::Sha256 => n_bytes * 4 + 64,
727        };
728        let raw = Self::collect_one_n(&ss_mutex, n_samples);
729        if raw.is_empty() {
730            return None; // Source failed to produce output
731        }
732        let output = crate::conditioning::condition(&raw, n_bytes, mode);
733        Some(output)
734    }
735
736    /// Collect raw bytes from a single named source.
737    ///
738    /// Returns `None` if no source matches the name.
739    pub fn get_source_raw_bytes(&self, source_name: &str, n_samples: usize) -> Option<Vec<u8>> {
740        let ss_mutex = self.sources.iter().find(|ss_mutex| {
741            let ss = ss_mutex.lock().unwrap();
742            ss.source.info().name == source_name
743        })?;
744
745        let raw = Self::collect_one_n(ss_mutex, n_samples);
746        if raw.is_empty() {
747            return None; // Source failed to produce output
748        }
749        Some(raw)
750    }
751
752    /// List all registered source names.
753    pub fn source_names(&self) -> Vec<String> {
754        self.sources
755            .iter()
756            .map(|ss_mutex| {
757                let ss = ss_mutex.lock().unwrap();
758                ss.source.info().name.to_string()
759            })
760            .collect()
761    }
762
763    /// Get source info for each registered source.
764    pub fn source_infos(&self) -> Vec<SourceInfoSnapshot> {
765        self.sources
766            .iter()
767            .map(|ss_mutex| {
768                let ss = ss_mutex.lock().unwrap();
769                let info = ss.source.info();
770                SourceInfoSnapshot {
771                    name: info.name.to_string(),
772                    description: info.description.to_string(),
773                    physics: info.physics.to_string(),
774                    category: info.category.to_string(),
775                    platform: info.platform.to_string(),
776                    requirements: info.requirements.iter().map(|r| r.to_string()).collect(),
777                    entropy_rate_estimate: info.entropy_rate_estimate,
778                    composite: info.composite,
779                    config: ss.source.config_options(),
780                }
781            })
782            .collect()
783    }
784
785    /// Call a function on a named source, returning `None` if no match.
786    pub fn with_source<F, R>(&self, name: &str, f: F) -> Option<R>
787    where
788        F: FnOnce(&dyn EntropySource) -> R,
789    {
790        self.sources
791            .iter()
792            .find(|ss| ss.lock().unwrap().source.info().name == name)
793            .map(|ss| f(&*ss.lock().unwrap().source))
794    }
795}
796
797/// Fill buffer with OS random bytes via the `getrandom` crate.
798/// Works cross-platform (Unix, Windows, WASM, etc.) without manual file I/O.
799///
800/// # Panics
801/// Panics if the OS CSPRNG fails — this indicates a fatal platform issue.
802fn getrandom(buf: &mut [u8]) {
803    getrandom::fill(buf).expect("OS CSPRNG failed");
804}
805
806/// Number of logical CPUs (for concurrency limits).
807fn num_cpus() -> usize {
808    std::thread::available_parallelism()
809        .map(|n| n.get())
810        .unwrap_or(4)
811}
812
813/// Overall health report for the entropy pool.
814#[derive(Debug, Clone)]
815pub struct HealthReport {
816    /// Number of healthy sources.
817    pub healthy: usize,
818    /// Total number of registered sources.
819    pub total: usize,
820    /// Total raw bytes collected across all sources.
821    pub raw_bytes: u64,
822    /// Total conditioned output bytes produced.
823    pub output_bytes: u64,
824    /// Current internal buffer size in bytes.
825    pub buffer_size: usize,
826    /// Per-source health details.
827    pub sources: Vec<SourceHealth>,
828}
829
830/// Health status of a single entropy source.
831#[derive(Debug, Clone)]
832pub struct SourceHealth {
833    /// Source name.
834    pub name: String,
835    /// Whether the source is currently healthy (entropy > 1.0 bits/byte).
836    pub healthy: bool,
837    /// Total bytes collected from this source.
838    pub bytes: u64,
839    /// Shannon entropy of the last collection (bits per byte, max 8.0).
840    pub entropy: f64,
841    /// Min-entropy of the last collection (bits per byte, max 8.0). More conservative than Shannon.
842    pub min_entropy: f64,
843    /// Lag-1 autocorrelation of the last collection ([-1, 1], 0 = ideal).
844    pub autocorrelation: f64,
845    /// Time taken for the last collection in seconds.
846    pub time: f64,
847    /// Number of collection failures.
848    pub failures: u64,
849}
850
851/// Snapshot of source metadata for external consumption.
852#[derive(Debug, Clone)]
853pub struct SourceInfoSnapshot {
854    /// Source name.
855    pub name: String,
856    /// Human-readable description.
857    pub description: String,
858    /// Physics explanation.
859    pub physics: String,
860    /// Source category.
861    pub category: String,
862    /// Target platform.
863    pub platform: String,
864    /// Hardware/software requirements.
865    pub requirements: Vec<String>,
866    /// Estimated entropy rate.
867    pub entropy_rate_estimate: f64,
868    /// Whether this is a composite source.
869    pub composite: bool,
870    /// Runtime configuration keys and current values.
871    pub config: Vec<(&'static str, String)>,
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    use crate::source::{Platform, SourceCategory, SourceInfo};
878
879    // -----------------------------------------------------------------------
880    // Mock entropy source for testing
881    // -----------------------------------------------------------------------
882
883    /// A deterministic mock entropy source that returns predictable data.
884    struct MockSource {
885        info: SourceInfo,
886        data: Vec<u8>,
887    }
888
889    impl MockSource {
890        fn new(name: &'static str, data: Vec<u8>) -> Self {
891            Self {
892                info: SourceInfo {
893                    name,
894                    description: "mock source",
895                    physics: "deterministic test data",
896                    category: SourceCategory::System,
897                    platform: Platform::Any,
898                    requirements: &[],
899                    entropy_rate_estimate: 1.0,
900                    composite: false,
901                    is_fast: true,
902                },
903                data,
904            }
905        }
906    }
907
908    impl EntropySource for MockSource {
909        fn info(&self) -> &SourceInfo {
910            &self.info
911        }
912        fn is_available(&self) -> bool {
913            true
914        }
915        fn collect(&self, n_samples: usize) -> Vec<u8> {
916            self.data.iter().copied().cycle().take(n_samples).collect()
917        }
918    }
919
920    /// A mock source that always fails (returns empty).
921    struct FailingSource {
922        info: SourceInfo,
923    }
924
925    impl FailingSource {
926        fn new(name: &'static str) -> Self {
927            Self {
928                info: SourceInfo {
929                    name,
930                    description: "failing mock",
931                    physics: "always fails",
932                    category: SourceCategory::System,
933                    platform: Platform::Any,
934                    requirements: &[],
935                    entropy_rate_estimate: 0.0,
936                    composite: false,
937                    is_fast: true,
938                },
939            }
940        }
941    }
942
943    impl EntropySource for FailingSource {
944        fn info(&self) -> &SourceInfo {
945            &self.info
946        }
947        fn is_available(&self) -> bool {
948            true
949        }
950        fn collect(&self, _n_samples: usize) -> Vec<u8> {
951            Vec::new()
952        }
953    }
954
955    struct SlowSource {
956        info: SourceInfo,
957        delay: Duration,
958        value: u8,
959    }
960
961    impl SlowSource {
962        fn new(name: &'static str, delay: Duration, value: u8) -> Self {
963            Self {
964                info: SourceInfo {
965                    name,
966                    description: "slow mock",
967                    physics: "delayed deterministic test data",
968                    category: SourceCategory::System,
969                    platform: Platform::Any,
970                    requirements: &[],
971                    entropy_rate_estimate: 1.0,
972                    composite: false,
973                    is_fast: false,
974                },
975                delay,
976                value,
977            }
978        }
979    }
980
981    impl EntropySource for SlowSource {
982        fn info(&self) -> &SourceInfo {
983            &self.info
984        }
985        fn is_available(&self) -> bool {
986            true
987        }
988        fn collect(&self, n_samples: usize) -> Vec<u8> {
989            std::thread::sleep(self.delay);
990            vec![self.value; n_samples]
991        }
992    }
993
994    // -----------------------------------------------------------------------
995    // Pool creation tests
996    // -----------------------------------------------------------------------
997
998    #[test]
999    fn test_pool_new_empty() {
1000        let pool = EntropyPool::new(None);
1001        assert_eq!(pool.source_count(), 0);
1002    }
1003
1004    #[test]
1005    fn test_pool_new_with_seed() {
1006        let pool = EntropyPool::new(Some(b"test seed"));
1007        assert_eq!(pool.source_count(), 0);
1008    }
1009
1010    #[test]
1011    fn test_pool_add_source() {
1012        let mut pool = EntropyPool::new(Some(b"test"));
1013        pool.add_source(Box::new(MockSource::new("mock1", vec![42])));
1014        assert_eq!(pool.source_count(), 1);
1015    }
1016
1017    #[test]
1018    fn test_pool_add_multiple_sources() {
1019        let mut pool = EntropyPool::new(Some(b"test"));
1020        pool.add_source(Box::new(MockSource::new("mock1", vec![1])));
1021        pool.add_source(Box::new(MockSource::new("mock2", vec![2])));
1022        pool.add_source(Box::new(MockSource::new("mock3", vec![3])));
1023        assert_eq!(pool.source_count(), 3);
1024    }
1025
1026    // -----------------------------------------------------------------------
1027    // Collection tests
1028    // -----------------------------------------------------------------------
1029
1030    #[test]
1031    fn test_collect_all_returns_bytes() {
1032        let mut pool = EntropyPool::new(Some(b"test"));
1033        pool.add_source(Box::new(MockSource::new("mock1", vec![0xAA, 0xBB, 0xCC])));
1034        let n = pool.collect_all();
1035        assert!(n > 0, "Should have collected some bytes");
1036    }
1037
1038    #[test]
1039    fn test_collect_all_parallel_with_timeout() {
1040        let mut pool = EntropyPool::new(Some(b"test"));
1041        pool.add_source(Box::new(MockSource::new("mock1", vec![1, 2])));
1042        pool.add_source(Box::new(MockSource::new("mock2", vec![3, 4])));
1043        let n = pool.collect_all_parallel(5.0);
1044        assert!(n > 0);
1045    }
1046
1047    #[test]
1048    fn test_collect_enabled_filters_sources() {
1049        let mut pool = EntropyPool::new(Some(b"test"));
1050        pool.add_source(Box::new(MockSource::new("alpha", vec![1])));
1051        pool.add_source(Box::new(MockSource::new("beta", vec![2])));
1052
1053        let enabled = vec!["alpha".to_string()];
1054        let n = pool.collect_enabled(&enabled);
1055        assert!(n > 0, "Should collect from enabled source");
1056    }
1057
1058    #[test]
1059    fn test_collect_enabled_no_match() {
1060        let mut pool = EntropyPool::new(Some(b"test"));
1061        pool.add_source(Box::new(MockSource::new("alpha", vec![1])));
1062
1063        let enabled = vec!["nonexistent".to_string()];
1064        let n = pool.collect_enabled(&enabled);
1065        assert_eq!(n, 0, "No sources should match");
1066    }
1067
1068    #[test]
1069    fn test_collect_enabled_raw_n_preserves_source_boundaries() {
1070        let mut pool = EntropyPool::new(Some(b"test"));
1071        pool.add_source(Box::new(MockSource::new("alpha", vec![1, 2, 3])));
1072        pool.add_source(Box::new(MockSource::new("beta", vec![9, 8, 7])));
1073
1074        let enabled = vec!["alpha".to_string(), "beta".to_string()];
1075        let results = pool.collect_enabled_raw_n(&enabled, 1.0, 4);
1076
1077        assert_eq!(results.len(), 2);
1078        assert_eq!(results.get("alpha").unwrap(), &vec![1, 2, 3, 1]);
1079        assert_eq!(results.get("beta").unwrap(), &vec![9, 8, 7, 9]);
1080    }
1081
1082    #[test]
1083    fn test_collect_enabled_raw_n_respects_shared_timeout_budget() {
1084        let mut pool = EntropyPool::new(Some(b"test"));
1085        pool.add_source(Box::new(MockSource::new("fast", vec![1, 2, 3, 4])));
1086        pool.add_source(Box::new(SlowSource::new(
1087            "slow",
1088            Duration::from_millis(500),
1089            7,
1090        )));
1091
1092        let enabled = vec!["fast".to_string(), "slow".to_string()];
1093        let start = Instant::now();
1094        let results = pool.collect_enabled_raw_n(&enabled, 0.01, 4);
1095        let elapsed = start.elapsed();
1096
1097        assert!(
1098            elapsed < Duration::from_millis(250),
1099            "shared timeout budget should return well before the slow source finishes: {elapsed:?}"
1100        );
1101        assert_eq!(results.get("fast").unwrap(), &vec![1, 2, 3, 4]);
1102        assert!(!results.contains_key("slow"));
1103    }
1104
1105    #[test]
1106    fn test_collect_all_parallel_keeps_backoff_after_worker_finishes() {
1107        let mut pool = EntropyPool::new(Some(b"test"));
1108        pool.add_source(Box::new(SlowSource::new(
1109            "slow",
1110            Duration::from_millis(500),
1111            7,
1112        )));
1113
1114        assert_eq!(pool.collect_all_parallel_n(0.01, 4), 0);
1115        std::thread::sleep(Duration::from_millis(600));
1116
1117        let backoff_until = pool.backoff_until.lock().unwrap().get(&0).copied();
1118        assert!(backoff_until.is_some());
1119        assert!(backoff_until.unwrap() > Instant::now());
1120
1121        let retry_started = Instant::now();
1122        assert_eq!(pool.collect_all_parallel_n(0.01, 4), 0);
1123        assert!(retry_started.elapsed() < Duration::from_millis(50));
1124    }
1125
1126    #[test]
1127    fn test_collect_enabled_raw_n_keeps_backoff_after_worker_finishes() {
1128        let mut pool = EntropyPool::new(Some(b"test"));
1129        pool.add_source(Box::new(SlowSource::new(
1130            "slow",
1131            Duration::from_millis(500),
1132            7,
1133        )));
1134
1135        let enabled = vec!["slow".to_string()];
1136        assert!(pool.collect_enabled_raw_n(&enabled, 0.01, 4).is_empty());
1137        std::thread::sleep(Duration::from_millis(600));
1138
1139        let backoff_until = pool.backoff_until.lock().unwrap().get(&0).copied();
1140        assert!(backoff_until.is_some());
1141        assert!(backoff_until.unwrap() > Instant::now());
1142
1143        let retry_started = Instant::now();
1144        assert!(pool.collect_enabled_raw_n(&enabled, 0.01, 4).is_empty());
1145        assert!(retry_started.elapsed() < Duration::from_millis(50));
1146    }
1147
1148    // -----------------------------------------------------------------------
1149    // Byte output tests
1150    // -----------------------------------------------------------------------
1151
1152    #[test]
1153    fn test_get_raw_bytes_length() {
1154        let mut pool = EntropyPool::new(Some(b"test"));
1155        pool.add_source(Box::new(MockSource::new("mock", (0..=255).collect())));
1156        let bytes = pool.get_raw_bytes(64);
1157        assert_eq!(bytes.len(), 64);
1158    }
1159
1160    #[test]
1161    fn test_get_random_bytes_length() {
1162        let mut pool = EntropyPool::new(Some(b"test"));
1163        pool.add_source(Box::new(MockSource::new("mock", (0..=255).collect())));
1164        let bytes = pool.get_random_bytes(64);
1165        assert_eq!(bytes.len(), 64);
1166    }
1167
1168    #[test]
1169    fn test_get_random_bytes_various_sizes() {
1170        let mut pool = EntropyPool::new(Some(b"test"));
1171        pool.add_source(Box::new(MockSource::new("mock", (0..=255).collect())));
1172        for size in [1, 16, 32, 64, 100, 256] {
1173            let bytes = pool.get_random_bytes(size);
1174            assert_eq!(bytes.len(), size, "Expected {size} bytes");
1175        }
1176    }
1177
1178    #[test]
1179    fn test_get_bytes_raw_mode() {
1180        let mut pool = EntropyPool::new(Some(b"test"));
1181        pool.add_source(Box::new(MockSource::new("mock", (0..=255).collect())));
1182        let bytes = pool.get_bytes(32, crate::conditioning::ConditioningMode::Raw);
1183        assert_eq!(bytes.len(), 32);
1184    }
1185
1186    #[test]
1187    fn test_get_bytes_sha256_mode() {
1188        let mut pool = EntropyPool::new(Some(b"test"));
1189        pool.add_source(Box::new(MockSource::new("mock", (0..=255).collect())));
1190        let bytes = pool.get_bytes(32, crate::conditioning::ConditioningMode::Sha256);
1191        assert_eq!(bytes.len(), 32);
1192    }
1193
1194    #[test]
1195    fn test_get_bytes_von_neumann_mode() {
1196        let mut pool = EntropyPool::new(Some(b"test"));
1197        pool.add_source(Box::new(MockSource::new("mock", (0..=255).collect())));
1198        let bytes = pool.get_bytes(16, crate::conditioning::ConditioningMode::VonNeumann);
1199        // VonNeumann may produce fewer bytes due to debiasing yield
1200        assert!(bytes.len() <= 16);
1201    }
1202
1203    // -----------------------------------------------------------------------
1204    // Health report tests
1205    // -----------------------------------------------------------------------
1206
1207    #[test]
1208    fn test_health_report_empty_pool() {
1209        let pool = EntropyPool::new(Some(b"test"));
1210        let report = pool.health_report();
1211        assert_eq!(report.total, 0);
1212        assert_eq!(report.healthy, 0);
1213        assert_eq!(report.raw_bytes, 0);
1214        assert_eq!(report.output_bytes, 0);
1215        assert_eq!(report.buffer_size, 0);
1216        assert!(report.sources.is_empty());
1217    }
1218
1219    #[test]
1220    fn test_health_report_after_collection() {
1221        let mut pool = EntropyPool::new(Some(b"test"));
1222        pool.add_source(Box::new(MockSource::new(
1223            "good_source",
1224            (0..=255).collect(),
1225        )));
1226        pool.collect_all();
1227        let report = pool.health_report();
1228        assert_eq!(report.total, 1);
1229        assert!(report.raw_bytes > 0);
1230        assert_eq!(report.sources.len(), 1);
1231        assert_eq!(report.sources[0].name, "good_source");
1232        assert!(report.sources[0].bytes > 0);
1233    }
1234
1235    #[test]
1236    fn test_health_report_failing_source() {
1237        let mut pool = EntropyPool::new(Some(b"test"));
1238        pool.add_source(Box::new(FailingSource::new("bad_source")));
1239        pool.collect_all();
1240        let report = pool.health_report();
1241        assert_eq!(report.total, 1);
1242        assert_eq!(report.healthy, 0);
1243        assert!(!report.sources[0].healthy);
1244        assert_eq!(report.sources[0].failures, 1);
1245    }
1246
1247    #[test]
1248    fn test_health_report_mixed_sources() {
1249        let mut pool = EntropyPool::new(Some(b"test"));
1250        pool.add_source(Box::new(MockSource::new("good", (0..=255).collect())));
1251        pool.add_source(Box::new(FailingSource::new("bad")));
1252        pool.collect_all();
1253        let report = pool.health_report();
1254        assert_eq!(report.total, 2);
1255        // The good source should be healthy if its entropy > 1.0
1256        assert!(report.healthy >= 1);
1257        assert_eq!(report.sources.len(), 2);
1258    }
1259
1260    #[test]
1261    fn test_health_report_tracks_output_bytes() {
1262        let mut pool = EntropyPool::new(Some(b"test"));
1263        pool.add_source(Box::new(MockSource::new("mock", (0..=255).collect())));
1264        let _ = pool.get_random_bytes(64);
1265        let report = pool.health_report();
1266        assert!(report.output_bytes >= 64);
1267    }
1268
1269    // -----------------------------------------------------------------------
1270    // Source info snapshot tests
1271    // -----------------------------------------------------------------------
1272
1273    #[test]
1274    fn test_source_infos_empty() {
1275        let pool = EntropyPool::new(Some(b"test"));
1276        let infos = pool.source_infos();
1277        assert!(infos.is_empty());
1278    }
1279
1280    #[test]
1281    fn test_source_infos_populated() {
1282        let mut pool = EntropyPool::new(Some(b"test"));
1283        pool.add_source(Box::new(MockSource::new("test_src", vec![1])));
1284        let infos = pool.source_infos();
1285        assert_eq!(infos.len(), 1);
1286        assert_eq!(infos[0].name, "test_src");
1287        assert_eq!(infos[0].description, "mock source");
1288        assert_eq!(infos[0].category, "system");
1289        assert!((infos[0].entropy_rate_estimate - 1.0).abs() < f64::EPSILON);
1290    }
1291
1292    // -----------------------------------------------------------------------
1293    // Determinism / seed tests
1294    // -----------------------------------------------------------------------
1295
1296    #[test]
1297    fn test_different_seeds_differ() {
1298        let mut pool1 = EntropyPool::new(Some(b"seed_a"));
1299        pool1.add_source(Box::new(MockSource::new("m", vec![42; 100])));
1300        let mut pool2 = EntropyPool::new(Some(b"seed_b"));
1301        pool2.add_source(Box::new(MockSource::new("m", vec![42; 100])));
1302
1303        let bytes1 = pool1.get_random_bytes(32);
1304        let bytes2 = pool2.get_random_bytes(32);
1305        assert_ne!(
1306            bytes1, bytes2,
1307            "Different seeds should produce different output"
1308        );
1309    }
1310
1311    // -----------------------------------------------------------------------
1312    // Edge case tests
1313    // -----------------------------------------------------------------------
1314
1315    #[test]
1316    fn test_collect_from_empty_pool() {
1317        let pool = EntropyPool::new(Some(b"test"));
1318        let n = pool.collect_all();
1319        assert_eq!(n, 0, "Empty pool should collect 0 bytes");
1320    }
1321
1322    #[test]
1323    fn test_collect_enabled_empty_list() {
1324        let mut pool = EntropyPool::new(Some(b"test"));
1325        pool.add_source(Box::new(MockSource::new("mock", vec![1])));
1326        let n = pool.collect_enabled(&[]);
1327        assert_eq!(n, 0);
1328    }
1329}