mecomp_analysis/decoder/mod.rs
1use std::{
2 clone::Clone,
3 marker::Send,
4 num::NonZeroUsize,
5 path::{Path, PathBuf},
6 sync::mpsc,
7 thread,
8};
9
10use log::info;
11
12use crate::{errors::AnalysisResult, Analysis, ResampledAudio};
13
14mod mecomp;
15#[allow(clippy::module_name_repetitions)]
16pub use mecomp::MecompDecoder;
17
18/// Trait used to implement your own decoder.
19///
20/// The `decode` function should be implemented so that it
21/// decodes and resample a song to one channel with a sampling rate of 22050 Hz
22/// and a f32le layout.
23/// Once it is implemented, several functions
24/// to perform analysis from path(s) are available, such as
25/// [`song_from_path`](Decoder::song_from_path) and
26/// [`analyze_paths`](Decoder::analyze_paths).
27#[allow(clippy::module_name_repetitions)]
28pub trait Decoder {
29 /// A function that should decode and resample a song, optionally
30 /// extracting the song's metadata such as the artist, the album, etc.
31 ///
32 /// The output sample array should be resampled to f32le, one channel, with a sampling rate
33 /// of 22050 Hz. Anything other than that will yield wrong results.
34 ///
35 /// # Errors
36 ///
37 /// This function will return an error if the file path is invalid, if
38 /// the file path points to a file containing no or corrupted audio stream,
39 /// or if the analysis could not be conducted to the end for some reason.
40 ///
41 /// The error type returned should give a hint as to whether it was a
42 /// decoding or an analysis error.
43 fn decode(path: &Path) -> AnalysisResult<ResampledAudio>;
44
45 /// Returns a decoded song's `Analysis` given a file path, or an error if the song
46 /// could not be analyzed for some reason.
47 ///
48 /// # Arguments
49 ///
50 /// * `path` - A [`Path`] holding a valid file path to a valid audio file.
51 ///
52 /// # Errors
53 ///
54 /// This function will return an error if the file path is invalid, if
55 /// the file path points to a file containing no or corrupted audio stream,
56 /// or if the analysis could not be conducted to the end for some reason.
57 ///
58 /// The error type returned should give a hint as to whether it was a
59 /// decoding or an analysis error.
60 fn analyze_path<P: AsRef<Path>>(path: P) -> AnalysisResult<Analysis> {
61 Self::decode(path.as_ref())?.try_into()
62 }
63
64 /// Analyze songs in `paths`, and return the `Analysis` objects through an
65 /// [`mpsc::IntoIter`].
66 ///
67 /// Returns an iterator, whose items are a tuple made of
68 /// the song path (to display to the user in case the analysis failed),
69 /// and a `Result<Analysis>`.
70 fn analyze_paths<P: Into<PathBuf>, F: IntoIterator<Item = P>>(
71 paths: F,
72 ) -> mpsc::IntoIter<(PathBuf, AnalysisResult<Analysis>)> {
73 let cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
74 Self::analyze_paths_with_cores(paths, cores)
75 }
76
77 /// Analyze songs in `paths`, and return the `Analysis` objects through an
78 /// [`mpsc::IntoIter`]. `number_cores` sets the number of cores the analysis
79 /// will use, capped by your system's capacity. Most of the time, you want to
80 /// use the simpler `analyze_paths` functions, which autodetects the number
81 /// of cores in your system.
82 ///
83 /// Return an iterator, whose items are a tuple made of
84 /// the song path (to display to the user in case the analysis failed),
85 /// and a `Result<Analysis>`.
86 fn analyze_paths_with_cores<P: Into<PathBuf>, F: IntoIterator<Item = P>>(
87 paths: F,
88 number_cores: NonZeroUsize,
89 ) -> mpsc::IntoIter<(PathBuf, AnalysisResult<Analysis>)> {
90 let mut cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
91 if cores > number_cores {
92 cores = number_cores;
93 }
94 let paths: Vec<PathBuf> = paths.into_iter().map(Into::into).collect();
95 let (tx, rx) = mpsc::channel::<(PathBuf, AnalysisResult<Analysis>)>();
96 if paths.is_empty() {
97 return rx.into_iter();
98 }
99 let mut handles = Vec::new();
100 let mut chunk_length = paths.len() / cores;
101 if chunk_length == 0 {
102 chunk_length = paths.len();
103 }
104 for chunk in paths.chunks(chunk_length) {
105 let tx_thread = tx.clone();
106 let owned_chunk = chunk.to_owned();
107 let child = thread::spawn(move || {
108 for path in owned_chunk {
109 info!("Analyzing file '{:?}'", path);
110 let song = Self::analyze_path(&path);
111 tx_thread.send((path.clone(), song)).unwrap();
112 }
113 });
114 handles.push(child);
115 }
116
117 for handle in handles {
118 handle.join().unwrap();
119 }
120
121 rx.into_iter()
122 }
123}
124
125/// This trait implements functions in the [`Decoder`] trait that take a callback to run on the results.
126///
127/// It should not be implemented directly, it will be automatically implemented for any type that implements
128/// the [`Decoder`] trait.
129///
130/// Instead of sending an iterator of results, this trait sends each result over the provided channel as soon as it's ready
131#[allow(clippy::module_name_repetitions)]
132pub trait DecoderWithCallback: Decoder {
133 /// Returns a decoded song's `Analysis` given a file path, or an error if the song
134 /// could not be analyzed for some reason.
135 ///
136 /// # Arguments
137 ///
138 /// * `path` - A [`Path`] holding a valid file path to a valid audio file.
139 /// * `callback` - A function that will be called with the path and the result of the analysis.
140 ///
141 /// # Errors
142 ///
143 /// This function will return an error if the file path is invalid, if
144 /// the file path points to a file containing no or corrupted audio stream,
145 /// or if the analysis could not be conducted to the end for some reason.
146 ///
147 /// The error type returned should give a hint as to whether it was a
148 /// decoding or an analysis error.
149 fn analyze_path_with_callback<P: AsRef<Path>, CallbackState>(
150 path: P,
151 callback: mpsc::Sender<(P, AnalysisResult<Analysis>)>,
152 ) {
153 let song = Self::analyze_path(&path);
154 callback.send((path, song)).unwrap();
155
156 // We don't need to return the result of the send, as the receiver will
157 }
158
159 /// Analyze songs in `paths`, and return the `Analysis` objects through an
160 /// [`mpsc::IntoIter`].
161 ///
162 /// Returns an iterator, whose items are a tuple made of
163 /// the song path (to display to the user in case the analysis failed),
164 /// and a `Result<Analysis>`.
165 fn analyze_paths_with_callback<P: Into<PathBuf>, I: Send + IntoIterator<Item = P>>(
166 paths: I,
167 callback: mpsc::Sender<(PathBuf, AnalysisResult<Analysis>)>,
168 ) {
169 let cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
170 Self::analyze_paths_with_cores_with_callback(paths, cores, callback);
171 }
172
173 /// Analyze songs in `paths`, and return the `Analysis` objects through an
174 /// [`mpsc::IntoIter`]. `number_cores` sets the number of cores the analysis
175 /// will use, capped by your system's capacity. Most of the time, you want to
176 /// use the simpler `analyze_paths_with_callback` functions, which autodetects the number
177 /// of cores in your system.
178 ///
179 /// Return an iterator, whose items are a tuple made of
180 /// the song path (to display to the user in case the analysis failed),
181 /// and a `Result<Analysis>`.
182 fn analyze_paths_with_cores_with_callback<P: Into<PathBuf>, I: IntoIterator<Item = P>>(
183 paths: I,
184 number_cores: NonZeroUsize,
185 callback: mpsc::Sender<(PathBuf, AnalysisResult<Analysis>)>,
186 ) {
187 let mut cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
188 if cores > number_cores {
189 cores = number_cores;
190 }
191 let paths: Vec<PathBuf> = paths.into_iter().map(Into::into).collect();
192 let mut chunk_length = paths.len() / cores;
193 if chunk_length == 0 {
194 chunk_length = paths.len();
195 }
196
197 if paths.is_empty() {
198 return;
199 }
200
201 thread::scope(move |scope| {
202 let mut handles = Vec::new();
203 for chunk in paths.chunks(chunk_length) {
204 let owned_chunk = chunk.to_owned();
205
206 let tx_thread: mpsc::Sender<_> = callback.clone();
207
208 let child = scope.spawn(move || {
209 for path in owned_chunk {
210 info!("Analyzing file '{:?}'", path);
211
212 let song = Self::analyze_path(&path);
213
214 tx_thread.send((path, song)).unwrap();
215 }
216 });
217 handles.push(child);
218 }
219
220 for handle in handles {
221 handle.join().unwrap();
222 }
223 });
224 }
225}
226
227impl<T: Decoder> DecoderWithCallback for T {}