1extern crate chromaprint_rust;
2extern crate ffmpeg_next;
3#[cfg(feature = "rayon")]
4extern crate rayon;
5
6use chromaprint_rust as chromaprint;
7
8use std::path::Path;
9use std::time::Duration;
10
11#[cfg(feature = "rayon")]
12use rayon::prelude::*;
13use serde::{Deserialize, Serialize};
14
15use crate::{Error, Result};
16
17#[derive(Debug, Deserialize, Serialize)]
24pub struct FrameHashes {
25 pub(crate) hash_period: f32,
26 pub(crate) hash_duration: f32,
27 pub(crate) data: Vec<(u32, Duration)>,
28 pub(crate) video_size: usize,
32}
33
34impl FrameHashes {
35 fn from_path(path: impl AsRef<Path>) -> Result<Self> {
37 let path = path.as_ref();
38 if !path.exists() {
39 return Err(Error::FrameHashDataNotFound(path.to_owned()).into());
40 }
41 let f = std::fs::File::open(path)?;
42 Ok(bincode::deserialize_from(&f)?)
43 }
44
45 pub fn from_video(video: impl AsRef<Path>, analyze: bool) -> Result<Self> {
50 let video = video.as_ref();
51
52 if !analyze {
53 let path = video
54 .to_owned()
55 .with_extension(super::FRAME_HASH_DATA_FILE_EXT);
56 Self::from_path(&path)
57 } else {
58 tracing::debug!(
59 "starting in-place video analysis for {}...",
60 video.display()
61 );
62 let analyzer = super::Analyzer::<&Path>::default().with_force(true);
63 let frame_hashes = analyzer.run_single(
64 video,
65 super::DEFAULT_HASH_PERIOD,
66 super::DEFAULT_HASH_DURATION,
67 false,
68 )?;
69 tracing::debug!("completed in-place video analysis for {}", video.display());
70 Ok(frame_hashes)
71 }
72 }
73}
74
75struct Decoder {
77 decoder: ffmpeg_next::codec::decoder::Audio,
78}
79
80impl Decoder {
81 fn build_threading_config() -> ffmpeg_next::codec::threading::Config {
82 let mut config = ffmpeg_next::codec::threading::Config::default();
83 config.count = std::thread::available_parallelism()
84 .expect("unable to determine available parallelism")
85 .get();
86 config.kind = ffmpeg_next::codec::threading::Type::Frame;
87 config
88 }
89
90 fn from_stream(stream: ffmpeg_next::format::stream::Stream, threaded: bool) -> Result<Self> {
91 let ctx = ffmpeg_next::codec::context::Context::from_parameters(stream.parameters())?;
92 let mut decoder = ctx.decoder();
93
94 if threaded {
95 decoder.set_threading(Self::build_threading_config());
96 }
97
98 let decoder = decoder.audio()?;
99
100 Ok(Self { decoder })
101 }
102
103 fn send_packet(&mut self, packet: &ffmpeg_next::packet::Packet) -> Result<()> {
104 Ok(self.decoder.send_packet(packet)?)
105 }
106
107 fn receive_frame(&mut self, frame: &mut ffmpeg_next::frame::Audio) -> Result<()> {
108 Ok(self.decoder.receive_frame(frame)?)
109 }
110}
111
112#[derive(Debug)]
144pub struct Analyzer<P: AsRef<Path>> {
145 pub(crate) videos: Vec<P>,
146 threaded_decoding: bool,
147 force: bool,
148}
149
150impl<P: AsRef<Path>> Default for Analyzer<P> {
151 fn default() -> Self {
152 Self {
153 videos: Default::default(),
154 threaded_decoding: false,
155 force: false,
156 }
157 }
158}
159
160impl<P: AsRef<Path>> Analyzer<P> {
161 pub fn from_files(videos: impl Into<Vec<P>>, threaded_decoding: bool, force: bool) -> Self {
163 Self {
164 videos: videos.into(),
165 threaded_decoding,
166 force,
167 }
168 }
169
170 pub fn videos(&self) -> &[P] {
172 &self.videos
173 }
174
175 pub fn with_force(mut self, force: bool) -> Self {
177 self.force = force;
178 self
179 }
180
181 pub fn with_threaded_decoding(mut self, threaded_decoding: bool) -> Self {
183 self.threaded_decoding = threaded_decoding;
184 self
185 }
186
187 fn find_best_audio_stream(
188 input: &ffmpeg_next::format::context::Input,
189 ) -> ffmpeg_next::format::stream::Stream {
190 input
191 .streams()
192 .best(ffmpeg_next::media::Type::Audio)
193 .expect("unable to find an audio stream")
194 }
195
196 fn process_frames(
200 ctx: &mut ffmpeg_next::format::context::Input,
201 stream_idx: usize,
202 hash_duration: Duration,
203 hash_period: Duration,
204 threaded: bool,
205 ) -> Result<Vec<(u32, Duration)>> {
206 let span = tracing::span!(tracing::Level::TRACE, "process_frames");
207 let _enter = span.enter();
208
209 let stream = ctx.stream(stream_idx).unwrap();
210 let mut decoder = Decoder::from_stream(stream, threaded).unwrap();
211
212 let mut hashes = Vec::new();
213 let mut frame = ffmpeg_next::frame::Audio::empty();
214 let mut frame_resampled = ffmpeg_next::frame::Audio::empty();
215
216 let n = f32::ceil(hash_duration.as_secs_f32() / hash_period.as_secs_f32()) as usize;
218 let mut fingerprinter =
219 chromaprint::DelayedFingerprinter::new(n, hash_duration, hash_period, None, 2, None);
220
221 let target_sample_rate = fingerprinter.sample_rate();
223 let mut resampler = decoder
224 .decoder
225 .resampler(
226 ffmpeg_next::format::Sample::I16(ffmpeg_next::format::sample::Type::Packed),
227 ffmpeg_next::ChannelLayout::STEREO,
228 target_sample_rate,
229 )
230 .unwrap();
231
232 let audio_packets = ctx
234 .packets()
235 .filter(|(s, _)| s.index() == stream_idx)
236 .map(|(_, p)| p);
237
238 for p in audio_packets {
239 decoder.send_packet(&p).unwrap();
240 while decoder.receive_frame(&mut frame).is_ok() {
241 let mut delay = match resampler.run(&frame, &mut frame_resampled) {
243 Ok(v) => v,
244 Err(ffmpeg_next::Error::InputChanged) => {
247 let mut local_resampler = frame
248 .resampler(
249 ffmpeg_next::format::Sample::I16(
250 ffmpeg_next::format::sample::Type::Packed,
251 ),
252 ffmpeg_next::ChannelLayout::STEREO,
253 target_sample_rate,
254 )
255 .unwrap();
256 let delay = local_resampler
257 .run(&frame, &mut frame_resampled)
258 .expect("failed to resample frame");
259
260 resampler = local_resampler;
261
262 delay
263 }
264 Err(_) => panic!("unexpected error"),
266 };
267
268 loop {
269 let raw_samples = &frame_resampled.data(0)
275 [..frame_resampled.samples() * frame_resampled.channels() as usize * 2];
276
277 let (_, samples, _) = unsafe { raw_samples.align_to() };
283
284 for (raw_fingerprint, ts) in fingerprinter.feed(samples).unwrap() {
287 let hash = chromaprint::simhash::simhash32(raw_fingerprint.get());
288 hashes.push((hash, ts));
289 }
290
291 if delay.is_none() {
292 break;
293 } else {
294 delay = resampler.flush(&mut frame_resampled).unwrap();
295 }
296 }
297 }
298 }
299
300 Ok(hashes)
301 }
302
303 pub(crate) fn run_single(
304 &self,
305 path: impl AsRef<Path>,
306 hash_period: f32,
307 hash_duration: f32,
308 persist: bool,
309 ) -> Result<FrameHashes> {
310 let span = tracing::span!(tracing::Level::TRACE, "run");
311 let _enter = span.enter();
312
313 let path = path.as_ref();
314 let frame_hash_path = path.with_extension(super::FRAME_HASH_DATA_FILE_EXT);
315
316 let video_size = std::fs::File::open(&path)?.metadata()?.len() as usize;
318 if !self.force {
319 if let Ok(f) = std::fs::File::open(&frame_hash_path) {
320 let data: FrameHashes = bincode::deserialize_from(&f).unwrap();
321 if data.video_size == video_size {
322 println!("Skipping analysis for {}...", path.display());
323 return Ok(data);
324 }
325 }
326 }
327
328 let mut ctx = ffmpeg_next::format::input(&path)?;
329 let stream = Self::find_best_audio_stream(&ctx);
330 let stream_idx = stream.index();
331 let threaded = self.threaded_decoding;
332
333 tracing::debug!("starting frame processing for {}", path.display());
334 let frame_hashes = Self::process_frames(
335 &mut ctx,
336 stream_idx,
337 Duration::from_secs_f32(hash_duration),
338 Duration::from_secs_f32(hash_period),
339 threaded,
340 )?;
341 tracing::debug!(
342 num_hashes = frame_hashes.len(),
343 "completed frame processing for {}",
344 path.display(),
345 );
346
347 let frame_hashes = FrameHashes {
348 hash_period,
349 hash_duration,
350 data: frame_hashes,
351 video_size,
352 };
353
354 if persist {
356 let mut f = std::fs::File::create(&frame_hash_path)?;
357 bincode::serialize_into(&mut f, &frame_hashes)?;
358 }
359
360 Ok(frame_hashes)
361 }
362}
363
364impl<P: AsRef<Path> + Sync> Analyzer<P> {
365 pub fn run(
367 &self,
368 hash_period: f32,
369 hash_duration: f32,
370 persist: bool,
371 threading: bool,
372 ) -> Result<Vec<FrameHashes>> {
373 if self.videos.len() == 0 {
374 return Err(Error::AnalyzerMissingPaths.into());
375 }
376
377 let mut data = Vec::new();
378
379 if cfg!(feature = "rayon") && threading {
380 #[cfg(feature = "rayon")]
381 {
382 data = self
383 .videos
384 .par_iter()
385 .map(|path| {
386 self.run_single(path, hash_period, hash_duration, persist)
387 .unwrap()
388 })
389 .collect::<Vec<_>>();
390 }
391 } else {
392 data.extend(self.videos.iter().map(|path| {
393 self.run_single(path, hash_period, hash_duration, persist)
394 .unwrap()
395 }));
396 }
397
398 Ok(data)
399 }
400}
401
402#[cfg(test)]
403mod test {
404 use std::path::PathBuf;
405
406 use super::*;
407
408 fn get_sample_paths() -> Vec<PathBuf> {
409 let resources = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("resources");
410 vec![
411 resources.join("sample-5s.mp4"),
412 resources.join("sample-shifted-4s.mp4"),
413 ]
414 }
415
416 #[test]
417 fn test_analyzer() {
418 let paths = get_sample_paths();
419 let analyzer = Analyzer::from_files(paths.clone(), false, false);
420 let data = analyzer.run(0.3, 3.0, false, false).unwrap();
421 insta::assert_debug_snapshot!(data);
422 }
423}