1use crate::{
2 FileId, ProgressEntry, PullResult, PullStream,
3 http::{HttpError, HttpPuller},
4};
5use fast_pull::Puller;
6use parking_lot::Mutex;
7use reqwest::{Client, ClientBuilder, Proxy, Response, header::HeaderMap};
8use std::{
9 net::IpAddr,
10 sync::{
11 Arc,
12 atomic::{AtomicUsize, Ordering},
13 },
14};
15use url::Url;
16
17pub fn build_client(
21 headers: &HeaderMap,
22 proxy: Option<&str>,
23 accept_invalid_certs: bool,
24 accept_invalid_hostnames: bool,
25 local_addr: Option<IpAddr>,
26) -> Result<reqwest::Client, reqwest::Error> {
27 let mut client = ClientBuilder::new()
28 .default_headers(headers.clone())
29 .danger_accept_invalid_certs(accept_invalid_certs)
30 .danger_accept_invalid_hostnames(accept_invalid_hostnames)
31 .local_address(local_addr);
32 client = match proxy {
33 None => client,
34 Some("") => client.no_proxy(),
35 Some(p) => client.proxy(Proxy::all(p)?),
36 };
37 client.build()
38}
39
40#[derive(Debug)]
41pub struct FastDownPuller {
42 inner: HttpPuller<Client>,
43 headers: Arc<HeaderMap>,
44 proxy: Option<Arc<str>>,
45 url: Arc<Url>,
46 accept_invalid_certs: bool,
47 accept_invalid_hostnames: bool,
48 file_id: FileId,
49 resp: Option<Arc<Mutex<Option<Response>>>>,
50 available_ips: Arc<[IpAddr]>,
51 turn: Arc<AtomicUsize>,
52}
53
54pub struct FastDownPullerOptions<'a> {
55 pub url: Url,
56 pub headers: Arc<HeaderMap>,
57 pub proxy: Option<&'a str>,
58 pub accept_invalid_certs: bool,
59 pub accept_invalid_hostnames: bool,
60 pub file_id: FileId,
61 pub resp: Option<Arc<Mutex<Option<Response>>>>,
62 pub available_ips: Arc<[IpAddr]>,
63}
64
65impl FastDownPuller {
66 pub fn new(option: FastDownPullerOptions) -> Result<Self, reqwest::Error> {
67 let turn = Arc::new(AtomicUsize::new(1));
68 let available_ips = option.available_ips;
69 let client = build_client(
70 &option.headers,
71 option.proxy,
72 option.accept_invalid_certs,
73 option.accept_invalid_hostnames,
74 if available_ips.is_empty() {
75 None
76 } else {
77 available_ips
78 .get(turn.fetch_add(1, Ordering::AcqRel) % available_ips.len())
79 .cloned()
80 },
81 )?;
82 Ok(Self {
83 inner: HttpPuller::new(
84 option.url.clone(),
85 client,
86 option.resp.clone(),
87 option.file_id.clone(),
88 ),
89 resp: option.resp,
90 headers: option.headers,
91 proxy: option.proxy.map(Arc::from),
92 url: Arc::new(option.url),
93 accept_invalid_certs: option.accept_invalid_certs,
94 accept_invalid_hostnames: option.accept_invalid_hostnames,
95 file_id: option.file_id,
96 available_ips,
97 turn,
98 })
99 }
100}
101
102impl Clone for FastDownPuller {
103 fn clone(&self) -> Self {
104 let available_ips = self.available_ips.clone();
105 let turn = self.turn.clone();
106 Self {
107 inner: if let Ok(client) = build_client(
108 &self.headers,
109 self.proxy.as_deref(),
110 self.accept_invalid_certs,
111 self.accept_invalid_hostnames,
112 {
113 if available_ips.is_empty() {
114 None
115 } else {
116 available_ips
117 .get(turn.fetch_add(1, Ordering::AcqRel) % available_ips.len())
118 .cloned()
119 }
120 },
121 ) {
122 HttpPuller::new(
123 self.url.as_ref().clone(),
124 client,
125 self.resp.clone(),
126 self.file_id.clone(),
127 )
128 } else {
129 self.inner.clone()
130 },
131 resp: self.resp.clone(),
132 headers: self.headers.clone(),
133 proxy: self.proxy.clone(),
134 url: self.url.clone(),
135 accept_invalid_certs: self.accept_invalid_certs,
136 accept_invalid_hostnames: self.accept_invalid_hostnames,
137 file_id: self.file_id.clone(),
138 available_ips,
139 turn,
140 }
141 }
142}
143
144impl Puller for FastDownPuller {
145 type Error = HttpError<Client>;
146 async fn pull(
147 &mut self,
148 range: Option<&ProgressEntry>,
149 ) -> PullResult<impl PullStream<Self::Error>, Self::Error> {
150 Puller::pull(&mut self.inner, range).await
151 }
152}