retry_on_failure/
retry_on_failure.rs1use std::error::Error;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use icy_metadata::{IcyHeaders, IcyMetadataReader, RequestIcyMetadata};
7use stream_download::http::HttpStream;
8use stream_download::http::reqwest::Client;
9use stream_download::source::DecodeError;
10use stream_download::storage::bounded::BoundedStorageProvider;
11use stream_download::storage::memory::MemoryStorageProvider;
12use stream_download::{Settings, StreamDownload};
13
14#[tokio::main]
15async fn main() -> Result<(), Box<dyn Error>> {
16 let (_stream, handle) = rodio::OutputStream::try_default()?;
17
18 let restart = Arc::new(AtomicBool::new(true));
19
20 loop {
21 if !restart.swap(false, Ordering::Relaxed) {
22 return Ok(());
23 }
24
25 let sink = rodio::Sink::try_new(&handle)?;
26
27 let client = Client::builder().request_icy_metadata().build()?;
30
31 let stream =
32 HttpStream::new(client, "https://ice6.somafm.com/insound-128-mp3".parse()?).await?;
33
34 let icy_headers = IcyHeaders::parse_from_headers(stream.headers());
35 println!("Icecast headers: {icy_headers:#?}\n");
36 println!("content type={:?}\n", stream.content_type());
37
38 let prefetch_bytes = icy_headers.bitrate().unwrap() / 8 * 1024 * 5;
41
42 let reader = match StreamDownload::from_stream(
43 stream,
44 BoundedStorageProvider::new(
46 MemoryStorageProvider,
47 NonZeroUsize::new(512 * 1024).unwrap(),
50 ),
51 Settings::default()
52 .prefetch_bytes(prefetch_bytes as u64)
53 .on_reconnect({
54 let restart = restart.clone();
55 move |_stream, cancellation_token| {
56 cancellation_token.cancel();
61 restart.store(true, Ordering::Relaxed);
62 }
63 }),
64 )
65 .await
66 {
67 Ok(reader) => reader,
68 Err(e) => return Err(e.decode_error().await)?,
69 };
70
71 sink.append(rodio::Decoder::new(IcyMetadataReader::new(
72 reader,
73 icy_headers.metadata_interval(),
76 |metadata| {
78 println!("{metadata:#?}\n");
79 },
80 ))?);
81
82 let handle = tokio::task::spawn_blocking(move || {
83 sink.sleep_until_end();
84 });
85 handle.await?;
86 }
87}