mecomp_analysis/decoder/
mod.rs

1use std::{
2    clone::Clone,
3    marker::Send,
4    num::NonZeroUsize,
5    path::{Path, PathBuf},
6    sync::mpsc,
7    thread,
8};
9
10use log::info;
11
12use crate::{errors::AnalysisResult, Analysis, ResampledAudio};
13
14mod mecomp;
15#[allow(clippy::module_name_repetitions)]
16pub use mecomp::MecompDecoder;
17
18/// Trait used to implement your own decoder.
19///
20/// The `decode` function should be implemented so that it
21/// decodes and resample a song to one channel with a sampling rate of 22050 Hz
22/// and a f32le layout.
23/// Once it is implemented, several functions
24/// to perform analysis from path(s) are available, such as
25/// [`song_from_path`](Decoder::song_from_path) and
26/// [`analyze_paths`](Decoder::analyze_paths).
27#[allow(clippy::module_name_repetitions)]
28pub trait Decoder {
29    /// A function that should decode and resample a song, optionally
30    /// extracting the song's metadata such as the artist, the album, etc.
31    ///
32    /// The output sample array should be resampled to f32le, one channel, with a sampling rate
33    /// of 22050 Hz. Anything other than that will yield wrong results.
34    ///
35    /// # Errors
36    ///
37    /// This function will return an error if the file path is invalid, if
38    /// the file path points to a file containing no or corrupted audio stream,
39    /// or if the analysis could not be conducted to the end for some reason.
40    ///
41    /// The error type returned should give a hint as to whether it was a
42    /// decoding or an analysis error.
43    fn decode(path: &Path) -> AnalysisResult<ResampledAudio>;
44
45    /// Returns a decoded song's `Analysis` given a file path, or an error if the song
46    /// could not be analyzed for some reason.
47    ///
48    /// # Arguments
49    ///
50    /// * `path` - A [`Path`] holding a valid file path to a valid audio file.
51    ///
52    /// # Errors
53    ///
54    /// This function will return an error if the file path is invalid, if
55    /// the file path points to a file containing no or corrupted audio stream,
56    /// or if the analysis could not be conducted to the end for some reason.
57    ///
58    /// The error type returned should give a hint as to whether it was a
59    /// decoding or an analysis error.
60    fn analyze_path<P: AsRef<Path>>(path: P) -> AnalysisResult<Analysis> {
61        Self::decode(path.as_ref())?.try_into()
62    }
63
64    /// Analyze songs in `paths`, and return the `Analysis` objects through an
65    /// [`mpsc::IntoIter`].
66    ///
67    /// Returns an iterator, whose items are a tuple made of
68    /// the song path (to display to the user in case the analysis failed),
69    /// and a `Result<Analysis>`.
70    fn analyze_paths<P: Into<PathBuf>, F: IntoIterator<Item = P>>(
71        paths: F,
72    ) -> mpsc::IntoIter<(PathBuf, AnalysisResult<Analysis>)> {
73        let cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
74        Self::analyze_paths_with_cores(paths, cores)
75    }
76
77    /// Analyze songs in `paths`, and return the `Analysis` objects through an
78    /// [`mpsc::IntoIter`]. `number_cores` sets the number of cores the analysis
79    /// will use, capped by your system's capacity. Most of the time, you want to
80    /// use the simpler `analyze_paths` functions, which autodetects the number
81    /// of cores in your system.
82    ///
83    /// Return an iterator, whose items are a tuple made of
84    /// the song path (to display to the user in case the analysis failed),
85    /// and a `Result<Analysis>`.
86    fn analyze_paths_with_cores<P: Into<PathBuf>, F: IntoIterator<Item = P>>(
87        paths: F,
88        number_cores: NonZeroUsize,
89    ) -> mpsc::IntoIter<(PathBuf, AnalysisResult<Analysis>)> {
90        let mut cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
91        if cores > number_cores {
92            cores = number_cores;
93        }
94        let paths: Vec<PathBuf> = paths.into_iter().map(Into::into).collect();
95        let (tx, rx) = mpsc::channel::<(PathBuf, AnalysisResult<Analysis>)>();
96        if paths.is_empty() {
97            return rx.into_iter();
98        }
99        let mut handles = Vec::new();
100        let mut chunk_length = paths.len() / cores;
101        if chunk_length == 0 {
102            chunk_length = paths.len();
103        }
104        for chunk in paths.chunks(chunk_length) {
105            let tx_thread = tx.clone();
106            let owned_chunk = chunk.to_owned();
107            let child = thread::spawn(move || {
108                for path in owned_chunk {
109                    info!("Analyzing file '{:?}'", path);
110                    let song = Self::analyze_path(&path);
111                    tx_thread.send((path.clone(), song)).unwrap();
112                }
113            });
114            handles.push(child);
115        }
116
117        for handle in handles {
118            handle.join().unwrap();
119        }
120
121        rx.into_iter()
122    }
123}
124
125/// This trait implements functions in the [`Decoder`] trait that take a callback to run on the results.
126///
127/// It should not be implemented directly, it will be automatically implemented for any type that implements
128/// the [`Decoder`] trait.
129///
130/// Instead of sending an iterator of results, this trait sends each result over the provided channel as soon as it's ready
131#[allow(clippy::module_name_repetitions)]
132pub trait DecoderWithCallback: Decoder {
133    /// Returns a decoded song's `Analysis` given a file path, or an error if the song
134    /// could not be analyzed for some reason.
135    ///
136    /// # Arguments
137    ///
138    /// * `path` - A [`Path`] holding a valid file path to a valid audio file.
139    /// * `callback` - A function that will be called with the path and the result of the analysis.
140    ///
141    /// # Errors
142    ///
143    /// This function will return an error if the file path is invalid, if
144    /// the file path points to a file containing no or corrupted audio stream,
145    /// or if the analysis could not be conducted to the end for some reason.
146    ///
147    /// The error type returned should give a hint as to whether it was a
148    /// decoding or an analysis error.
149    fn analyze_path_with_callback<P: AsRef<Path>, CallbackState>(
150        path: P,
151        callback: mpsc::Sender<(P, AnalysisResult<Analysis>)>,
152    ) {
153        let song = Self::analyze_path(&path);
154        callback.send((path, song)).unwrap();
155
156        // We don't need to return the result of the send, as the receiver will
157    }
158
159    /// Analyze songs in `paths`, and return the `Analysis` objects through an
160    /// [`mpsc::IntoIter`].
161    ///
162    /// Returns an iterator, whose items are a tuple made of
163    /// the song path (to display to the user in case the analysis failed),
164    /// and a `Result<Analysis>`.
165    fn analyze_paths_with_callback<P: Into<PathBuf>, I: Send + IntoIterator<Item = P>>(
166        paths: I,
167        callback: mpsc::Sender<(PathBuf, AnalysisResult<Analysis>)>,
168    ) {
169        let cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
170        Self::analyze_paths_with_cores_with_callback(paths, cores, callback);
171    }
172
173    /// Analyze songs in `paths`, and return the `Analysis` objects through an
174    /// [`mpsc::IntoIter`]. `number_cores` sets the number of cores the analysis
175    /// will use, capped by your system's capacity. Most of the time, you want to
176    /// use the simpler `analyze_paths_with_callback` functions, which autodetects the number
177    /// of cores in your system.
178    ///
179    /// Return an iterator, whose items are a tuple made of
180    /// the song path (to display to the user in case the analysis failed),
181    /// and a `Result<Analysis>`.
182    fn analyze_paths_with_cores_with_callback<P: Into<PathBuf>, I: IntoIterator<Item = P>>(
183        paths: I,
184        number_cores: NonZeroUsize,
185        callback: mpsc::Sender<(PathBuf, AnalysisResult<Analysis>)>,
186    ) {
187        let mut cores = thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap());
188        if cores > number_cores {
189            cores = number_cores;
190        }
191        let paths: Vec<PathBuf> = paths.into_iter().map(Into::into).collect();
192        let mut chunk_length = paths.len() / cores;
193        if chunk_length == 0 {
194            chunk_length = paths.len();
195        }
196
197        if paths.is_empty() {
198            return;
199        }
200
201        thread::scope(move |scope| {
202            let mut handles = Vec::new();
203            for chunk in paths.chunks(chunk_length) {
204                let owned_chunk = chunk.to_owned();
205
206                let tx_thread: mpsc::Sender<_> = callback.clone();
207
208                let child = scope.spawn(move || {
209                    for path in owned_chunk {
210                        info!("Analyzing file '{:?}'", path);
211
212                        let song = Self::analyze_path(&path);
213
214                        tx_thread.send((path, song)).unwrap();
215                    }
216                });
217                handles.push(child);
218            }
219
220            for handle in handles {
221                handle.join().unwrap();
222            }
223        });
224    }
225}
226
227impl<T: Decoder> DecoderWithCallback for T {}