1use crate::bufcache::{BufCache, BufCacheCons};
13use crate::generator::{
14 GeneratorChaCha12, GeneratorChaCha20, GeneratorChaCha8, GeneratorCrc, NextRandom,
15};
16use crate::kdf::kdf;
17use anyhow as ah;
18use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
21use std::sync::mpsc::{channel, Receiver, Sender};
22use std::sync::{Arc, Condvar, Mutex};
23use std::thread;
24
25#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
27pub enum DtStreamType {
28 ChaCha8,
30 ChaCha12,
32 #[default]
34 ChaCha20,
35 Crc,
37}
38
39pub struct DtStreamChunk {
41 pub data: Option<Vec<u8>>,
42 #[allow(dead_code)] pub index: u8,
44}
45
46#[allow(clippy::too_many_arguments)]
48fn thread_worker(
49 stype: DtStreamType,
50 chunk_factor: usize,
51 seed: Vec<u8>,
52 thread_id: u32,
53 round_id: u64,
54 mut cache_cons: BufCacheCons,
55 byte_offset: u64,
56 invert_pattern: bool,
57 abort: Arc<AtomicBool>,
58 error: Arc<AtomicBool>,
59 level: Arc<AtomicIsize>,
60 sleep: Arc<(Mutex<bool>, Condvar)>,
61 tx: Sender<DtStreamChunk>,
62) {
63 let thread_seed = kdf(&seed, thread_id, round_id);
65 drop(seed);
66
67 let mut generator: Box<dyn NextRandom> = match stype {
69 DtStreamType::ChaCha8 => Box::new(GeneratorChaCha8::new(&thread_seed)),
70 DtStreamType::ChaCha12 => Box::new(GeneratorChaCha12::new(&thread_seed)),
71 DtStreamType::ChaCha20 => Box::new(GeneratorChaCha20::new(&thread_seed)),
72 DtStreamType::Crc => Box::new(GeneratorCrc::new(&thread_seed)),
73 };
74
75 if let Err(e) = generator.seek(byte_offset) {
77 eprintln!("ERROR in generator thread {}: {}", thread_id, e);
78 error.store(true, Ordering::Relaxed);
79 return;
80 }
81
82 let mut index = 0;
84 let mut cur_level = level.load(Ordering::Relaxed);
85 while !abort.load(Ordering::SeqCst) {
86 if cur_level < DtStream::MAX_THRES {
87 let size = generator.get_base_size() * chunk_factor;
89 let mut data = cache_cons.pull(size);
90 generator.next(&mut data, chunk_factor);
91 debug_assert_eq!(data.len(), size);
92
93 if invert_pattern {
95 for x in &mut data {
96 *x ^= 0xFFu8;
97 }
98 }
99
100 let chunk = DtStreamChunk {
101 data: Some(data),
102 index,
103 };
104 index = index.wrapping_add(1);
105
106 tx.send(chunk).expect("Worker thread: Send failed.");
108 cur_level = level.fetch_add(1, Ordering::Relaxed) + 1;
109 } else {
110 let mut sleeping = sleep.0.lock().expect("Thread Condvar lock poison");
112 *sleeping = true;
113 while *sleeping {
114 sleeping = sleep.1.wait(sleeping).expect("Thread Condvar wait poison");
115 }
116 cur_level = level.load(Ordering::Relaxed);
117 }
118 }
119}
120
121pub struct DtStream {
123 stype: DtStreamType,
124 seed: Vec<u8>,
125 invert_pattern: bool,
126 thread_id: u32,
127 round_id: u64,
128 rx: Option<Receiver<DtStreamChunk>>,
129 cache: Rc<RefCell<BufCache>>,
130 is_active: bool,
131 thread_join: Option<thread::JoinHandle<()>>,
132 abort: Arc<AtomicBool>,
133 error: Arc<AtomicBool>,
134 level: Arc<AtomicIsize>,
135 sleep: Arc<(Mutex<bool>, Condvar)>,
136}
137
138impl DtStream {
139 const MAX_THRES: isize = 10;
141 const LO_THRES: isize = 6;
143
144 pub fn new(
145 stype: DtStreamType,
146 seed: Vec<u8>,
147 invert_pattern: bool,
148 thread_id: u32,
149 round_id: u64,
150 cache: Rc<RefCell<BufCache>>,
151 ) -> DtStream {
152 let abort = Arc::new(AtomicBool::new(false));
153 let error = Arc::new(AtomicBool::new(false));
154 let level = Arc::new(AtomicIsize::new(0));
155 let sleep = Arc::new((Mutex::new(false), Condvar::new()));
156 DtStream {
157 stype,
158 seed,
159 invert_pattern,
160 thread_id,
161 round_id,
162 rx: None,
163 cache,
164 is_active: false,
165 thread_join: None,
166 abort,
167 error,
168 level,
169 sleep,
170 }
171 }
172
173 fn wake_thread(&self) {
175 let mut sleeping = self.sleep.0.lock().expect("Wake Condvar lock poison");
176 if *sleeping {
177 *sleeping = false;
178 self.sleep.1.notify_one();
179 }
180 }
181
182 fn stop(&mut self) {
185 self.is_active = false;
186 self.abort.store(true, Ordering::SeqCst);
187 self.wake_thread();
188 if let Some(thread_join) = self.thread_join.take() {
189 thread_join.join().expect("Thread join failed");
190 }
191 self.abort.store(false, Ordering::SeqCst);
192 }
193
194 fn start(&mut self, byte_offset: u64, chunk_factor: usize) {
197 assert!(!self.is_active);
198 assert!(self.thread_join.is_none());
199
200 self.abort.store(false, Ordering::SeqCst);
202 self.error.store(false, Ordering::SeqCst);
203 self.level.store(0, Ordering::SeqCst);
204 let (tx, rx) = channel();
205 self.rx = Some(rx);
206
207 let thread_stype = self.stype;
209 let thread_chunk_factor = chunk_factor;
210 let thread_seed = self.seed.to_vec();
211 let thread_id = self.thread_id;
212 let thread_round_id = self.round_id;
213 let thread_cache_cons = self.cache.borrow_mut().new_consumer(self.thread_id);
214 let thread_byte_offset = byte_offset;
215 let thread_invert_pattern = self.invert_pattern;
216 let thread_abort = Arc::clone(&self.abort);
217 let thread_error = Arc::clone(&self.error);
218 let thread_level = Arc::clone(&self.level);
219 let thread_sleep = Arc::clone(&self.sleep);
220 self.thread_join = Some(thread::spawn(move || {
221 thread_worker(
222 thread_stype,
223 thread_chunk_factor,
224 thread_seed,
225 thread_id,
226 thread_round_id,
227 thread_cache_cons,
228 thread_byte_offset,
229 thread_invert_pattern,
230 thread_abort,
231 thread_error,
232 thread_level,
233 thread_sleep,
234 tx,
235 );
236 }));
237 self.is_active = true;
238 }
239
240 #[inline]
242 fn is_thread_error(&self) -> bool {
243 self.error.load(Ordering::Relaxed)
244 }
245
246 pub fn activate(&mut self, byte_offset: u64, chunk_factor: usize) -> ah::Result<()> {
248 self.stop();
249 self.start(byte_offset, chunk_factor);
250
251 Ok(())
252 }
253
254 #[inline]
256 pub fn is_active(&self) -> bool {
257 self.is_active
258 }
259
260 pub fn get_chunk_size(&self) -> usize {
262 match self.stype {
263 DtStreamType::ChaCha8 => GeneratorChaCha8::BASE_SIZE,
264 DtStreamType::ChaCha12 => GeneratorChaCha12::BASE_SIZE,
265 DtStreamType::ChaCha20 => GeneratorChaCha20::BASE_SIZE,
266 DtStreamType::Crc => GeneratorCrc::BASE_SIZE,
267 }
268 }
269
270 pub fn get_default_chunk_factor(&self) -> usize {
272 match self.stype {
273 DtStreamType::ChaCha8 => GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
274 DtStreamType::ChaCha12 => GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
275 DtStreamType::ChaCha20 => GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
276 DtStreamType::Crc => GeneratorCrc::DEFAULT_CHUNK_FACTOR,
277 }
278 }
279
280 #[inline]
283 pub fn get_chunk(&mut self) -> ah::Result<Option<DtStreamChunk>> {
284 if !self.is_active() {
285 return Err(ah::format_err!("Generator stream is not active."));
286 }
287 if self.is_thread_error() {
288 return Err(ah::format_err!(
289 "Generator stream thread aborted with an error."
290 ));
291 }
292 let Some(rx) = &self.rx else {
293 return Err(ah::format_err!("Generator stream RX channel not present."));
294 };
295 let Ok(chunk) = rx.try_recv() else {
296 self.wake_thread();
298 return Ok(None);
299 };
300 if self.level.fetch_sub(1, Ordering::Relaxed) - 1 <= DtStream::LO_THRES {
301 self.wake_thread();
303 }
304 Ok(Some(chunk))
306 }
307}
308
309impl Drop for DtStream {
310 fn drop(&mut self) {
311 self.stop();
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use crate::disktest::DisktestQuiet;
319 use std::time::Duration;
320
321 impl DtStream {
322 pub fn wait_chunk(&mut self) -> DtStreamChunk {
323 loop {
324 if let Some(chunk) = self.get_chunk().unwrap() {
325 break chunk;
326 }
327 thread::sleep(Duration::from_millis(1));
328 }
329 }
330 }
331
332 fn run_base_test(algorithm: DtStreamType) {
333 println!("stream base test");
334 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
335 let mut s = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
336 s.activate(0, s.get_default_chunk_factor()).unwrap();
337 assert!(s.is_active());
338
339 assert!(s.get_chunk_size() > 0);
340 assert!(s.get_default_chunk_factor() > 0);
341
342 let mut results_first = vec![];
343 for count in 0..5 {
344 let chunk = s.wait_chunk();
345 println!(
346 "{}: index={} data[0]={} (current level = {})",
347 count,
348 chunk.index,
349 chunk.data.as_ref().unwrap()[0],
350 s.level.load(Ordering::Relaxed)
351 );
352 results_first.push(chunk.data.as_ref().unwrap()[0]);
353 assert_eq!(chunk.index, count);
354 }
355 match algorithm {
356 DtStreamType::ChaCha8 => {
357 assert_eq!(results_first, vec![66, 209, 254, 224, 203]);
358 }
359 DtStreamType::ChaCha12 => {
360 assert_eq!(results_first, vec![200, 202, 12, 60, 234]);
361 }
362 DtStreamType::ChaCha20 => {
363 assert_eq!(results_first, vec![206, 236, 87, 55, 170]);
364 }
365 DtStreamType::Crc => {
366 assert_eq!(results_first, vec![108, 99, 114, 196, 213]);
367 }
368 }
369 }
370
371 fn run_offset_test(algorithm: DtStreamType) {
372 println!("stream offset test");
373 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
375 let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
376 a.activate(0, a.get_default_chunk_factor()).unwrap();
377
378 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
380 let mut b = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
381 b.activate(
382 a.get_chunk_size() as u64 * a.get_default_chunk_factor() as u64,
383 a.get_default_chunk_factor(),
384 )
385 .unwrap();
386
387 let achunk = a.wait_chunk();
388 let bchunk = b.wait_chunk();
389 assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
390 let achunk = a.wait_chunk();
391 assert!(achunk.data.as_ref().unwrap() == bchunk.data.as_ref().unwrap());
392 }
393
394 fn run_invert_test(algorithm: DtStreamType) {
395 println!("stream invert test");
396 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
397 let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
398 a.activate(0, a.get_default_chunk_factor()).unwrap();
399 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
400 let mut b = DtStream::new(algorithm, vec![1, 2, 3], true, 0, 0, cache);
401 b.activate(0, a.get_default_chunk_factor()).unwrap();
402
403 let achunk = a.wait_chunk();
404 let bchunk = b.wait_chunk();
405 let inv_bchunk: Vec<u8> = bchunk
406 .data
407 .as_ref()
408 .unwrap()
409 .iter()
410 .map(|x| x ^ 0xFF)
411 .collect();
412 assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
413 assert!(achunk.data.as_ref().unwrap() == &inv_bchunk);
414 }
415
416 #[test]
417 fn test_chacha8() {
418 let alg = DtStreamType::ChaCha8;
419 run_base_test(alg);
420 run_offset_test(alg);
421 run_invert_test(alg);
422 }
423
424 #[test]
425 fn test_chacha12() {
426 let alg = DtStreamType::ChaCha12;
427 run_base_test(alg);
428 run_offset_test(alg);
429 run_invert_test(alg);
430 }
431
432 #[test]
433 fn test_chacha20() {
434 let alg = DtStreamType::ChaCha20;
435 run_base_test(alg);
436 run_offset_test(alg);
437 run_invert_test(alg);
438 }
439
440 #[test]
441 fn test_crc() {
442 let alg = DtStreamType::Crc;
443 run_base_test(alg);
444 run_offset_test(alg);
445 run_invert_test(alg);
446 }
447}
448
449