1use std::collections::{HashMap, VecDeque};
2use std::fmt::Debug;
3use std::io::{self, Read, Seek, SeekFrom};
4use std::num::NonZeroUsize;
5use std::str::FromStr;
6
7use tracing::warn;
8
9use crate::error::{EmptyMetadataError, MetadataParseError};
10use crate::parse::{ParseResult, parse_delimited_string, parse_value_if_valid};
11
12pub struct IcyMetadataReader<T> {
27 inner: T,
28 icy_metadata_interval: Option<usize>,
29 next_metadata: usize,
30 metadata_size_queue: MetadataSizeQueue,
31 current_pos: u64,
32 on_metadata_read: Box<dyn Fn(Result<IcyMetadata, MetadataParseError>) + Send + Sync>,
33}
34
35impl<T> Debug for IcyMetadataReader<T> {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("IcyMetadataReader")
38 .field("inner", &"<inner>")
39 .field("icy_metadata_interval", &self.icy_metadata_interval)
40 .field("next_metadata", &self.next_metadata)
41 .field("metadata_size_queue", &self.metadata_size_queue)
42 .field("current_pos", &self.current_pos)
43 .field("on_metadata_read", &"<on_metadata_read>")
44 .finish()
45 }
46}
47
48impl<T> IcyMetadataReader<T> {
49 pub fn new<F>(
56 inner: T,
57 icy_metadata_interval: Option<NonZeroUsize>,
58 on_metadata_read: F,
59 ) -> Self
60 where
61 F: Fn(Result<IcyMetadata, MetadataParseError>) + Send + Sync + 'static,
62 {
63 let icy_metadata_interval = icy_metadata_interval.map(|i| i.get());
64 Self {
65 inner,
66 icy_metadata_interval,
67 on_metadata_read: Box::new(on_metadata_read),
68 next_metadata: icy_metadata_interval.unwrap_or(0),
69 metadata_size_queue: MetadataSizeQueue {
70 inner: VecDeque::new(),
71 cache_size: 128,
72 },
73
74 current_pos: 0,
75 }
76 }
77}
78
79impl<T> IcyMetadataReader<T> {
80 pub fn metadata_cache_size(mut self, size: usize) -> Self {
82 self.metadata_size_queue.cache_size = size;
83 self
84 }
85}
86
87const ICY_METADATA_MULTIPLIER: usize = 16;
90
91impl<T> IcyMetadataReader<T>
92where
93 T: Read,
94{
95 fn parse_metadata_from_stream(&mut self, buf: &mut [u8], metaint: usize) -> io::Result<usize> {
96 let to_fill = buf.len();
97 let mut total_written = 0;
98 while total_written < to_fill {
99 let prev_written = total_written;
100 self.parse_next_metadata(buf, metaint, &mut total_written)?;
101 if total_written == prev_written {
103 break;
104 }
105 }
106 self.current_pos += total_written as u64;
107 Ok(total_written)
108 }
109
110 fn parse_next_metadata(
111 &mut self,
112 buf: &mut [u8],
113 metaint: usize,
114 total_written: &mut usize,
115 ) -> io::Result<()> {
116 let to_fill = buf.len();
117
118 if self.next_metadata > 0 {
119 let written = self.inner.read(&mut buf[..self.next_metadata])?;
121 if written == 0 {
122 return Ok(());
123 }
124 *total_written += written;
125 }
126
127 self.read_metadata()?;
128 self.next_metadata = metaint;
129 let start = *total_written;
130
131 let end = (start + self.next_metadata).min(to_fill);
133 let written = self.inner.read(&mut buf[start..end])?;
134 *total_written += written;
135 self.next_metadata = metaint - written;
136 Ok(())
137 }
138
139 fn update_metadata_size(&mut self) -> io::Result<()> {
140 let mut metadata_length_buf = [0u8; 1];
141 self.inner.read_exact(&mut metadata_length_buf)?;
142
143 let metadata_length = metadata_length_buf[0] as usize * ICY_METADATA_MULTIPLIER;
144
145 self.metadata_size_queue.push(metadata_length);
146 Ok(())
147 }
148
149 fn read_metadata(&mut self) -> io::Result<()> {
150 self.update_metadata_size()?;
151 if let Some(last_size) = self.metadata_size_queue.peek() {
152 if last_size > 0 {
153 let mut metadata_buf = vec![0u8; last_size];
154 self.inner.read_exact(&mut metadata_buf)?;
155
156 let callback_val = String::from_utf8(metadata_buf)
157 .map_err(MetadataParseError::InvalidUtf8)
158 .and_then(|metadata_str| {
159 let metadata_str = metadata_str.trim_end_matches(char::from(0));
160 metadata_str
161 .parse::<IcyMetadata>()
162 .map_err(MetadataParseError::Empty)
163 });
164 (self.on_metadata_read)(callback_val);
165 }
166 }
167 Ok(())
168 }
169}
170
171impl<T> Read for IcyMetadataReader<T>
172where
173 T: Read,
174{
175 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
176 let Some(metaint) = self.icy_metadata_interval else {
178 return self.inner.read(buf);
179 };
180
181 if buf.len() > self.next_metadata {
182 self.parse_metadata_from_stream(buf, metaint)
183 } else {
184 let read = self.inner.read(buf)?;
185 self.next_metadata -= read;
186 self.current_pos += read as u64;
187 Ok(read)
188 }
189 }
190}
191
192impl<T> Seek for IcyMetadataReader<T>
193where
194 T: Read + Seek,
195{
196 fn seek(&mut self, seek_from: io::SeekFrom) -> io::Result<u64> {
197 let Some(metaint) = self.icy_metadata_interval else {
199 return self.inner.seek(seek_from);
200 };
201
202 let (requested_change, requested_pos) = match seek_from {
203 SeekFrom::Start(pos) => (pos as i64 - self.current_pos as i64, pos as i64),
204 SeekFrom::Current(pos) => (pos, self.current_pos as i64 + pos),
205 SeekFrom::End(_) => {
206 return Err(io::Error::new(
207 io::ErrorKind::Unsupported,
208 "seek from end not supported",
209 ));
210 }
211 };
212
213 let mut current_absolute_pos = self.inner.stream_position()? as i64;
214 let mut seek_progress = 0i64;
215
216 if requested_change < 0 {
217 let mut last_metadata_offset = (metaint - self.next_metadata) as i64;
218 let mut last_metadata_end_pos = current_absolute_pos - last_metadata_offset;
219
220 while current_absolute_pos + requested_change - seek_progress < last_metadata_end_pos
223 && last_metadata_end_pos > 0
224 {
225 let Some(last_metadata_size) = self.metadata_size_queue.pop() else {
226 return Err(io::Error::new(
227 io::ErrorKind::InvalidData,
228 "Attempting to seek beyond metadata length cache. You may need to call \
229 IcyMetadataReader::metadata_cache_size to increase the cache size.",
230 ));
231 };
232 let metadata_region_size = last_metadata_size as i64 + 1;
234 let seek_to = (last_metadata_end_pos - metadata_region_size) as u64;
235 current_absolute_pos = self.inner.seek(SeekFrom::Start(seek_to))? as i64;
237 seek_progress -= last_metadata_offset;
238
239 last_metadata_offset = metaint as i64;
240 last_metadata_end_pos -= metadata_region_size + metaint as i64;
241 }
242 } else {
243 while requested_change - seek_progress >= self.next_metadata as i64 {
246 self.inner
247 .seek(SeekFrom::Current(self.next_metadata as i64))?;
248 seek_progress += self.next_metadata as i64;
249 self.read_metadata()?;
251 }
252 }
253 self.inner
254 .seek(SeekFrom::Current(requested_change - seek_progress))?;
255 self.next_metadata = metaint - ((requested_pos as usize) % metaint);
256 self.current_pos = requested_pos as u64;
257 Ok(self.current_pos)
258 }
259}
260
261#[derive(Clone, Debug, Default, PartialEq, Eq)]
263#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
264pub struct IcyMetadata {
265 stream_title: Option<String>,
266 stream_url: Option<String>,
267 custom: HashMap<String, String>,
268}
269
270impl IcyMetadata {
271 pub fn stream_title(&self) -> Option<&str> {
274 self.stream_title.as_deref()
275 }
276
277 pub fn stream_url(&self) -> Option<&str> {
280 self.stream_url.as_deref()
281 }
282
283 pub fn custom_fields(&self) -> &HashMap<String, String> {
285 &self.custom
286 }
287}
288
289impl FromStr for IcyMetadata {
290 type Err = EmptyMetadataError;
291
292 fn from_str(s: &str) -> Result<Self, Self::Err> {
293 let mut metadata = Self {
294 stream_title: None,
295 stream_url: None,
296 custom: HashMap::new(),
297 };
298
299 let ParseResult {
300 map,
301 errors_found,
302 missing_quotes_found,
303 } = parse_delimited_string(s);
304 if map.is_empty() {
305 return Err(EmptyMetadataError(s.to_string()));
306 }
307
308 let mut fields_found = 0;
309 let mut stray_values_found = false;
310 for (key, value) in map {
311 fields_found += 1;
312 match key.to_ascii_lowercase().as_str() {
313 "streamtitle" => {
314 metadata.stream_title = Some(value.to_string());
315 }
316 "streamurl" => {
317 metadata.stream_url = Some(value.to_string());
318 }
319 _ => {
320 metadata.custom.insert(key.to_string(), value.to_string());
321 stray_values_found = true;
322 }
323 }
324 }
325 if errors_found || stray_values_found {
330 let semicolon_count = s.chars().filter(|c| *c == ';').count();
331 if semicolon_count > fields_found || missing_quotes_found {
332 warn!(
333 metadata_string = s,
334 "found possibly malformed metadata, attempting to resolve any unescaped fields",
335 );
336 handle_unescaped_values(s, &mut metadata);
337 }
338 }
339
340 Ok(metadata)
341 }
342}
343
344fn handle_unescaped_values(s: &str, metadata: &mut IcyMetadata) {
345 let lower_string = s.to_ascii_lowercase();
346 let stream_title_index = lower_string.find("streamtitle=");
347 let stream_url_index = lower_string.find("streamurl=");
348
349 let (stream_title, stream_url) = match (stream_title_index, stream_url_index) {
350 (Some(stream_title_index), Some(stream_url_index)) => {
351 let (stream_title, stream_url) = if stream_title_index < stream_url_index {
352 let stream_title = &s[stream_title_index..stream_url_index];
353 let stream_url = &s[stream_url_index..];
354 (stream_title, stream_url)
355 } else {
356 let stream_url = &s[stream_url_index..stream_title_index];
357 let stream_title = &s[stream_title_index..];
358 (stream_title, stream_url)
359 };
360 (Some(stream_title), Some(stream_url))
361 }
362 (Some(stream_title_index), None) => {
363 let stream_title = &s[stream_title_index..];
364 (Some(stream_title), None)
365 }
366 (None, Some(stream_url_index)) => {
367 let stream_url = &s[stream_url_index..];
368 (None, Some(stream_url))
369 }
370 (None, None) => (None, None),
371 };
372
373 if let Some(stream_title) = stream_title {
374 metadata.stream_title = parse_value_if_valid(stream_title);
375 };
376
377 if let Some(stream_url) = stream_url {
378 metadata.stream_url = parse_value_if_valid(stream_url);
379 };
380}
381
382#[derive(Debug)]
383struct MetadataSize {
384 size: usize,
385 count: usize,
386}
387
388#[derive(Debug)]
389struct MetadataSizeQueue {
390 inner: VecDeque<MetadataSize>,
391 cache_size: usize,
392}
393
394impl MetadataSizeQueue {
395 fn push(&mut self, size: usize) {
396 if let Some(last) = self.inner.back_mut() {
397 if last.size == size {
398 last.count += 1;
399 return;
400 }
401 }
402 self.inner.push_back(MetadataSize { size, count: 1 });
403 if self.inner.len() >= self.cache_size {
404 self.inner.pop_front();
405 }
406 }
407
408 fn pop(&mut self) -> Option<usize> {
409 let last = self.inner.back_mut()?;
410 last.count -= 1;
411 let size = last.size;
412 if last.count == 0 {
413 self.inner.pop_back();
414 }
415 Some(size)
416 }
417
418 fn peek(&self) -> Option<usize> {
419 self.inner.back().map(|b| b.size)
420 }
421}