openlibspot_core/
http_client.rs

1use std::{
2    collections::HashMap,
3    env::consts::OS,
4    time::{Duration, Instant},
5};
6
7use bytes::Bytes;
8use futures_util::{future::IntoStream, FutureExt};
9use governor::{
10    clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState, Quota, RateLimiter,
11};
12use http::{header::HeaderValue, Uri};
13use hyper::{
14    client::{HttpConnector, ResponseFuture},
15    header::USER_AGENT,
16    Body, Client, HeaderMap, Request, Response, StatusCode,
17};
18use hyper_proxy::{Intercept, Proxy, ProxyConnector};
19use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
20use nonzero_ext::nonzero;
21use once_cell::sync::OnceCell;
22use parking_lot::Mutex;
23use sysinfo::{System, SystemExt};
24use thiserror::Error;
25use url::Url;
26
27use crate::{
28    date::Date,
29    version::{spotify_version, FALLBACK_USER_AGENT, VERSION_STRING},
30    Error,
31};
32
33// The 30 seconds interval is documented by Spotify, but the calls per interval
34// is a guesstimate and probably subject to licensing (purchasing extra calls)
35// and may change at any time.
36pub const RATE_LIMIT_INTERVAL: Duration = Duration::from_secs(30);
37pub const RATE_LIMIT_MAX_WAIT: Duration = Duration::from_secs(10);
38pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300;
39
40#[derive(Debug, Error)]
41pub enum HttpClientError {
42    #[error("Response status code: {0}")]
43    StatusCode(hyper::StatusCode),
44}
45
46impl From<HttpClientError> for Error {
47    fn from(err: HttpClientError) -> Self {
48        match err {
49            HttpClientError::StatusCode(code) => {
50                // not exhaustive, but what reasonably could be expected
51                match code {
52                    StatusCode::GATEWAY_TIMEOUT | StatusCode::REQUEST_TIMEOUT => {
53                        Error::deadline_exceeded(err)
54                    }
55                    StatusCode::GONE
56                    | StatusCode::NOT_FOUND
57                    | StatusCode::MOVED_PERMANENTLY
58                    | StatusCode::PERMANENT_REDIRECT
59                    | StatusCode::TEMPORARY_REDIRECT => Error::not_found(err),
60                    StatusCode::FORBIDDEN | StatusCode::PAYMENT_REQUIRED => {
61                        Error::permission_denied(err)
62                    }
63                    StatusCode::NETWORK_AUTHENTICATION_REQUIRED
64                    | StatusCode::PROXY_AUTHENTICATION_REQUIRED
65                    | StatusCode::UNAUTHORIZED => Error::unauthenticated(err),
66                    StatusCode::EXPECTATION_FAILED
67                    | StatusCode::PRECONDITION_FAILED
68                    | StatusCode::PRECONDITION_REQUIRED => Error::failed_precondition(err),
69                    StatusCode::RANGE_NOT_SATISFIABLE => Error::out_of_range(err),
70                    StatusCode::INTERNAL_SERVER_ERROR
71                    | StatusCode::MISDIRECTED_REQUEST
72                    | StatusCode::SERVICE_UNAVAILABLE
73                    | StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => Error::unavailable(err),
74                    StatusCode::BAD_REQUEST
75                    | StatusCode::HTTP_VERSION_NOT_SUPPORTED
76                    | StatusCode::LENGTH_REQUIRED
77                    | StatusCode::METHOD_NOT_ALLOWED
78                    | StatusCode::NOT_ACCEPTABLE
79                    | StatusCode::PAYLOAD_TOO_LARGE
80                    | StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
81                    | StatusCode::UNSUPPORTED_MEDIA_TYPE
82                    | StatusCode::URI_TOO_LONG => Error::invalid_argument(err),
83                    StatusCode::TOO_MANY_REQUESTS => Error::resource_exhausted(err),
84                    StatusCode::NOT_IMPLEMENTED => Error::unimplemented(err),
85                    _ => Error::unknown(err),
86                }
87            }
88        }
89    }
90}
91
92type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Body>;
93
94pub struct HttpClient {
95    user_agent: HeaderValue,
96    proxy_url: Option<Url>,
97    hyper_client: OnceCell<HyperClient>,
98
99    // while the DashMap variant is more performant, our level of concurrency
100    // is pretty low so we can save pulling in that extra dependency
101    rate_limiter:
102        RateLimiter<String, Mutex<HashMap<String, InMemoryState>>, MonotonicClock, NoOpMiddleware>,
103}
104
105impl HttpClient {
106    pub fn new(proxy_url: Option<&Url>) -> Self {
107        let zero_str = String::from("0");
108        let os_version = System::new()
109            .os_version()
110            .unwrap_or_else(|| zero_str.clone());
111
112        let (spotify_platform, os_version) = match OS {
113            "android" => ("Android", os_version),
114            "ios" => ("iOS", os_version),
115            "macos" => ("OSX", zero_str),
116            "windows" => ("Win32", zero_str),
117            _ => ("Linux", zero_str),
118        };
119
120        let user_agent_str = &format!(
121            "Spotify/{} {}/{} ({})",
122            spotify_version(),
123            spotify_platform,
124            os_version,
125            VERSION_STRING
126        );
127
128        let user_agent = HeaderValue::from_str(user_agent_str).unwrap_or_else(|err| {
129            error!("Invalid user agent <{}>: {}", user_agent_str, err);
130            HeaderValue::from_static(FALLBACK_USER_AGENT)
131        });
132
133        let replenish_interval_ns =
134            RATE_LIMIT_INTERVAL.as_nanos() / RATE_LIMIT_CALLS_PER_INTERVAL as u128;
135        let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
136            .expect("replenish interval should be valid")
137            .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
138        let rate_limiter = RateLimiter::keyed(quota);
139
140        Self {
141            user_agent,
142            proxy_url: proxy_url.cloned(),
143            hyper_client: OnceCell::new(),
144            rate_limiter,
145        }
146    }
147
148    fn try_create_hyper_client(proxy_url: Option<&Url>) -> Result<HyperClient, Error> {
149        // configuring TLS is expensive and should be done once per process
150        let https_connector = HttpsConnectorBuilder::new()
151            .with_native_roots()
152            .https_or_http()
153            .enable_http1()
154            .enable_http2()
155            .build();
156
157        // When not using a proxy a dummy proxy is configured that will not intercept any traffic.
158        // This prevents needing to carry the Client Connector generics through the whole project
159        let proxy = match &proxy_url {
160            Some(proxy_url) => Proxy::new(Intercept::All, proxy_url.to_string().parse()?),
161            None => Proxy::new(Intercept::None, Uri::from_static("0.0.0.0")),
162        };
163        let proxy_connector = ProxyConnector::from_proxy(https_connector, proxy)?;
164
165        let client = Client::builder()
166            .http2_adaptive_window(true)
167            .build(proxy_connector);
168        Ok(client)
169    }
170
171    fn hyper_client(&self) -> Result<&HyperClient, Error> {
172        self.hyper_client
173            .get_or_try_init(|| Self::try_create_hyper_client(self.proxy_url.as_ref()))
174    }
175
176    pub async fn request(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
177        debug!("Requesting {}", req.uri().to_string());
178
179        // `Request` does not implement `Clone` because its `Body` may be a single-shot stream.
180        // As correct as that may be technically, we now need all this boilerplate to clone it
181        // ourselves, as any `Request` is moved in the loop.
182        let (parts, body) = req.into_parts();
183        let body_as_bytes = hyper::body::to_bytes(body)
184            .await
185            .unwrap_or_else(|_| Bytes::new());
186
187        loop {
188            let mut req = Request::builder()
189                .method(parts.method.clone())
190                .uri(parts.uri.clone())
191                .version(parts.version)
192                .body(Body::from(body_as_bytes.clone()))?;
193            *req.headers_mut() = parts.headers.clone();
194
195            let request = self.request_fut(req)?;
196            let response = request.await;
197
198            if let Ok(response) = &response {
199                let code = response.status();
200
201                if code == StatusCode::TOO_MANY_REQUESTS {
202                    if let Some(duration) = Self::get_retry_after(response.headers()) {
203                        warn!(
204                            "Rate limited by service, retrying in {} seconds...",
205                            duration.as_secs()
206                        );
207                        tokio::time::sleep(duration).await;
208                        continue;
209                    }
210                }
211
212                if code != StatusCode::OK {
213                    return Err(HttpClientError::StatusCode(code).into());
214                }
215            }
216
217            return Ok(response?);
218        }
219    }
220
221    pub async fn request_body(&self, req: Request<Body>) -> Result<Bytes, Error> {
222        let response = self.request(req).await?;
223        Ok(hyper::body::to_bytes(response.into_body()).await?)
224    }
225
226    pub fn request_stream(&self, req: Request<Body>) -> Result<IntoStream<ResponseFuture>, Error> {
227        Ok(self.request_fut(req)?.into_stream())
228    }
229
230    pub fn request_fut(&self, mut req: Request<Body>) -> Result<ResponseFuture, Error> {
231        let headers_mut = req.headers_mut();
232        headers_mut.insert(USER_AGENT, self.user_agent.clone());
233
234        // For rate limiting we cannot *just* depend on Spotify sending us HTTP/429
235        // Retry-After headers. For example, when there is a service interruption
236        // and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure.
237        let domain = match req.uri().host() {
238            Some(host) => {
239                // strip the prefix from *.domain.tld (assume rate limit is per domain, not subdomain)
240                let mut parts = host
241                    .split('.')
242                    .map(|s| s.to_string())
243                    .collect::<Vec<String>>();
244                let n = parts.len().saturating_sub(2);
245                parts.drain(n..).collect()
246            }
247            None => String::from(""),
248        };
249        self.rate_limiter.check_key(&domain).map_err(|e| {
250            Error::resource_exhausted(format!(
251                "rate limited for at least another {} seconds",
252                e.wait_time_from(Instant::now()).as_secs()
253            ))
254        })?;
255
256        Ok(self.hyper_client()?.request(req))
257    }
258
259    pub fn get_retry_after(headers: &HeaderMap<HeaderValue>) -> Option<Duration> {
260        let now = Date::now_utc().as_timestamp_ms();
261
262        let mut retry_after_ms = None;
263        if let Some(header_val) = headers.get("X-RateLimit-Next") {
264            // *.akamaized.net (Akamai)
265            if let Ok(date_str) = header_val.to_str() {
266                if let Ok(target) = Date::from_iso8601(date_str) {
267                    retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now))
268                }
269            }
270        } else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") {
271            // *.scdn.co (Fastly)
272            if let Ok(timestamp) = header_val.to_str() {
273                if let Ok(target) = timestamp.parse::<i64>() {
274                    retry_after_ms = Some(target.saturating_sub(now))
275                }
276            }
277        } else if let Some(header_val) = headers.get("Retry-After") {
278            // Generic RFC compliant (including *.spotify.com)
279            if let Ok(retry_after) = header_val.to_str() {
280                if let Ok(duration) = retry_after.parse::<i64>() {
281                    retry_after_ms = Some(duration * 1000)
282                }
283            }
284        }
285
286        if let Some(retry_after) = retry_after_ms {
287            let duration = Duration::from_millis(retry_after as u64);
288            if duration <= RATE_LIMIT_MAX_WAIT {
289                return Some(duration);
290            } else {
291                debug!(
292                    "Waiting {} seconds would exceed {} second limit",
293                    duration.as_secs(),
294                    RATE_LIMIT_MAX_WAIT.as_secs()
295                );
296            }
297        }
298
299        None
300    }
301}