use std::default::Default;
use std::fmt::Debug;
use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use std::{thread, time};
use crate::blocking::aws::{AWS2Client, AWS4Client};
use crate::blocking::{AuthType, S3Client};
use crate::error::Error;
use log::{debug, error, info};
#[derive(Default, Debug, Clone)]
pub struct MultiDownloadParameters(pub usize, pub usize);
pub struct DownloadRequestPool {
ch_data: Option<mpsc::Sender<Box<MultiDownloadParameters>>>,
ch_result: mpsc::Receiver<Result<(MultiDownloadParameters, Vec<u8>), Error>>,
total_worker: usize,
total_jobs: usize,
data: Vec<u8>,
}
#[allow(clippy::needless_lifetimes)]
fn acquire<'a, T>(s: &'a Arc<Mutex<T>>) -> MutexGuard<'a, T>
where
T: Debug,
{
let mut l = s.lock();
while l.is_err() {
thread::sleep(time::Duration::from_millis(1000));
info!("sleep and wait for lock... error: {:?}", l);
l = s.lock();
}
l.expect("lock acuired")
}
#[allow(clippy::too_many_arguments)]
impl DownloadRequestPool {
pub fn new(
auth_type: AuthType,
secure: bool,
access_key: String,
secret_key: String,
host: String,
uri: String,
region: String,
totoal_size: usize,
total_worker: usize,
) -> Self {
let (ch_s, ch_r) = mpsc::channel();
let a_ch_r = Arc::new(Mutex::new(ch_r));
let (ch_result_s, ch_result_r) = mpsc::channel();
let a_ch_result_s = Arc::new(Mutex::new(ch_result_s));
let data = vec![0; totoal_size];
for _ in 0..total_worker {
let a_ch_r2 = a_ch_r.clone();
let a_ch_result_s2 = a_ch_result_s.clone();
let akey = access_key.clone();
let skey = secret_key.clone();
let h = host.clone();
let u = uri.clone();
let r = region.clone();
std::thread::spawn(move || loop {
let s3_client: Box<dyn S3Client> = match auth_type {
AuthType::AWS2 => Box::new(AWS2Client {
tls: secure,
access_key: &akey,
secret_key: &skey,
}),
AuthType::AWS4 => Box::new(AWS4Client {
tls: secure,
access_key: &akey,
secret_key: &skey,
host: &h,
region: r.to_string(),
}),
};
let recv_end = a_ch_r2.lock().expect("worker recv end is expected");
let result_send_back_ch = acquire(&a_ch_result_s2);
loop {
let p: Box<MultiDownloadParameters> = match recv_end.recv() {
Ok(p) => p,
Err(e) => {
let r = acquire(&a_ch_result_s2);
r.send(Err(Error::RequestPoolError(format!("{:?}", e))))
.ok();
drop(r);
return;
}
};
if p.0 == 0 && p.1 == 0 {
drop(recv_end);
drop(result_send_back_ch);
return;
}
info!("Range ({}, {}) downloading...", p.0, p.1);
match s3_client.request(
"GET",
&h,
&u,
&mut Vec::new(),
&mut vec![("range", &format!("bytes={}-{}", p.0, p.1 - 1))],
&Vec::new(),
) {
Ok(result) => {
if result.1.len() == p.1 - p.0 {
let mut send_result =
result_send_back_ch.send(Ok(((*p).clone(), result.1.clone())));
while send_result.is_err() {
info!("send back result error: {:?}", send_result);
thread::sleep(time::Duration::from_millis(1000));
send_result = result_send_back_ch
.send(Ok(((*p).clone(), result.1.clone())));
}
} else {
error!(
"Range ({}, {}) download size not correct {}",
p.0,
p.1,
result.1.len()
);
}
info!("Range ({}, {}) download executed", p.0, p.1);
}
Err(err) => {
info!("Error on downloading Range ({}, {}): {}", p.0, p.1, err);
let rs = acquire(&a_ch_result_s2);
rs.send(Err(err))
.expect("channel is full to handle messages");
drop(rs);
}
};
}
});
}
DownloadRequestPool {
ch_data: Some(ch_s),
total_worker,
ch_result: ch_result_r,
total_jobs: 0,
data,
}
}
pub fn run(&mut self, p: MultiDownloadParameters) {
if let Some(ref ch_s) = self.ch_data {
info!("sending range ({}, {}) request to worker", p.0, p.1);
ch_s.send(Box::new(p))
.expect("channel is full to handle messages");
self.total_jobs += 1;
}
}
pub fn close(&self) {
let mut close_sent = 0;
while let Some(ref ch_s) = self.ch_data {
ch_s.send(Box::new(MultiDownloadParameters {
..Default::default()
}))
.expect("channel is full to handle messages");
close_sent += 1;
if close_sent == self.total_worker {
thread::sleep(time::Duration::from_millis(1000));
info!("request pool closed");
return;
}
}
}
pub fn wait(mut self) -> Result<Vec<u8>, Error> {
let mut results = 0;
self.ch_data.take();
loop {
thread::sleep(time::Duration::from_millis(1000));
let result = self
.ch_result
.recv()
.expect("channel is full to handle messages");
match result {
Ok((para, data)) => {
self.data[para.0..para.1].copy_from_slice(&data);
debug!("{:?}", para);
}
Err(e) => {
error!("{}", e);
}
}
results += 1;
info!("{} job excuted ", results);
if results == self.total_jobs {
self.close();
return Ok(self.data);
}
}
}
}