1use crate::bufcache::{BufCache, BufCacheCons};
13use crate::generator::{
14 GeneratorChaCha12, GeneratorChaCha20, GeneratorChaCha8, GeneratorCrc, NextRandom,
15};
16use crate::kdf::kdf;
17use anyhow as ah;
18use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
21use std::sync::mpsc::{channel, Receiver, Sender};
22use std::sync::{Arc, Condvar, Mutex};
23use std::thread;
24
25#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
27pub enum DtStreamType {
28 ChaCha8,
30 ChaCha12,
32 #[default]
34 ChaCha20,
35 Crc,
37}
38
39pub struct DtStreamChunk {
41 pub data: Option<Vec<u8>>,
42 #[cfg(test)]
43 pub index: u8,
44}
45
46fn try_lower_thread_priority() {
48 #[cfg(any(target_os = "linux", target_os = "android"))]
50 unsafe {
51 libc::nice(7);
52 }
53}
54
55#[allow(clippy::too_many_arguments)]
57fn thread_worker(
58 stype: DtStreamType,
59 chunk_factor: usize,
60 seed: Vec<u8>,
61 thread_id: u32,
62 round_id: u64,
63 mut cache_cons: BufCacheCons,
64 byte_offset: u64,
65 invert_pattern: bool,
66 abort: Arc<AtomicBool>,
67 error: Arc<AtomicBool>,
68 level: Arc<AtomicIsize>,
69 sleep: Arc<(Mutex<bool>, Condvar)>,
70 tx: Sender<DtStreamChunk>,
71) {
72 let thread_seed = kdf(&seed, thread_id, round_id);
74 drop(seed);
75
76 let mut generator: Box<dyn NextRandom> = match stype {
78 DtStreamType::ChaCha8 => Box::new(GeneratorChaCha8::new(&thread_seed)),
79 DtStreamType::ChaCha12 => Box::new(GeneratorChaCha12::new(&thread_seed)),
80 DtStreamType::ChaCha20 => Box::new(GeneratorChaCha20::new(&thread_seed)),
81 DtStreamType::Crc => Box::new(GeneratorCrc::new(&thread_seed)),
82 };
83
84 if let Err(e) = generator.seek(byte_offset) {
86 eprintln!("ERROR in generator thread {}: {}", thread_id, e);
87 error.store(true, Ordering::Relaxed);
88 return;
89 }
90
91 let chunk_size = generator.get_base_size() * chunk_factor;
92
93 try_lower_thread_priority();
96
97 #[cfg(test)]
99 let mut index = 0;
100 let mut cur_level = level.load(Ordering::Relaxed);
101 while !abort.load(Ordering::Acquire) {
102 if cur_level < DtStream::MAX_THRES {
103 let mut data = cache_cons.pull(chunk_size);
105 generator.next(&mut data, chunk_factor);
106 debug_assert_eq!(data.len(), chunk_size);
107
108 if invert_pattern {
110 for x in &mut data {
111 *x ^= 0xFFu8;
112 }
113 }
114
115 let chunk = DtStreamChunk {
116 data: Some(data),
117 #[cfg(test)]
118 index,
119 };
120 #[cfg(test)]
121 {
122 index = index.wrapping_add(1);
123 }
124
125 tx.send(chunk).expect("Worker thread: Send failed.");
127 cur_level = level.fetch_add(1, Ordering::Relaxed) + 1;
128 } else {
129 let mut sleeping = sleep.0.lock().expect("Thread Condvar lock poison");
131 *sleeping = true;
132 while *sleeping {
133 sleeping = sleep.1.wait(sleeping).expect("Thread Condvar wait poison");
134 }
135 cur_level = level.load(Ordering::Relaxed);
136 }
137 }
138}
139
140pub struct DtStream {
142 stype: DtStreamType,
143 seed: Vec<u8>,
144 invert_pattern: bool,
145 thread_id: u32,
146 round_id: u64,
147 rx: Option<Receiver<DtStreamChunk>>,
148 cache: Rc<RefCell<BufCache>>,
149 is_active: bool,
150 thread_join: Option<thread::JoinHandle<()>>,
151 abort: Arc<AtomicBool>,
152 error: Arc<AtomicBool>,
153 level: Arc<AtomicIsize>,
154 sleep: Arc<(Mutex<bool>, Condvar)>,
155}
156
157impl DtStream {
158 const MAX_THRES: isize = 10;
160 const LO_THRES: isize = 6;
162
163 pub fn new(
164 stype: DtStreamType,
165 seed: Vec<u8>,
166 invert_pattern: bool,
167 thread_id: u32,
168 round_id: u64,
169 cache: Rc<RefCell<BufCache>>,
170 ) -> DtStream {
171 let abort = Arc::new(AtomicBool::new(false));
172 let error = Arc::new(AtomicBool::new(false));
173 let level = Arc::new(AtomicIsize::new(0));
174 let sleep = Arc::new((Mutex::new(false), Condvar::new()));
175 DtStream {
176 stype,
177 seed,
178 invert_pattern,
179 thread_id,
180 round_id,
181 rx: None,
182 cache,
183 is_active: false,
184 thread_join: None,
185 abort,
186 error,
187 level,
188 sleep,
189 }
190 }
191
192 fn wake_thread(&self) {
194 let mut sleeping = self.sleep.0.lock().expect("Wake Condvar lock poison");
195 if *sleeping {
196 *sleeping = false;
197 self.sleep.1.notify_one();
198 }
199 }
200
201 fn stop(&mut self) {
204 self.is_active = false;
205 self.abort.store(true, Ordering::Release);
206 self.wake_thread();
207 if let Some(thread_join) = self.thread_join.take() {
208 thread_join.join().expect("Thread join failed");
209 }
210 self.abort.store(false, Ordering::Release);
211 }
212
213 fn start(&mut self, byte_offset: u64, chunk_factor: usize) {
216 assert!(!self.is_active);
217 assert!(self.thread_join.is_none());
218
219 self.abort.store(false, Ordering::Release);
221 self.error.store(false, Ordering::Release);
222 self.level.store(0, Ordering::Release);
223 let (tx, rx) = channel();
224 self.rx = Some(rx);
225
226 let thread_stype = self.stype;
228 let thread_chunk_factor = chunk_factor;
229 let thread_seed = self.seed.to_vec();
230 let thread_id = self.thread_id;
231 let thread_round_id = self.round_id;
232 let thread_cache_cons = self.cache.borrow_mut().new_consumer(self.thread_id);
233 let thread_byte_offset = byte_offset;
234 let thread_invert_pattern = self.invert_pattern;
235 let thread_abort = Arc::clone(&self.abort);
236 let thread_error = Arc::clone(&self.error);
237 let thread_level = Arc::clone(&self.level);
238 let thread_sleep = Arc::clone(&self.sleep);
239 self.thread_join = Some(thread::spawn(move || {
240 thread_worker(
241 thread_stype,
242 thread_chunk_factor,
243 thread_seed,
244 thread_id,
245 thread_round_id,
246 thread_cache_cons,
247 thread_byte_offset,
248 thread_invert_pattern,
249 thread_abort,
250 thread_error,
251 thread_level,
252 thread_sleep,
253 tx,
254 );
255 }));
256 self.is_active = true;
257 }
258
259 #[inline]
261 fn is_thread_error(&self) -> bool {
262 self.error.load(Ordering::Relaxed)
263 }
264
265 pub fn activate(&mut self, byte_offset: u64, chunk_factor: usize) -> ah::Result<()> {
267 self.stop();
268 self.start(byte_offset, chunk_factor);
269
270 Ok(())
271 }
272
273 #[inline]
275 pub fn is_active(&self) -> bool {
276 self.is_active
277 }
278
279 pub fn get_chunk_size(&self) -> usize {
281 match self.stype {
282 DtStreamType::ChaCha8 => GeneratorChaCha8::BASE_SIZE,
283 DtStreamType::ChaCha12 => GeneratorChaCha12::BASE_SIZE,
284 DtStreamType::ChaCha20 => GeneratorChaCha20::BASE_SIZE,
285 DtStreamType::Crc => GeneratorCrc::BASE_SIZE,
286 }
287 }
288
289 pub fn get_default_chunk_factor(&self) -> usize {
291 match self.stype {
292 DtStreamType::ChaCha8 => GeneratorChaCha8::DEFAULT_CHUNK_FACTOR,
293 DtStreamType::ChaCha12 => GeneratorChaCha12::DEFAULT_CHUNK_FACTOR,
294 DtStreamType::ChaCha20 => GeneratorChaCha20::DEFAULT_CHUNK_FACTOR,
295 DtStreamType::Crc => GeneratorCrc::DEFAULT_CHUNK_FACTOR,
296 }
297 }
298
299 #[inline]
302 pub fn get_chunk(&mut self) -> ah::Result<Option<DtStreamChunk>> {
303 if !self.is_active() {
304 return Err(ah::format_err!("Generator stream is not active."));
305 }
306 if self.is_thread_error() {
307 return Err(ah::format_err!(
308 "Generator stream thread aborted with an error."
309 ));
310 }
311 let Some(rx) = &self.rx else {
312 return Err(ah::format_err!("Generator stream RX channel not present."));
313 };
314 let Ok(chunk) = rx.try_recv() else {
315 self.wake_thread();
317 return Ok(None);
318 };
319 if self.level.fetch_sub(1, Ordering::Relaxed) - 1 <= DtStream::LO_THRES {
320 self.wake_thread();
322 }
323 Ok(Some(chunk))
325 }
326}
327
328impl Drop for DtStream {
329 fn drop(&mut self) {
330 self.stop();
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use crate::disktest::DisktestQuiet;
338 use std::time::Duration;
339
340 impl DtStream {
341 pub fn wait_chunk(&mut self) -> DtStreamChunk {
342 loop {
343 if let Some(chunk) = self.get_chunk().unwrap() {
344 break chunk;
345 }
346 thread::sleep(Duration::from_millis(1));
347 }
348 }
349 }
350
351 fn run_base_test(algorithm: DtStreamType) {
352 println!("stream base test");
353 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
354 let mut s = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
355 s.activate(0, s.get_default_chunk_factor()).unwrap();
356 assert!(s.is_active());
357
358 assert!(s.get_chunk_size() > 0);
359 assert!(s.get_default_chunk_factor() > 0);
360
361 let mut results_first = vec![];
362 for count in 0..5 {
363 let chunk = s.wait_chunk();
364 println!(
365 "{}: index={} data[0]={} (current level = {})",
366 count,
367 chunk.index,
368 chunk.data.as_ref().unwrap()[0],
369 s.level.load(Ordering::Relaxed)
370 );
371 results_first.push(chunk.data.as_ref().unwrap()[0]);
372 assert_eq!(chunk.index, count);
373 }
374 match algorithm {
375 DtStreamType::ChaCha8 => {
376 assert_eq!(results_first, vec![66, 209, 254, 224, 203]);
377 }
378 DtStreamType::ChaCha12 => {
379 assert_eq!(results_first, vec![200, 202, 12, 60, 234]);
380 }
381 DtStreamType::ChaCha20 => {
382 assert_eq!(results_first, vec![206, 236, 87, 55, 170]);
383 }
384 DtStreamType::Crc => {
385 assert_eq!(results_first, vec![108, 99, 114, 196, 213]);
386 }
387 }
388 }
389
390 fn run_offset_test(algorithm: DtStreamType) {
391 println!("stream offset test");
392 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
394 let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
395 a.activate(0, a.get_default_chunk_factor()).unwrap();
396
397 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
399 let mut b = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
400 b.activate(
401 a.get_chunk_size() as u64 * a.get_default_chunk_factor() as u64,
402 a.get_default_chunk_factor(),
403 )
404 .unwrap();
405
406 let achunk = a.wait_chunk();
407 let bchunk = b.wait_chunk();
408 assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
409 let achunk = a.wait_chunk();
410 assert!(achunk.data.as_ref().unwrap() == bchunk.data.as_ref().unwrap());
411 }
412
413 fn run_invert_test(algorithm: DtStreamType) {
414 println!("stream invert test");
415 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
416 let mut a = DtStream::new(algorithm, vec![1, 2, 3], false, 0, 0, cache);
417 a.activate(0, a.get_default_chunk_factor()).unwrap();
418 let cache = Rc::new(RefCell::new(BufCache::new(DisktestQuiet::Normal)));
419 let mut b = DtStream::new(algorithm, vec![1, 2, 3], true, 0, 0, cache);
420 b.activate(0, a.get_default_chunk_factor()).unwrap();
421
422 let achunk = a.wait_chunk();
423 let bchunk = b.wait_chunk();
424 let inv_bchunk: Vec<u8> = bchunk
425 .data
426 .as_ref()
427 .unwrap()
428 .iter()
429 .map(|x| x ^ 0xFF)
430 .collect();
431 assert!(achunk.data.as_ref().unwrap() != bchunk.data.as_ref().unwrap());
432 assert!(achunk.data.as_ref().unwrap() == &inv_bchunk);
433 }
434
435 #[test]
436 fn test_chacha8() {
437 let alg = DtStreamType::ChaCha8;
438 run_base_test(alg);
439 run_offset_test(alg);
440 run_invert_test(alg);
441 }
442
443 #[test]
444 fn test_chacha12() {
445 let alg = DtStreamType::ChaCha12;
446 run_base_test(alg);
447 run_offset_test(alg);
448 run_invert_test(alg);
449 }
450
451 #[test]
452 fn test_chacha20() {
453 let alg = DtStreamType::ChaCha20;
454 run_base_test(alg);
455 run_offset_test(alg);
456 run_invert_test(alg);
457 }
458
459 #[test]
460 fn test_crc() {
461 let alg = DtStreamType::Crc;
462 run_base_test(alg);
463 run_offset_test(alg);
464 run_invert_test(alg);
465 }
466}
467
468