Skip to main content

dec_cryptor/
parallel_handler.rs

1use aes::Aes256;
2use ctr::cipher::{KeyIvInit, StreamCipher, StreamCipherSeek};
3use ctr::Ctr128BE;
4use rayon::prelude::*;
5
6// AES-256-CTR 类型别名
7pub type Aes256Ctr = Ctr128BE<Aes256>;
8
9// 对于非常小的数据,并行处理的开销可能超过收益,直接单线程处理。
10// 这里的 16KB 是一个经验值,可以根据实际情况调整。
11const PARALLEL_THRESHOLD: usize = 16 * 1024;
12
13/// 通过将缓冲区拆分为多个片段,并利用 Rayon 的并行迭代器将 AES-CTR 密钥流应用到数据上。
14///
15/// 会就地修改 `data`。此方式与单流处理产生完全相同的结果,但利用了多核 CPU 加速。
16///
17/// - key: 32 字节
18/// - iv: 16 字节
19/// - data: 需要原地转换的字节切片
20/// - stream_offset: 在整体流中的绝对字节偏移(用于文件流式处理)
21pub fn ctr_apply_in_parts(
22    key: &[u8],
23    iv: &[u8],
24    data: &mut [u8],
25    stream_offset: usize
26) -> Result<(), String> {
27    let total_len = data.len();
28    if total_len == 0 {
29        return Ok(());
30    }
31
32    let num_parts = crate::crypto_utils::get_parts();
33
34    if num_parts <= 1 || total_len < PARALLEL_THRESHOLD {
35        // 回退到单线程处理
36        let mut cipher = Aes256Ctr::new(key.into(), iv.into());
37        cipher.seek(stream_offset as u128);
38        cipher.apply_keystream(data);
39        return Ok(());
40    }
41
42    // 计算每个数据块的大小,使用向上取整,确保覆盖所有数据
43    let chunk_size = (total_len + num_parts - 1) / num_parts;
44
45    // 将数据分成多个可变切片
46    let mut chunks: Vec<&mut [u8]> = data.chunks_mut(chunk_size).collect();
47
48    // 使用 Rayon 的 `par_iter` 进行并行处理
49    chunks.par_iter_mut()
50        .enumerate() // 获取块的索引,用于计算偏移量
51        .for_each(|(chunk_index, chunk)| {
52            // 为每个并行任务(线程)创建一个新的 cipher 实例。
53            // 这是必须的,因为 cipher 实例内部有状态,不能在线程间共享。
54            let mut cipher = Aes256Ctr::new(key.into(), iv.into());
55
56            // 计算当前块的偏移量
57            let offset = stream_offset + chunk_index * chunk_size;
58
59            // 将 cipher 定位到当前块的正确密钥流位置
60            cipher.seek(offset as u128);
61
62            // 对当前数据块应用密钥流
63            cipher.apply_keystream(chunk);
64        });
65
66    Ok(())
67}