1use crate::source::SourceStream;
2use async_trait::async_trait;
3use bytes::Bytes;
4use futures::Stream;
5use parking_lot::Mutex;
6use reqwest::Client;
7use std::sync::Arc;
8use std::{
9 io,
10 pin::Pin,
11 str::FromStr,
12 task::{self, Poll},
13};
14use tracing::{info, warn};
15
16const STONG_TITLE_ERROR: &str = "Error Please Try Again";
17const CHUNKS_BEFORE_START: u8 = 20;
18
19pub struct HttpStream {
20 stream: Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Unpin + Send + Sync>,
21 client: Client,
22 content_length: Option<u64>,
23 url: reqwest::Url,
24}
25
26impl Stream for HttpStream {
27 type Item = Result<Bytes, reqwest::Error>;
28
29 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
30 Pin::new(&mut self.stream).poll_next(cx)
31 }
32}
33
34#[async_trait]
35impl SourceStream for HttpStream {
36 type Url = reqwest::Url;
37 type Error = reqwest::Error;
38
39 async fn create(
40 url: Self::Url,
41 is_radio: bool,
42 radio_title: Arc<Mutex<String>>,
43 ) -> io::Result<Self> {
44 let client = Client::new();
45 info!("Requesting content length");
46 let response = client
47 .get(url.as_str())
48 .send()
49 .await
50 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
51 let mut content_length = None;
52 if let Some(length) = response.headers().get(reqwest::header::CONTENT_LENGTH) {
53 let length = u64::from_str(
54 length
55 .to_str()
56 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?,
57 )
58 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
59 info!("Got content length {length}");
60 content_length = Some(length);
61 } else {
62 warn!("Content length header missing");
63 }
64 let stream = response.bytes_stream();
65 let mut count_down = CHUNKS_BEFORE_START;
66 let url_inside = url.clone();
67 if is_radio {
68 let mut should_restart = true;
69 let client_inside = Client::new();
70 tokio::spawn(async move {
71 loop {
72 let mut response = match client_inside
73 .get(url_inside.as_str())
74 .header("icy-metadata", "1")
75 .send()
76 .await
77 {
78 Ok(t) => t,
79 Err(_) => {
80 *radio_title.lock() = STONG_TITLE_ERROR.to_string();
81 continue;
82 }
83 };
84 if let Some(header_value) = response.headers().get("content-type") {
85 if header_value.to_str().unwrap_or_default() != "audio/mpeg" {
86 *radio_title.lock() = STONG_TITLE_ERROR.to_string();
87 continue;
88 }
89 } else {
90 *radio_title.lock() = STONG_TITLE_ERROR.to_string();
91 continue;
92 }
93 let meta_interval: usize =
94 if let Some(header_value) = response.headers().get("icy-metaint") {
95 header_value
96 .to_str()
97 .unwrap_or_default()
98 .parse()
99 .unwrap_or_default()
100 } else {
101 0
102 };
103 let mut counter = meta_interval;
104 let mut awaiting_metadata_size = false;
105 let mut metadata_size: u8 = 0;
106 let mut awaiting_metadata = false;
107 let mut metadata: Vec<u8> = Vec::new();
108 let mut title_string = String::new();
109 while let Some(chunk) = response.chunk().await.expect("Couldn't get next chunk")
110 {
111 for byte in &chunk {
112 if meta_interval != 0 {
113 if awaiting_metadata_size {
114 awaiting_metadata_size = false;
115 metadata_size = *byte * 16;
116 if metadata_size == 0 {
117 counter = meta_interval;
118 } else {
119 awaiting_metadata = true;
120 }
121 } else if awaiting_metadata {
122 metadata.push(*byte);
123 metadata_size = metadata_size.saturating_sub(1);
124 if metadata_size == 0 {
125 awaiting_metadata = false;
126 let metadata_string =
127 std::str::from_utf8(&metadata).unwrap_or("");
128 if !metadata_string.is_empty() {
129 const STREAM_TITLE_KEYWORD: &str = "StreamTitle='";
130 if let Some(index) =
131 metadata_string.find(STREAM_TITLE_KEYWORD)
132 {
133 let left_index = index + 13;
134 let stream_title_substring =
135 &metadata_string[left_index..];
136 if let Some(right_index) =
137 stream_title_substring.find('\'')
138 {
139 let trimmed_song_title =
140 &stream_title_substring[..right_index];
141 title_string += " ";
142 title_string += trimmed_song_title;
143 *radio_title.lock() =
144 format!("Current playing: {title_string}");
145 }
146 }
147 }
148 metadata.clear();
149 counter = meta_interval;
150 }
151 } else {
152 counter = counter.saturating_sub(1);
154 if counter == 0 {
155 awaiting_metadata_size = true;
156 }
157 }
158 } else {
159 }
161 }
162 if should_restart {
163 if count_down == 0 {
164 should_restart = false;
165 title_string = String::new();
166 } else {
167 count_down -= 1;
168 }
169 }
170 }
171 }
172 });
173 }
174 Ok(Self {
175 stream: Box::new(stream),
176 client,
177 content_length,
178 url,
179 })
180 }
181
182 async fn content_length(&self) -> Option<u64> {
183 self.content_length
184 }
185
186 async fn seek_range(&mut self, start: u64, end: Option<u64>) -> io::Result<()> {
187 info!("Seeking: {start}-{end:?}");
188 let response = self
189 .client
190 .get(self.url.as_str())
191 .header(
192 "Range",
193 format!(
194 "bytes={start}-{}",
195 end.map(|e| e.to_string()).unwrap_or_default()
196 ),
197 )
198 .send()
199 .await
200 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
201 if !response.status().is_success() {
202 return response
203 .error_for_status()
204 .map(|_| ())
205 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()));
206 }
207 self.stream = Box::new(response.bytes_stream());
208 info!("Done seeking");
209 Ok(())
210 }
211}