retry_on_failure/
retry_on_failure.rs

1use std::error::Error;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use icy_metadata::{IcyHeaders, IcyMetadataReader, RequestIcyMetadata};
7use stream_download::http::HttpStream;
8use stream_download::http::reqwest::Client;
9use stream_download::source::DecodeError;
10use stream_download::storage::bounded::BoundedStorageProvider;
11use stream_download::storage::memory::MemoryStorageProvider;
12use stream_download::{Settings, StreamDownload};
13
14#[tokio::main]
15async fn main() -> Result<(), Box<dyn Error>> {
16    let restart = Arc::new(AtomicBool::new(true));
17
18    loop {
19        if !restart.swap(false, Ordering::Relaxed) {
20            return Ok(());
21        }
22
23        let stream_handle = rodio::OutputStreamBuilder::open_default_stream()?;
24        let sink = rodio::Sink::connect_new(stream_handle.mixer());
25
26        // We need to add a header to tell the Icecast server that we can parse the metadata
27        // embedded within the stream itself.
28        let client = Client::builder().request_icy_metadata().build()?;
29
30        let stream =
31            HttpStream::new(client, "https://ice6.somafm.com/insound-128-mp3".parse()?).await?;
32
33        let icy_headers = IcyHeaders::parse_from_headers(stream.headers());
34        println!("Icecast headers: {icy_headers:#?}\n");
35        println!("content type={:?}\n", stream.content_type());
36
37        // buffer 5 seconds of audio
38        // bitrate (in kilobits) / bits per byte * bytes per kilobyte * 5 seconds
39        let prefetch_bytes = icy_headers.bitrate().unwrap() / 8 * 1024 * 5;
40
41        let reader = match StreamDownload::from_stream(
42            stream,
43            // use bounded storage to keep the underlying size from growing indefinitely
44            BoundedStorageProvider::new(
45                MemoryStorageProvider,
46                // be liberal with the buffer size, you need to make sure it holds enough space to
47                // prevent any out-of-bounds reads
48                NonZeroUsize::new(512 * 1024).unwrap(),
49            ),
50            Settings::default()
51                .prefetch_bytes(prefetch_bytes as u64)
52                .on_reconnect({
53                    let restart = restart.clone();
54                    move |_stream, cancellation_token| {
55                        // If the stream reconnects after a network failure, the internal state of
56                        // the metadata parser is likely invalid.
57                        // We should cancel the current download task and re-instantiate it in order
58                        // to reset everything.
59                        cancellation_token.cancel();
60                        restart.store(true, Ordering::Relaxed);
61                    }
62                }),
63        )
64        .await
65        {
66            Ok(reader) => reader,
67            Err(e) => return Err(e.decode_error().await)?,
68        };
69
70        sink.append(rodio::Decoder::new(IcyMetadataReader::new(
71            reader,
72            // Since we requested icy metadata, the metadata interval header should be present in
73            // the response. This will allow us to parse the metadata within the stream
74            icy_headers.metadata_interval(),
75            // Print the stream metadata whenever we receive new values
76            |metadata| {
77                println!("{metadata:#?}\n");
78            },
79        ))?);
80
81        let handle = tokio::task::spawn_blocking(move || {
82            sink.sleep_until_end();
83        });
84        handle.await?;
85    }
86}