downloader_rs/
download_service.rs1use std::collections::{VecDeque};
2use std::sync::{Arc};
3use std::thread;
4use std::thread::JoinHandle;
5use std::time::Duration;
6use reqwest::{Client, ClientBuilder};
7use parking_lot::RwLock;
8use tokio::runtime;
9use tokio::time::sleep;
10use tokio_util::sync::CancellationToken;
11use crate::download_configuration::DownloadConfiguration;
12use crate::download_operation::DownloadOperation;
13use crate::download_tracker;
14use crate::downloader::{Downloader};
15
16
17type DownloaderQueue = VecDeque<Arc<Downloader>>;
18
19pub struct DownloadService {
20 multi_thread: bool,
21 worker_thread_count: usize,
22 cancel_token: CancellationToken,
23 parallel_count: Arc<RwLock<usize>>,
24 download_queue: Arc<RwLock<DownloaderQueue>>,
25 thread_handle: Option<JoinHandle<()>>,
26 client: Arc<Client>,
27}
28
29impl DownloadService {
30 pub fn new() -> Self {
31 let client = ClientBuilder::new()
32 .use_rustls_tls()
33 .build()
34 .unwrap();
35 Self {
36 multi_thread: false,
37 worker_thread_count: 4,
38 download_queue: Arc::new(RwLock::new(DownloaderQueue::new())),
39 parallel_count: Arc::new(RwLock::new(32)),
40 thread_handle: None,
41 cancel_token: CancellationToken::new(),
42 client: Arc::new(client),
43 }
44 }
45
46 pub fn start_service(&mut self) {
47 let cancel_token = self.cancel_token.clone();
48 let queue = self.download_queue.clone();
49 let parallel_count = self.parallel_count.clone();
50 let worker_thread_count = self.worker_thread_count;
51 let multi_thread = self.multi_thread;
52 let handle = thread::spawn(move || {
53 let rt = match multi_thread {
54 true => {
55 runtime::Builder::new_multi_thread()
56 .worker_threads(worker_thread_count)
57 .enable_all()
58 .build()
59 .expect("runtime build failed")
60 }
61 false => {
62 runtime::Builder::new_current_thread()
63 .enable_all()
64 .build()
65 .expect("runtime build failed")
66 }
67 };
68
69 rt.block_on(async {
70 let mut downloading_count = 0;
71 let mut downloadings = Vec::new();
72 while !cancel_token.is_cancelled() {
73 while downloading_count < *parallel_count.read() && queue.read().len() > 0 {
74 if let Some(downloader) = queue.write().pop_front() {
75 let downloader_clone = downloader.clone();
76 if !downloader.is_pending_async().await {
77 continue;
78 }
79 let _ = &mut downloadings.push(downloader_clone);
80 downloading_count += 1;
81 downloader.start_download();
82 }
83 }
84 for i in (0..downloadings.len()).rev() {
85 let downloader = downloadings.get(i).unwrap();
86 if downloader.is_done() {
87 downloadings.remove(i);
88 downloading_count -= 1;
89 }
90 }
91 if downloadings.len() > *parallel_count.read() {
92 let mut remove_count = downloadings.len() - *parallel_count.read();
93 while remove_count > 0 {
94 let index = downloadings.len() - 1;
95 let downloader = downloadings.get(index).unwrap();
96 downloader.stop_async().await;
97 downloader.pending_async().await;
98 queue.write().push_back(downloader.clone());
99 downloadings.remove(downloadings.len() - 1);
100 remove_count -= 1;
101 downloading_count -= 1;
102 }
103 }
104 sleep(Duration::from_millis(300)).await;
105 }
106 })
107 });
108
109 self.thread_handle = Some(handle);
110 }
111
112 pub fn set_multi_thread(mut self, multi_thread: bool) -> DownloadService {
113 self.multi_thread = multi_thread;
114 self
115 }
116
117 pub fn set_worker_thread_count(mut self, worker_thread_count: usize) -> DownloadService {
118 self.worker_thread_count = worker_thread_count;
119 self
120 }
121
122 pub fn set_parallel_count(&mut self, parallel_count: usize) {
123 *self.parallel_count.write() = parallel_count;
124 }
125
126 pub fn add_downloader(&mut self, config: DownloadConfiguration) -> DownloadOperation {
127 let (tx, rx) = download_tracker::new(config.download_in_memory);
128 let mut downloader = Downloader::new(config, self.client.clone(), Arc::new(tx));
129 downloader.pending();
130 let downloader = Arc::new(downloader);
131 self.download_queue.write().push_back(downloader.clone());
132 let operation = DownloadOperation::new(downloader.clone(), rx);
133 return operation;
134 }
135
136 pub fn is_finished(&self) -> bool {
137 if let Some(handle) = &self.thread_handle {
138 return handle.is_finished();
139 }
140 return false;
141 }
142
143 pub fn stop(&self) {
144 self.cancel_token.cancel();
145 }
146}
147
148#[cfg(test)]
149mod test {
150 use std::thread;
151 use std::thread::sleep;
152 use std::time::Duration;
153 use tokio::runtime;
154 use tokio::time::Instant;
155 use crate::download_configuration::DownloadConfiguration;
156 use crate::download_service::DownloadService;
157
158 #[test]
159 pub fn test_download_service() {
160 let mut service = DownloadService::new();
161 service.start_service();
162 let url = "https://lan.sausage.xd.com/servers.txt".to_string();
163 let config = DownloadConfiguration::new()
164 .set_url(&url)
165 .set_download_in_memory(true)
166 .set_retry_times_on_failure(2)
167 .set_timeout(5)
168 .build();
169 let operation = service.add_downloader(config);
170
171
172 while !operation.is_done() {
173 println!("{}", operation.downloaded_size());
174 }
175
176 if operation.is_error() {
177 println!("{}", operation.error());
178 }
179
180 let bytes = operation.bytes();
181 println!("{}", bytes.len());
182
183 service.stop();
184 }
185}