fast_down/utils/
puller.rs1use crate::{
2 FileId, ProgressEntry, PullResult, PullStream,
3 http::{HttpError, HttpPuller},
4};
5use fast_pull::Puller;
6use parking_lot::Mutex;
7use reqwest::{Client, ClientBuilder, Proxy, Response, header::HeaderMap};
8use std::{
9 net::IpAddr,
10 sync::{
11 Arc,
12 atomic::{AtomicUsize, Ordering},
13 },
14};
15use url::Url;
16
17pub fn build_client(
21 headers: &HeaderMap,
22 proxy: Option<&str>,
23 accept_invalid_certs: bool,
24 accept_invalid_hostnames: bool,
25 local_addr: Option<IpAddr>,
26) -> Result<reqwest::Client, reqwest::Error> {
27 let mut client = ClientBuilder::new()
28 .http1_only()
29 .default_headers(headers.clone())
30 .danger_accept_invalid_certs(accept_invalid_certs)
31 .danger_accept_invalid_hostnames(accept_invalid_hostnames)
32 .local_address(local_addr);
33 client = match proxy {
34 None => client,
35 Some("") => client.no_proxy(),
36 Some(p) => client.proxy(Proxy::all(p)?),
37 };
38 client.build()
39}
40
41#[derive(Debug)]
42pub struct FastDownPuller {
43 inner: HttpPuller<Client>,
44 pullers: Arc<[HttpPuller<Client>]>,
45 turn: Arc<AtomicUsize>,
46}
47
48pub struct FastDownPullerOptions<'a, 'b> {
49 pub url: Url,
50 pub headers: Arc<HeaderMap>,
51 pub proxy: Option<&'a str>,
52 pub accept_invalid_certs: bool,
53 pub accept_invalid_hostnames: bool,
54 pub file_id: FileId,
55 pub resp: Option<Arc<Mutex<Option<Response>>>>,
56 pub available_ips: &'b [IpAddr],
57}
58
59impl FastDownPuller {
60 pub fn new(option: FastDownPullerOptions) -> Result<Self, reqwest::Error> {
61 let turn = Arc::new(AtomicUsize::new(1));
62 let pullers: Arc<[HttpPuller<Client>]> = option
63 .available_ips
64 .iter()
65 .flat_map(|&ip| {
66 build_client(
67 &option.headers,
68 option.proxy,
69 option.accept_invalid_certs,
70 option.accept_invalid_hostnames,
71 Some(ip),
72 )
73 })
74 .map(|client| {
75 HttpPuller::new(
76 option.url.clone(),
77 client,
78 option.resp.clone(),
79 option.file_id.clone(),
80 )
81 })
82 .collect();
83 let puller = if let Some(puller) = pullers.first().cloned() {
84 puller
85 } else {
86 HttpPuller::new(
87 option.url.clone(),
88 build_client(
89 &option.headers,
90 option.proxy,
91 option.accept_invalid_certs,
92 option.accept_invalid_hostnames,
93 None,
94 )?,
95 option.resp.clone(),
96 option.file_id.clone(),
97 )
98 };
99 Ok(Self {
100 inner: puller,
101 pullers,
102 turn,
103 })
104 }
105}
106
107impl Clone for FastDownPuller {
108 fn clone(&self) -> Self {
109 let pullers = self.pullers.clone();
110 let turn = self.turn.clone();
111 Self {
112 inner: if let Some(puller) = {
113 if pullers.is_empty() {
114 None
115 } else {
116 pullers
117 .get(turn.fetch_add(1, Ordering::AcqRel) % pullers.len())
118 .cloned()
119 }
120 } {
121 puller
122 } else {
123 self.inner.clone()
124 },
125 turn,
126 pullers,
127 }
128 }
129}
130
131impl Puller for FastDownPuller {
132 type Error = HttpError<Client>;
133 async fn pull(
134 &mut self,
135 range: Option<&ProgressEntry>,
136 ) -> PullResult<impl PullStream<Self::Error>, Self::Error> {
137 Puller::pull(&mut self.inner, range).await
138 }
139}