alass_cli/video_decoder/
ffmpeg_binary.rs

1use failure::{Backtrace, Context, Fail, ResultExt};
2use std::ffi::OsString;
3use std::fmt;
4use std::io::Read;
5use std::path::{Path, PathBuf};
6use std::process::Child;
7use std::process::{ChildStdout, Command, Output, Stdio};
8use std::str::from_utf8;
9
10use byteorder::ByteOrder;
11use serde::{Deserialize, Deserializer};
12
13use crate::define_error;
14
15#[derive(Debug, PartialEq, Eq)]
16pub enum CodecType {
17    Audio,
18    Video,
19    Subtitle,
20    Other(String),
21}
22
23impl<'de> Deserialize<'de> for CodecType {
24    fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
25        let s = String::deserialize(d)?;
26        match &s[..] {
27            "audio" => Ok(CodecType::Audio),
28            "video" => Ok(CodecType::Video),
29            "subtitle" => Ok(CodecType::Subtitle),
30            s => Ok(CodecType::Other(s.to_owned())),
31        }
32    }
33}
34
35#[derive(Debug, Deserialize)]
36struct Stream {
37    pub index: usize,
38    pub codec_long_name: String,
39    pub channels: Option<usize>,
40    /// `.mkv` does not store the duration in the streams; we have to use `format -> duration` instead
41    pub duration: Option<String>,
42    pub codec_type: CodecType,
43}
44
45#[derive(Debug, Deserialize)]
46struct Format {
47    pub duration: Option<String>,
48}
49
50/// Metadata associated with a video.
51#[derive(Debug, Deserialize)]
52struct Metadata {
53    streams: Vec<Stream>,
54    format: Option<Format>,
55}
56
57define_error!(DecoderError, DecoderErrorKind);
58
59#[derive(Debug, Fail)]
60pub enum DecoderErrorKind {
61    FailedToDecodeVideoStreamInfo,
62    ExtractingMetadataFailed {
63        cmd_path: PathBuf,
64        file_path: PathBuf,
65        args: Vec<OsString>,
66    },
67    NoAudioStream {
68        path: PathBuf,
69    },
70    FailedExtractingAudio {
71        file_path: PathBuf,
72        cmd_path: PathBuf,
73        args: Vec<OsString>,
74    },
75    FailedSpawningSubprocess {
76        path: PathBuf,
77        args: Vec<OsString>,
78    },
79    WaitingForProcessFailed {
80        cmd_path: PathBuf,
81    },
82    ProcessErrorCode {
83        cmd_path: PathBuf,
84        code: Option<i32>,
85    },
86    ProcessErrorMessage {
87        msg: String,
88    },
89    DeserializingMetadataFailed {
90        path: PathBuf,
91    },
92    ReadError,
93    FailedToParseDuration {
94        s: String,
95    },
96    AudioSegmentProcessingFailed,
97    NoDurationInformation,
98}
99
100fn format_cmd(cmd_path: &PathBuf, args: &[OsString]) -> String {
101    let args_string: String = args
102        .iter()
103        .map(|x| format!("{}", x.to_string_lossy()))
104        .collect::<Vec<String>>()
105        .join(" ");
106    format!("{} {}", cmd_path.display(), args_string)
107}
108
109impl fmt::Display for DecoderErrorKind {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            DecoderErrorKind::FailedToDecodeVideoStreamInfo => write!(f, "failed to decode video stream info"),
113            DecoderErrorKind::DeserializingMetadataFailed { path } => {
114                write!(f, "failed to deserialize metadata of file '{}'", path.display())
115            }
116            DecoderErrorKind::NoAudioStream { path } => write!(f, "no audio stream in file '{}'", path.display()),
117            DecoderErrorKind::FailedExtractingAudio {
118                file_path,
119                cmd_path,
120                args,
121            } => write!(
122                f,
123                "failed to extract audio from '{}' with '{}' ",
124                file_path.display(),
125                format_cmd(cmd_path, args)
126            ),
127            DecoderErrorKind::FailedSpawningSubprocess { path, args } => {
128                write!(f, "failed to spawn subprocess '{}' ", format_cmd(path, args))
129            }
130            DecoderErrorKind::WaitingForProcessFailed { cmd_path } => {
131                write!(f, "failed to check status of subprocess '{}'", cmd_path.display())
132            }
133            DecoderErrorKind::ProcessErrorCode { cmd_path, code } => write!(
134                f,
135                "process '{}' returned error code '{}'",
136                cmd_path.display(),
137                code.map(|x| x.to_string())
138                    .unwrap_or_else(|| String::from("interrupted?"))
139            ),
140            DecoderErrorKind::ProcessErrorMessage { msg } => write!(f, "stderr: {}", msg),
141            DecoderErrorKind::ExtractingMetadataFailed {
142                file_path,
143                cmd_path,
144                args,
145            } => write!(
146                f,
147                "failed to extract metadata from '{}' using command '{}'",
148                file_path.display(),
149                format_cmd(cmd_path, args)
150            ),
151            DecoderErrorKind::ReadError => write!(f, "error while reading stdout"),
152            DecoderErrorKind::FailedToParseDuration { s } => {
153                write!(f, "failed to parse duration string '{}' from metadata", s)
154            }
155            DecoderErrorKind::AudioSegmentProcessingFailed => write!(f, "processing audio segment failed"),
156            DecoderErrorKind::NoDurationInformation => write!(f, "no audio duration information found"),
157        }
158    }
159}
160
161trait IntoOk<T> {
162    fn into_ok<I>(self) -> Result<T, I>;
163}
164impl<T> IntoOk<T> for T {
165    fn into_ok<I>(self) -> Result<T, I> {
166        Ok(self)
167    }
168}
169
170pub struct VideoDecoderFFmpegBinary {}
171
172static PROGRESS_PRESCALER: i64 = 200;
173
174impl VideoDecoderFFmpegBinary {
175    /// Samples are pushed in 8kHz mono/single-channel format.
176    pub fn decode<T>(
177        file_path: impl AsRef<Path>,
178        receiver: impl super::AudioReceiver<Output = T>,
179        mut progress_handler: impl super::ProgressHandler,
180    ) -> Result<T, DecoderError> {
181        let file_path_buf: PathBuf = file_path.as_ref().into();
182
183        let args = vec![
184            OsString::from("-v"),
185            OsString::from("error"),
186            OsString::from("-show_entries"),
187            OsString::from("format=duration:stream=index,codec_long_name,channels,duration,codec_type"),
188            OsString::from("-of"),
189            OsString::from("json"),
190            OsString::from(file_path.as_ref()),
191        ];
192
193        let ffprobe_path: PathBuf = std::env::var_os("ALASS_FFPROBE_PATH")
194            .unwrap_or(OsString::from("ffprobe"))
195            .into();
196
197        let metadata: Metadata =
198            Self::get_metadata(file_path_buf.clone(), ffprobe_path.clone(), &args).with_context(|_| {
199                DecoderErrorKind::ExtractingMetadataFailed {
200                    file_path: file_path_buf.clone(),
201                    cmd_path: ffprobe_path.clone(),
202                    args: args,
203                }
204            })?;
205
206        let best_stream_opt: Option<Stream> = metadata
207            .streams
208            .into_iter()
209            .filter(|s| s.codec_type == CodecType::Audio && s.channels.is_some())
210            .min_by_key(|s| s.channels.unwrap());
211
212        let best_stream: Stream;
213        match best_stream_opt {
214            Some(x) => best_stream = x,
215            None => {
216                return Err(DecoderError::from(DecoderErrorKind::NoAudioStream {
217                    path: file_path.as_ref().into(),
218                }))
219            }
220        }
221
222        let ffmpeg_path: PathBuf = std::env::var_os("ALASS_FFMPEG_PATH")
223            .unwrap_or(OsString::from("ffmpeg"))
224            .into();
225
226        let args: Vec<OsString> = vec![
227            // only print errors
228            OsString::from("-v"),
229            OsString::from("error"),
230            // "yes" -> disables user interaction
231            OsString::from("-y"),
232            // input file
233            OsString::from("-i"),
234            file_path.as_ref().into(),
235            // select stream
236            OsString::from("-map"),
237            format!("0:{}", best_stream.index).into(),
238            // audio codec: 16-bit signed little endian
239            OsString::from("-acodec"),
240            OsString::from("pcm_s16le"),
241            // resample to 8khz
242            OsString::from("-ar"),
243            OsString::from("8000"),
244            // resample to single channel
245            OsString::from("-ac"),
246            OsString::from("1"),
247            // output 16-bit signed little endian stream directly (no wav, etc.)
248            OsString::from("-f"),
249            OsString::from("s16le"),
250            // output to stdout pipe
251            OsString::from("-"),
252        ];
253
254        let format_opt: Option<Format> = metadata.format;
255
256        // `.mkv` containers do not store duration info in streams, only the format information does contain it
257        let duration_str = best_stream
258            .duration
259            .or_else(|| format_opt.and_then(|format| format.duration))
260            .ok_or_else(|| DecoderError::from(DecoderErrorKind::NoDurationInformation))?;
261
262        let duration = duration_str
263            .parse::<f64>()
264            .with_context(|_| DecoderErrorKind::FailedToParseDuration { s: duration_str })?;
265
266        let num_samples: i64 = (duration * 8000.0) as i64 / PROGRESS_PRESCALER;
267
268        progress_handler.init(num_samples);
269
270        return Self::extract_audio_stream(receiver, progress_handler, ffmpeg_path.clone(), &args)
271            .with_context(|_| DecoderErrorKind::FailedExtractingAudio {
272                file_path: file_path_buf.clone(),
273                cmd_path: ffmpeg_path.clone(),
274                args: args,
275            })?
276            .into_ok();
277    }
278
279    fn extract_audio_stream<T>(
280        mut receiver: impl super::AudioReceiver<Output = T>,
281        mut progress_handler: impl super::ProgressHandler,
282        ffmpeg_path: PathBuf,
283        args: &[OsString],
284    ) -> Result<T, DecoderError> {
285        let mut ffmpeg_process: Child = Command::new(ffmpeg_path.clone())
286            .args(args)
287            .stdin(Stdio::null())
288            .stderr(Stdio::piped())
289            .stdout(Stdio::piped())
290            .spawn()
291            .with_context(|_| DecoderErrorKind::FailedSpawningSubprocess {
292                path: ffmpeg_path.clone(),
293                args: args.to_vec(),
294            })?;
295
296        let mut stdout: ChildStdout = ffmpeg_process.stdout.take().unwrap();
297
298        enum ParserState {
299            Start,
300            SingleByte(u8),
301        }
302
303        let mut data: Vec<u8> = std::vec::from_elem(0, 200 * 1024 * 1024);
304        let data2_cap = 1024 * 1024;
305        let mut data2: Vec<i16> = Vec::with_capacity(data2_cap);
306        let mut parser_state: ParserState = ParserState::Start;
307        let mut progress_prescaler_counter = 0;
308
309        loop {
310            // improves performance by allowing ffmpeg to generate more data in pipe
311            // TODO: an async tokio read might also have the same effect (without being as machine dependent)
312            //  -> too low: does not do anything (+some otherhead)
313            //  -> too high: slows down computaton because ffmpeg has to wait for this process to read
314            //std::thread::sleep(Duration::from_nanos(1000));
315
316            let read_bytes = stdout.read(&mut data).with_context(|_| DecoderErrorKind::ReadError)?;
317            //println!("{}", read_bytes);
318
319            if read_bytes == 0 {
320                match ffmpeg_process
321                    .wait()
322                    .with_context(|_| DecoderErrorKind::WaitingForProcessFailed {
323                        cmd_path: ffmpeg_path.clone(),
324                    })?
325                    .code()
326                {
327                    Some(0) => {
328                        receiver
329                            .push_samples(&data2)
330                            .with_context(|_| DecoderErrorKind::AudioSegmentProcessingFailed)?;
331                        data2.clear();
332                        progress_handler.finish();
333                        return Ok(receiver
334                            .finish()
335                            .with_context(|_| DecoderErrorKind::AudioSegmentProcessingFailed)?);
336                    }
337                    code @ Some(_) | code @ None => {
338                        let error_code_err: DecoderErrorKind = DecoderErrorKind::ProcessErrorCode {
339                            cmd_path: ffmpeg_path,
340                            code: code,
341                        };
342
343                        let mut stderr_data = Vec::new();
344                        ffmpeg_process
345                            .stderr
346                            .unwrap()
347                            .read_to_end(&mut stderr_data)
348                            .with_context(|_| DecoderErrorKind::ReadError)?;
349
350                        let stderr_str: String = String::from_utf8_lossy(&stderr_data).into();
351
352                        if stderr_str.is_empty() {
353                            return Err(error_code_err.into());
354                        } else {
355                            return Err(DecoderError::from(DecoderErrorKind::ProcessErrorMessage {
356                                msg: stderr_str,
357                            }))
358                            .with_context(|_| error_code_err)
359                            .map_err(|x| DecoderError::from(x));
360                        }
361                    }
362                }
363            }
364
365            for &byte in &data[0..read_bytes] {
366                match parser_state {
367                    ParserState::Start => parser_state = ParserState::SingleByte(byte),
368                    ParserState::SingleByte(last_byte) => {
369                        let two_bytes = [last_byte, byte];
370                        let sample = byteorder::LittleEndian::read_i16(&two_bytes);
371                        receiver
372                            .push_samples(&[sample])
373                            .with_context(|_| DecoderErrorKind::AudioSegmentProcessingFailed)?;
374
375                        if progress_prescaler_counter == PROGRESS_PRESCALER {
376                            progress_handler.inc();
377                            progress_prescaler_counter = 0;
378                        }
379
380                        progress_prescaler_counter = progress_prescaler_counter + 1;
381
382                        /*data2.push(sample);
383                        if data2.len() == data2_cap {
384                            receiver.push_samples(&data2);
385                            data2.clear();
386                        }*/
387                        parser_state = ParserState::Start;
388                    }
389                }
390            }
391        }
392    }
393
394    fn get_metadata(file_path: PathBuf, ffprobe_path: PathBuf, args: &[OsString]) -> Result<Metadata, DecoderError> {
395        let ffprobe_process: Output = Command::new(ffprobe_path.clone())
396            .args(args)
397            .stdin(Stdio::null())
398            .stderr(Stdio::piped())
399            .stdout(Stdio::piped())
400            .output()
401            .with_context(|_| DecoderErrorKind::FailedSpawningSubprocess {
402                path: ffprobe_path.clone(),
403                args: args.to_vec(),
404            })?;
405
406        if !ffprobe_process.status.success() {
407            let stderr: String = String::from_utf8_lossy(&ffprobe_process.stderr)
408                .to_string()
409                .trim_end()
410                .to_string();
411
412            let err = DecoderErrorKind::ProcessErrorCode {
413                cmd_path: ffprobe_path.clone(),
414                code: ffprobe_process.status.code(),
415            };
416
417            if stderr.is_empty() {
418                return Err(DecoderError::from(err));
419            } else {
420                return Err(DecoderError::from(DecoderErrorKind::ProcessErrorMessage {
421                    msg: stderr,
422                }))
423                .with_context(|_| err)
424                .map_err(|x| DecoderError::from(x));
425            }
426        }
427
428        let stdout =
429            from_utf8(&ffprobe_process.stdout).with_context(|_| DecoderErrorKind::FailedToDecodeVideoStreamInfo)?;
430
431        let metadata: Metadata = serde_json::from_str(stdout)
432            .with_context(|_| DecoderErrorKind::DeserializingMetadataFailed { path: file_path })?;
433
434        Ok(metadata)
435    }
436}