1use crate::errors::*;
2use bytes::Bytes;
3use reqwest::{
4 header::{HeaderMap, HeaderValue, RANGE},
5 Response, StatusCode,
6};
7use std::time::Duration;
8use tokio::time;
9
10pub struct Client {
11 client: reqwest::Client,
12 timeout: Option<Duration>,
13}
14
15impl Client {
16 pub fn new(timeout: Option<u64>) -> Result<Client> {
17 let client = reqwest::ClientBuilder::new()
18 .user_agent(format!("spotify-launcher/{}", env!("CARGO_PKG_VERSION")))
19 .redirect(reqwest::redirect::Policy::limited(8))
20 .build()
21 .context("Failed to create http client")?;
22
23 let timeout = match timeout {
24 Some(0) => None,
25 Some(secs) => Some(Duration::from_secs(secs)),
26 None => Some(Duration::from_secs(30)),
27 };
28
29 Ok(Client { client, timeout })
30 }
31
32 async fn send_get(&self, url: &str, offset: Option<u64>) -> Result<Response> {
33 let future = async {
34 let mut headers = HeaderMap::new();
35 if let Some(offset) = offset {
36 headers.insert(RANGE, HeaderValue::from_str(&format!("bytes={offset}-"))?);
37 }
38
39 let resp = self
40 .client
41 .get(url)
42 .headers(headers)
43 .send()
44 .await
45 .context("Failed to send http request")?;
46
47 let status = resp.status();
48 if !status.is_success() {
49 bail!("Unexpected http status code: {:?}", status);
50 }
51
52 Ok(resp)
53 };
54 if let Some(timeout) = self.timeout {
55 time::timeout(timeout, future)
56 .await
57 .context("Request timed out")?
58 } else {
59 future.await
60 }
61 }
62
63 pub async fn fetch(&self, url: &str) -> Result<Vec<u8>> {
64 debug!("Fetching {:?}...", url);
65 let resp = self.send_get(url, None).await?;
66
67 let body = resp.bytes();
68 let body = if let Some(timeout) = self.timeout {
69 time::timeout(timeout, body)
70 .await
71 .context("Reading http response timed out")?
72 } else {
73 body.await
74 }
75 .context("Failed to read http response")?;
76
77 debug!("Fetched {} bytes", body.len());
78 Ok(body.to_vec())
79 }
80
81 pub async fn fetch_stream(&self, url: &str, offset: Option<u64>) -> Result<Download> {
82 debug!("Downloading {:?}...", url);
83 let resp = self.send_get(url, offset).await?;
84
85 if offset.is_some() && resp.status() != StatusCode::PARTIAL_CONTENT {
86 bail!("Download server does not support resumption");
87 }
88
89 let progress = offset.unwrap_or(0);
90 let total = resp.content_length().unwrap_or(0) + progress;
91
92 Ok(Download {
93 resp,
94 timeout: self.timeout,
95 progress,
96 total,
97 })
98 }
99}
100
101pub struct Download {
102 resp: reqwest::Response,
103 timeout: Option<Duration>,
104 pub progress: u64,
105 pub total: u64,
106}
107
108impl Download {
109 pub async fn chunk(&mut self) -> Result<Option<Bytes>> {
110 let future = self.resp.chunk();
111 let bytes = if let Some(timeout) = self.timeout {
112 if let Ok(bytes) = time::timeout(timeout, future).await {
113 bytes?
114 } else {
115 bail!("Download timed out due to inactivity");
116 }
117 } else {
118 future.await?
119 };
120 if let Some(bytes) = bytes {
121 self.progress += bytes.len() as u64;
122 Ok(Some(bytes))
123 } else {
124 Ok(None)
125 }
126 }
127}