disktest_lib/
stream.rs

1// -*- coding: utf-8 -*-
2//
3// disktest - Storage tester
4//
5// Copyright 2020-2024 Michael Büsch <m@bues.ch>
6//
7// Licensed under the Apache License version 2.0
8// or the MIT license, at your option.
9// SPDX-License-Identifier: Apache-2.0 OR MIT
10//
11
12use crate::bufcache::{BufCache, BufCacheCons};
13use crate::generator::{
14    GeneratorChaCha12, GeneratorChaCha20, GeneratorChaCha8, GeneratorCrc, NextRandom,
15};
16use crate::kdf::kdf;
17use anyhow as ah;
18use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
21use std::sync::mpsc::{channel, Receiver, Sender};
22use std::sync::{Arc, Condvar, Mutex};
23use std::thread;
24
25/// Random data stream algorithm type.
26#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
27pub enum DtStreamType {
28    /// Very weak version of the ChaCha random number generator.
29    ChaCha8,
30    /// Weak version of the ChaCha random number generator.
31    ChaCha12,
32    /// Cryptographically secure version of the ChaCha random number generator.
33    #[default]
34    ChaCha20,
35    /// Very fast by cryptographically unsecure CRC based random number generator.
36    Crc,
37}
38
39/// Data chunk that contains the computed PRNG data.
40pub struct DtStreamChunk {
41    pub data: Option<Vec<u8>>,
42    #[allow(dead_code)] // used in test only.
43    pub index: u8,
44}
45
46/// Thread worker function, that computes the chunks.
47#[allow(clippy::too_many_arguments)]
48fn thread_worker(
49    stype: DtStreamType,
50    chunk_factor: usize,
51    seed: Vec<u8>,
52    thread_id: u32,
53    round_id: u64,
54    mut cache_cons: BufCacheCons,
55    byte_offset: u64,
56    invert_pattern: bool,
57    abort: Arc<AtomicBool>,
58    error: Arc<AtomicBool>,
59    level: Arc<AtomicIsize>,
60    sleep: Arc<(Mutex<bool>, Condvar)>,
61    tx: Sender<DtStreamChunk>,
62) {
63    // Calculate the per-thread-seed from the global seed.
64    let thread_seed = kdf(&seed, thread_id, round_id);
65    drop(seed);
66
67    // Construct the generator algorithm.
68    let mut generator: Box<dyn NextRandom> = match stype {
69        DtStreamType::ChaCha8 => Box::new(GeneratorChaCha8::new(&thread_seed)),
70        DtStreamType::ChaCha12 => Box::new(GeneratorChaCha12::new(&thread_seed)),
71        DtStreamType::ChaCha20 => Box::new(GeneratorChaCha20::new(&thread_seed)),
72        DtStreamType::Crc => Box::new(GeneratorCrc::new(&thread_seed)),
73    };
74
75    // Seek the generator to the specified byte offset.
76    if let Err(e) = generator.seek(byte_offset) {
77        eprintln!("ERROR in generator thread {}: {}", thread_id, e);
78        error.store(true, Ordering::Relaxed);
79        return;
80    }
81
82    // Run the generator work loop.
83    let mut index = 0;
84    let mut cur_level = level.load(Ordering::Relaxed);
85    while !abort.load(Ordering::SeqCst) {
86        if cur_level < DtStream::MAX_THRES {
87            // Get the next chunk from the generator.
88            let size = generator.get_base_size() * chunk_factor;
89            let mut data = cache_cons.pull(size);
90            generator.next(&mut data, chunk_factor);
91            debug_assert_eq!(data.len(), size);
92
93            // Invert the bit pattern, if requested.
94            if invert_pattern {
95                for x in &mut data {
96                    *x ^= 0xFFu8;
97                }
98            }
99
100            let chunk = DtStreamChunk {
101                data: Some(data),
102                index,
103            };
104            index = index.wrapping_add(1);
105
106            // Send the chunk to the main thread.
107            tx.send(chunk).expect("Worker thread: Send failed.");
108            cur_level = level.fetch_add(1, Ordering::Relaxed) + 1;
109        } else {
110            // The chunk buffer is full. Wait...
111            let mut sleeping = sleep.0.lock().expect("Thread Condvar lock poison");
112            *sleeping = true;
113            while *sleeping {
114                sleeping = sleep.1.wait(sleeping).expect("Thread Condvar wait poison");
115            }
116            cur_level = level.load(Ordering::Relaxed);
117        }
118    }
119}
120
121/// PRNG stream.
122pub struct DtStream {
123    stype: DtStreamType,
124    seed: Vec<u8>,
125    invert_pattern: bool,
126    thread_id: u32,
127    round_id: u64,
128    rx: Option<Receiver<DtStreamChunk>>,
129    cache: Rc<RefCell<BufCache>>,
130    is_active: bool,
131    thread_join: Option<thread::JoinHandle<()>>,
132    abort: Arc<AtomicBool>,
133    error: Arc<AtomicBool>,
134    level: Arc<AtomicIsize>,
135    sleep: Arc<(Mutex<bool>, Condvar)>,
136}
137
138impl DtStream {
139    /// Maximum number of chunks that the thread will compute in advance.
140    const MAX_THRES: isize = 10;
141    /// Low watermark for thread wakeup.
142    const LO_THRES: isize = 6;
143
144    pub fn new(
145        stype: DtStreamType,
146        seed: Vec<u8>,
147        invert_pattern: bool,
148        thread_id: u32,
149        round_id: u64,
150        cache: Rc<RefCell<BufCache>>,
151    ) -> DtStream {
152        let abort = Arc::new(AtomicBool::new(false));
153        let error = Arc::new(AtomicBool::new(false));
154        let level = Arc::new(AtomicIsize::new(0));
155        let sleep = Arc::new((Mutex::new(false), Condvar::new()));
156        DtStream {
157            stype,
158            seed,
159            invert_pattern,
160            thread_id,
161            round_id,
162            rx: None,
163            cache,
164            is_active: false,
165            thread_join: None,
166            abort,
167            error,
168            level,
169            sleep,
170        }
171    }
172
173    /// Wake up the worker thread, if it is currently sleeping.
174    fn wake_thread(&self) {
175        let mut sleeping = self.sleep.0.lock().expect("Wake Condvar lock poison");
176        if *sleeping {
177            *sleeping = false;
178            self.sleep.1.notify_one();
179        }
180    }
181
182    /// Stop the worker thread.
183    /// Does nothing, if the thread is not running.
184    fn stop(&mut self) {
185        self.is_active = false;
186        self.abort.store(true, Ordering::SeqCst);
187        self.wake_thread();
188        if let Some(thread_join) = self.thread_join.take() {
189            thread_join.join().expect("Thread join failed");
190        }
191        self.abort.store(false, Ordering::SeqCst);
192    }
193
194    /// Spawn the worker thread.
195    /// Panics, if the thread is already running.
196    fn start(&mut self, byte_offset: u64, chunk_factor: usize) {
197        assert!(!self.is_active);
198        assert!(self.thread_join.is_none());
199
200        // Initialize thread communication
201        self.abort.store(false, Ordering::SeqCst);
202        self.error.store(false, Ordering::SeqCst);
203        self.level.store(0, Ordering::SeqCst);
204        let (tx, rx) = channel();
205        self.rx = Some(rx);
206
207        // Spawn the worker thread.
208        let thread_stype = self.stype;
209        let thread_chunk_factor = chunk_factor;
210        let thread_seed = self.seed.to_vec();
211        let thread_id = self.thread_id;
212        let thread_round_id = self.round_id;
213        let thread_cache_cons = self.cache.borrow_mut().new_consumer(self.thread_id);
214        let thread_byte_offset = byte_offset;
215        let thread_invert_pattern = self.invert_pattern;
216        let thread_abort = Arc::clone(&self.abort);
217        let thread_error = Arc::clone(&self.error);
218        let thread_level = Arc::clone(&self.level);
219        let thread_sleep = Arc::clone(&self.sleep);
220        self.thread_join = Some(thread::spawn(move || {
221            thread_worker(
222                thread_stype,
223                thread_chunk_factor,
224                thread_seed,
225                thread_id,
226                thread_round_id,
227                thread_cache_cons,
228                thread_byte_offset,
229                thread_invert_pattern,
230                thread_abort,
231                thread_error,
232                thread_level,
233                thread_sleep,
234                tx,
235            );
236        }));
237        self.is_active = true;
238    }
239
240    /// Check if the thread exited due to an error.
241    #[inline]
242    fn is_thread_error(&self) -> bool {
243        self.error.load(Ordering::Relaxed)
244    }
245
246    /// Activate the worker thread.
247    pub fn activate(&mut self, byte_offset: u64, chunk_factor: usize) -> ah::Result<()> {
248        self.stop();
249        self.start(byte_offset, chunk_factor);
250
251        Ok(())
252    }
253
254    /// Check if the worker thread is currently running.
255    #[inline]
256    pub fn is_active(&self) -> bool {
257        self.is_active
258    }
259
260    /// Get the chunk base size.
261    pub fn get_chunk_size(&self) -> usize {
262        match self.stype {
263            DtStreamType::ChaCha8 => GeneratorChaCha8::BASE_SIZE,
264            DtStreamType::ChaCha12 => GeneratorChaCha12::BASE_SIZE,
265            DtStreamType::ChaCha20 => GeneratorChaCha20::BASE_SIZE,
266            DtStreamType::Crc => GeneratorCrc::BASE_SIZE,
267        }
268    }
269
270    /// Get the default chunk factor of the selected generator.
271    pub fn get_default_chunk_factor(&self) -> usize {
272        match self.stype {
273            DtStreamType::ChaCha8 => GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
274            DtStreamType::ChaCha12 => GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
275            DtStreamType::ChaCha20 => GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
276            DtStreamType::Crc => GeneratorCrc::DEFAULT_CHUNK_FACTOR,
277        }
278    }
279
280    /// Get the next chunk from the thread.
281    /// Returns None, if no chunk is available, yet.
282    #[inline]
283    pub fn get_chunk(&mut self) -> ah::Result<Option<DtStreamChunk>> {
284        if !self.is_active() {
285            return Err(ah::format_err!("Generator stream is not active."));
286        }
287        if self.is_thread_error() {
288            return Err(ah::format_err!(
289                "Generator stream thread aborted with an error."
290            ));
291        }
292        let Some(rx) = &self.rx else {
293            return Err(ah::format_err!("Generator stream RX channel not present."));
294        };
295        let Ok(chunk) = rx.try_recv() else {
296            // Queue is empty. Wake thread.
297            self.wake_thread();
298            return Ok(None);
299        };
300        if self.level.fetch_sub(1, Ordering::Relaxed) - 1 <= DtStream::LO_THRES {
301            // Queue fill level is low. Wake thread.
302            self.wake_thread();
303        }
304        // We got a chunk.
305        Ok(Some(chunk))
306    }
307}
308
309impl Drop for DtStream {
310    fn drop(&mut self) {
311        self.stop();
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use crate::disktest::DisktestQuiet;
319    use std::time::Duration;
320
321    impl DtStream {
322        pub fn wait_chunk(&mut self) -> DtStreamChunk {
323            loop {
324                if let Some(chunk) = self.get_chunk().unwrap() {
325                    break chunk;
326                }
327                thread::sleep(Duration::from_millis(1));
328            }
329        }
330    }
331
332    fn run_base_test(algorithm: DtStreamType) {
333        println!("stream base test");
334        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
335        let mut s = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
336        s.activate(0, s.get_default_chunk_factor()).unwrap();
337        assert!(s.is_active());
338
339        assert!(s.get_chunk_size() > 0);
340        assert!(s.get_default_chunk_factor() > 0);
341
342        let mut results_first = vec![];
343        for count in 0..5 {
344            let chunk = s.wait_chunk();
345            println!(
346                "{}: index={} data[0]={} (current level = {})",
347                count,
348                chunk.index,
349                chunk.data.as_ref().unwrap()[0],
350                s.level.load(Ordering::Relaxed)
351            );
352            results_first.push(chunk.data.as_ref().unwrap()[0]);
353            assert_eq!(chunk.index, count);
354        }
355        match algorithm {
356            DtStreamType::ChaCha8 => {
357                assert_eq!(results_first, vec![66, 209, 254, 224, 203]);
358            }
359            DtStreamType::ChaCha12 => {
360                assert_eq!(results_first, vec![200, 202, 12, 60, 234]);
361            }
362            DtStreamType::ChaCha20 => {
363                assert_eq!(results_first, vec![206, 236, 87, 55, 170]);
364            }
365            DtStreamType::Crc => {
366                assert_eq!(results_first, vec![108, 99, 114, 196, 213]);
367            }
368        }
369    }
370
371    fn run_offset_test(algorithm: DtStreamType) {
372        println!("stream offset test");
373        // a: start at chunk offset 0
374        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
375        let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
376        a.activate(0, a.get_default_chunk_factor()).unwrap();
377
378        // b: start at chunk offset 1
379        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
380        let mut b = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
381        b.activate(
382            a.get_chunk_size() as u64 * a.get_default_chunk_factor() as u64,
383            a.get_default_chunk_factor(),
384        )
385        .unwrap();
386
387        let achunk = a.wait_chunk();
388        let bchunk = b.wait_chunk();
389        assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
390        let achunk = a.wait_chunk();
391        assert!(achunk.data.as_ref().unwrap() == bchunk.data.as_ref().unwrap());
392    }
393
394    fn run_invert_test(algorithm: DtStreamType) {
395        println!("stream invert test");
396        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
397        let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
398        a.activate(0, a.get_default_chunk_factor()).unwrap();
399        let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
400        let mut b = DtStream::new(algorithm, vec![1, 2, 3], true, 0, 0, cache);
401        b.activate(0, a.get_default_chunk_factor()).unwrap();
402
403        let achunk = a.wait_chunk();
404        let bchunk = b.wait_chunk();
405        let inv_bchunk: Vec<u8> = bchunk
406            .data
407            .as_ref()
408            .unwrap()
409            .iter()
410            .map(|x| x ^ 0xFF)
411            .collect();
412        assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
413        assert!(achunk.data.as_ref().unwrap() == &inv_bchunk);
414    }
415
416    #[test]
417    fn test_chacha8() {
418        let alg = DtStreamType::ChaCha8;
419        run_base_test(alg);
420        run_offset_test(alg);
421        run_invert_test(alg);
422    }
423
424    #[test]
425    fn test_chacha12() {
426        let alg = DtStreamType::ChaCha12;
427        run_base_test(alg);
428        run_offset_test(alg);
429        run_invert_test(alg);
430    }
431
432    #[test]
433    fn test_chacha20() {
434        let alg = DtStreamType::ChaCha20;
435        run_base_test(alg);
436        run_offset_test(alg);
437        run_invert_test(alg);
438    }
439
440    #[test]
441    fn test_crc() {
442        let alg = DtStreamType::Crc;
443        run_base_test(alg);
444        run_offset_test(alg);
445        run_invert_test(alg);
446    }
447}
448
449// vim: ts=4 sw=4 expandtab