1use std::{
2 collections::HashMap,
3 time::{Duration, Instant},
4};
5
6use bytes::Bytes;
7use futures_util::{future::IntoStream, FutureExt};
8use governor::{
9 clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState, Quota, RateLimiter,
10};
11use http::{header::HeaderValue, Uri};
12use http_body_util::{BodyExt, Full};
13use hyper::{body::Incoming, header::USER_AGENT, HeaderMap, Request, Response, StatusCode};
14use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
15use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
16use hyper_util::{
17 client::legacy::{connect::HttpConnector, Client, ResponseFuture},
18 rt::TokioExecutor,
19};
20use nonzero_ext::nonzero;
21use once_cell::sync::OnceCell;
22use parking_lot::Mutex;
23use thiserror::Error;
24use url::Url;
25
26use crate::{
27 config::{os_version, OS},
28 date::Date,
29 version::{spotify_version, FALLBACK_USER_AGENT, VERSION_STRING},
30 Error,
31};
32
33pub const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(30);
37pub const RATE_LIMIT_MAX_WAIT: Duration = Duration::from_secs(10);
38pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300;
39
40#[derive(Debug, Error)]
41pub enum HttpClientError {
42 #[error("Response status code: {0}")]
43 StatusCode(hyper::StatusCode),
44}
45
46impl From<HttpClientError> for Error {
47 fn from(err: HttpClientError) -> Self {
48 match err {
49 HttpClientError::StatusCode(code) => {
50 match code {
52 StatusCode::GATEWAY_TIMEOUT | StatusCode::REQUEST_TIMEOUT => {
53 Error::deadline_exceeded(err)
54 }
55 StatusCode::GONE
56 | StatusCode::NOT_FOUND
57 | StatusCode::MOVED_PERMANENTLY
58 | StatusCode::PERMANENT_REDIRECT
59 | StatusCode::TEMPORARY_REDIRECT => Error::not_found(err),
60 StatusCode::FORBIDDEN | StatusCode::PAYMENT_REQUIRED => {
61 Error::permission_denied(err)
62 }
63 StatusCode::NETWORK_AUTHENTICATION_REQUIRED
64 | StatusCode::PROXY_AUTHENTICATION_REQUIRED
65 | StatusCode::UNAUTHORIZED => Error::unauthenticated(err),
66 StatusCode::EXPECTATION_FAILED
67 | StatusCode::PRECONDITION_FAILED
68 | StatusCode::PRECONDITION_REQUIRED => Error::failed_precondition(err),
69 StatusCode::RANGE_NOT_SATISFIABLE => Error::out_of_range(err),
70 StatusCode::INTERNAL_SERVER_ERROR
71 | StatusCode::MISDIRECTED_REQUEST
72 | StatusCode::SERVICE_UNAVAILABLE
73 | StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => Error::unavailable(err),
74 StatusCode::BAD_REQUEST
75 | StatusCode::HTTP_VERSION_NOT_SUPPORTED
76 | StatusCode::LENGTH_REQUIRED
77 | StatusCode::METHOD_NOT_ALLOWED
78 | StatusCode::NOT_ACCEPTABLE
79 | StatusCode::PAYLOAD_TOO_LARGE
80 | StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
81 | StatusCode::UNSUPPORTED_MEDIA_TYPE
82 | StatusCode::URI_TOO_LONG => Error::invalid_argument(err),
83 StatusCode::TOO_MANY_REQUESTS => Error::resource_exhausted(err),
84 StatusCode::NOT_IMPLEMENTED => Error::unimplemented(err),
85 _ => Error::unknown(err),
86 }
87 }
88 }
89 }
90}
91
92type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Full<bytes::Bytes>>;
93
94pub struct HttpClient {
95 user_agent: HeaderValue,
96 proxy_url: Option<Url>,
97 hyper_client: OnceCell<HyperClient>,
98
99 rate_limiter:
102 RateLimiter<String, Mutex<HashMap<String, InMemoryState>>, MonotonicClock, NoOpMiddleware>,
103}
104
105impl HttpClient {
106 pub fn new(proxy_url: Option<&Url>) -> Self {
107 let zero_str = String::from("0");
108 let os_version = os_version();
109
110 let (spotify_platform, os_version) = match OS {
111 "android" => ("Android", os_version),
112 "ios" => ("iOS", os_version),
113 "macos" => ("OSX", zero_str),
114 "windows" => ("Win32", zero_str),
115 _ => ("Linux", zero_str),
116 };
117
118 let user_agent_str = &format!(
119 "Spotify/{} {}/{} ({})",
120 spotify_version(),
121 spotify_platform,
122 os_version,
123 VERSION_STRING
124 );
125
126 let user_agent = HeaderValue::from_str(user_agent_str).unwrap_or_else(|err| {
127 error!("Invalid user agent <{}>: {}", user_agent_str, err);
128 HeaderValue::from_static(FALLBACK_USER_AGENT)
129 });
130
131 let replenish_interval_ns =
132 RATE_LIMIT_INTERVAL.as_nanos() / RATE_LIMIT_CALLS_PER_INTERVAL as u128;
133 let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
134 .expect("replenish interval should be valid")
135 .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
136 let rate_limiter = RateLimiter::keyed(quota);
137
138 Self {
139 user_agent,
140 proxy_url: proxy_url.cloned(),
141 hyper_client: OnceCell::new(),
142 rate_limiter,
143 }
144 }
145
146 fn try_create_hyper_client(proxy_url: Option<&Url>) -> Result<HyperClient, Error> {
147 let https_connector = HttpsConnectorBuilder::new()
149 .with_native_roots()?
150 .https_or_http()
151 .enable_http1()
152 .enable_http2()
153 .build();
154
155 let proxy = match &proxy_url {
158 Some(proxy_url) => Proxy::new(Intercept::All, proxy_url.to_string().parse()?),
159 None => Proxy::new(Intercept::None, Uri::from_static("0.0.0.0")),
160 };
161 let proxy_connector = ProxyConnector::from_proxy(https_connector, proxy)?;
162
163 let client = Client::builder(TokioExecutor::new())
164 .http2_adaptive_window(true)
165 .build(proxy_connector);
166 Ok(client)
167 }
168
169 fn hyper_client(&self) -> Result<&HyperClient, Error> {
170 self.hyper_client
171 .get_or_try_init(|| Self::try_create_hyper_client(self.proxy_url.as_ref()))
172 }
173
174 pub async fn request(&self, req: Request<Bytes>) -> Result<Response<Incoming>, Error> {
175 debug!("Requesting {}", req.uri().to_string());
176
177 let (parts, body_as_bytes) = req.into_parts();
181
182 loop {
183 let mut req = Request::builder()
184 .method(parts.method.clone())
185 .uri(parts.uri.clone())
186 .version(parts.version)
187 .body(body_as_bytes.clone())?;
188 *req.headers_mut() = parts.headers.clone();
189
190 let request = self.request_fut(req)?;
191 let response = request.await;
192
193 if let Ok(response) = &response {
194 let code = response.status();
195
196 if code == StatusCode::TOO_MANY_REQUESTS {
197 if let Some(duration) = Self::get_retry_after(response.headers()) {
198 warn!(
199 "Rate limited by service, retrying in {} seconds...",
200 duration.as_secs()
201 );
202 tokio::time::sleep(duration).await;
203 continue;
204 }
205 }
206
207 if code != StatusCode::OK {
208 return Err(HttpClientError::StatusCode(code).into());
209 }
210 }
211
212 let response = response?;
213 return Ok(response);
214 }
215 }
216
217 pub async fn request_body(&self, req: Request<Bytes>) -> Result<Bytes, Error> {
218 let response = self.request(req).await?;
219 Ok(response.into_body().collect().await?.to_bytes())
220 }
221
222 pub fn request_stream(&self, req: Request<Bytes>) -> Result<IntoStream<ResponseFuture>, Error> {
223 Ok(self.request_fut(req)?.into_stream())
224 }
225
226 pub fn request_fut(&self, mut req: Request<Bytes>) -> Result<ResponseFuture, Error> {
227 let headers_mut = req.headers_mut();
228 headers_mut.insert(USER_AGENT, self.user_agent.clone());
229
230 let domain = match req.uri().host() {
234 Some(host) => {
235 let mut parts = host
237 .split('.')
238 .map(|s| s.to_string())
239 .collect::<Vec<String>>();
240 let n = parts.len().saturating_sub(2);
241 parts.drain(n..).collect()
242 }
243 None => String::from(""),
244 };
245 self.rate_limiter.check_key(&domain).map_err(|e| {
246 Error::resource_exhausted(format!(
247 "rate limited for at least another {} seconds",
248 e.wait_time_from(Instant::now()).as_secs()
249 ))
250 })?;
251
252 Ok(self.hyper_client()?.request(req.map(Full::new)))
253 }
254
255 pub fn get_retry_after(headers: &HeaderMap<HeaderValue>) -> Option<Duration> {
256 let now = Date::now_utc().as_timestamp_ms();
257
258 let mut retry_after_ms = None;
259 if let Some(header_val) = headers.get("X-RateLimit-Next") {
260 if let Ok(date_str) = header_val.to_str() {
262 if let Ok(target) = Date::from_iso8601(date_str) {
263 retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now))
264 }
265 }
266 } else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") {
267 if let Ok(timestamp) = header_val.to_str() {
269 if let Ok(target) = timestamp.parse::<i64>() {
270 retry_after_ms = Some(target.saturating_sub(now))
271 }
272 }
273 } else if let Some(header_val) = headers.get("Retry-After") {
274 if let Ok(retry_after) = header_val.to_str() {
276 if let Ok(duration) = retry_after.parse::<i64>() {
277 retry_after_ms = Some(duration * 1000)
278 }
279 }
280 }
281
282 if let Some(retry_after) = retry_after_ms {
283 let duration = Duration::from_millis(retry_after as u64);
284 if duration <= RATE_LIMIT_MAX_WAIT {
285 return Some(duration);
286 } else {
287 debug!(
288 "Waiting {} seconds would exceed {} second limit",
289 duration.as_secs(),
290 RATE_LIMIT_MAX_WAIT.as_secs()
291 );
292 }
293 }
294
295 None
296 }
297}