librespot_core/
http_client.rs

1use std::{
2    collections::HashMap,
3    time::{Duration, Instant},
4};
5
6use bytes::Bytes;
7use futures_util::{future::IntoStream, FutureExt};
8use governor::{
9    clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState, Quota, RateLimiter,
10};
11use http::{header::HeaderValue, Uri};
12use http_body_util::{BodyExt, Full};
13use hyper::{body::Incoming, header::USER_AGENT, HeaderMap, Request, Response, StatusCode};
14use hyper_proxy2::{Intercept, Proxy, ProxyConnector};
15use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
16use hyper_util::{
17    client::legacy::{connect::HttpConnector, Client, ResponseFuture},
18    rt::TokioExecutor,
19};
20use nonzero_ext::nonzero;
21use once_cell::sync::OnceCell;
22use parking_lot::Mutex;
23use thiserror::Error;
24use url::Url;
25
26use crate::{
27    config::{os_version, OS},
28    date::Date,
29    version::{spotify_version, FALLBACK_USER_AGENT, VERSION_STRING},
30    Error,
31};
32
33// The 30 seconds interval is documented by Spotify, but the calls per interval
34// is a guesstimate and probably subject to licensing (purchasing extra calls)
35// and may change at any time.
36pub const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(30);
37pub const RATE_LIMIT_MAX_WAIT: Duration = Duration::from_secs(10);
38pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300;
39
40#[derive(Debug, Error)]
41pub enum HttpClientError {
42    #[error("Response status code: {0}")]
43    StatusCode(hyper::StatusCode),
44}
45
46impl From<HttpClientError> for Error {
47    fn from(err: HttpClientError) -> Self {
48        match err {
49            HttpClientError::StatusCode(code) => {
50                // not exhaustive, but what reasonably could be expected
51                match code {
52                    StatusCode::GATEWAY_TIMEOUT | StatusCode::REQUEST_TIMEOUT => {
53                        Error::deadline_exceeded(err)
54                    }
55                    StatusCode::GONE
56                    | StatusCode::NOT_FOUND
57                    | StatusCode::MOVED_PERMANENTLY
58                    | StatusCode::PERMANENT_REDIRECT
59                    | StatusCode::TEMPORARY_REDIRECT => Error::not_found(err),
60                    StatusCode::FORBIDDEN | StatusCode::PAYMENT_REQUIRED => {
61                        Error::permission_denied(err)
62                    }
63                    StatusCode::NETWORK_AUTHENTICATION_REQUIRED
64                    | StatusCode::PROXY_AUTHENTICATION_REQUIRED
65                    | StatusCode::UNAUTHORIZED => Error::unauthenticated(err),
66                    StatusCode::EXPECTATION_FAILED
67                    | StatusCode::PRECONDITION_FAILED
68                    | StatusCode::PRECONDITION_REQUIRED => Error::failed_precondition(err),
69                    StatusCode::RANGE_NOT_SATISFIABLE => Error::out_of_range(err),
70                    StatusCode::INTERNAL_SERVER_ERROR
71                    | StatusCode::MISDIRECTED_REQUEST
72                    | StatusCode::SERVICE_UNAVAILABLE
73                    | StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => Error::unavailable(err),
74                    StatusCode::BAD_REQUEST
75                    | StatusCode::HTTP_VERSION_NOT_SUPPORTED
76                    | StatusCode::LENGTH_REQUIRED
77                    | StatusCode::METHOD_NOT_ALLOWED
78                    | StatusCode::NOT_ACCEPTABLE
79                    | StatusCode::PAYLOAD_TOO_LARGE
80                    | StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
81                    | StatusCode::UNSUPPORTED_MEDIA_TYPE
82                    | StatusCode::URI_TOO_LONG => Error::invalid_argument(err),
83                    StatusCode::TOO_MANY_REQUESTS => Error::resource_exhausted(err),
84                    StatusCode::NOT_IMPLEMENTED => Error::unimplemented(err),
85                    _ => Error::unknown(err),
86                }
87            }
88        }
89    }
90}
91
92type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Full<bytes::Bytes>>;
93
94pub struct HttpClient {
95    user_agent: HeaderValue,
96    proxy_url: Option<Url>,
97    hyper_client: OnceCell<HyperClient>,
98
99    // while the DashMap variant is more performant, our level of concurrency
100    // is pretty low so we can save pulling in that extra dependency
101    rate_limiter:
102        RateLimiter<String, Mutex<HashMap<String, InMemoryState>>, MonotonicClock, NoOpMiddleware>,
103}
104
105impl HttpClient {
106    pub fn new(proxy_url: Option<&Url>) -> Self {
107        let zero_str = String::from("0");
108        let os_version = os_version();
109
110        let (spotify_platform, os_version) = match OS {
111            "android" => ("Android", os_version),
112            "ios" => ("iOS", os_version),
113            "macos" => ("OSX", zero_str),
114            "windows" => ("Win32", zero_str),
115            _ => ("Linux", zero_str),
116        };
117
118        let user_agent_str = &format!(
119            "Spotify/{} {}/{} ({})",
120            spotify_version(),
121            spotify_platform,
122            os_version,
123            VERSION_STRING
124        );
125
126        let user_agent = HeaderValue::from_str(user_agent_str).unwrap_or_else(|err| {
127            error!("Invalid user agent <{}>: {}", user_agent_str, err);
128            HeaderValue::from_static(FALLBACK_USER_AGENT)
129        });
130
131        let replenish_interval_ns =
132            RATE_LIMIT_INTERVAL.as_nanos() / RATE_LIMIT_CALLS_PER_INTERVAL as u128;
133        let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
134            .expect("replenish interval should be valid")
135            .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
136        let rate_limiter = RateLimiter::keyed(quota);
137
138        Self {
139            user_agent,
140            proxy_url: proxy_url.cloned(),
141            hyper_client: OnceCell::new(),
142            rate_limiter,
143        }
144    }
145
146    fn try_create_hyper_client(proxy_url: Option<&Url>) -> Result<HyperClient, Error> {
147        // configuring TLS is expensive and should be done once per process
148        let https_connector = HttpsConnectorBuilder::new()
149            .with_native_roots()?
150            .https_or_http()
151            .enable_http1()
152            .enable_http2()
153            .build();
154
155        // When not using a proxy a dummy proxy is configured that will not intercept any traffic.
156        // This prevents needing to carry the Client Connector generics through the whole project
157        let proxy = match &proxy_url {
158            Some(proxy_url) => Proxy::new(Intercept::All, proxy_url.to_string().parse()?),
159            None => Proxy::new(Intercept::None, Uri::from_static("0.0.0.0")),
160        };
161        let proxy_connector = ProxyConnector::from_proxy(https_connector, proxy)?;
162
163        let client = Client::builder(TokioExecutor::new())
164            .http2_adaptive_window(true)
165            .build(proxy_connector);
166        Ok(client)
167    }
168
169    fn hyper_client(&self) -> Result<&HyperClient, Error> {
170        self.hyper_client
171            .get_or_try_init(|| Self::try_create_hyper_client(self.proxy_url.as_ref()))
172    }
173
174    pub async fn request(&self, req: Request<Bytes>) -> Result<Response<Incoming>, Error> {
175        debug!("Requesting {}", req.uri().to_string());
176
177        // `Request` does not implement `Clone` because its `Body` may be a single-shot stream.
178        // As correct as that may be technically, we now need all this boilerplate to clone it
179        // ourselves, as any `Request` is moved in the loop.
180        let (parts, body_as_bytes) = req.into_parts();
181
182        loop {
183            let mut req = Request::builder()
184                .method(parts.method.clone())
185                .uri(parts.uri.clone())
186                .version(parts.version)
187                .body(body_as_bytes.clone())?;
188            *req.headers_mut() = parts.headers.clone();
189
190            let request = self.request_fut(req)?;
191            let response = request.await;
192
193            if let Ok(response) = &response {
194                let code = response.status();
195
196                if code == StatusCode::TOO_MANY_REQUESTS {
197                    if let Some(duration) = Self::get_retry_after(response.headers()) {
198                        warn!(
199                            "Rate limited by service, retrying in {} seconds...",
200                            duration.as_secs()
201                        );
202                        tokio::time::sleep(duration).await;
203                        continue;
204                    }
205                }
206
207                if code != StatusCode::OK {
208                    return Err(HttpClientError::StatusCode(code).into());
209                }
210            }
211
212            let response = response?;
213            return Ok(response);
214        }
215    }
216
217    pub async fn request_body(&self, req: Request<Bytes>) -> Result<Bytes, Error> {
218        let response = self.request(req).await?;
219        Ok(response.into_body().collect().await?.to_bytes())
220    }
221
222    pub fn request_stream(&self, req: Request<Bytes>) -> Result<IntoStream<ResponseFuture>, Error> {
223        Ok(self.request_fut(req)?.into_stream())
224    }
225
226    pub fn request_fut(&self, mut req: Request<Bytes>) -> Result<ResponseFuture, Error> {
227        let headers_mut = req.headers_mut();
228        headers_mut.insert(USER_AGENT, self.user_agent.clone());
229
230        // For rate limiting we cannot *just* depend on Spotify sending us HTTP/429
231        // Retry-After headers. For example, when there is a service interruption
232        // and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure.
233        let domain = match req.uri().host() {
234            Some(host) => {
235                // strip the prefix from *.domain.tld (assume rate limit is per domain, not subdomain)
236                let mut parts = host
237                    .split('.')
238                    .map(|s| s.to_string())
239                    .collect::<Vec<String>>();
240                let n = parts.len().saturating_sub(2);
241                parts.drain(n..).collect()
242            }
243            None => String::from(""),
244        };
245        self.rate_limiter.check_key(&domain).map_err(|e| {
246            Error::resource_exhausted(format!(
247                "rate limited for at least another {} seconds",
248                e.wait_time_from(Instant::now()).as_secs()
249            ))
250        })?;
251
252        Ok(self.hyper_client()?.request(req.map(Full::new)))
253    }
254
255    pub fn get_retry_after(headers: &HeaderMap<HeaderValue>) -> Option<Duration> {
256        let now = Date::now_utc().as_timestamp_ms();
257
258        let mut retry_after_ms = None;
259        if let Some(header_val) = headers.get("X-RateLimit-Next") {
260            // *.akamaized.net (Akamai)
261            if let Ok(date_str) = header_val.to_str() {
262                if let Ok(target) = Date::from_iso8601(date_str) {
263                    retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now))
264                }
265            }
266        } else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") {
267            // *.scdn.co (Fastly)
268            if let Ok(timestamp) = header_val.to_str() {
269                if let Ok(target) = timestamp.parse::<i64>() {
270                    retry_after_ms = Some(target.saturating_sub(now))
271                }
272            }
273        } else if let Some(header_val) = headers.get("Retry-After") {
274            // Generic RFC compliant (including *.spotify.com)
275            if let Ok(retry_after) = header_val.to_str() {
276                if let Ok(duration) = retry_after.parse::<i64>() {
277                    retry_after_ms = Some(duration * 1000)
278                }
279            }
280        }
281
282        if let Some(retry_after) = retry_after_ms {
283            let duration = Duration::from_millis(retry_after as u64);
284            if duration <= RATE_LIMIT_MAX_WAIT {
285                return Some(duration);
286            } else {
287                debug!(
288                    "Waiting {} seconds would exceed {} second limit",
289                    duration.as_secs(),
290                    RATE_LIMIT_MAX_WAIT.as_secs()
291                );
292            }
293        }
294
295        None
296    }
297}