termusic_stream/
http.rs

1use crate::source::SourceStream;
2use async_trait::async_trait;
3use bytes::Bytes;
4use futures::Stream;
5use parking_lot::Mutex;
6use reqwest::Client;
7use std::sync::Arc;
8use std::{
9    io,
10    pin::Pin,
11    str::FromStr,
12    task::{self, Poll},
13};
14use tracing::{info, warn};
15
16const STONG_TITLE_ERROR: &str = "Error Please Try Again";
17const CHUNKS_BEFORE_START: u8 = 20;
18
19pub struct HttpStream {
20    stream: Box<dyn Stream<Item = Result<Bytes, reqwest::Error>> + Unpin + Send + Sync>,
21    client: Client,
22    content_length: Option<u64>,
23    url: reqwest::Url,
24}
25
26impl Stream for HttpStream {
27    type Item = Result<Bytes, reqwest::Error>;
28
29    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
30        Pin::new(&mut self.stream).poll_next(cx)
31    }
32}
33
34#[async_trait]
35impl SourceStream for HttpStream {
36    type Url = reqwest::Url;
37    type Error = reqwest::Error;
38
39    async fn create(
40        url: Self::Url,
41        is_radio: bool,
42        radio_title: Arc<Mutex<String>>,
43    ) -> io::Result<Self> {
44        let client = Client::new();
45        info!("Requesting content length");
46        let response = client
47            .get(url.as_str())
48            .send()
49            .await
50            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
51        let mut content_length = None;
52        if let Some(length) = response.headers().get(reqwest::header::CONTENT_LENGTH) {
53            let length = u64::from_str(
54                length
55                    .to_str()
56                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?,
57            )
58            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
59            info!("Got content length {length}");
60            content_length = Some(length);
61        } else {
62            warn!("Content length header missing");
63        }
64        let stream = response.bytes_stream();
65        let mut count_down = CHUNKS_BEFORE_START;
66        let url_inside = url.clone();
67        if is_radio {
68            let mut should_restart = true;
69            let client_inside = Client::new();
70            tokio::spawn(async move {
71                loop {
72                    let mut response = match client_inside
73                        .get(url_inside.as_str())
74                        .header("icy-metadata", "1")
75                        .send()
76                        .await
77                    {
78                        Ok(t) => t,
79                        Err(_) => {
80                            *radio_title.lock() = STONG_TITLE_ERROR.to_string();
81                            continue;
82                        }
83                    };
84                    if let Some(header_value) = response.headers().get("content-type") {
85                        if header_value.to_str().unwrap_or_default() != "audio/mpeg" {
86                            *radio_title.lock() = STONG_TITLE_ERROR.to_string();
87                            continue;
88                        }
89                    } else {
90                        *radio_title.lock() = STONG_TITLE_ERROR.to_string();
91                        continue;
92                    }
93                    let meta_interval: usize =
94                        if let Some(header_value) = response.headers().get("icy-metaint") {
95                            header_value
96                                .to_str()
97                                .unwrap_or_default()
98                                .parse()
99                                .unwrap_or_default()
100                        } else {
101                            0
102                        };
103                    let mut counter = meta_interval;
104                    let mut awaiting_metadata_size = false;
105                    let mut metadata_size: u8 = 0;
106                    let mut awaiting_metadata = false;
107                    let mut metadata: Vec<u8> = Vec::new();
108                    let mut title_string = String::new();
109                    while let Some(chunk) = response.chunk().await.expect("Couldn't get next chunk")
110                    {
111                        for byte in &chunk {
112                            if meta_interval != 0 {
113                                if awaiting_metadata_size {
114                                    awaiting_metadata_size = false;
115                                    metadata_size = *byte * 16;
116                                    if metadata_size == 0 {
117                                        counter = meta_interval;
118                                    } else {
119                                        awaiting_metadata = true;
120                                    }
121                                } else if awaiting_metadata {
122                                    metadata.push(*byte);
123                                    metadata_size = metadata_size.saturating_sub(1);
124                                    if metadata_size == 0 {
125                                        awaiting_metadata = false;
126                                        let metadata_string =
127                                            std::str::from_utf8(&metadata).unwrap_or("");
128                                        if !metadata_string.is_empty() {
129                                            const STREAM_TITLE_KEYWORD: &str = "StreamTitle='";
130                                            if let Some(index) =
131                                                metadata_string.find(STREAM_TITLE_KEYWORD)
132                                            {
133                                                let left_index = index + 13;
134                                                let stream_title_substring =
135                                                    &metadata_string[left_index..];
136                                                if let Some(right_index) =
137                                                    stream_title_substring.find('\'')
138                                                {
139                                                    let trimmed_song_title =
140                                                        &stream_title_substring[..right_index];
141                                                    title_string += " ";
142                                                    title_string += trimmed_song_title;
143                                                    *radio_title.lock() =
144                                                        format!("Current playing: {title_string}");
145                                                }
146                                            }
147                                        }
148                                        metadata.clear();
149                                        counter = meta_interval;
150                                    }
151                                } else {
152                                    // file.write_all(&[*byte]).expect("Couldn't write to file");
153                                    counter = counter.saturating_sub(1);
154                                    if counter == 0 {
155                                        awaiting_metadata_size = true;
156                                    }
157                                }
158                            } else {
159                                // file.write_all(&[*byte]).expect("Couldn't write to file");
160                            }
161                        }
162                        if should_restart {
163                            if count_down == 0 {
164                                should_restart = false;
165                                title_string = String::new();
166                            } else {
167                                count_down -= 1;
168                            }
169                        }
170                    }
171                }
172            });
173        }
174        Ok(Self {
175            stream: Box::new(stream),
176            client,
177            content_length,
178            url,
179        })
180    }
181
182    async fn content_length(&self) -> Option<u64> {
183        self.content_length
184    }
185
186    async fn seek_range(&mut self, start: u64, end: Option<u64>) -> io::Result<()> {
187        info!("Seeking: {start}-{end:?}");
188        let response = self
189            .client
190            .get(self.url.as_str())
191            .header(
192                "Range",
193                format!(
194                    "bytes={start}-{}",
195                    end.map(|e| e.to_string()).unwrap_or_default()
196                ),
197            )
198            .send()
199            .await
200            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
201        if !response.status().is_success() {
202            return response
203                .error_for_status()
204                .map(|_| ())
205                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()));
206        }
207        self.stream = Box::new(response.bytes_stream());
208        info!("Done seeking");
209        Ok(())
210    }
211}