Skip to main content

spotify_launcher/
http.rs

1use crate::errors::*;
2use bytes::Bytes;
3use reqwest::{
4    header::{HeaderMap, HeaderValue, RANGE},
5    Response, StatusCode,
6};
7use std::time::Duration;
8use tokio::time;
9
10pub struct Client {
11    client: reqwest::Client,
12    timeout: Option<Duration>,
13}
14
15impl Client {
16    pub fn new(timeout: Option<u64>) -> Result<Client> {
17        let client = reqwest::ClientBuilder::new()
18            .user_agent(format!("spotify-launcher/{}", env!("CARGO_PKG_VERSION")))
19            .redirect(reqwest::redirect::Policy::limited(8))
20            .build()
21            .context("Failed to create http client")?;
22
23        let timeout = match timeout {
24            Some(0) => None,
25            Some(secs) => Some(Duration::from_secs(secs)),
26            None => Some(Duration::from_secs(30)),
27        };
28
29        Ok(Client { client, timeout })
30    }
31
32    async fn send_get(&self, url: &str, offset: Option<u64>) -> Result<Response> {
33        let future = async {
34            let mut headers = HeaderMap::new();
35            if let Some(offset) = offset {
36                headers.insert(RANGE, HeaderValue::from_str(&format!("bytes={offset}-"))?);
37            }
38
39            let resp = self
40                .client
41                .get(url)
42                .headers(headers)
43                .send()
44                .await
45                .context("Failed to send http request")?;
46
47            let status = resp.status();
48            if !status.is_success() {
49                bail!("Unexpected http status code: {:?}", status);
50            }
51
52            Ok(resp)
53        };
54        if let Some(timeout) = self.timeout {
55            time::timeout(timeout, future)
56                .await
57                .context("Request timed out")?
58        } else {
59            future.await
60        }
61    }
62
63    pub async fn fetch(&self, url: &str) -> Result<Vec<u8>> {
64        debug!("Fetching {:?}...", url);
65        let resp = self.send_get(url, None).await?;
66
67        let body = resp.bytes();
68        let body = if let Some(timeout) = self.timeout {
69            time::timeout(timeout, body)
70                .await
71                .context("Reading http response timed out")?
72        } else {
73            body.await
74        }
75        .context("Failed to read http response")?;
76
77        debug!("Fetched {} bytes", body.len());
78        Ok(body.to_vec())
79    }
80
81    pub async fn fetch_stream(&self, url: &str, offset: Option<u64>) -> Result<Download> {
82        debug!("Downloading {:?}...", url);
83        let resp = self.send_get(url, offset).await?;
84
85        if offset.is_some() && resp.status() != StatusCode::PARTIAL_CONTENT {
86            bail!("Download server does not support resumption");
87        }
88
89        let progress = offset.unwrap_or(0);
90        let total = resp.content_length().unwrap_or(0) + progress;
91
92        Ok(Download {
93            resp,
94            timeout: self.timeout,
95            progress,
96            total,
97        })
98    }
99}
100
101pub struct Download {
102    resp: reqwest::Response,
103    timeout: Option<Duration>,
104    pub progress: u64,
105    pub total: u64,
106}
107
108impl Download {
109    pub async fn chunk(&mut self) -> Result<Option<Bytes>> {
110        let future = self.resp.chunk();
111        let bytes = if let Some(timeout) = self.timeout {
112            if let Ok(bytes) = time::timeout(timeout, future).await {
113                bytes?
114            } else {
115                bail!("Download timed out due to inactivity");
116            }
117        } else {
118            future.await?
119        };
120        if let Some(bytes) = bytes {
121            self.progress += bytes.len() as u64;
122            Ok(Some(bytes))
123        } else {
124            Ok(None)
125        }
126    }
127}