Skip to main content

fast_down/utils/
puller.rs

1use crate::{
2    FileId, ProgressEntry, PullResult, PullStream,
3    http::{HttpError, HttpPuller},
4};
5use fast_pull::Puller;
6use parking_lot::Mutex;
7use reqwest::{Client, ClientBuilder, Proxy, Response, header::HeaderMap};
8use std::{
9    net::IpAddr,
10    sync::{
11        Arc,
12        atomic::{AtomicUsize, Ordering},
13    },
14};
15use url::Url;
16
17/// proxy 为 None 意为系统代理
18/// 为 Some("") 意为不使用代理
19/// 为 Some("proxy") 意为使用 proxy 作为全部代理
20pub fn build_client(
21    headers: &HeaderMap,
22    proxy: Option<&str>,
23    accept_invalid_certs: bool,
24    accept_invalid_hostnames: bool,
25    local_addr: Option<IpAddr>,
26) -> Result<reqwest::Client, reqwest::Error> {
27    let mut client = ClientBuilder::new()
28        .default_headers(headers.clone())
29        .danger_accept_invalid_certs(accept_invalid_certs)
30        .danger_accept_invalid_hostnames(accept_invalid_hostnames)
31        .local_address(local_addr);
32    client = match proxy {
33        None => client,
34        Some("") => client.no_proxy(),
35        Some(p) => client.proxy(Proxy::all(p)?),
36    };
37    client.build()
38}
39
40#[derive(Debug)]
41pub struct FastDownPuller {
42    inner: HttpPuller<Client>,
43    headers: Arc<HeaderMap>,
44    proxy: Option<Arc<str>>,
45    url: Arc<Url>,
46    accept_invalid_certs: bool,
47    accept_invalid_hostnames: bool,
48    file_id: FileId,
49    resp: Option<Arc<Mutex<Option<Response>>>>,
50    available_ips: Arc<[IpAddr]>,
51    turn: Arc<AtomicUsize>,
52}
53
54pub struct FastDownPullerOptions<'a> {
55    pub url: Url,
56    pub headers: Arc<HeaderMap>,
57    pub proxy: Option<&'a str>,
58    pub accept_invalid_certs: bool,
59    pub accept_invalid_hostnames: bool,
60    pub file_id: FileId,
61    pub resp: Option<Arc<Mutex<Option<Response>>>>,
62    pub available_ips: Arc<[IpAddr]>,
63}
64
65impl FastDownPuller {
66    pub fn new(option: FastDownPullerOptions) -> Result<Self, reqwest::Error> {
67        let turn = Arc::new(AtomicUsize::new(1));
68        let available_ips = option.available_ips;
69        let client = build_client(
70            &option.headers,
71            option.proxy,
72            option.accept_invalid_certs,
73            option.accept_invalid_hostnames,
74            if available_ips.is_empty() {
75                None
76            } else {
77                available_ips
78                    .get(turn.fetch_add(1, Ordering::AcqRel) % available_ips.len())
79                    .cloned()
80            },
81        )?;
82        Ok(Self {
83            inner: HttpPuller::new(
84                option.url.clone(),
85                client,
86                option.resp.clone(),
87                option.file_id.clone(),
88            ),
89            resp: option.resp,
90            headers: option.headers,
91            proxy: option.proxy.map(Arc::from),
92            url: Arc::new(option.url),
93            accept_invalid_certs: option.accept_invalid_certs,
94            accept_invalid_hostnames: option.accept_invalid_hostnames,
95            file_id: option.file_id,
96            available_ips,
97            turn,
98        })
99    }
100}
101
102impl Clone for FastDownPuller {
103    fn clone(&self) -> Self {
104        let available_ips = self.available_ips.clone();
105        let turn = self.turn.clone();
106        Self {
107            inner: if let Ok(client) = build_client(
108                &self.headers,
109                self.proxy.as_deref(),
110                self.accept_invalid_certs,
111                self.accept_invalid_hostnames,
112                {
113                    if available_ips.is_empty() {
114                        None
115                    } else {
116                        available_ips
117                            .get(turn.fetch_add(1, Ordering::AcqRel) % available_ips.len())
118                            .cloned()
119                    }
120                },
121            ) {
122                HttpPuller::new(
123                    self.url.as_ref().clone(),
124                    client,
125                    self.resp.clone(),
126                    self.file_id.clone(),
127                )
128            } else {
129                self.inner.clone()
130            },
131            resp: self.resp.clone(),
132            headers: self.headers.clone(),
133            proxy: self.proxy.clone(),
134            url: self.url.clone(),
135            accept_invalid_certs: self.accept_invalid_certs,
136            accept_invalid_hostnames: self.accept_invalid_hostnames,
137            file_id: self.file_id.clone(),
138            available_ips,
139            turn,
140        }
141    }
142}
143
144impl Puller for FastDownPuller {
145    type Error = HttpError<Client>;
146    async fn pull(
147        &mut self,
148        range: Option<&ProgressEntry>,
149    ) -> PullResult<impl PullStream<Self::Error>, Self::Error> {
150        Puller::pull(&mut self.inner, range).await
151    }
152}