termusic_stream/
lib.rs

1use parking_lot::Mutex;
2use source::{Source, SourceHandle, SourceStream};
3use std::sync::Arc;
4use std::{
5    io::{self, BufReader, Read, Seek, SeekFrom},
6    thread,
7};
8use symphonia::core::io::MediaSource;
9use tap::{Tap, TapFallible};
10use tempfile::NamedTempFile;
11use tracing::{debug, error};
12
13pub mod http;
14pub mod source;
15
16#[derive(Debug)]
17pub struct StreamDownload {
18    output_reader: BufReader<NamedTempFile>,
19    handle: SourceHandle,
20    pub radio_title: Arc<Mutex<String>>,
21}
22
23impl StreamDownload {
24    pub fn new_http(
25        url: reqwest::Url,
26        is_radio: bool,
27        radio_title: Arc<Mutex<String>>,
28        radio_downloaded: Arc<Mutex<u64>>,
29    ) -> io::Result<Self> {
30        Self::new::<http::HttpStream>(url, is_radio, radio_title, radio_downloaded)
31    }
32
33    pub fn new<S: SourceStream>(
34        url: S::Url,
35        is_radio: bool,
36        radio_title: Arc<Mutex<String>>,
37        radio_downloaded: Arc<Mutex<u64>>,
38    ) -> io::Result<Self> {
39        let tempfile = tempfile::Builder::new().tempfile()?;
40        let source = Source::new(tempfile.reopen()?);
41        let handle = source.source_handle();
42        let radio_title_inside = radio_title.clone();
43        let radio_downloaded_inside1 = radio_downloaded.clone();
44        if let Ok(handle) = tokio::runtime::Handle::try_current() {
45            handle.spawn(async move {
46                let stream = S::create(url, is_radio, radio_title_inside)
47                    .await
48                    .tap_err(|e| error!("Error creating stream: {e}"))?;
49                source.download(stream, radio_downloaded_inside1).await?;
50                Ok::<_, io::Error>(())
51            });
52        } else {
53            thread::spawn(move || {
54                let rt = tokio::runtime::Builder::new_current_thread()
55                    .enable_all()
56                    .build()
57                    .tap_err(|e| error!("Error creating tokio runtime: {e}"))?;
58                rt.block_on(async move {
59                    let stream = S::create(url, is_radio, radio_title_inside)
60                        .await
61                        .tap_err(|e| error!("Error creating stream {e}"))?;
62                    source.download(stream, radio_downloaded).await?;
63                    Ok::<_, io::Error>(())
64                })?;
65                Ok::<_, io::Error>(())
66            });
67        };
68        Ok(Self {
69            output_reader: BufReader::new(tempfile),
70            handle,
71            radio_title,
72        })
73    }
74
75    pub fn from_stream<S: SourceStream>(
76        stream: S,
77        radio_title: Arc<Mutex<String>>,
78        radio_downloaded: Arc<Mutex<u64>>,
79    ) -> Result<Self, io::Error> {
80        let tempfile = tempfile::Builder::new().tempfile()?;
81        let source = Source::new(tempfile.reopen()?);
82        let handle = source.source_handle();
83        let radio_downloaded_inside1 = radio_downloaded.clone();
84        if let Ok(handle) = tokio::runtime::Handle::try_current() {
85            handle.spawn(async move {
86                source
87                    .download(stream, radio_downloaded_inside1)
88                    .await
89                    .tap_err(|e| error!("Error downloading stream: {e}"))?;
90                Ok::<_, io::Error>(())
91            });
92        } else {
93            thread::spawn(move || {
94                let rt = tokio::runtime::Builder::new_current_thread()
95                    .enable_all()
96                    .build()
97                    .tap_err(|e| error!("Error creating tokio runtime: {e}"))?;
98                rt.block_on(async move {
99                    source
100                        .download(stream, radio_downloaded)
101                        .await
102                        .tap_err(|e| error!("Error downloading stream: {e}"))?;
103                    Ok::<_, io::Error>(())
104                })?;
105                Ok::<_, io::Error>(())
106            });
107        };
108        Ok(Self {
109            output_reader: BufReader::new(tempfile),
110            handle,
111            radio_title,
112        })
113    }
114}
115
116impl Read for StreamDownload {
117    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
118        debug!("Read request buf len: {}", buf.len());
119        let stream_position = self.output_reader.stream_position()?;
120        let requested_position = stream_position + buf.len() as u64;
121        debug!(
122            "read: current position: {} requested position: {requested_position}",
123            stream_position
124        );
125        if let Some(closest_set) = self.handle.downloaded().get(&stream_position) {
126            debug!("Already downloaded {closest_set:?}");
127            if closest_set.end >= requested_position {
128                return self
129                    .output_reader
130                    .read(buf)
131                    .tap(|l| debug!("Returning read length {l:?}"));
132            }
133        }
134        self.handle.request_position(requested_position);
135        debug!("waiting for position");
136        self.handle.wait_for_requested_position();
137        debug!(
138            "reached requested position {requested_position}: stream position: {}",
139            stream_position
140        );
141        self.output_reader
142            .read(buf)
143            .tap(|l| debug!("Returning read length {l:?}"))
144    }
145}
146
147impl Seek for StreamDownload {
148    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
149        let seek_pos = match pos {
150            SeekFrom::Start(pos) => {
151                debug!("Seek from start: {pos}");
152                pos
153            }
154            SeekFrom::End(pos) => {
155                debug!("Seek from end: {pos}");
156                if let Some(length) = self.handle.content_length() {
157                    (length as i64 - 1 + pos) as u64
158                } else {
159                    return Err(io::Error::new(
160                        io::ErrorKind::Unsupported,
161                        "Cannot seek from end when content length is unknown",
162                    ));
163                }
164            }
165            SeekFrom::Current(pos) => {
166                debug!("Seek from current: {pos}");
167                (self.output_reader.stream_position()? as i64 + pos) as u64
168            }
169        };
170        if let Some(closest_set) = self.handle.downloaded().get(&seek_pos) {
171            if closest_set.end >= seek_pos {
172                return self.output_reader.seek(pos);
173            }
174        }
175        self.handle.request_position(seek_pos);
176        debug!(
177            "seek: current position {seek_pos} requested position {:?}. waiting",
178            seek_pos
179        );
180        self.handle.seek(seek_pos);
181        self.handle.wait_for_requested_position();
182        debug!("reached seek position");
183        self.output_reader.seek(pos)
184    }
185}
186
187impl MediaSource for StreamDownload {
188    fn is_seekable(&self) -> bool {
189        true
190        // !self.is_radio
191    }
192
193    fn byte_len(&self) -> Option<u64> {
194        self.handle.content_length()
195    }
196}