librespot_core/
http_client.rs

1use std::{
2    collections::HashMap,
3    sync::OnceLock,
4    time::{Duration, Instant},
5};
6
7use bytes::Bytes;
8use futures_util::{FutureExt, future::IntoStream};
9use governor::{
10    Quota, RateLimiter, clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState,
11};
12use http::{Uri, header::HeaderValue};
13use http_body_util::{BodyExt, Full};
14use hyper::{HeaderMap, Request, Response, StatusCode, body::Incoming, header::USER_AGENT};
15use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
16use hyper_util::{
17    client::legacy::{Client, ResponseFuture, connect::HttpConnector},
18    rt::TokioExecutor,
19};
20use nonzero_ext::nonzero;
21use parking_lot::Mutex;
22use thiserror::Error;
23use url::Url;
24
25#[cfg(all(feature = "__rustls", not(feature = "native-tls")))]
26use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
27#[cfg(all(feature = "native-tls", not(feature = "__rustls")))]
28use hyper_tls::HttpsConnector;
29
30use crate::{
31    Error,
32    config::{OS, os_version},
33    date::Date,
34    version::{FALLBACK_USER_AGENT, VERSION_STRING, spotify_version},
35};
36
37// The 30 seconds interval is documented by Spotify, but the calls per interval
38// is a guesstimate and probably subject to licensing (purchasing extra calls)
39// and may change at any time.
40pub const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(30);
41pub const RATE_LIMIT_MAX_WAIT: Duration = Duration::from_secs(10);
42pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300;
43
44#[derive(Debug, Error)]
45pub enum HttpClientError {
46    #[error("Response status code: {0}")]
47    StatusCode(hyper::StatusCode),
48}
49
50impl From<HttpClientError> for Error {
51    fn from(err: HttpClientError) -> Self {
52        match err {
53            HttpClientError::StatusCode(code) => {
54                // not exhaustive, but what reasonably could be expected
55                match code {
56                    StatusCode::GATEWAY_TIMEOUT | StatusCode::REQUEST_TIMEOUT => {
57                        Error::deadline_exceeded(err)
58                    }
59                    StatusCode::GONE
60                    | StatusCode::NOT_FOUND
61                    | StatusCode::MOVED_PERMANENTLY
62                    | StatusCode::PERMANENT_REDIRECT
63                    | StatusCode::TEMPORARY_REDIRECT => Error::not_found(err),
64                    StatusCode::FORBIDDEN | StatusCode::PAYMENT_REQUIRED => {
65                        Error::permission_denied(err)
66                    }
67                    StatusCode::NETWORK_AUTHENTICATION_REQUIRED
68                    | StatusCode::PROXY_AUTHENTICATION_REQUIRED
69                    | StatusCode::UNAUTHORIZED => Error::unauthenticated(err),
70                    StatusCode::EXPECTATION_FAILED
71                    | StatusCode::PRECONDITION_FAILED
72                    | StatusCode::PRECONDITION_REQUIRED => Error::failed_precondition(err),
73                    StatusCode::RANGE_NOT_SATISFIABLE => Error::out_of_range(err),
74                    StatusCode::INTERNAL_SERVER_ERROR
75                    | StatusCode::MISDIRECTED_REQUEST
76                    | StatusCode::SERVICE_UNAVAILABLE
77                    | StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => Error::unavailable(err),
78                    StatusCode::BAD_REQUEST
79                    | StatusCode::HTTP_VERSION_NOT_SUPPORTED
80                    | StatusCode::LENGTH_REQUIRED
81                    | StatusCode::METHOD_NOT_ALLOWED
82                    | StatusCode::NOT_ACCEPTABLE
83                    | StatusCode::PAYLOAD_TOO_LARGE
84                    | StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
85                    | StatusCode::UNSUPPORTED_MEDIA_TYPE
86                    | StatusCode::URI_TOO_LONG => Error::invalid_argument(err),
87                    StatusCode::TOO_MANY_REQUESTS => Error::resource_exhausted(err),
88                    StatusCode::NOT_IMPLEMENTED => Error::unimplemented(err),
89                    _ => Error::unknown(err),
90                }
91            }
92        }
93    }
94}
95
96type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Full<bytes::Bytes>>;
97
98pub struct HttpClient {
99    user_agent: HeaderValue,
100    proxy_url: Option<Url>,
101    hyper_client: OnceLock<HyperClient>,
102
103    // while the DashMap variant is more performant, our level of concurrency
104    // is pretty low so we can save pulling in that extra dependency
105    rate_limiter:
106        RateLimiter<String, Mutex<HashMap<String, InMemoryState>>, MonotonicClock, NoOpMiddleware>,
107}
108
109impl HttpClient {
110    pub fn new(proxy_url: Option<&Url>) -> Self {
111        let zero_str = String::from("0");
112        let os_version = os_version();
113
114        let (spotify_platform, os_version) = match OS {
115            "android" => ("Android", os_version),
116            "ios" => ("iOS", os_version),
117            "macos" => ("OSX", zero_str),
118            "windows" => ("Win32", zero_str),
119            _ => ("Linux", zero_str),
120        };
121
122        let user_agent_str = &format!(
123            "Spotify/{} {}/{} ({})",
124            spotify_version(),
125            spotify_platform,
126            os_version,
127            VERSION_STRING
128        );
129
130        let user_agent = HeaderValue::from_str(user_agent_str).unwrap_or_else(|err| {
131            error!("Invalid user agent <{user_agent_str}>: {err}");
132            HeaderValue::from_static(FALLBACK_USER_AGENT)
133        });
134
135        let replenish_interval_ns =
136            RATE_LIMIT_INTERVAL.as_nanos() / RATE_LIMIT_CALLS_PER_INTERVAL as u128;
137        let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
138            .expect("replenish interval should be valid")
139            .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
140        let rate_limiter = RateLimiter::keyed(quota);
141
142        Self {
143            user_agent,
144            proxy_url: proxy_url.cloned(),
145            hyper_client: OnceLock::new(),
146            rate_limiter,
147        }
148    }
149
150    fn try_create_hyper_client(proxy_url: Option<&Url>) -> Result<HyperClient, Error> {
151        // configuring TLS is expensive and should be done once per process
152
153        #[cfg(all(feature = "__rustls", not(feature = "native-tls")))]
154        let https_connector = {
155            #[cfg(feature = "rustls-tls-native-roots")]
156            let tls = HttpsConnectorBuilder::new().with_native_roots()?;
157            #[cfg(feature = "rustls-tls-webpki-roots")]
158            let tls = HttpsConnectorBuilder::new().with_webpki_roots();
159            tls.https_or_http().enable_http1().enable_http2().build()
160        };
161
162        #[cfg(all(feature = "native-tls", not(feature = "__rustls")))]
163        let https_connector = HttpsConnector::new();
164
165        // When not using a proxy a dummy proxy is configured that will not intercept any traffic.
166        // This prevents needing to carry the Client Connector generics through the whole project
167        let proxy = match &proxy_url {
168            Some(proxy_url) => Proxy::new(Intercept::All, proxy_url.to_string().parse()?),
169            None => Proxy::new(Intercept::None, Uri::from_static("0.0.0.0")),
170        };
171        let proxy_connector = ProxyConnector::from_proxy(https_connector, proxy)?;
172
173        let client = Client::builder(TokioExecutor::new())
174            .http2_adaptive_window(true)
175            .build(proxy_connector);
176        Ok(client)
177    }
178
179    fn hyper_client(&self) -> &HyperClient {
180        self.hyper_client
181            .get_or_init(|| Self::try_create_hyper_client(self.proxy_url.as_ref()).unwrap())
182    }
183
184    pub async fn request(&self, req: Request<Bytes>) -> Result<Response<Incoming>, Error> {
185        debug!("Requesting {}", req.uri());
186
187        // `Request` does not implement `Clone` because its `Body` may be a single-shot stream.
188        // As correct as that may be technically, we now need all this boilerplate to clone it
189        // ourselves, as any `Request` is moved in the loop.
190        let (parts, body_as_bytes) = req.into_parts();
191
192        loop {
193            let mut req = Request::builder()
194                .method(parts.method.clone())
195                .uri(parts.uri.clone())
196                .version(parts.version)
197                .body(body_as_bytes.clone())?;
198            *req.headers_mut() = parts.headers.clone();
199
200            let request = self.request_fut(req)?;
201            let response = request.await;
202
203            if let Ok(response) = &response {
204                let code = response.status();
205
206                if code == StatusCode::TOO_MANY_REQUESTS {
207                    if let Some(duration) = Self::get_retry_after(response.headers()) {
208                        warn!(
209                            "Rate limited by service, retrying in {} seconds...",
210                            duration.as_secs()
211                        );
212                        tokio::time::sleep(duration).await;
213                        continue;
214                    }
215                }
216
217                if !code.is_success() {
218                    return Err(HttpClientError::StatusCode(code).into());
219                }
220            }
221
222            let response = response?;
223            return Ok(response);
224        }
225    }
226
227    pub async fn request_body(&self, req: Request<Bytes>) -> Result<Bytes, Error> {
228        let response = self.request(req).await?;
229        Ok(response.into_body().collect().await?.to_bytes())
230    }
231
232    pub fn request_stream(&self, req: Request<Bytes>) -> Result<IntoStream<ResponseFuture>, Error> {
233        Ok(self.request_fut(req)?.into_stream())
234    }
235
236    pub fn request_fut(&self, mut req: Request<Bytes>) -> Result<ResponseFuture, Error> {
237        let headers_mut = req.headers_mut();
238        headers_mut.insert(USER_AGENT, self.user_agent.clone());
239
240        // For rate limiting we cannot *just* depend on Spotify sending us HTTP/429
241        // Retry-After headers. For example, when there is a service interruption
242        // and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure.
243        let domain = match req.uri().host() {
244            Some(host) => {
245                // strip the prefix from *.domain.tld (assume rate limit is per domain, not subdomain)
246                let mut parts = host
247                    .split('.')
248                    .map(|s| s.to_string())
249                    .collect::<Vec<String>>();
250                let n = parts.len().saturating_sub(2);
251                parts.drain(n..).collect()
252            }
253            None => String::from(""),
254        };
255        self.rate_limiter.check_key(&domain).map_err(|e| {
256            Error::resource_exhausted(format!(
257                "rate limited for at least another {} seconds",
258                e.wait_time_from(Instant::now()).as_secs()
259            ))
260        })?;
261
262        Ok(self.hyper_client().request(req.map(Full::new)))
263    }
264
265    pub fn get_retry_after(headers: &HeaderMap<HeaderValue>) -> Option<Duration> {
266        let now = Date::now_utc().as_timestamp_ms();
267
268        let mut retry_after_ms = None;
269        if let Some(header_val) = headers.get("X-RateLimit-Next") {
270            // *.akamaized.net (Akamai)
271            if let Ok(date_str) = header_val.to_str() {
272                if let Ok(target) = Date::from_iso8601(date_str) {
273                    retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now))
274                }
275            }
276        } else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") {
277            // *.scdn.co (Fastly)
278            if let Ok(timestamp) = header_val.to_str() {
279                if let Ok(target) = timestamp.parse::<i64>() {
280                    retry_after_ms = Some(target.saturating_sub(now))
281                }
282            }
283        } else if let Some(header_val) = headers.get("Retry-After") {
284            // Generic RFC compliant (including *.spotify.com)
285            if let Ok(retry_after) = header_val.to_str() {
286                if let Ok(duration) = retry_after.parse::<i64>() {
287                    retry_after_ms = Some(duration * 1000)
288                }
289            }
290        }
291
292        if let Some(retry_after) = retry_after_ms {
293            let duration = Duration::from_millis(retry_after as u64);
294            if duration <= RATE_LIMIT_MAX_WAIT {
295                return Some(duration);
296            } else {
297                debug!(
298                    "Waiting {} seconds would exceed {} second limit",
299                    duration.as_secs(),
300                    RATE_LIMIT_MAX_WAIT.as_secs()
301                );
302            }
303        }
304
305        None
306    }
307}