1use std::{
2 collections::HashMap,
3 sync::OnceLock,
4 time::{Duration, Instant},
5};
6
7use bytes::Bytes;
8use futures_util::{FutureExt, future::IntoStream};
9use governor::{
10 Quota, RateLimiter, clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState,
11};
12use http::{Uri, header::HeaderValue};
13use http_body_util::{BodyExt, Full};
14use hyper::{HeaderMap, Request, Response, StatusCode, body::Incoming, header::USER_AGENT};
15use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
16use hyper_util::{
17 client::legacy::{Client, ResponseFuture, connect::HttpConnector},
18 rt::TokioExecutor,
19};
20use nonzero_ext::nonzero;
21use parking_lot::Mutex;
22use thiserror::Error;
23use url::Url;
24
25#[cfg(all(feature = "__rustls", not(feature = "native-tls")))]
26use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
27#[cfg(all(feature = "native-tls", not(feature = "__rustls")))]
28use hyper_tls::HttpsConnector;
29
30use crate::{
31 Error,
32 config::{OS, os_version},
33 date::Date,
34 version::{FALLBACK_USER_AGENT, VERSION_STRING, spotify_version},
35};
36
37pub const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(30);
41pub const RATE_LIMIT_MAX_WAIT: Duration = Duration::from_secs(10);
42pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300;
43
44#[derive(Debug, Error)]
45pub enum HttpClientError {
46 #[error("Response status code: {0}")]
47 StatusCode(hyper::StatusCode),
48}
49
50impl From<HttpClientError> for Error {
51 fn from(err: HttpClientError) -> Self {
52 match err {
53 HttpClientError::StatusCode(code) => {
54 match code {
56 StatusCode::GATEWAY_TIMEOUT | StatusCode::REQUEST_TIMEOUT => {
57 Error::deadline_exceeded(err)
58 }
59 StatusCode::GONE
60 | StatusCode::NOT_FOUND
61 | StatusCode::MOVED_PERMANENTLY
62 | StatusCode::PERMANENT_REDIRECT
63 | StatusCode::TEMPORARY_REDIRECT => Error::not_found(err),
64 StatusCode::FORBIDDEN | StatusCode::PAYMENT_REQUIRED => {
65 Error::permission_denied(err)
66 }
67 StatusCode::NETWORK_AUTHENTICATION_REQUIRED
68 | StatusCode::PROXY_AUTHENTICATION_REQUIRED
69 | StatusCode::UNAUTHORIZED => Error::unauthenticated(err),
70 StatusCode::EXPECTATION_FAILED
71 | StatusCode::PRECONDITION_FAILED
72 | StatusCode::PRECONDITION_REQUIRED => Error::failed_precondition(err),
73 StatusCode::RANGE_NOT_SATISFIABLE => Error::out_of_range(err),
74 StatusCode::INTERNAL_SERVER_ERROR
75 | StatusCode::MISDIRECTED_REQUEST
76 | StatusCode::SERVICE_UNAVAILABLE
77 | StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => Error::unavailable(err),
78 StatusCode::BAD_REQUEST
79 | StatusCode::HTTP_VERSION_NOT_SUPPORTED
80 | StatusCode::LENGTH_REQUIRED
81 | StatusCode::METHOD_NOT_ALLOWED
82 | StatusCode::NOT_ACCEPTABLE
83 | StatusCode::PAYLOAD_TOO_LARGE
84 | StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
85 | StatusCode::UNSUPPORTED_MEDIA_TYPE
86 | StatusCode::URI_TOO_LONG => Error::invalid_argument(err),
87 StatusCode::TOO_MANY_REQUESTS => Error::resource_exhausted(err),
88 StatusCode::NOT_IMPLEMENTED => Error::unimplemented(err),
89 _ => Error::unknown(err),
90 }
91 }
92 }
93 }
94}
95
96type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Full<bytes::Bytes>>;
97
98pub struct HttpClient {
99 user_agent: HeaderValue,
100 proxy_url: Option<Url>,
101 hyper_client: OnceLock<HyperClient>,
102
103 rate_limiter:
106 RateLimiter<String, Mutex<HashMap<String, InMemoryState>>, MonotonicClock, NoOpMiddleware>,
107}
108
109impl HttpClient {
110 pub fn new(proxy_url: Option<&Url>) -> Self {
111 let zero_str = String::from("0");
112 let os_version = os_version();
113
114 let (spotify_platform, os_version) = match OS {
115 "android" => ("Android", os_version),
116 "ios" => ("iOS", os_version),
117 "macos" => ("OSX", zero_str),
118 "windows" => ("Win32", zero_str),
119 _ => ("Linux", zero_str),
120 };
121
122 let user_agent_str = &format!(
123 "Spotify/{} {}/{} ({})",
124 spotify_version(),
125 spotify_platform,
126 os_version,
127 VERSION_STRING
128 );
129
130 let user_agent = HeaderValue::from_str(user_agent_str).unwrap_or_else(|err| {
131 error!("Invalid user agent <{user_agent_str}>: {err}");
132 HeaderValue::from_static(FALLBACK_USER_AGENT)
133 });
134
135 let replenish_interval_ns =
136 RATE_LIMIT_INTERVAL.as_nanos() / RATE_LIMIT_CALLS_PER_INTERVAL as u128;
137 let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
138 .expect("replenish interval should be valid")
139 .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
140 let rate_limiter = RateLimiter::keyed(quota);
141
142 Self {
143 user_agent,
144 proxy_url: proxy_url.cloned(),
145 hyper_client: OnceLock::new(),
146 rate_limiter,
147 }
148 }
149
150 fn try_create_hyper_client(proxy_url: Option<&Url>) -> Result<HyperClient, Error> {
151 #[cfg(all(feature = "__rustls", not(feature = "native-tls")))]
154 let https_connector = {
155 #[cfg(feature = "rustls-tls-native-roots")]
156 let tls = HttpsConnectorBuilder::new().with_native_roots()?;
157 #[cfg(feature = "rustls-tls-webpki-roots")]
158 let tls = HttpsConnectorBuilder::new().with_webpki_roots();
159 tls.https_or_http().enable_http1().enable_http2().build()
160 };
161
162 #[cfg(all(feature = "native-tls", not(feature = "__rustls")))]
163 let https_connector = HttpsConnector::new();
164
165 let proxy = match &proxy_url {
168 Some(proxy_url) => Proxy::new(Intercept::All, proxy_url.to_string().parse()?),
169 None => Proxy::new(Intercept::None, Uri::from_static("0.0.0.0")),
170 };
171 let proxy_connector = ProxyConnector::from_proxy(https_connector, proxy)?;
172
173 let client = Client::builder(TokioExecutor::new())
174 .http2_adaptive_window(true)
175 .build(proxy_connector);
176 Ok(client)
177 }
178
179 fn hyper_client(&self) -> &HyperClient {
180 self.hyper_client
181 .get_or_init(|| Self::try_create_hyper_client(self.proxy_url.as_ref()).unwrap())
182 }
183
184 pub async fn request(&self, req: Request<Bytes>) -> Result<Response<Incoming>, Error> {
185 debug!("Requesting {}", req.uri());
186
187 let (parts, body_as_bytes) = req.into_parts();
191
192 loop {
193 let mut req = Request::builder()
194 .method(parts.method.clone())
195 .uri(parts.uri.clone())
196 .version(parts.version)
197 .body(body_as_bytes.clone())?;
198 *req.headers_mut() = parts.headers.clone();
199
200 let request = self.request_fut(req)?;
201 let response = request.await;
202
203 if let Ok(response) = &response {
204 let code = response.status();
205
206 if code == StatusCode::TOO_MANY_REQUESTS {
207 if let Some(duration) = Self::get_retry_after(response.headers()) {
208 warn!(
209 "Rate limited by service, retrying in {} seconds...",
210 duration.as_secs()
211 );
212 tokio::time::sleep(duration).await;
213 continue;
214 }
215 }
216
217 if !code.is_success() {
218 return Err(HttpClientError::StatusCode(code).into());
219 }
220 }
221
222 let response = response?;
223 return Ok(response);
224 }
225 }
226
227 pub async fn request_body(&self, req: Request<Bytes>) -> Result<Bytes, Error> {
228 let response = self.request(req).await?;
229 Ok(response.into_body().collect().await?.to_bytes())
230 }
231
232 pub fn request_stream(&self, req: Request<Bytes>) -> Result<IntoStream<ResponseFuture>, Error> {
233 Ok(self.request_fut(req)?.into_stream())
234 }
235
236 pub fn request_fut(&self, mut req: Request<Bytes>) -> Result<ResponseFuture, Error> {
237 let headers_mut = req.headers_mut();
238 headers_mut.insert(USER_AGENT, self.user_agent.clone());
239
240 let domain = match req.uri().host() {
244 Some(host) => {
245 let mut parts = host
247 .split('.')
248 .map(|s| s.to_string())
249 .collect::<Vec<String>>();
250 let n = parts.len().saturating_sub(2);
251 parts.drain(n..).collect()
252 }
253 None => String::from(""),
254 };
255 self.rate_limiter.check_key(&domain).map_err(|e| {
256 Error::resource_exhausted(format!(
257 "rate limited for at least another {} seconds",
258 e.wait_time_from(Instant::now()).as_secs()
259 ))
260 })?;
261
262 Ok(self.hyper_client().request(req.map(Full::new)))
263 }
264
265 pub fn get_retry_after(headers: &HeaderMap<HeaderValue>) -> Option<Duration> {
266 let now = Date::now_utc().as_timestamp_ms();
267
268 let mut retry_after_ms = None;
269 if let Some(header_val) = headers.get("X-RateLimit-Next") {
270 if let Ok(date_str) = header_val.to_str() {
272 if let Ok(target) = Date::from_iso8601(date_str) {
273 retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now))
274 }
275 }
276 } else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") {
277 if let Ok(timestamp) = header_val.to_str() {
279 if let Ok(target) = timestamp.parse::<i64>() {
280 retry_after_ms = Some(target.saturating_sub(now))
281 }
282 }
283 } else if let Some(header_val) = headers.get("Retry-After") {
284 if let Ok(retry_after) = header_val.to_str() {
286 if let Ok(duration) = retry_after.parse::<i64>() {
287 retry_after_ms = Some(duration * 1000)
288 }
289 }
290 }
291
292 if let Some(retry_after) = retry_after_ms {
293 let duration = Duration::from_millis(retry_after as u64);
294 if duration <= RATE_LIMIT_MAX_WAIT {
295 return Some(duration);
296 } else {
297 debug!(
298 "Waiting {} seconds would exceed {} second limit",
299 duration.as_secs(),
300 RATE_LIMIT_MAX_WAIT.as_secs()
301 );
302 }
303 }
304
305 None
306 }
307}