downloader_rs/
downloader.rs

1use std::ops::Deref;
2use std::sync::Arc;
3use std::time::Duration;
4use reqwest::Client;
5use parking_lot::RwLock;
6use tokio::{fs, spawn};
7use tokio::sync::watch::Receiver;
8use tokio::task::JoinHandle;
9use tokio::time::sleep;
10use tokio_util::sync::CancellationToken;
11use crate::download_status::{DownloadStatus};
12use crate::download_configuration::DownloadConfiguration;
13use crate::download_sender::DownloadSender;
14use crate::{chunk, chunk_hub, remote_file};
15use crate::error::DownloadError;
16use crate::verify::file_verify::FileVerify;
17use crate::verify::file_verify;
18
19pub struct Downloader {
20    config: Arc<DownloadConfiguration>,
21    client: Arc<Client>,
22    download_status: Arc<RwLock<DownloadStatus>>,
23    cancel_token: CancellationToken,
24    sender: Arc<DownloadSender>,
25    thread_handle: RwLock<Option<JoinHandle<()>>>,
26}
27
28impl Downloader {
29    pub fn new(config: DownloadConfiguration, client: Arc<Client>, sender: Arc<DownloadSender>) -> Downloader {
30        let config = Arc::new(config);
31        let downloader = Downloader {
32            config: config.clone(),
33            client,
34            download_status: Arc::new(RwLock::new(DownloadStatus::None)),
35            cancel_token: CancellationToken::new(),
36            sender,
37            thread_handle: RwLock::new(None),
38        };
39        downloader
40    }
41
42    pub fn start_download(&self) {
43        let config = self.config.clone();
44        let client = self.client.clone();
45        let cancel_token = self.cancel_token.clone();
46        let sender = self.sender.clone();
47        let download_status = self.download_status.clone();
48        let handle = spawn(async move {
49            if let Err(e) = start_download_file(config.clone(),
50                                                client.clone(),
51                                                cancel_token.clone(),
52                                                sender.clone(),
53                                                download_status.clone()).await {
54                sender.error_sender.send(e).unwrap();
55                *download_status.write() = DownloadStatus::Failed;
56                return;
57            }
58
59            if cancel_token.is_cancelled() {
60                return;
61            }
62
63            if !config.download_in_memory {
64                if config.file_verify != FileVerify::None {
65                    *download_status.write() = DownloadStatus::FileVerify;
66                    if let Err(e) = file_verify::file_validate(&config.file_verify, config.get_file_temp_path()).await {
67                        sender.error_sender.send(e).unwrap();
68                        *download_status.write() = DownloadStatus::Failed;
69                        return;
70                    }
71                }
72
73                if let Err(e) = fs::rename(config.get_file_temp_path(), config.get_file_path()).await {
74                    sender.error_sender.send(DownloadError::FileRename(format!("file rename failed {}", e))).unwrap();
75                    *download_status.write() = DownloadStatus::Failed;
76                    return;
77                }
78            }
79
80            *download_status.write() = DownloadStatus::Complete;
81        });
82        *self.thread_handle.write() = Some(handle);
83    }
84
85    pub fn is_done(&self) -> bool {
86        if let Some(handle) = self.thread_handle.read().as_ref() {
87            return handle.is_finished();
88        }
89        return false;
90    }
91
92    pub fn status(&self) -> DownloadStatus {
93        *self.download_status.read()
94    }
95
96    pub async fn is_pending_async(&self) -> bool {
97        return *self.download_status.read() == DownloadStatus::Pending;
98    }
99
100    pub fn pending(&mut self) {
101        *self.download_status.write() = DownloadStatus::Pending;
102    }
103
104    pub async fn pending_async(&self) {
105        *self.download_status.write() = DownloadStatus::Pending;
106    }
107
108    pub fn stop(&self) {
109        self.cancel_token.cancel();
110        *self.download_status.write() = DownloadStatus::Stop;
111    }
112
113    pub async fn stop_async(&self) {
114        self.cancel_token.cancel();
115        *self.download_status.write() = DownloadStatus::Stop;
116    }
117}
118
119async fn start_download_file(
120    config: Arc<DownloadConfiguration>,
121    client: Arc<Client>,
122    cancel_token: CancellationToken,
123    sender: Arc<DownloadSender>,
124    status: Arc<RwLock<DownloadStatus>>) -> crate::error::Result<()> {
125    if cancel_token.is_cancelled() {
126        return Ok(());
127    }
128
129    *status.write() = DownloadStatus::Head;
130
131    let remote_file = remote_file::head(&client, &config).await?;
132
133    if cancel_token.is_cancelled() {
134        return Ok(());
135    }
136
137    *status.write() = DownloadStatus::Download;
138
139    let _ = sender.download_total_size_sender.send(remote_file.total_length);
140
141    let (chunks, receivers) = chunk_hub::validate(&config, remote_file).await?;
142
143    let mut handles = Vec::with_capacity(chunks.len());
144    let chunk_length = chunks.len();
145    for chunk in chunks {
146        if chunk.valid {
147            continue;
148        }
149        let sender = sender.clone();
150        let handle = spawn(
151            chunk::start_download(
152                config.clone(),
153                client.clone(),
154                chunk,
155                sender,
156                cancel_token.clone())
157        );
158        handles.push(handle);
159    }
160
161    fn sync_downloaded_size(receivers: &Arc<Vec<Receiver<u64>>>, sender: &DownloadSender) {
162        let mut downloaded_size = 0u64;
163        for receiver in receivers.deref() {
164            downloaded_size += *receiver.borrow();
165        }
166        let _ = sender.downloaded_size_sender.send(downloaded_size);
167    }
168
169    let receivers = Arc::new(receivers);
170    let sync_downloaded_canceltoken = CancellationToken::new();
171    let downloaded_size_handle = {
172        sync_downloaded_size(&receivers, &sender);
173        let cancel_token = sync_downloaded_canceltoken.clone();
174        let sender = sender.clone();
175        let receivers = receivers.clone();
176        let handle = spawn(async move {
177            loop {
178                if cancel_token.is_cancelled() {
179                    break;
180                }
181                sync_downloaded_size(&receivers, &sender);
182                sleep(Duration::from_millis(100)).await;
183            }
184        });
185        handle
186    };
187
188
189    for handle in handles {
190        match handle.await {
191            Ok(result) => {
192                if let Err(e) = result {
193                    sync_downloaded_canceltoken.cancel();
194                    let _ = downloaded_size_handle.await;
195                    return Err(e);
196                }
197            }
198            Err(_) => {
199                sync_downloaded_canceltoken.cancel();
200                let _ = downloaded_size_handle.await;
201                return Err(DownloadError::ChunkDownloadHandle);
202            }
203        }
204    }
205
206    sync_downloaded_canceltoken.cancel();
207    let _ = downloaded_size_handle.await;
208
209    if cancel_token.is_cancelled() {
210        return Ok(());
211    }
212
213    sync_downloaded_size(&receivers, &sender);
214
215    *status.write() = DownloadStatus::DownloadPost;
216    chunk_hub::on_download_post(&config, chunk_length).await?;
217
218    Ok(())
219}