spotify_dl/
download.rs

1use std::fmt::Write;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use anyhow::Result;
6use futures::StreamExt;
7use futures::TryStreamExt;
8use indicatif::MultiProgress;
9use indicatif::ProgressBar;
10use indicatif::ProgressState;
11use indicatif::ProgressStyle;
12use librespot::core::session::Session;
13
14use crate::encoder;
15use crate::encoder::Format;
16use crate::encoder::Samples;
17use crate::stream::Stream;
18use crate::stream::StreamEvent;
19use crate::stream::StreamEventChannel;
20use crate::track::Track;
21use crate::track::TrackMetadata;
22
23pub struct Downloader {
24    session: Session,
25    progress_bar: MultiProgress,
26}
27
28#[derive(Debug, Clone)]
29pub struct DownloadOptions {
30    pub destination: PathBuf,
31    pub parallel: usize,
32    pub format: Format,
33    pub force: bool,
34}
35
36impl DownloadOptions {
37    pub fn new(destination: Option<String>, parallel: usize, format: Format, force: bool) -> Self {
38        let destination =
39            destination.map_or_else(|| std::env::current_dir().unwrap(), PathBuf::from);
40        DownloadOptions {
41            destination,
42            parallel,
43            format,
44            force,
45        }
46    }
47}
48
49impl Downloader {
50    pub fn new(session: Session) -> Self {
51        Downloader {
52            session,
53            progress_bar: MultiProgress::new(),
54        }
55    }
56
57    pub async fn download_tracks(
58        self,
59        tracks: Vec<Track>,
60        options: &DownloadOptions,
61    ) -> Result<()> {
62        futures::stream::iter(tracks)
63            .map(|track| self.download_track(track, options))
64            .buffer_unordered(options.parallel)
65            .try_collect::<Vec<_>>()
66            .await?;
67
68        Ok(())
69    }
70
71    #[tracing::instrument(name = "download_track", skip(self))]
72    async fn download_track(&self, track: Track, options: &DownloadOptions) -> Result<()> {
73        let metadata = track.metadata(&self.session).await?;
74        tracing::info!("Downloading track: {:?}", metadata.track_name);
75
76        let path = options
77            .destination
78            .join(metadata.to_string())
79            .with_extension(options.format.extension())
80            .to_str()
81            .ok_or(anyhow::anyhow!("Could not set the output path"))?
82            .to_string();
83
84        if !options.force && PathBuf::from(&path).exists() {
85            tracing::info!(
86                "Skipping {}, file already exists. Use --force to force re-downloading the track",
87                &metadata.track_name
88            );
89            return Ok(());
90        }
91
92        let pb = self.add_progress_bar(&metadata);
93
94        let stream = Stream::new(self.session.clone());
95        let channel = match stream.stream(track).await {
96            Ok(channel) => channel,
97            Err(e) => {
98                self.fail_with_error(&pb, &metadata.to_string(), e.to_string());
99                return Ok(());
100            }
101        };
102
103        let samples = match self.buffer_track(channel, &pb, &metadata).await {
104            Ok(samples) => samples,
105            Err(e) => {
106                self.fail_with_error(&pb, &metadata.to_string(), e.to_string());
107                return Ok(());
108            }
109        };
110
111        tracing::info!("Encoding track: {}", metadata.to_string());
112        pb.set_message(format!("Encoding {}", metadata.to_string()));
113
114        let encoder = crate::encoder::get_encoder(options.format);
115        let stream = encoder.encode(samples).await?;
116
117        pb.set_message(format!("Writing {}", metadata.to_string()));
118        tracing::info!(
119            "Writing track: {:?} to file: {}",
120            metadata.to_string(),
121            &path
122        );
123        stream.write_to_file(&path).await?;
124
125        let tags = metadata.tags().await?;
126        encoder::tags::store_tags(path, &tags, options.format).await?;
127
128        pb.finish_with_message(format!("Downloaded {}", metadata.to_string()));
129        Ok(())
130    }
131
132    fn add_progress_bar(&self, track: &TrackMetadata) -> ProgressBar {
133        let pb = self
134            .progress_bar
135            .add(ProgressBar::new(track.approx_size() as u64));
136        pb.enable_steady_tick(Duration::from_millis(100));
137        pb.set_style(ProgressStyle::with_template("{spinner:.green} {msg} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
138            // Infallible
139            .unwrap()
140            .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
141            .progress_chars("#>-"));
142        pb.set_message(track.to_string());
143        pb
144    }
145
146    async fn buffer_track(
147        &self,
148        mut rx: StreamEventChannel,
149        pb: &ProgressBar,
150        metadata: &TrackMetadata,
151    ) -> Result<Samples> {
152        let mut samples = Vec::<i32>::new();
153        while let Some(event) = rx.recv().await {
154            match event {
155                StreamEvent::Write {
156                    bytes,
157                    total,
158                    mut content,
159                } => {
160                    tracing::trace!("Written {} bytes out of {}", bytes, total);
161                    pb.set_position(bytes as u64);
162                    samples.append(&mut content);
163                }
164                StreamEvent::Finished => {
165                    tracing::info!("Finished downloading track");
166                    break;
167                }
168                StreamEvent::Error(stream_error) => {
169                    tracing::error!("Error while streaming track: {:?}", stream_error);
170                    return Err(anyhow::anyhow!("Streaming error: {:?}", stream_error));
171                }
172                StreamEvent::Retry {
173                    attempt,
174                    max_attempts,
175                } => {
176                    tracing::warn!(
177                        "Retrying download, attempt {} of {}: {}",
178                        attempt,
179                        max_attempts,
180                        metadata.to_string()
181                    );
182                    pb.set_message(format!(
183                        "Retrying ({}/{}) {}",
184                        attempt,
185                        max_attempts,
186                        metadata.to_string()
187                    ));
188                }
189            }
190        }
191        Ok(Samples {
192            samples,
193            ..Default::default()
194        })
195    }
196
197    fn fail_with_error<S>(&self, pb: &ProgressBar, name: &str, e: S)
198    where
199        S: Into<String>,
200    {
201        tracing::error!("Failed to download {}: {}", name, e.into());
202        pb.finish_with_message(console::style(format!("Failed! {}", name)).red().to_string());
203    }
204}