icy_metadata/
reader.rs

1use std::collections::{HashMap, VecDeque};
2use std::fmt::Debug;
3use std::io::{self, Read, Seek, SeekFrom};
4use std::num::{NonZero, NonZeroUsize};
5use std::str::FromStr;
6
7use tracing::warn;
8
9use crate::error::{EmptyMetadataError, MetadataParseError};
10use crate::parse::{ParseResult, parse_delimited_string, parse_value_if_valid};
11
12/// Reads icy metadata contained within a stream.
13///
14/// Seeking within the stream is supported with the following limitations:
15///
16/// - [`SeekFrom::End`](std::io::SeekFrom::End) is not supported since seeking from the end of a
17///   stream conceptually doesn't make sense.
18/// - Seeking backwards is limited by the size of the metadata cache. Since the metadata values have
19///   dynamic sizes, we need to know the size of the previous metadata value to seek past it. In
20///   order to prevent unbounded memory growth, we cap the number of previous metadata sizes we keep
21///   track of. You can change this limit using [`Self::metadata_cache_size`]. In practice, most
22///   metadata is 0-sized except for at the start of each track. We use rudimentary compression so
23///   consecutive metadata entries of the same size don't take up additional slots in the array.
24///   This means you shouldn't exceed the default value of `128` unless you're going really far
25///   back.
26pub struct IcyMetadataReader<T> {
27    inner: T,
28    icy_metadata_interval: Option<usize>,
29    next_metadata: usize,
30    metadata_size_queue: MetadataSizeQueue,
31    current_pos: u64,
32    on_metadata_read: Box<dyn Fn(Result<IcyMetadata, MetadataParseError>) + Send + Sync>,
33}
34
35impl<T> Debug for IcyMetadataReader<T> {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("IcyMetadataReader")
38            .field("inner", &"<inner>")
39            .field("icy_metadata_interval", &self.icy_metadata_interval)
40            .field("next_metadata", &self.next_metadata)
41            .field("metadata_size_queue", &self.metadata_size_queue)
42            .field("current_pos", &self.current_pos)
43            .field("on_metadata_read", &"<on_metadata_read>")
44            .finish()
45    }
46}
47
48impl<T> IcyMetadataReader<T> {
49    /// Creates a new `IcyMetadataReader`.
50    /// `icy_metadata_interval` is required in order to figure out the location of the metadata
51    /// blocks. If `icy_metadata_interval` is `None`, it will treat the stream as though the
52    /// metadata is absent. You can retrieve the value from
53    /// [`IcyHeaders::metadata_interval`](crate::IcyHeaders::metadata_interval) or by extracting the
54    /// value from the headers manually.
55    pub fn new<F>(
56        inner: T,
57        icy_metadata_interval: Option<NonZeroUsize>,
58        on_metadata_read: F,
59    ) -> Self
60    where
61        F: Fn(Result<IcyMetadata, MetadataParseError>) + Send + Sync + 'static,
62    {
63        let icy_metadata_interval = icy_metadata_interval.map(NonZero::get);
64        Self {
65            inner,
66            icy_metadata_interval,
67            on_metadata_read: Box::new(on_metadata_read),
68            next_metadata: icy_metadata_interval.unwrap_or(0),
69            metadata_size_queue: MetadataSizeQueue {
70                inner: VecDeque::new(),
71                cache_size: 128,
72            },
73
74            current_pos: 0,
75        }
76    }
77}
78
79impl<T> IcyMetadataReader<T> {
80    /// Set the size of the metadata cache.
81    pub fn metadata_cache_size(mut self, size: usize) -> Self {
82        self.metadata_size_queue.cache_size = size;
83        self
84    }
85}
86
87// The metadata length block must be multiplied by 16 to get the total metadata length
88// info taken from here https://gist.github.com/niko/2a1d7b2d109ebe7f7ca2f860c3505ef0
89const ICY_METADATA_MULTIPLIER: usize = 16;
90
91impl<T> IcyMetadataReader<T>
92where
93    T: Read,
94{
95    fn parse_metadata_from_stream(&mut self, buf: &mut [u8], metaint: usize) -> io::Result<usize> {
96        let to_fill = buf.len();
97        let mut total_written = 0;
98        while total_written < to_fill {
99            let prev_written = total_written;
100            self.parse_next_metadata(buf, metaint, &mut total_written)?;
101            // No additional data written, we're at the end of the stream
102            if total_written == prev_written {
103                break;
104            }
105        }
106        self.current_pos += total_written as u64;
107        Ok(total_written)
108    }
109
110    fn parse_next_metadata(
111        &mut self,
112        buf: &mut [u8],
113        metaint: usize,
114        total_written: &mut usize,
115    ) -> io::Result<()> {
116        let to_fill = buf.len();
117
118        if self.next_metadata > 0 {
119            // Read data before next metadata
120            let written = self.inner.read(&mut buf[..self.next_metadata])?;
121            if written == 0 {
122                return Ok(());
123            }
124            *total_written += written;
125        }
126
127        self.read_metadata()?;
128        self.next_metadata = metaint;
129        let start = *total_written;
130
131        // make sure we don't exceed the buffer length
132        let end = (start + self.next_metadata).min(to_fill);
133        let written = self.inner.read(&mut buf[start..end])?;
134        *total_written += written;
135        self.next_metadata = metaint - written;
136        Ok(())
137    }
138
139    fn update_metadata_size(&mut self) -> io::Result<()> {
140        let mut metadata_length_buf = [0u8; 1];
141        self.inner.read_exact(&mut metadata_length_buf)?;
142
143        let metadata_length = metadata_length_buf[0] as usize * ICY_METADATA_MULTIPLIER;
144
145        self.metadata_size_queue.push(metadata_length);
146        Ok(())
147    }
148
149    fn read_metadata(&mut self) -> io::Result<()> {
150        self.update_metadata_size()?;
151        if let Some(last_size) = self.metadata_size_queue.peek() {
152            if last_size > 0 {
153                let mut metadata_buf = vec![0u8; last_size];
154                self.inner.read_exact(&mut metadata_buf)?;
155
156                let callback_val = String::from_utf8(metadata_buf)
157                    .map_err(MetadataParseError::InvalidUtf8)
158                    .and_then(|metadata_str| {
159                        let metadata_str = metadata_str.trim_end_matches(char::from(0));
160                        metadata_str
161                            .parse::<IcyMetadata>()
162                            .map_err(MetadataParseError::Empty)
163                    });
164                (self.on_metadata_read)(callback_val);
165            }
166        }
167        Ok(())
168    }
169}
170
171impl<T> Read for IcyMetadataReader<T>
172where
173    T: Read,
174{
175    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
176        // Default to normal read behavior if metaint is not set
177        let Some(metaint) = self.icy_metadata_interval else {
178            return self.inner.read(buf);
179        };
180
181        if buf.len() > self.next_metadata {
182            self.parse_metadata_from_stream(buf, metaint)
183        } else {
184            let read = self.inner.read(buf)?;
185            self.next_metadata -= read;
186            self.current_pos += read as u64;
187            Ok(read)
188        }
189    }
190}
191
192impl<T> Seek for IcyMetadataReader<T>
193where
194    T: Read + Seek,
195{
196    fn seek(&mut self, seek_from: io::SeekFrom) -> io::Result<u64> {
197        // Default to normal behavior if metaint is not set
198        let Some(metaint) = self.icy_metadata_interval else {
199            return self.inner.seek(seek_from);
200        };
201
202        let (requested_change, requested_pos) = match seek_from {
203            SeekFrom::Start(pos) => (pos as i64 - self.current_pos as i64, pos as i64),
204            SeekFrom::Current(pos) => (pos, self.current_pos as i64 + pos),
205            SeekFrom::End(_) => {
206                return Err(io::Error::new(
207                    io::ErrorKind::Unsupported,
208                    "seek from end not supported",
209                ));
210            }
211        };
212
213        let mut current_absolute_pos = self.inner.stream_position()? as i64;
214        let mut seek_progress = 0i64;
215
216        if requested_change < 0 {
217            let mut last_metadata_offset = (metaint - self.next_metadata) as i64;
218            let mut last_metadata_end_pos = current_absolute_pos - last_metadata_offset;
219
220            // Keep seeking back through the previous metadata entries until we reach a point where
221            // the next seek point is after the next metadata
222            while current_absolute_pos + requested_change - seek_progress < last_metadata_end_pos
223                && last_metadata_end_pos > 0
224            {
225                let Some(last_metadata_size) = self.metadata_size_queue.pop() else {
226                    return Err(io::Error::new(
227                        io::ErrorKind::InvalidData,
228                        "Attempting to seek beyond metadata length cache. You may need to call \
229                         IcyMetadataReader::metadata_cache_size to increase the cache size.",
230                    ));
231                };
232                // +1 for the byte that holds the metadata length
233                let metadata_region_size = last_metadata_size as i64 + 1;
234                let seek_to = (last_metadata_end_pos - metadata_region_size) as u64;
235                // Seek before the metadata entry
236                current_absolute_pos = self.inner.seek(SeekFrom::Start(seek_to))? as i64;
237                seek_progress -= last_metadata_offset;
238
239                last_metadata_offset = metaint as i64;
240                last_metadata_end_pos -= metadata_region_size + metaint as i64;
241            }
242        } else {
243            // Keep seeking forward through each metadata entry until we reach a point where
244            // the next seek point is before the next metadata
245            while requested_change - seek_progress >= self.next_metadata as i64 {
246                self.inner
247                    .seek(SeekFrom::Current(self.next_metadata as i64))?;
248                seek_progress += self.next_metadata as i64;
249                // Read the metadata and continue
250                self.read_metadata()?;
251            }
252        }
253        self.inner
254            .seek(SeekFrom::Current(requested_change - seek_progress))?;
255        self.next_metadata = metaint - ((requested_pos as usize) % metaint);
256        self.current_pos = requested_pos as u64;
257        Ok(self.current_pos)
258    }
259}
260
261/// Metadata contained within a stream
262#[derive(Clone, Debug, Default, PartialEq, Eq)]
263#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
264pub struct IcyMetadata {
265    stream_title: Option<String>,
266    stream_url: Option<String>,
267    custom: HashMap<String, String>,
268}
269
270impl IcyMetadata {
271    /// The title of the currently playing track.
272    /// Maps to the `StreamTitle` metadata value.
273    pub fn stream_title(&self) -> Option<&str> {
274        self.stream_title.as_deref()
275    }
276
277    /// This could be an album art URL, an image URL for the stream itself, or some other
278    /// information. Maps to the `StreamUrl` metadata value.
279    pub fn stream_url(&self) -> Option<&str> {
280        self.stream_url.as_deref()
281    }
282
283    /// Any additional fields found in the metadata.
284    pub fn custom_fields(&self) -> &HashMap<String, String> {
285        &self.custom
286    }
287}
288
289impl FromStr for IcyMetadata {
290    type Err = EmptyMetadataError;
291
292    fn from_str(s: &str) -> Result<Self, Self::Err> {
293        let mut metadata = Self {
294            stream_title: None,
295            stream_url: None,
296            custom: HashMap::new(),
297        };
298
299        let ParseResult {
300            map,
301            errors_found,
302            missing_quotes_found,
303        } = parse_delimited_string(s);
304        if map.is_empty() {
305            return Err(EmptyMetadataError(s.to_string()));
306        }
307
308        let mut fields_found = 0;
309        let mut stray_values_found = false;
310        for (key, value) in map {
311            fields_found += 1;
312            match key.to_ascii_lowercase().as_str() {
313                "streamtitle" => {
314                    metadata.stream_title = Some(value.to_string());
315                }
316                "streamurl" => {
317                    metadata.stream_url = Some(value.to_string());
318                }
319                _ => {
320                    metadata.custom.insert(key.to_string(), value.to_string());
321                    stray_values_found = true;
322                }
323            }
324        }
325        // Escaping characters like quotes, semicolons, and equal signs within the metadata string
326        // doesn't seem to be well-defined Here we try to handle the scenario where a stray
327        // semicolon in one of the values messes with the parsing by relying on the fact
328        // that StreamTitle and StreamUrl should be the only valid keys
329        if errors_found || stray_values_found {
330            let semicolon_count = s.chars().filter(|c| *c == ';').count();
331            if semicolon_count > fields_found || missing_quotes_found {
332                warn!(
333                    metadata_string = s,
334                    "found possibly malformed metadata, attempting to resolve any unescaped fields",
335                );
336                handle_unescaped_values(s, &mut metadata);
337            }
338        }
339
340        Ok(metadata)
341    }
342}
343
344fn handle_unescaped_values(s: &str, metadata: &mut IcyMetadata) {
345    let lower_string = s.to_ascii_lowercase();
346    let stream_title_index = lower_string.find("streamtitle=");
347    let stream_url_index = lower_string.find("streamurl=");
348
349    let (stream_title, stream_url) = match (stream_title_index, stream_url_index) {
350        (Some(stream_title_index), Some(stream_url_index)) => {
351            let (stream_title, stream_url) = if stream_title_index < stream_url_index {
352                let stream_title = &s[stream_title_index..stream_url_index];
353                let stream_url = &s[stream_url_index..];
354                (stream_title, stream_url)
355            } else {
356                let stream_url = &s[stream_url_index..stream_title_index];
357                let stream_title = &s[stream_title_index..];
358                (stream_title, stream_url)
359            };
360            (Some(stream_title), Some(stream_url))
361        }
362        (Some(stream_title_index), None) => {
363            let stream_title = &s[stream_title_index..];
364            (Some(stream_title), None)
365        }
366        (None, Some(stream_url_index)) => {
367            let stream_url = &s[stream_url_index..];
368            (None, Some(stream_url))
369        }
370        (None, None) => (None, None),
371    };
372
373    if let Some(stream_title) = stream_title {
374        metadata.stream_title = parse_value_if_valid(stream_title);
375    };
376
377    if let Some(stream_url) = stream_url {
378        metadata.stream_url = parse_value_if_valid(stream_url);
379    };
380}
381
382#[derive(Debug)]
383struct MetadataSize {
384    size: usize,
385    count: usize,
386}
387
388#[derive(Debug)]
389struct MetadataSizeQueue {
390    inner: VecDeque<MetadataSize>,
391    cache_size: usize,
392}
393
394impl MetadataSizeQueue {
395    fn push(&mut self, size: usize) {
396        if let Some(last) = self.inner.back_mut() {
397            if last.size == size {
398                last.count += 1;
399                return;
400            }
401        }
402        self.inner.push_back(MetadataSize { size, count: 1 });
403        if self.inner.len() >= self.cache_size {
404            self.inner.pop_front();
405        }
406    }
407
408    fn pop(&mut self) -> Option<usize> {
409        let last = self.inner.back_mut()?;
410        last.count -= 1;
411        let size = last.size;
412        if last.count == 0 {
413            self.inner.pop_back();
414        }
415        Some(size)
416    }
417
418    fn peek(&self) -> Option<usize> {
419        self.inner.back().map(|b| b.size)
420    }
421}