1use parking_lot::Mutex;
2use source::{Source, SourceHandle, SourceStream};
3use std::sync::Arc;
4use std::{
5 io::{self, BufReader, Read, Seek, SeekFrom},
6 thread,
7};
8use symphonia::core::io::MediaSource;
9use tap::{Tap, TapFallible};
10use tempfile::NamedTempFile;
11use tracing::{debug, error};
12
13pub mod http;
14pub mod source;
15
16#[derive(Debug)]
17pub struct StreamDownload {
18 output_reader: BufReader<NamedTempFile>,
19 handle: SourceHandle,
20 pub radio_title: Arc<Mutex<String>>,
21}
22
23impl StreamDownload {
24 pub fn new_http(
25 url: reqwest::Url,
26 is_radio: bool,
27 radio_title: Arc<Mutex<String>>,
28 radio_downloaded: Arc<Mutex<u64>>,
29 ) -> io::Result<Self> {
30 Self::new::<http::HttpStream>(url, is_radio, radio_title, radio_downloaded)
31 }
32
33 pub fn new<S: SourceStream>(
34 url: S::Url,
35 is_radio: bool,
36 radio_title: Arc<Mutex<String>>,
37 radio_downloaded: Arc<Mutex<u64>>,
38 ) -> io::Result<Self> {
39 let tempfile = tempfile::Builder::new().tempfile()?;
40 let source = Source::new(tempfile.reopen()?);
41 let handle = source.source_handle();
42 let radio_title_inside = radio_title.clone();
43 let radio_downloaded_inside1 = radio_downloaded.clone();
44 if let Ok(handle) = tokio::runtime::Handle::try_current() {
45 handle.spawn(async move {
46 let stream = S::create(url, is_radio, radio_title_inside)
47 .await
48 .tap_err(|e| error!("Error creating stream: {e}"))?;
49 source.download(stream, radio_downloaded_inside1).await?;
50 Ok::<_, io::Error>(())
51 });
52 } else {
53 thread::spawn(move || {
54 let rt = tokio::runtime::Builder::new_current_thread()
55 .enable_all()
56 .build()
57 .tap_err(|e| error!("Error creating tokio runtime: {e}"))?;
58 rt.block_on(async move {
59 let stream = S::create(url, is_radio, radio_title_inside)
60 .await
61 .tap_err(|e| error!("Error creating stream {e}"))?;
62 source.download(stream, radio_downloaded).await?;
63 Ok::<_, io::Error>(())
64 })?;
65 Ok::<_, io::Error>(())
66 });
67 };
68 Ok(Self {
69 output_reader: BufReader::new(tempfile),
70 handle,
71 radio_title,
72 })
73 }
74
75 pub fn from_stream<S: SourceStream>(
76 stream: S,
77 radio_title: Arc<Mutex<String>>,
78 radio_downloaded: Arc<Mutex<u64>>,
79 ) -> Result<Self, io::Error> {
80 let tempfile = tempfile::Builder::new().tempfile()?;
81 let source = Source::new(tempfile.reopen()?);
82 let handle = source.source_handle();
83 let radio_downloaded_inside1 = radio_downloaded.clone();
84 if let Ok(handle) = tokio::runtime::Handle::try_current() {
85 handle.spawn(async move {
86 source
87 .download(stream, radio_downloaded_inside1)
88 .await
89 .tap_err(|e| error!("Error downloading stream: {e}"))?;
90 Ok::<_, io::Error>(())
91 });
92 } else {
93 thread::spawn(move || {
94 let rt = tokio::runtime::Builder::new_current_thread()
95 .enable_all()
96 .build()
97 .tap_err(|e| error!("Error creating tokio runtime: {e}"))?;
98 rt.block_on(async move {
99 source
100 .download(stream, radio_downloaded)
101 .await
102 .tap_err(|e| error!("Error downloading stream: {e}"))?;
103 Ok::<_, io::Error>(())
104 })?;
105 Ok::<_, io::Error>(())
106 });
107 };
108 Ok(Self {
109 output_reader: BufReader::new(tempfile),
110 handle,
111 radio_title,
112 })
113 }
114}
115
116impl Read for StreamDownload {
117 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
118 debug!("Read request buf len: {}", buf.len());
119 let stream_position = self.output_reader.stream_position()?;
120 let requested_position = stream_position + buf.len() as u64;
121 debug!(
122 "read: current position: {} requested position: {requested_position}",
123 stream_position
124 );
125 if let Some(closest_set) = self.handle.downloaded().get(&stream_position) {
126 debug!("Already downloaded {closest_set:?}");
127 if closest_set.end >= requested_position {
128 return self
129 .output_reader
130 .read(buf)
131 .tap(|l| debug!("Returning read length {l:?}"));
132 }
133 }
134 self.handle.request_position(requested_position);
135 debug!("waiting for position");
136 self.handle.wait_for_requested_position();
137 debug!(
138 "reached requested position {requested_position}: stream position: {}",
139 stream_position
140 );
141 self.output_reader
142 .read(buf)
143 .tap(|l| debug!("Returning read length {l:?}"))
144 }
145}
146
147impl Seek for StreamDownload {
148 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
149 let seek_pos = match pos {
150 SeekFrom::Start(pos) => {
151 debug!("Seek from start: {pos}");
152 pos
153 }
154 SeekFrom::End(pos) => {
155 debug!("Seek from end: {pos}");
156 if let Some(length) = self.handle.content_length() {
157 (length as i64 - 1 + pos) as u64
158 } else {
159 return Err(io::Error::new(
160 io::ErrorKind::Unsupported,
161 "Cannot seek from end when content length is unknown",
162 ));
163 }
164 }
165 SeekFrom::Current(pos) => {
166 debug!("Seek from current: {pos}");
167 (self.output_reader.stream_position()? as i64 + pos) as u64
168 }
169 };
170 if let Some(closest_set) = self.handle.downloaded().get(&seek_pos) {
171 if closest_set.end >= seek_pos {
172 return self.output_reader.seek(pos);
173 }
174 }
175 self.handle.request_position(seek_pos);
176 debug!(
177 "seek: current position {seek_pos} requested position {:?}. waiting",
178 seek_pos
179 );
180 self.handle.seek(seek_pos);
181 self.handle.wait_for_requested_position();
182 debug!("reached seek position");
183 self.output_reader.seek(pos)
184 }
185}
186
187impl MediaSource for StreamDownload {
188 fn is_seekable(&self) -> bool {
189 true
190 }
192
193 fn byte_len(&self) -> Option<u64> {
194 self.handle.content_length()
195 }
196}