retry_on_failure/
retry_on_failure.rs1use std::error::Error;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use icy_metadata::{IcyHeaders, IcyMetadataReader, RequestIcyMetadata};
7use stream_download::http::HttpStream;
8use stream_download::http::reqwest::Client;
9use stream_download::source::DecodeError;
10use stream_download::storage::bounded::BoundedStorageProvider;
11use stream_download::storage::memory::MemoryStorageProvider;
12use stream_download::{Settings, StreamDownload};
13
14#[tokio::main]
15async fn main() -> Result<(), Box<dyn Error>> {
16 let restart = Arc::new(AtomicBool::new(true));
17
18 loop {
19 if !restart.swap(false, Ordering::Relaxed) {
20 return Ok(());
21 }
22
23 let stream_handle = rodio::OutputStreamBuilder::open_default_stream()?;
24 let sink = rodio::Sink::connect_new(stream_handle.mixer());
25
26 let client = Client::builder().request_icy_metadata().build()?;
29
30 let stream =
31 HttpStream::new(client, "https://ice6.somafm.com/insound-128-mp3".parse()?).await?;
32
33 let icy_headers = IcyHeaders::parse_from_headers(stream.headers());
34 println!("Icecast headers: {icy_headers:#?}\n");
35 println!("content type={:?}\n", stream.content_type());
36
37 let prefetch_bytes = icy_headers.bitrate().unwrap() / 8 * 1024 * 5;
40
41 let reader = match StreamDownload::from_stream(
42 stream,
43 BoundedStorageProvider::new(
45 MemoryStorageProvider,
46 NonZeroUsize::new(512 * 1024).unwrap(),
49 ),
50 Settings::default()
51 .prefetch_bytes(prefetch_bytes as u64)
52 .on_reconnect({
53 let restart = restart.clone();
54 move |_stream, cancellation_token| {
55 cancellation_token.cancel();
60 restart.store(true, Ordering::Relaxed);
61 }
62 }),
63 )
64 .await
65 {
66 Ok(reader) => reader,
67 Err(e) => return Err(e.decode_error().await)?,
68 };
69
70 sink.append(rodio::Decoder::new(IcyMetadataReader::new(
71 reader,
72 icy_headers.metadata_interval(),
75 |metadata| {
77 println!("{metadata:#?}\n");
78 },
79 ))?);
80
81 let handle = tokio::task::spawn_blocking(move || {
82 sink.sleep_until_end();
83 });
84 handle.await?;
85 }
86}