downloader_rs/
download_service.rs

1use std::collections::{VecDeque};
2use std::sync::{Arc};
3use std::thread;
4use std::thread::JoinHandle;
5use std::time::Duration;
6use reqwest::{Client, ClientBuilder};
7use parking_lot::RwLock;
8use tokio::runtime;
9use tokio::time::sleep;
10use tokio_util::sync::CancellationToken;
11use crate::download_configuration::DownloadConfiguration;
12use crate::download_operation::DownloadOperation;
13use crate::download_tracker;
14use crate::downloader::{Downloader};
15
16
17type DownloaderQueue = VecDeque<Arc<Downloader>>;
18
19pub struct DownloadService {
20    multi_thread: bool,
21    worker_thread_count: usize,
22    cancel_token: CancellationToken,
23    parallel_count: Arc<RwLock<usize>>,
24    download_queue: Arc<RwLock<DownloaderQueue>>,
25    thread_handle: Option<JoinHandle<()>>,
26    client: Arc<Client>,
27}
28
29impl DownloadService {
30    pub fn new() -> Self {
31        let client = ClientBuilder::new()
32            .use_rustls_tls()
33            .build()
34            .unwrap();
35        Self {
36            multi_thread: false,
37            worker_thread_count: 4,
38            download_queue: Arc::new(RwLock::new(DownloaderQueue::new())),
39            parallel_count: Arc::new(RwLock::new(32)),
40            thread_handle: None,
41            cancel_token: CancellationToken::new(),
42            client: Arc::new(client),
43        }
44    }
45
46    pub fn start_service(&mut self) {
47        let cancel_token = self.cancel_token.clone();
48        let queue = self.download_queue.clone();
49        let parallel_count = self.parallel_count.clone();
50        let worker_thread_count = self.worker_thread_count;
51        let multi_thread = self.multi_thread;
52        let handle = thread::spawn(move || {
53            let rt = match multi_thread {
54                true => {
55                    runtime::Builder::new_multi_thread()
56                        .worker_threads(worker_thread_count)
57                        .enable_all()
58                        .build()
59                        .expect("runtime build failed")
60                }
61                false => {
62                    runtime::Builder::new_current_thread()
63                        .enable_all()
64                        .build()
65                        .expect("runtime build failed")
66                }
67            };
68
69            rt.block_on(async {
70                let mut downloading_count = 0;
71                let mut downloadings = Vec::new();
72                while !cancel_token.is_cancelled() {
73                    while downloading_count < *parallel_count.read() && queue.read().len() > 0 {
74                        if let Some(downloader) = queue.write().pop_front() {
75                            let downloader_clone = downloader.clone();
76                            if !downloader.is_pending_async().await {
77                                continue;
78                            }
79                            let _ = &mut downloadings.push(downloader_clone);
80                            downloading_count += 1;
81                            downloader.start_download();
82                        }
83                    }
84                    for i in (0..downloadings.len()).rev() {
85                        let downloader = downloadings.get(i).unwrap();
86                        if downloader.is_done() {
87                            downloadings.remove(i);
88                            downloading_count -= 1;
89                        }
90                    }
91                    if downloadings.len() > *parallel_count.read() {
92                        let mut remove_count = downloadings.len() - *parallel_count.read();
93                        while remove_count > 0 {
94                            let index = downloadings.len() - 1;
95                            let downloader = downloadings.get(index).unwrap();
96                            downloader.stop_async().await;
97                            downloader.pending_async().await;
98                            queue.write().push_back(downloader.clone());
99                            downloadings.remove(downloadings.len() - 1);
100                            remove_count -= 1;
101                            downloading_count -= 1;
102                        }
103                    }
104                    sleep(Duration::from_millis(300)).await;
105                }
106            })
107        });
108
109        self.thread_handle = Some(handle);
110    }
111
112    pub fn set_multi_thread(mut self, multi_thread: bool) -> DownloadService {
113        self.multi_thread = multi_thread;
114        self
115    }
116
117    pub fn set_worker_thread_count(mut self, worker_thread_count: usize) -> DownloadService {
118        self.worker_thread_count = worker_thread_count;
119        self
120    }
121
122    pub fn set_parallel_count(&mut self, parallel_count: usize) {
123        *self.parallel_count.write() = parallel_count;
124    }
125
126    pub fn add_downloader(&mut self, config: DownloadConfiguration) -> DownloadOperation {
127        let (tx, rx) = download_tracker::new(config.download_in_memory);
128        let mut downloader = Downloader::new(config, self.client.clone(), Arc::new(tx));
129        downloader.pending();
130        let downloader = Arc::new(downloader);
131        self.download_queue.write().push_back(downloader.clone());
132        let operation = DownloadOperation::new(downloader.clone(), rx);
133        return operation;
134    }
135
136    pub fn is_finished(&self) -> bool {
137        if let Some(handle) = &self.thread_handle {
138            return handle.is_finished();
139        }
140        return false;
141    }
142
143    pub fn stop(&self) {
144        self.cancel_token.cancel();
145    }
146}
147
148#[cfg(test)]
149mod test {
150    use std::thread;
151    use std::thread::sleep;
152    use std::time::Duration;
153    use tokio::runtime;
154    use tokio::time::Instant;
155    use crate::download_configuration::DownloadConfiguration;
156    use crate::download_service::DownloadService;
157
158    #[test]
159    pub fn test_download_service() {
160        let mut service = DownloadService::new();
161        service.start_service();
162        let url = "https://lan.sausage.xd.com/servers.txt".to_string();
163        let config = DownloadConfiguration::new()
164            .set_url(&url)
165            .set_download_in_memory(true)
166            .set_retry_times_on_failure(2)
167            .set_timeout(5)
168            .build();
169        let operation = service.add_downloader(config);
170
171
172        while !operation.is_done() {
173            println!("{}", operation.downloaded_size());
174        }
175
176        if operation.is_error() {
177            println!("{}", operation.error());
178        }
179
180        let bytes = operation.bytes();
181        println!("{}", bytes.len());
182
183        service.stop();
184    }
185}