Skip to main content

fast_down/utils/
puller.rs

1use crate::{
2    FileId, ProgressEntry, PullResult, PullStream,
3    http::{HttpError, HttpPuller},
4};
5use fast_pull::Puller;
6use parking_lot::Mutex;
7use reqwest::{Client, ClientBuilder, Proxy, Response, header::HeaderMap};
8use std::{
9    net::IpAddr,
10    sync::{
11        Arc,
12        atomic::{AtomicUsize, Ordering},
13    },
14};
15use url::Url;
16
17/// proxy 为 None 意为系统代理
18/// 为 Some("") 意为不使用代理
19/// 为 Some("proxy") 意为使用 proxy 作为全部代理
20pub fn build_client(
21    headers: &HeaderMap,
22    proxy: Option<&str>,
23    accept_invalid_certs: bool,
24    accept_invalid_hostnames: bool,
25    local_addr: Option<IpAddr>,
26) -> Result<reqwest::Client, reqwest::Error> {
27    let mut client = ClientBuilder::new()
28        .http1_only()
29        .default_headers(headers.clone())
30        .danger_accept_invalid_certs(accept_invalid_certs)
31        .danger_accept_invalid_hostnames(accept_invalid_hostnames)
32        .local_address(local_addr);
33    client = match proxy {
34        None => client,
35        Some("") => client.no_proxy(),
36        Some(p) => client.proxy(Proxy::all(p)?),
37    };
38    client.build()
39}
40
41#[derive(Debug)]
42pub struct FastDownPuller {
43    inner: HttpPuller<Client>,
44    pullers: Arc<[HttpPuller<Client>]>,
45    turn: Arc<AtomicUsize>,
46}
47
48pub struct FastDownPullerOptions<'a, 'b> {
49    pub url: Url,
50    pub headers: Arc<HeaderMap>,
51    pub proxy: Option<&'a str>,
52    pub accept_invalid_certs: bool,
53    pub accept_invalid_hostnames: bool,
54    pub file_id: FileId,
55    pub resp: Option<Arc<Mutex<Option<Response>>>>,
56    pub available_ips: &'b [IpAddr],
57}
58
59impl FastDownPuller {
60    pub fn new(option: FastDownPullerOptions) -> Result<Self, reqwest::Error> {
61        let turn = Arc::new(AtomicUsize::new(1));
62        let pullers: Arc<[HttpPuller<Client>]> = option
63            .available_ips
64            .iter()
65            .flat_map(|&ip| {
66                build_client(
67                    &option.headers,
68                    option.proxy,
69                    option.accept_invalid_certs,
70                    option.accept_invalid_hostnames,
71                    Some(ip),
72                )
73            })
74            .map(|client| {
75                HttpPuller::new(
76                    option.url.clone(),
77                    client,
78                    option.resp.clone(),
79                    option.file_id.clone(),
80                )
81            })
82            .collect();
83        let puller = if let Some(puller) = pullers.first().cloned() {
84            puller
85        } else {
86            HttpPuller::new(
87                option.url.clone(),
88                build_client(
89                    &option.headers,
90                    option.proxy,
91                    option.accept_invalid_certs,
92                    option.accept_invalid_hostnames,
93                    None,
94                )?,
95                option.resp.clone(),
96                option.file_id.clone(),
97            )
98        };
99        Ok(Self {
100            inner: puller,
101            pullers,
102            turn,
103        })
104    }
105}
106
107impl Clone for FastDownPuller {
108    fn clone(&self) -> Self {
109        let pullers = self.pullers.clone();
110        let turn = self.turn.clone();
111        Self {
112            inner: if let Some(puller) = {
113                if pullers.is_empty() {
114                    None
115                } else {
116                    pullers
117                        .get(turn.fetch_add(1, Ordering::AcqRel) % pullers.len())
118                        .cloned()
119                }
120            } {
121                puller
122            } else {
123                self.inner.clone()
124            },
125            turn,
126            pullers,
127        }
128    }
129}
130
131impl Puller for FastDownPuller {
132    type Error = HttpError<Client>;
133    async fn pull(
134        &mut self,
135        range: Option<&ProgressEntry>,
136    ) -> PullResult<impl PullStream<Self::Error>, Self::Error> {
137        Puller::pull(&mut self.inner, range).await
138    }
139}