1use std::fmt::Write;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use anyhow::Result;
6use futures::StreamExt;
7use futures::TryStreamExt;
8use indicatif::MultiProgress;
9use indicatif::ProgressBar;
10use indicatif::ProgressState;
11use indicatif::ProgressStyle;
12use librespot::core::session::Session;
13
14use crate::encoder;
15use crate::encoder::Format;
16use crate::encoder::Samples;
17use crate::stream::Stream;
18use crate::stream::StreamEvent;
19use crate::stream::StreamEventChannel;
20use crate::track::Track;
21use crate::track::TrackMetadata;
22
23pub struct Downloader {
24 session: Session,
25 progress_bar: MultiProgress,
26}
27
28#[derive(Debug, Clone)]
29pub struct DownloadOptions {
30 pub destination: PathBuf,
31 pub parallel: usize,
32 pub format: Format,
33 pub force: bool,
34}
35
36impl DownloadOptions {
37 pub fn new(destination: Option<String>, parallel: usize, format: Format, force: bool) -> Self {
38 let destination =
39 destination.map_or_else(|| std::env::current_dir().unwrap(), PathBuf::from);
40 DownloadOptions {
41 destination,
42 parallel,
43 format,
44 force,
45 }
46 }
47}
48
49impl Downloader {
50 pub fn new(session: Session) -> Self {
51 Downloader {
52 session,
53 progress_bar: MultiProgress::new(),
54 }
55 }
56
57 pub async fn download_tracks(
58 self,
59 tracks: Vec<Track>,
60 options: &DownloadOptions,
61 ) -> Result<()> {
62 futures::stream::iter(tracks)
63 .map(|track| self.download_track(track, options))
64 .buffer_unordered(options.parallel)
65 .try_collect::<Vec<_>>()
66 .await?;
67
68 Ok(())
69 }
70
71 #[tracing::instrument(name = "download_track", skip(self))]
72 async fn download_track(&self, track: Track, options: &DownloadOptions) -> Result<()> {
73 let metadata = track.metadata(&self.session).await?;
74 tracing::info!("Downloading track: {:?}", metadata.track_name);
75
76 let path = options
77 .destination
78 .join(metadata.to_string())
79 .with_extension(options.format.extension())
80 .to_str()
81 .ok_or(anyhow::anyhow!("Could not set the output path"))?
82 .to_string();
83
84 if !options.force && PathBuf::from(&path).exists() {
85 tracing::info!(
86 "Skipping {}, file already exists. Use --force to force re-downloading the track",
87 &metadata.track_name
88 );
89 return Ok(());
90 }
91
92 let pb = self.add_progress_bar(&metadata);
93
94 let stream = Stream::new(self.session.clone());
95 let channel = match stream.stream(track).await {
96 Ok(channel) => channel,
97 Err(e) => {
98 self.fail_with_error(&pb, &metadata.to_string(), e.to_string());
99 return Ok(());
100 }
101 };
102
103 let samples = match self.buffer_track(channel, &pb, &metadata).await {
104 Ok(samples) => samples,
105 Err(e) => {
106 self.fail_with_error(&pb, &metadata.to_string(), e.to_string());
107 return Ok(());
108 }
109 };
110
111 tracing::info!("Encoding track: {}", metadata.to_string());
112 pb.set_message(format!("Encoding {}", metadata.to_string()));
113
114 let encoder = crate::encoder::get_encoder(options.format);
115 let stream = encoder.encode(samples).await?;
116
117 pb.set_message(format!("Writing {}", metadata.to_string()));
118 tracing::info!(
119 "Writing track: {:?} to file: {}",
120 metadata.to_string(),
121 &path
122 );
123 stream.write_to_file(&path).await?;
124
125 let tags = metadata.tags().await?;
126 encoder::tags::store_tags(path, &tags, options.format).await?;
127
128 pb.finish_with_message(format!("Downloaded {}", metadata.to_string()));
129 Ok(())
130 }
131
132 fn add_progress_bar(&self, track: &TrackMetadata) -> ProgressBar {
133 let pb = self
134 .progress_bar
135 .add(ProgressBar::new(track.approx_size() as u64));
136 pb.enable_steady_tick(Duration::from_millis(100));
137 pb.set_style(ProgressStyle::with_template("{spinner:.green} {msg} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
138 .unwrap()
140 .with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
141 .progress_chars("#>-"));
142 pb.set_message(track.to_string());
143 pb
144 }
145
146 async fn buffer_track(
147 &self,
148 mut rx: StreamEventChannel,
149 pb: &ProgressBar,
150 metadata: &TrackMetadata,
151 ) -> Result<Samples> {
152 let mut samples = Vec::<i32>::new();
153 while let Some(event) = rx.recv().await {
154 match event {
155 StreamEvent::Write {
156 bytes,
157 total,
158 mut content,
159 } => {
160 tracing::trace!("Written {} bytes out of {}", bytes, total);
161 pb.set_position(bytes as u64);
162 samples.append(&mut content);
163 }
164 StreamEvent::Finished => {
165 tracing::info!("Finished downloading track");
166 break;
167 }
168 StreamEvent::Error(stream_error) => {
169 tracing::error!("Error while streaming track: {:?}", stream_error);
170 return Err(anyhow::anyhow!("Streaming error: {:?}", stream_error));
171 }
172 StreamEvent::Retry {
173 attempt,
174 max_attempts,
175 } => {
176 tracing::warn!(
177 "Retrying download, attempt {} of {}: {}",
178 attempt,
179 max_attempts,
180 metadata.to_string()
181 );
182 pb.set_message(format!(
183 "Retrying ({}/{}) {}",
184 attempt,
185 max_attempts,
186 metadata.to_string()
187 ));
188 }
189 }
190 }
191 Ok(Samples {
192 samples,
193 ..Default::default()
194 })
195 }
196
197 fn fail_with_error<S>(&self, pb: &ProgressBar, name: &str, e: S)
198 where
199 S: Into<String>,
200 {
201 tracing::error!("Failed to download {}: {}", name, e.into());
202 pb.finish_with_message(console::style(format!("Failed! {}", name)).red().to_string());
203 }
204}