1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::fmt::Debug;
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
use reqwest::Response;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::select;
use tokio::sync;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;

use crate::{ChunkManager, DownloadError, DownloadedLenChangeNotify, DownloadingEndCause};

#[derive(Debug)]
pub struct SingleDownload {
    cancel_token: CancellationToken,
    downloaded_len_sender: Arc<sync::watch::Sender<u64>>,
    pub content_length: Option<u64>,
}

impl SingleDownload {
    pub fn new(
        cancel_token: CancellationToken,
        downloaded_len_sender: Arc<sync::watch::Sender<u64>>,
        content_length: Option<u64>,
    ) -> Self {
        Self {
            cancel_token,
            downloaded_len_sender,
            content_length,
        }
    }

    pub async fn download(
        &self,
        file: Arc<Mutex<File>>,
        response: Box<Response>,
        retry_count: u8,
        downloaded_len_receiver: Option<Arc<dyn DownloadedLenChangeNotify>>,
        buffer_size: usize,
    ) -> Result<DownloadingEndCause, DownloadError> {
        use futures_util::StreamExt;
        let mut chunk_bytes = Vec::with_capacity(buffer_size);
        let future = async {
            let mut stream = response.bytes_stream();
            while let Some(bytes) = stream.next().await {
                let bytes: Bytes = {
                    // 因为无法断点续传,所以无法重试
                    match bytes {
                        Ok(bytes) => bytes,
                        Err(err) => {
                            return Err(DownloadError::HttpRequestFailed(err));
                        }
                    }
                };
                let len = bytes.len();

                // 超过缓冲大小就写入磁盘
                if chunk_bytes.len() + len > chunk_bytes.capacity() {
                    let mut file = file.lock().await;
                    file.write_all(&chunk_bytes).await?;
                    file.flush().await?;
                    file.sync_all().await?;
                    chunk_bytes.clear();
                }

                chunk_bytes.extend(bytes);
                self.downloaded_len_sender.send_modify(|n| *n += len as u64);
                if let Some(downloaded_len_receiver) = downloaded_len_receiver.as_ref() {
                    match downloaded_len_receiver.receive_len(len) {
                        None => {}
                        Some(r) => r.await,
                    };
                }
            }
            Result::<(), DownloadError>::Ok(())
        };
        Ok(select! {
            r = future => {
                r?;
                let mut file = file.lock().await;
                file.write_all(&chunk_bytes).await?;
                file.flush().await?;
                file.sync_all().await?;
                DownloadingEndCause::DownloadFinished
            }
            _ = self.cancel_token.cancelled() => {DownloadingEndCause::Cancelled}
        })
    }
}

pub enum DownloadWay {
    Ranges(Arc<ChunkManager>),
    Single(SingleDownload),
}