use futures::task::{Spawn, SpawnExt};
use futures_intrusive::channel::shared::*;
use num_cpus;
use crate::repository::{Chunk, ChunkID, Compression, Encryption, Key, HMAC};
#[derive(Debug)]
struct Message {
compression: Compression,
encryption: Encryption,
hmac: HMAC,
key: Key,
ret_chunk: OneshotSender<Vec<Chunk>>,
ret_id: Option<OneshotSender<Vec<ChunkID>>>,
}
#[derive(Clone)]
pub struct Pipeline {
input: Sender<(Vec<Vec<u8>>, Message)>,
input_id: Sender<(Vec<ChunkID>, Vec<Vec<u8>>, Message)>,
}
impl Pipeline {
#[allow(clippy::too_many_lines)]
pub fn new(pool: impl Spawn) -> Pipeline {
let base_threads = match num_cpus::get() / 2 {
0 => 1,
x => x,
};
let heavy_count = base_threads;
let light_count = base_threads;
let (input, id_rx) = channel(50);
let (id_tx, compress_rx) = channel(50);
let input_id = id_tx.clone();
let (compress_tx, enc_rx) = channel(50);
let (enc_tx, mac_rx) = channel(50);
for _ in 0..light_count {
let id_rx = id_rx.clone();
let id_tx = id_tx.clone();
let compress_tx = compress_tx.clone();
let enc_tx = enc_tx.clone();
pool.spawn(async move {
while let Some(input) = id_rx.receive().await {
let (data, mut message): (Vec<Vec<u8>>, Message) = input;
let mut cids = Vec::new();
for chunk in &data {
let id = message.hmac.id(&chunk[..], &message.key);
cids.push(ChunkID::new(&id[..]));
}
message.ret_id.take().unwrap().send(cids.clone()).unwrap();
let compression = message.compression;
let encryption = message.encryption;
let next = (cids, data, message);
if compression == Compression::NoCompression {
if encryption == Encryption::NoEncryption {
enc_tx.send(next).await.unwrap();
} else {
compress_tx.send(next).await.unwrap();
}
} else {
id_tx.send(next).await.unwrap();
}
}
})
.expect("Spawing a ChunkID task failed!");
}
for _ in 0..heavy_count {
let compress_rx = compress_rx.clone();
let compress_tx = compress_tx.clone();
let enc_tx = enc_tx.clone();
pool.spawn(async move {
while let Some(input) = compress_rx.receive().await {
let (cids, data, message) = input;
let mut cdatas = Vec::new();
for chunk in data {
let cdata = message.compression.compress(chunk);
cdatas.push(cdata);
}
let encryption = message.encryption;
let next = (cids, cdatas, message);
if encryption == Encryption::NoEncryption {
enc_tx.send(next).await.unwrap();
} else {
compress_tx.send(next).await.unwrap();
}
}
})
.expect("Spawing a compression task failed!");
}
for _ in 0..heavy_count {
let enc_rx = enc_rx.clone();
let enc_tx = enc_tx.clone();
pool.spawn(async move {
while let Some(input) = enc_rx.receive().await {
let (cids, data, message) = input;
let mut edatas = Vec::new();
for chunk in data {
let edata = message.encryption.encrypt(&chunk[..], &message.key);
edatas.push(edata);
}
enc_tx.send((cids, edatas, message)).await.unwrap();
}
})
.expect("Spawining an encryption task failed!");
}
for _ in 0..light_count {
let mac_rx = mac_rx.clone();
pool.spawn(async move {
while let Some(input) = mac_rx.receive().await {
let (cids, data, message) = input;
let mut chunks = Vec::new();
for (index, chunk) in data.into_iter().enumerate() {
let mac = message.hmac.mac(&chunk, &message.key);
let chunk = Chunk::from_parts(
chunk,
message.compression,
message.encryption,
message.hmac,
mac,
cids[index],
);
chunks.push(chunk)
}
message.ret_chunk.send(chunks).unwrap();
}
})
.expect("Spawning a MAC task failed!");
}
Pipeline { input, input_id }
}
pub async fn process(
&self,
data: Vec<u8>,
compression: Compression,
encryption: Encryption,
hmac: HMAC,
key: Key,
) -> (ChunkID, Chunk) {
let (c_tx, c_rx) = oneshot_channel();
let (id_tx, id_rx) = oneshot_channel();
let message = Message {
compression,
encryption,
hmac,
key,
ret_chunk: c_tx,
ret_id: Some(id_tx),
};
let input = self.input.clone();
input.send((vec![data], message)).await.unwrap();
(
id_rx.receive().await.unwrap()[0],
c_rx.receive().await.unwrap().into_iter().next().unwrap(),
)
}
pub async fn process_with_id(
&self,
data: Vec<u8>,
id: ChunkID,
compression: Compression,
encryption: Encryption,
hmac: HMAC,
key: Key,
) -> Chunk {
let (c_tx, c_rx) = oneshot_channel();
let message = Message {
compression,
encryption,
hmac,
key,
ret_chunk: c_tx,
ret_id: None,
};
let input = self.input_id.clone();
input.send((vec![id], vec![data], message)).await.unwrap();
c_rx.receive().await.unwrap().into_iter().next().unwrap()
}
pub async fn process_multiple(
&self,
data: Vec<Vec<u8>>,
compression: Compression,
encryption: Encryption,
hmac: HMAC,
key: Key,
) -> Vec<Chunk> {
let (c_tx, c_rx) = oneshot_channel();
let (id_tx, id_rx) = oneshot_channel();
let message = Message {
compression,
encryption,
hmac,
key,
ret_chunk: c_tx,
ret_id: Some(id_tx),
};
let input = self.input.clone();
input.send((data, message)).await.unwrap();
id_rx.receive().await;
c_rx.receive().await.unwrap()
}
}