disktest_lib/
stream.rs

1// -*- coding: utf-8 -*-
2//
3// disktest - Storage tester
4//
5// Copyright 2020-2024 Michael Büsch <m@bues.ch>
6//
7// Licensed under the Apache License version 2.0
8// or the MIT license, at your option.
9// SPDX-License-Identifier: Apache-2.0 OR MIT
10//
11
12use crate::bufcache::{BufCache, BufCacheCons};
13use crate::generator::{
14    GeneratorChaCha12, GeneratorChaCha20, GeneratorChaCha8, GeneratorCrc, NextRandom,
15};
16use crate::kdf::kdf;
17use anyhow as ah;
18use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
21use std::sync::mpsc::{channel, Receiver, Sender};
22use std::sync::{Arc, Condvar, Mutex};
23use std::thread;
24
25/// Random data stream algorithm type.
26#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
27pub enum DtStreamType {
28    /// Very weak version of the ChaCha random number generator.
29    ChaCha8,
30    /// Weak version of the ChaCha random number generator.
31    ChaCha12,
32    /// Cryptographically secure version of the ChaCha random number generator.
33    #[default]
34    ChaCha20,
35    /// Very fast by cryptographically unsecure CRC based random number generator.
36    Crc,
37}
38
39/// Data chunk that contains the computed PRNG data.
40pub struct DtStreamChunk {
41    pub data: Option<Vec<u8>>,
42    #[cfg(test)]
43    pub index: u8,
44}
45
46/// Try to lower the thread priority.
47fn try_lower_thread_priority() {
48    // SAFETY: nice (2) does not affect memory safety.
49    #[cfg(any(target_os = "linux", target_os = "android"))]
50    unsafe {
51        libc::nice(7);
52    }
53}
54
55/// Thread worker function, that computes the chunks.
56#[allow(clippy::too_many_arguments)]
57fn thread_worker(
58    stype: DtStreamType,
59    chunk_factor: usize,
60    seed: Vec<u8>,
61    thread_id: u32,
62    round_id: u64,
63    mut cache_cons: BufCacheCons,
64    byte_offset: u64,
65    invert_pattern: bool,
66    abort: Arc<AtomicBool>,
67    error: Arc<AtomicBool>,
68    level: Arc<AtomicIsize>,
69    sleep: Arc<(Mutex<bool>, Condvar)>,
70    tx: Sender<DtStreamChunk>,
71) {
72    // Calculate the per-thread-seed from the global seed.
73    let thread_seed = kdf(&seed, thread_id, round_id);
74    drop(seed);
75
76    // Construct the generator algorithm.
77    let mut generator: Box<dyn NextRandom> = match stype {
78        DtStreamType::ChaCha8 => Box::new(GeneratorChaCha8::new(&thread_seed)),
79        DtStreamType::ChaCha12 => Box::new(GeneratorChaCha12::new(&thread_seed)),
80        DtStreamType::ChaCha20 => Box::new(GeneratorChaCha20::new(&thread_seed)),
81        DtStreamType::Crc => Box::new(GeneratorCrc::new(&thread_seed)),
82    };
83
84    // Seek the generator to the specified byte offset.
85    if let Err(e) = generator.seek(byte_offset) {
86        eprintln!("ERROR in generator thread {}: {}", thread_id, e);
87        error.store(true, Ordering::Relaxed);
88        return;
89    }
90
91    let chunk_size = generator.get_base_size() * chunk_factor;
92
93    // Try to lower the thread priority
94    // to give the main thread a better chance to run.
95    try_lower_thread_priority();
96
97    // Run the generator work loop.
98    #[cfg(test)]
99    let mut index = 0;
100    let mut cur_level = level.load(Ordering::Relaxed);
101    while !abort.load(Ordering::Acquire) {
102        if cur_level < DtStream::MAX_THRES {
103            // Get the next chunk from the generator.
104            let mut data = cache_cons.pull(chunk_size);
105            generator.next(&mut data, chunk_factor);
106            debug_assert_eq!(data.len(), chunk_size);
107
108            // Invert the bit pattern, if requested.
109            if invert_pattern {
110                for x in &mut data {
111                    *x ^= 0xFFu8;
112                }
113            }
114
115            let chunk = DtStreamChunk {
116                data: Some(data),
117                #[cfg(test)]
118                index,
119            };
120            #[cfg(test)]
121            {
122                index = index.wrapping_add(1);
123            }
124
125            // Send the chunk to the main thread.
126            tx.send(chunk).expect("Worker thread: Send failed.");
127            cur_level = level.fetch_add(1, Ordering::Relaxed) + 1;
128        } else {
129            // The chunk buffer is full. Wait...
130            let mut sleeping = sleep.0.lock().expect("Thread Condvar lock poison");
131            *sleeping = true;
132            while *sleeping {
133                sleeping = sleep.1.wait(sleeping).expect("Thread Condvar wait poison");
134            }
135            cur_level = level.load(Ordering::Relaxed);
136        }
137    }
138}
139
140/// PRNG stream.
141pub struct DtStream {
142    stype: DtStreamType,
143    seed: Vec<u8>,
144    invert_pattern: bool,
145    thread_id: u32,
146    round_id: u64,
147    rx: Option<Receiver<DtStreamChunk>>,
148    cache: Rc<RefCell<BufCache>>,
149    is_active: bool,
150    thread_join: Option<thread::JoinHandle<()>>,
151    abort: Arc<AtomicBool>,
152    error: Arc<AtomicBool>,
153    level: Arc<AtomicIsize>,
154    sleep: Arc<(Mutex<bool>, Condvar)>,
155}
156
157impl DtStream {
158    /// Maximum number of chunks that the thread will compute in advance.
159    const MAX_THRES: isize = 10;
160    /// Low watermark for thread wakeup.
161    const LO_THRES: isize = 6;
162
163    pub fn new(
164        stype: DtStreamType,
165        seed: Vec<u8>,
166        invert_pattern: bool,
167        thread_id: u32,
168        round_id: u64,
169        cache: Rc<RefCell<BufCache>>,
170    ) -> DtStream {
171        let abort = Arc::new(AtomicBool::new(false));
172        let error = Arc::new(AtomicBool::new(false));
173        let level = Arc::new(AtomicIsize::new(0));
174        let sleep = Arc::new((Mutex::new(false), Condvar::new()));
175        DtStream {
176            stype,
177            seed,
178            invert_pattern,
179            thread_id,
180            round_id,
181            rx: None,
182            cache,
183            is_active: false,
184            thread_join: None,
185            abort,
186            error,
187            level,
188            sleep,
189        }
190    }
191
192    /// Wake up the worker thread, if it is currently sleeping.
193    fn wake_thread(&self) {
194        let mut sleeping = self.sleep.0.lock().expect("Wake Condvar lock poison");
195        if *sleeping {
196            *sleeping = false;
197            self.sleep.1.notify_one();
198        }
199    }
200
201    /// Stop the worker thread.
202    /// Does nothing, if the thread is not running.
203    fn stop(&mut self) {
204        self.is_active = false;
205        self.abort.store(true, Ordering::Release);
206        self.wake_thread();
207        if let Some(thread_join) = self.thread_join.take() {
208            thread_join.join().expect("Thread join failed");
209        }
210        self.abort.store(false, Ordering::Release);
211    }
212
213    /// Spawn the worker thread.
214    /// Panics, if the thread is already running.
215    fn start(&mut self, byte_offset: u64, chunk_factor: usize) {
216        assert!(!self.is_active);
217        assert!(self.thread_join.is_none());
218
219        // Initialize thread communication
220        self.abort.store(false, Ordering::Release);
221        self.error.store(false, Ordering::Release);
222        self.level.store(0, Ordering::Release);
223        let (tx, rx) = channel();
224        self.rx = Some(rx);
225
226        // Spawn the worker thread.
227        let thread_stype = self.stype;
228        let thread_chunk_factor = chunk_factor;
229        let thread_seed = self.seed.to_vec();
230        let thread_id = self.thread_id;
231        let thread_round_id = self.round_id;
232        let thread_cache_cons = self.cache.borrow_mut().new_consumer(self.thread_id);
233        let thread_byte_offset = byte_offset;
234        let thread_invert_pattern = self.invert_pattern;
235        let thread_abort = Arc::clone(&self.abort);
236        let thread_error = Arc::clone(&self.error);
237        let thread_level = Arc::clone(&self.level);
238        let thread_sleep = Arc::clone(&self.sleep);
239        self.thread_join = Some(thread::spawn(move || {
240            thread_worker(
241                thread_stype,
242                thread_chunk_factor,
243                thread_seed,
244                thread_id,
245                thread_round_id,
246                thread_cache_cons,
247                thread_byte_offset,
248                thread_invert_pattern,
249                thread_abort,
250                thread_error,
251                thread_level,
252                thread_sleep,
253                tx,
254            );
255        }));
256        self.is_active = true;
257    }
258
259    /// Check if the thread exited due to an error.
260    #[inline]
261    fn is_thread_error(&self) -> bool {
262        self.error.load(Ordering::Relaxed)
263    }
264
265    /// Activate the worker thread.
266    pub fn activate(&mut self, byte_offset: u64, chunk_factor: usize) -> ah::Result<()> {
267        self.stop();
268        self.start(byte_offset, chunk_factor);
269
270        Ok(())
271    }
272
273    /// Check if the worker thread is currently running.
274    #[inline]
275    pub fn is_active(&self) -> bool {
276        self.is_active
277    }
278
279    /// Get the chunk base size.
280    pub fn get_chunk_size(&self) -> usize {
281        match self.stype {
282            DtStreamType::ChaCha8 => GeneratorChaCha8::BASE_SIZE,
283            DtStreamType::ChaCha12 => GeneratorChaCha12::BASE_SIZE,
284            DtStreamType::ChaCha20 => GeneratorChaCha20::BASE_SIZE,
285            DtStreamType::Crc => GeneratorCrc::BASE_SIZE,
286        }
287    }
288
289    /// Get the default chunk factor of the selected generator.
290    pub fn get_default_chunk_factor(&self) -> usize {
291        match self.stype {
292            DtStreamType::ChaCha8 => GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
293            DtStreamType::ChaCha12 => GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
294            DtStreamType::ChaCha20 => GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
295            DtStreamType::Crc => GeneratorCrc::DEFAULT_CHUNK_FACTOR,
296        }
297    }
298
299    /// Get the next chunk from the thread.
300    /// Returns None, if no chunk is available, yet.
301    #[inline]
302    pub fn get_chunk(&mut self) -> ah::Result<Option<DtStreamChunk>> {
303        if !self.is_active() {
304            return Err(ah::format_err!("Generator stream is not active."));
305        }
306        if self.is_thread_error() {
307            return Err(ah::format_err!(
308                "Generator stream thread aborted with an error."
309            ));
310        }
311        let Some(rx) = &self.rx else {
312            return Err(ah::format_err!("Generator stream RX channel not present."));
313        };
314        let Ok(chunk) = rx.try_recv() else {
315            // Queue is empty. Wake thread.
316            self.wake_thread();
317            return Ok(None);
318        };
319        if self.level.fetch_sub(1, Ordering::Relaxed) - 1 <= DtStream::LO_THRES {
320            // Queue fill level is low. Wake thread.
321            self.wake_thread();
322        }
323        // We got a chunk.
324        Ok(Some(chunk))
325    }
326}
327
328impl Drop for DtStream {
329    fn drop(&mut self) {
330        self.stop();
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use crate::disktest::DisktestQuiet;
338    use std::time::Duration;
339
340    impl DtStream {
341        pub fn wait_chunk(&mut self) -> DtStreamChunk {
342            loop {
343                if let Some(chunk) = self.get_chunk().unwrap() {
344                    break chunk;
345                }
346                thread::sleep(Duration::from_millis(1));
347            }
348        }
349    }
350
351    fn run_base_test(algorithm: DtStreamType) {
352        println!("stream base test");
353        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
354        let mut s = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
355        s.activate(0, s.get_default_chunk_factor()).unwrap();
356        assert!(s.is_active());
357
358        assert!(s.get_chunk_size() > 0);
359        assert!(s.get_default_chunk_factor() > 0);
360
361        let mut results_first = vec![];
362        for count in 0..5 {
363            let chunk = s.wait_chunk();
364            println!(
365                "{}: index={} data[0]={} (current level = {})",
366                count,
367                chunk.index,
368                chunk.data.as_ref().unwrap()[0],
369                s.level.load(Ordering::Relaxed)
370            );
371            results_first.push(chunk.data.as_ref().unwrap()[0]);
372            assert_eq!(chunk.index, count);
373        }
374        match algorithm {
375            DtStreamType::ChaCha8 => {
376                assert_eq!(results_first, vec![66, 209, 254, 224, 203]);
377            }
378            DtStreamType::ChaCha12 => {
379                assert_eq!(results_first, vec![200, 202, 12, 60, 234]);
380            }
381            DtStreamType::ChaCha20 => {
382                assert_eq!(results_first, vec![206, 236, 87, 55, 170]);
383            }
384            DtStreamType::Crc => {
385                assert_eq!(results_first, vec![108, 99, 114, 196, 213]);
386            }
387        }
388    }
389
390    fn run_offset_test(algorithm: DtStreamType) {
391        println!("stream offset test");
392        // a: start at chunk offset 0
393        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
394        let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
395        a.activate(0, a.get_default_chunk_factor()).unwrap();
396
397        // b: start at chunk offset 1
398        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
399        let mut b = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
400        b.activate(
401            a.get_chunk_size() as u64 * a.get_default_chunk_factor() as u64,
402            a.get_default_chunk_factor(),
403        )
404        .unwrap();
405
406        let achunk = a.wait_chunk();
407        let bchunk = b.wait_chunk();
408        assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
409        let achunk = a.wait_chunk();
410        assert!(achunk.data.as_ref().unwrap() == bchunk.data.as_ref().unwrap());
411    }
412
413    fn run_invert_test(algorithm: DtStreamType) {
414        println!("stream invert test");
415        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
416        let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
417        a.activate(0, a.get_default_chunk_factor()).unwrap();
418        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
419        let mut b = DtStream::new(algorithm, vec![1, 2, 3], true, 0, 0, cache);
420        b.activate(0, a.get_default_chunk_factor()).unwrap();
421
422        let achunk = a.wait_chunk();
423        let bchunk = b.wait_chunk();
424        let inv_bchunk: Vec<u8> = bchunk
425            .data
426            .as_ref()
427            .unwrap()
428            .iter()
429            .map(|x| x ^ 0xFF)
430            .collect();
431        assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
432        assert!(achunk.data.as_ref().unwrap() == &inv_bchunk);
433    }
434
435    #[test]
436    fn test_chacha8() {
437        let alg = DtStreamType::ChaCha8;
438        run_base_test(alg);
439        run_offset_test(alg);
440        run_invert_test(alg);
441    }
442
443    #[test]
444    fn test_chacha12() {
445        let alg = DtStreamType::ChaCha12;
446        run_base_test(alg);
447        run_offset_test(alg);
448        run_invert_test(alg);
449    }
450
451    #[test]
452    fn test_chacha20() {
453        let alg = DtStreamType::ChaCha20;
454        run_base_test(alg);
455        run_offset_test(alg);
456        run_invert_test(alg);
457    }
458
459    #[test]
460    fn test_crc() {
461        let alg = DtStreamType::Crc;
462        run_base_test(alg);
463        run_offset_test(alg);
464        run_invert_test(alg);
465    }
466}
467
468// vim: ts=4 sw=4 expandtab