needle/audio/
analyzer.rs

1extern crate chromaprint_rust;
2extern crate ffmpeg_next;
3#[cfg(feature = "rayon")]
4extern crate rayon;
5
6use chromaprint_rust as chromaprint;
7
8use std::path::Path;
9use std::time::Duration;
10
11#[cfg(feature = "rayon")]
12use rayon::prelude::*;
13use serde::{Deserialize, Serialize};
14
15use crate::{Error, Result};
16
17/// Represents frame hash data for a single video file. This is the result of running
18/// an [Analyzer] on a video file.
19///
20/// The struct contains the raw data as well as metadata about how the data was generated. The
21/// original video size is included to allow for primitive duplicate checks when deciding whether
22/// or not to skip analyzing a file.
23#[derive(Debug, Deserialize, Serialize)]
24pub struct FrameHashes {
25    pub(crate) hash_period: f32,
26    pub(crate) hash_duration: f32,
27    pub(crate) data: Vec<(u32, Duration)>,
28    /// Size of the video, in bytes. This is used as a primitive hash
29    /// to detect if the video file has changed since this data was
30    /// generated.
31    pub(crate) video_size: usize,
32}
33
34impl FrameHashes {
35    /// Load frame hashes from a path.
36    fn from_path(path: impl AsRef<Path>) -> Result<Self> {
37        let path = path.as_ref();
38        if !path.exists() {
39            return Err(Error::FrameHashDataNotFound(path.to_owned()).into());
40        }
41        let f = std::fs::File::open(path)?;
42        Ok(bincode::deserialize_from(&f)?)
43    }
44
45    /// Load frame hash data using a video path.
46    ///
47    /// If `analyze` is set, the video is analyzed in-place. Otherwise, the frame data is
48    /// loaded from alongside the video.
49    pub fn from_video(video: impl AsRef<Path>, analyze: bool) -> Result<Self> {
50        let video = video.as_ref();
51
52        if !analyze {
53            let path = video
54                .to_owned()
55                .with_extension(super::FRAME_HASH_DATA_FILE_EXT);
56            Self::from_path(&path)
57        } else {
58            tracing::debug!(
59                "starting in-place video analysis for {}...",
60                video.display()
61            );
62            let analyzer = super::Analyzer::<&Path>::default().with_force(true);
63            let frame_hashes = analyzer.run_single(
64                video,
65                super::DEFAULT_HASH_PERIOD,
66                super::DEFAULT_HASH_DURATION,
67                false,
68            )?;
69            tracing::debug!("completed in-place video analysis for {}", video.display());
70            Ok(frame_hashes)
71        }
72    }
73}
74
75/// Thin wrapper around the native `FFmpeg` audio decoder.
76struct Decoder {
77    decoder: ffmpeg_next::codec::decoder::Audio,
78}
79
80impl Decoder {
81    fn build_threading_config() -> ffmpeg_next::codec::threading::Config {
82        let mut config = ffmpeg_next::codec::threading::Config::default();
83        config.count = std::thread::available_parallelism()
84            .expect("unable to determine available parallelism")
85            .get();
86        config.kind = ffmpeg_next::codec::threading::Type::Frame;
87        config
88    }
89
90    fn from_stream(stream: ffmpeg_next::format::stream::Stream, threaded: bool) -> Result<Self> {
91        let ctx = ffmpeg_next::codec::context::Context::from_parameters(stream.parameters())?;
92        let mut decoder = ctx.decoder();
93
94        if threaded {
95            decoder.set_threading(Self::build_threading_config());
96        }
97
98        let decoder = decoder.audio()?;
99
100        Ok(Self { decoder })
101    }
102
103    fn send_packet(&mut self, packet: &ffmpeg_next::packet::Packet) -> Result<()> {
104        Ok(self.decoder.send_packet(packet)?)
105    }
106
107    fn receive_frame(&mut self, frame: &mut ffmpeg_next::frame::Audio) -> Result<()> {
108        Ok(self.decoder.receive_frame(frame)?)
109    }
110}
111
112/// Analyzes one or more videos and converts them into [FrameHashes].
113///
114/// If `threaded_decoding` is set to `true`, video files will be distributed across multiple threads
115/// based on the number of CPUs available. If `force` is set, any existing frame hash data on disk
116/// will be **ignored**.
117///
118/// At a high-level, the analyzer does the following for a given video:
119///
120/// 1. Extracts the most suitable audio stream
121/// 2. Decodes the audio frame-by-frame and resamples it for fingerprinting
122/// 3. Builds a fingerprint (or hash) based on the provided `hash_duration`
123/// 4. Returns a [FrameHashes] instance that contains the raw data and (optionally) writes it to disk
124///    alongside the video
125///
126/// # Example
127///
128/// ```
129/// use std::path::PathBuf;
130/// use needle::audio::Analyzer;
131/// # fn get_sample_paths() -> Vec<PathBuf> {
132/// #     let resources = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("resources");
133/// #     vec![
134/// #         resources.join("sample-5s.mp4"),
135/// #         resources.join("sample-shifted-4s.mp4"),
136/// #     ]
137/// # }
138///
139/// let video_paths: Vec<PathBuf> = get_sample_paths();
140/// let analyzer = Analyzer::from_files(video_paths, false, false);
141/// let frame_hashes = analyzer.run(1.0, 3.0, false).unwrap();
142/// ```
143#[derive(Debug)]
144pub struct Analyzer<P: AsRef<Path>> {
145    pub(crate) videos: Vec<P>,
146    threaded_decoding: bool,
147    force: bool,
148}
149
150impl<P: AsRef<Path>> Default for Analyzer<P> {
151    fn default() -> Self {
152        Self {
153            videos: Default::default(),
154            threaded_decoding: false,
155            force: false,
156        }
157    }
158}
159
160impl<P: AsRef<Path>> Analyzer<P> {
161    /// Constructs a new [Analyzer] from a list of video paths.
162    pub fn from_files(videos: impl Into<Vec<P>>, threaded_decoding: bool, force: bool) -> Self {
163        Self {
164            videos: videos.into(),
165            threaded_decoding,
166            force,
167        }
168    }
169
170    /// Returns the video paths used by this analyzer.
171    pub fn videos(&self) -> &[P] {
172        &self.videos
173    }
174
175    /// Returns a new [Analyzer] with `force` set to the provided value.
176    pub fn with_force(mut self, force: bool) -> Self {
177        self.force = force;
178        self
179    }
180
181    /// Returns a new [Analyzer] with `thread_decoding` set to the provided value.
182    pub fn with_threaded_decoding(mut self, threaded_decoding: bool) -> Self {
183        self.threaded_decoding = threaded_decoding;
184        self
185    }
186
187    fn find_best_audio_stream(
188        input: &ffmpeg_next::format::context::Input,
189    ) -> ffmpeg_next::format::stream::Stream {
190        input
191            .streams()
192            .best(ffmpeg_next::media::Type::Audio)
193            .expect("unable to find an audio stream")
194    }
195
196    // Given an audio stream, computes the fingerprint for raw audio for the given duration.
197    //
198    // `count` can be used to limit the number of frames to process.
199    fn process_frames(
200        ctx: &mut ffmpeg_next::format::context::Input,
201        stream_idx: usize,
202        hash_duration: Duration,
203        hash_period: Duration,
204        threaded: bool,
205    ) -> Result<Vec<(u32, Duration)>> {
206        let span = tracing::span!(tracing::Level::TRACE, "process_frames");
207        let _enter = span.enter();
208
209        let stream = ctx.stream(stream_idx).unwrap();
210        let mut decoder = Decoder::from_stream(stream, threaded).unwrap();
211
212        let mut hashes = Vec::new();
213        let mut frame = ffmpeg_next::frame::Audio::empty();
214        let mut frame_resampled = ffmpeg_next::frame::Audio::empty();
215
216        // Setup the audio fingerprinter
217        let n = f32::ceil(hash_duration.as_secs_f32() / hash_period.as_secs_f32()) as usize;
218        let mut fingerprinter =
219            chromaprint::DelayedFingerprinter::new(n, hash_duration, hash_period, None, 2, None);
220
221        // Setup the audio resampler
222        let target_sample_rate = fingerprinter.sample_rate();
223        let mut resampler = decoder
224            .decoder
225            .resampler(
226                ffmpeg_next::format::Sample::I16(ffmpeg_next::format::sample::Type::Packed),
227                ffmpeg_next::ChannelLayout::STEREO,
228                target_sample_rate,
229            )
230            .unwrap();
231
232        // Build an iterator over packets in the stream.
233        let audio_packets = ctx
234            .packets()
235            .filter(|(s, _)| s.index() == stream_idx)
236            .map(|(_, p)| p);
237
238        for p in audio_packets {
239            decoder.send_packet(&p).unwrap();
240            while decoder.receive_frame(&mut frame).is_ok() {
241                // Resample the frame to S16 stereo and return the frame delay.
242                let mut delay = match resampler.run(&frame, &mut frame_resampled) {
243                    Ok(v) => v,
244                    // If resampling fails due to changed input, construct a new local resampler for this frame
245                    // and swap out the global resampler.
246                    Err(ffmpeg_next::Error::InputChanged) => {
247                        let mut local_resampler = frame
248                            .resampler(
249                                ffmpeg_next::format::Sample::I16(
250                                    ffmpeg_next::format::sample::Type::Packed,
251                                ),
252                                ffmpeg_next::ChannelLayout::STEREO,
253                                target_sample_rate,
254                            )
255                            .unwrap();
256                        let delay = local_resampler
257                            .run(&frame, &mut frame_resampled)
258                            .expect("failed to resample frame");
259
260                        resampler = local_resampler;
261
262                        delay
263                    }
264                    // We don't expect any other errors to occur.
265                    Err(_) => panic!("unexpected error"),
266                };
267
268                loop {
269                    // Obtain a slice of raw bytes in interleaved format.
270                    // We have two channels, so the bytes look like this: c1, c1, c2, c2, c1, c1, c2, c2, ...
271                    //
272                    // Note that `data` is a fixed-size buffer. To get the _actual_ sample bytes, we need to use:
273                    // a) sample count, b) channel count, and c) number of bytes per S16 sample.
274                    let raw_samples = &frame_resampled.data(0)
275                        [..frame_resampled.samples() * frame_resampled.channels() as usize * 2];
276
277                    // Transmute the raw byte slice into a slice of i16 samples.
278                    // This looks like: c1, c2, c1, c2, ...
279                    //
280                    // SAFETY: We know for a fact that the returned buffer contains i16 samples
281                    // because we explicitly told the resampler to return S16 samples (see above).
282                    let (_, samples, _) = unsafe { raw_samples.align_to() };
283
284                    // Feed the i16 samples to Chromaprint. Since we are using the default sampling rate,
285                    // Chromaprint will _not_ do any resampling internally.
286                    for (raw_fingerprint, ts) in fingerprinter.feed(samples).unwrap() {
287                        let hash = chromaprint::simhash::simhash32(raw_fingerprint.get());
288                        hashes.push((hash, ts));
289                    }
290
291                    if delay.is_none() {
292                        break;
293                    } else {
294                        delay = resampler.flush(&mut frame_resampled).unwrap();
295                    }
296                }
297            }
298        }
299
300        Ok(hashes)
301    }
302
303    pub(crate) fn run_single(
304        &self,
305        path: impl AsRef<Path>,
306        hash_period: f32,
307        hash_duration: f32,
308        persist: bool,
309    ) -> Result<FrameHashes> {
310        let span = tracing::span!(tracing::Level::TRACE, "run");
311        let _enter = span.enter();
312
313        let path = path.as_ref();
314        let frame_hash_path = path.with_extension(super::FRAME_HASH_DATA_FILE_EXT);
315
316        // Check if we've already analyzed this video.
317        let video_size = std::fs::File::open(&path)?.metadata()?.len() as usize;
318        if !self.force {
319            if let Ok(f) = std::fs::File::open(&frame_hash_path) {
320                let data: FrameHashes = bincode::deserialize_from(&f).unwrap();
321                if data.video_size == video_size {
322                    println!("Skipping analysis for {}...", path.display());
323                    return Ok(data);
324                }
325            }
326        }
327
328        let mut ctx = ffmpeg_next::format::input(&path)?;
329        let stream = Self::find_best_audio_stream(&ctx);
330        let stream_idx = stream.index();
331        let threaded = self.threaded_decoding;
332
333        tracing::debug!("starting frame processing for {}", path.display());
334        let frame_hashes = Self::process_frames(
335            &mut ctx,
336            stream_idx,
337            Duration::from_secs_f32(hash_duration),
338            Duration::from_secs_f32(hash_period),
339            threaded,
340        )?;
341        tracing::debug!(
342            num_hashes = frame_hashes.len(),
343            "completed frame processing for {}",
344            path.display(),
345        );
346
347        let frame_hashes = FrameHashes {
348            hash_period,
349            hash_duration,
350            data: frame_hashes,
351            video_size,
352        };
353
354        // Write results to disk.
355        if persist {
356            let mut f = std::fs::File::create(&frame_hash_path)?;
357            bincode::serialize_into(&mut f, &frame_hashes)?;
358        }
359
360        Ok(frame_hashes)
361    }
362}
363
364impl<P: AsRef<Path> + Sync> Analyzer<P> {
365    /// Runs this analyzer.
366    pub fn run(
367        &self,
368        hash_period: f32,
369        hash_duration: f32,
370        persist: bool,
371        threading: bool,
372    ) -> Result<Vec<FrameHashes>> {
373        if self.videos.len() == 0 {
374            return Err(Error::AnalyzerMissingPaths.into());
375        }
376
377        let mut data = Vec::new();
378
379        if cfg!(feature = "rayon") && threading {
380            #[cfg(feature = "rayon")]
381            {
382                data = self
383                    .videos
384                    .par_iter()
385                    .map(|path| {
386                        self.run_single(path, hash_period, hash_duration, persist)
387                            .unwrap()
388                    })
389                    .collect::<Vec<_>>();
390            }
391        } else {
392            data.extend(self.videos.iter().map(|path| {
393                self.run_single(path, hash_period, hash_duration, persist)
394                    .unwrap()
395            }));
396        }
397
398        Ok(data)
399    }
400}
401
402#[cfg(test)]
403mod test {
404    use std::path::PathBuf;
405
406    use super::*;
407
408    fn get_sample_paths() -> Vec<PathBuf> {
409        let resources = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("resources");
410        vec![
411            resources.join("sample-5s.mp4"),
412            resources.join("sample-shifted-4s.mp4"),
413        ]
414    }
415
416    #[test]
417    fn test_analyzer() {
418        let paths = get_sample_paths();
419        let analyzer = Analyzer::from_files(paths.clone(), false, false);
420        let data = analyzer.run(0.3, 3.0, false, false).unwrap();
421        insta::assert_debug_snapshot!(data);
422    }
423}