1use std::{
2 sync::OnceLock,
3 time::{Duration, Instant},
4};
5
6use bytes::Bytes;
7use futures_util::{FutureExt, future::IntoStream};
8use governor::{
9 Quota, RateLimiter, clock::MonotonicClock, middleware::NoOpMiddleware,
10 state::keyed::DefaultKeyedStateStore,
11};
12use http::{Uri, header::HeaderValue};
13use http_body_util::{BodyExt, Full};
14use hyper::{HeaderMap, Request, Response, StatusCode, body::Incoming, header::USER_AGENT};
15use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
16use hyper_util::{
17 client::legacy::{Client, ResponseFuture, connect::HttpConnector},
18 rt::TokioExecutor,
19};
20use nonzero_ext::nonzero;
21use thiserror::Error;
22use url::Url;
23
24#[cfg(all(feature = "__rustls", not(feature = "native-tls")))]
25use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
26#[cfg(all(feature = "native-tls", not(feature = "__rustls")))]
27use hyper_tls::HttpsConnector;
28
29use crate::{
30 Error,
31 config::{OS, os_version},
32 date::Date,
33 version::{FALLBACK_USER_AGENT, VERSION_STRING, spotify_version},
34};
35
36pub const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(30);
40pub const RATE_LIMIT_MAX_WAIT: Duration = Duration::from_secs(10);
41pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300;
42
43#[derive(Debug, Error)]
44pub enum HttpClientError {
45 #[error("Response status code: {0}")]
46 StatusCode(hyper::StatusCode),
47}
48
49impl From<HttpClientError> for Error {
50 fn from(err: HttpClientError) -> Self {
51 match err {
52 HttpClientError::StatusCode(code) => {
53 match code {
55 StatusCode::GATEWAY_TIMEOUT | StatusCode::REQUEST_TIMEOUT => {
56 Error::deadline_exceeded(err)
57 }
58 StatusCode::GONE
59 | StatusCode::NOT_FOUND
60 | StatusCode::MOVED_PERMANENTLY
61 | StatusCode::PERMANENT_REDIRECT
62 | StatusCode::TEMPORARY_REDIRECT => Error::not_found(err),
63 StatusCode::FORBIDDEN | StatusCode::PAYMENT_REQUIRED => {
64 Error::permission_denied(err)
65 }
66 StatusCode::NETWORK_AUTHENTICATION_REQUIRED
67 | StatusCode::PROXY_AUTHENTICATION_REQUIRED
68 | StatusCode::UNAUTHORIZED => Error::unauthenticated(err),
69 StatusCode::EXPECTATION_FAILED
70 | StatusCode::PRECONDITION_FAILED
71 | StatusCode::PRECONDITION_REQUIRED => Error::failed_precondition(err),
72 StatusCode::RANGE_NOT_SATISFIABLE => Error::out_of_range(err),
73 StatusCode::INTERNAL_SERVER_ERROR
74 | StatusCode::MISDIRECTED_REQUEST
75 | StatusCode::SERVICE_UNAVAILABLE
76 | StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => Error::unavailable(err),
77 StatusCode::BAD_REQUEST
78 | StatusCode::HTTP_VERSION_NOT_SUPPORTED
79 | StatusCode::LENGTH_REQUIRED
80 | StatusCode::METHOD_NOT_ALLOWED
81 | StatusCode::NOT_ACCEPTABLE
82 | StatusCode::PAYLOAD_TOO_LARGE
83 | StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
84 | StatusCode::UNSUPPORTED_MEDIA_TYPE
85 | StatusCode::URI_TOO_LONG => Error::invalid_argument(err),
86 StatusCode::TOO_MANY_REQUESTS => Error::resource_exhausted(err),
87 StatusCode::NOT_IMPLEMENTED => Error::unimplemented(err),
88 _ => Error::unknown(err),
89 }
90 }
91 }
92 }
93}
94
95type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Full<bytes::Bytes>>;
96
97pub struct HttpClient {
98 user_agent: HeaderValue,
99 proxy_url: Option<Url>,
100 hyper_client: OnceLock<HyperClient>,
101
102 rate_limiter:
103 RateLimiter<String, DefaultKeyedStateStore<String>, MonotonicClock, NoOpMiddleware>,
104}
105
106impl HttpClient {
107 pub fn new(proxy_url: Option<&Url>) -> Self {
108 let zero_str = String::from("0");
109 let os_version = os_version();
110
111 let (spotify_platform, os_version) = match OS {
112 "android" => ("Android", os_version),
113 "ios" => ("iOS", os_version),
114 "macos" => ("OSX", zero_str),
115 "windows" => ("Win32", zero_str),
116 _ => ("Linux", zero_str),
117 };
118
119 let user_agent_str = &format!(
120 "Spotify/{} {}/{} ({})",
121 spotify_version(),
122 spotify_platform,
123 os_version,
124 VERSION_STRING
125 );
126
127 let user_agent = HeaderValue::from_str(user_agent_str).unwrap_or_else(|err| {
128 error!("Invalid user agent <{user_agent_str}>: {err}");
129 HeaderValue::from_static(FALLBACK_USER_AGENT)
130 });
131
132 let replenish_interval_ns =
133 RATE_LIMIT_INTERVAL.as_nanos() / RATE_LIMIT_CALLS_PER_INTERVAL as u128;
134 let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
135 .expect("replenish interval should be valid")
136 .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
137 let rate_limiter = RateLimiter::keyed(quota);
138
139 Self {
140 user_agent,
141 proxy_url: proxy_url.cloned(),
142 hyper_client: OnceLock::new(),
143 rate_limiter,
144 }
145 }
146
147 fn try_create_hyper_client(proxy_url: Option<&Url>) -> Result<HyperClient, Error> {
148 #[cfg(all(feature = "__rustls", not(feature = "native-tls")))]
151 let https_connector = {
152 #[cfg(feature = "rustls-tls-native-roots")]
153 let tls = HttpsConnectorBuilder::new().with_native_roots()?;
154 #[cfg(feature = "rustls-tls-webpki-roots")]
155 let tls = HttpsConnectorBuilder::new().with_webpki_roots();
156 tls.https_or_http().enable_http1().enable_http2().build()
157 };
158
159 #[cfg(all(feature = "native-tls", not(feature = "__rustls")))]
160 let https_connector = HttpsConnector::new();
161
162 let proxy = match &proxy_url {
165 Some(proxy_url) => Proxy::new(Intercept::All, proxy_url.to_string().parse()?),
166 None => Proxy::new(Intercept::None, Uri::from_static("0.0.0.0")),
167 };
168 let proxy_connector = ProxyConnector::from_proxy(https_connector, proxy)?;
169
170 let client = Client::builder(TokioExecutor::new())
171 .http2_adaptive_window(true)
172 .build(proxy_connector);
173 Ok(client)
174 }
175
176 fn hyper_client(&self) -> &HyperClient {
177 self.hyper_client
178 .get_or_init(|| Self::try_create_hyper_client(self.proxy_url.as_ref()).unwrap())
179 }
180
181 pub async fn request(&self, req: Request<Bytes>) -> Result<Response<Incoming>, Error> {
182 debug!("Requesting {}", req.uri());
183
184 let (parts, body_as_bytes) = req.into_parts();
188
189 loop {
190 let mut req = Request::builder()
191 .method(parts.method.clone())
192 .uri(parts.uri.clone())
193 .version(parts.version)
194 .body(body_as_bytes.clone())?;
195 *req.headers_mut() = parts.headers.clone();
196
197 let request = self.request_fut(req)?;
198 let response = request.await;
199
200 if let Ok(response) = &response {
201 let code = response.status();
202
203 if code == StatusCode::TOO_MANY_REQUESTS {
204 if let Some(duration) = Self::get_retry_after(response.headers()) {
205 warn!(
206 "Rate limited by service, retrying in {} seconds...",
207 duration.as_secs()
208 );
209 tokio::time::sleep(duration).await;
210 continue;
211 }
212 }
213
214 if !code.is_success() {
215 return Err(HttpClientError::StatusCode(code).into());
216 }
217 }
218
219 let response = response?;
220 return Ok(response);
221 }
222 }
223
224 pub async fn request_body(&self, req: Request<Bytes>) -> Result<Bytes, Error> {
225 let response = self.request(req).await?;
226 Ok(response.into_body().collect().await?.to_bytes())
227 }
228
229 pub fn request_stream(&self, req: Request<Bytes>) -> Result<IntoStream<ResponseFuture>, Error> {
230 Ok(self.request_fut(req)?.into_stream())
231 }
232
233 pub fn request_fut(&self, mut req: Request<Bytes>) -> Result<ResponseFuture, Error> {
234 let headers_mut = req.headers_mut();
235 headers_mut.insert(USER_AGENT, self.user_agent.clone());
236
237 let domain = match req.uri().host() {
241 Some(host) => {
242 let mut parts = host
244 .split('.')
245 .map(|s| s.to_string())
246 .collect::<Vec<String>>();
247 let n = parts.len().saturating_sub(2);
248 parts.drain(n..).collect()
249 }
250 None => String::from(""),
251 };
252 self.rate_limiter.check_key(&domain).map_err(|e| {
253 Error::resource_exhausted(format!(
254 "rate limited for at least another {} seconds",
255 e.wait_time_from(Instant::now()).as_secs()
256 ))
257 })?;
258
259 Ok(self.hyper_client().request(req.map(Full::new)))
260 }
261
262 pub fn get_retry_after(headers: &HeaderMap<HeaderValue>) -> Option<Duration> {
263 let now = Date::now_utc().as_timestamp_ms();
264
265 let mut retry_after_ms = None;
266 if let Some(header_val) = headers.get("X-RateLimit-Next") {
267 if let Ok(date_str) = header_val.to_str() {
269 if let Ok(target) = Date::from_iso8601(date_str) {
270 retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now))
271 }
272 }
273 } else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") {
274 if let Ok(timestamp) = header_val.to_str() {
276 if let Ok(target) = timestamp.parse::<i64>() {
277 retry_after_ms = Some(target.saturating_sub(now))
278 }
279 }
280 } else if let Some(header_val) = headers.get("Retry-After") {
281 if let Ok(retry_after) = header_val.to_str() {
283 if let Ok(duration) = retry_after.parse::<i64>() {
284 retry_after_ms = Some(duration * 1000)
285 }
286 }
287 }
288
289 if let Some(retry_after) = retry_after_ms {
290 let duration = Duration::from_millis(retry_after as u64);
291 if duration <= RATE_LIMIT_MAX_WAIT {
292 return Some(duration);
293 } else {
294 debug!(
295 "Waiting {} seconds would exceed {} second limit",
296 duration.as_secs(),
297 RATE_LIMIT_MAX_WAIT.as_secs()
298 );
299 }
300 }
301
302 None
303 }
304}