ilass_cli/video_decoder/
ffmpeg_binary.rs

1use failure::{Backtrace, Context, Fail, ResultExt};
2use std::ffi::OsString;
3use std::fmt;
4use std::io::Read;
5use std::path::{Path, PathBuf};
6use std::process::Child;
7use std::process::{ChildStdout, Command, Output, Stdio};
8use std::str::from_utf8;
9
10use byteorder::ByteOrder;
11use serde::{Deserialize, Deserializer};
12
13use crate::define_error;
14
15#[derive(Debug, PartialEq, Eq)]
16pub enum CodecType {
17    Audio,
18    Video,
19    Subtitle,
20    Other(String),
21}
22
23impl<'de> Deserialize<'de> for CodecType {
24    fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
25        let s = String::deserialize(d)?;
26        match &s[..] {
27            "audio" => Ok(CodecType::Audio),
28            "video" => Ok(CodecType::Video),
29            "subtitle" => Ok(CodecType::Subtitle),
30            s => Ok(CodecType::Other(s.to_owned())),
31        }
32    }
33}
34
35#[derive(Debug, Deserialize)]
36struct Stream {
37    pub index: usize,
38    pub codec_long_name: String,
39    pub channels: Option<usize>,
40    /// `.mkv` does not store the duration in the streams; we have to use `format -> duration` instead
41    pub duration: Option<String>,
42    pub codec_type: CodecType,
43}
44
45#[derive(Debug, Deserialize)]
46struct Format {
47    pub duration: Option<String>,
48}
49
50/// Metadata associated with a video.
51#[derive(Debug, Deserialize)]
52struct Metadata {
53    streams: Vec<Stream>,
54    format: Option<Format>,
55}
56
57define_error!(DecoderError, DecoderErrorKind);
58
59#[derive(Debug, Fail)]
60pub enum DecoderErrorKind {
61    FailedToDecodeVideoStreamInfo,
62    ExtractingMetadataFailed {
63        cmd_path: PathBuf,
64        file_path: PathBuf,
65        args: Vec<OsString>,
66    },
67    NoAudioStream {
68        path: PathBuf,
69    },
70    FailedExtractingAudio {
71        file_path: PathBuf,
72        cmd_path: PathBuf,
73        args: Vec<OsString>,
74    },
75    FailedSpawningSubprocess {
76        path: PathBuf,
77        args: Vec<OsString>,
78    },
79    WaitingForProcessFailed {
80        cmd_path: PathBuf,
81    },
82    ProcessErrorCode {
83        cmd_path: PathBuf,
84        code: Option<i32>,
85    },
86    ProcessErrorMessage {
87        msg: String,
88    },
89    DeserializingMetadataFailed {
90        path: PathBuf,
91    },
92    ReadError,
93    FailedToParseDuration {
94        s: String,
95    },
96    AudioSegmentProcessingFailed,
97    NoDurationInformation,
98}
99
100fn format_cmd(cmd_path: &PathBuf, args: &[OsString]) -> String {
101    let args_string: String = args
102        .iter()
103        .map(|x| format!("{}", x.to_string_lossy()))
104        .collect::<Vec<String>>()
105        .join(" ");
106    format!("{} {}", cmd_path.display(), args_string)
107}
108
109impl fmt::Display for DecoderErrorKind {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            DecoderErrorKind::FailedToDecodeVideoStreamInfo => write!(f, "failed to decode video stream info"),
113            DecoderErrorKind::DeserializingMetadataFailed { path } => {
114                write!(f, "failed to deserialize metadata of file '{}'", path.display())
115            }
116            DecoderErrorKind::NoAudioStream { path } => write!(f, "no audio stream in file '{}'", path.display()),
117            DecoderErrorKind::FailedExtractingAudio {
118                file_path,
119                cmd_path,
120                args,
121            } => write!(
122                f,
123                "failed to extract audio from '{}' with '{}' ",
124                file_path.display(),
125                format_cmd(cmd_path, args)
126            ),
127            DecoderErrorKind::FailedSpawningSubprocess { path, args } => {
128                write!(f, "failed to spawn subprocess '{}' ", format_cmd(path, args))
129            }
130            DecoderErrorKind::WaitingForProcessFailed { cmd_path } => {
131                write!(f, "failed to check status of subprocess '{}'", cmd_path.display())
132            }
133            DecoderErrorKind::ProcessErrorCode { cmd_path, code } => write!(
134                f,
135                "process '{}' returned error code '{}'",
136                cmd_path.display(),
137                code.map(|x| x.to_string())
138                    .unwrap_or_else(|| String::from("interrupted?"))
139            ),
140            DecoderErrorKind::ProcessErrorMessage { msg } => write!(f, "stderr: {}", msg),
141            DecoderErrorKind::ExtractingMetadataFailed {
142                file_path,
143                cmd_path,
144                args,
145            } => write!(
146                f,
147                "failed to extract metadata from '{}' using command '{}'",
148                file_path.display(),
149                format_cmd(cmd_path, args)
150            ),
151            DecoderErrorKind::ReadError => write!(f, "error while reading stdout"),
152            DecoderErrorKind::FailedToParseDuration { s } => {
153                write!(f, "failed to parse duration string '{}' from metadata", s)
154            }
155            DecoderErrorKind::AudioSegmentProcessingFailed => write!(f, "processing audio segment failed"),
156            DecoderErrorKind::NoDurationInformation => write!(f, "no audio duration information found"),
157        }
158    }
159}
160
161trait IntoOk<T> {
162    fn into_ok<I>(self) -> Result<T, I>;
163}
164impl<T> IntoOk<T> for T {
165    fn into_ok<I>(self) -> Result<T, I> {
166        Ok(self)
167    }
168}
169
170pub struct VideoDecoderFFmpegBinary {}
171
172static PROGRESS_PRESCALER: i64 = 200;
173
174impl VideoDecoderFFmpegBinary {
175    /// Samples are pushed in 8kHz mono/single-channel format.
176    pub fn decode<T>(
177        file_path: impl AsRef<Path>,
178        audio_index: Option<usize>,
179        receiver: impl super::AudioReceiver<Output = T>,
180        mut progress_handler: impl super::ProgressHandler,
181    ) -> Result<T, DecoderError> {
182        let file_path_buf: PathBuf = file_path.as_ref().into();
183
184        let args = vec![
185            OsString::from("-v"),
186            OsString::from("error"),
187            OsString::from("-show_entries"),
188            OsString::from("format=duration:stream=index,codec_long_name,channels,duration,codec_type"),
189            OsString::from("-of"),
190            OsString::from("json"),
191            OsString::from(file_path.as_ref()),
192        ];
193
194        let ffprobe_path: PathBuf = std::env::var_os("ILASS_FFPROBE_PATH")
195            .unwrap_or(OsString::from("ffprobe"))
196            .into();
197
198        let metadata: Metadata =
199            Self::get_metadata(file_path_buf.clone(), ffprobe_path.clone(), &args).with_context(|_| {
200                DecoderErrorKind::ExtractingMetadataFailed {
201                    file_path: file_path_buf.clone(),
202                    cmd_path: ffprobe_path.clone(),
203                    args: args,
204                }
205            })?;
206
207        let mut audio_streams = metadata
208            .streams
209            .into_iter()
210            .filter(|s| s.codec_type == CodecType::Audio && s.channels.is_some());
211
212        let best_stream_opt = match audio_index {
213            None => audio_streams
214                .min_by_key(|s| s.channels.unwrap()),
215            Some(ai) => audio_streams
216                .find(|s| s.index == ai)
217        };
218
219        let best_stream: Stream;
220        match best_stream_opt {
221            Some(x) => best_stream = x,
222            None => {
223                return Err(DecoderError::from(DecoderErrorKind::NoAudioStream {
224                    path: file_path.as_ref().into(),
225                }))
226            }
227        }
228
229        let ffmpeg_path: PathBuf = std::env::var_os("ILASS_FFMPEG_PATH")
230            .unwrap_or(OsString::from("ffmpeg"))
231            .into();
232
233        let args: Vec<OsString> = vec![
234            // only print errors
235            OsString::from("-v"),
236            OsString::from("error"),
237            // "yes" -> disables user interaction
238            OsString::from("-y"),
239            // input file
240            OsString::from("-i"),
241            file_path.as_ref().into(),
242            // select stream
243            OsString::from("-map"),
244            format!("0:{}", best_stream.index).into(),
245            // audio codec: 16-bit signed little endian
246            OsString::from("-acodec"),
247            OsString::from("pcm_s16le"),
248            // resample to 8khz
249            OsString::from("-ar"),
250            OsString::from("8000"),
251            // resample to single channel
252            OsString::from("-ac"),
253            OsString::from("1"),
254            // output 16-bit signed little endian stream directly (no wav, etc.)
255            OsString::from("-f"),
256            OsString::from("s16le"),
257            // output to stdout pipe
258            OsString::from("-"),
259        ];
260
261        let format_opt: Option<Format> = metadata.format;
262
263        // `.mkv` containers do not store duration info in streams, only the format information does contain it
264        let duration_str = best_stream
265            .duration
266            .or_else(|| format_opt.and_then(|format| format.duration))
267            .ok_or_else(|| DecoderError::from(DecoderErrorKind::NoDurationInformation))?;
268
269        let duration = duration_str
270            .parse::<f64>()
271            .with_context(|_| DecoderErrorKind::FailedToParseDuration { s: duration_str })?;
272
273        let num_samples: i64 = (duration * 8000.0) as i64 / PROGRESS_PRESCALER;
274
275        progress_handler.init(num_samples);
276
277        return Self::extract_audio_stream(receiver, progress_handler, ffmpeg_path.clone(), &args)
278            .with_context(|_| DecoderErrorKind::FailedExtractingAudio {
279                file_path: file_path_buf.clone(),
280                cmd_path: ffmpeg_path.clone(),
281                args: args,
282            })?
283            .into_ok();
284    }
285
286    fn extract_audio_stream<T>(
287        mut receiver: impl super::AudioReceiver<Output = T>,
288        mut progress_handler: impl super::ProgressHandler,
289        ffmpeg_path: PathBuf,
290        args: &[OsString],
291    ) -> Result<T, DecoderError> {
292        let mut ffmpeg_process: Child = Command::new(ffmpeg_path.clone())
293            .args(args)
294            .stdin(Stdio::null())
295            .stderr(Stdio::piped())
296            .stdout(Stdio::piped())
297            .spawn()
298            .with_context(|_| DecoderErrorKind::FailedSpawningSubprocess {
299                path: ffmpeg_path.clone(),
300                args: args.to_vec(),
301            })?;
302
303        let mut stdout: ChildStdout = ffmpeg_process.stdout.take().unwrap();
304
305        enum ParserState {
306            Start,
307            SingleByte(u8),
308        }
309
310        let mut data: Vec<u8> = std::vec::from_elem(0, 200 * 1024 * 1024);
311        let data2_cap = 1024 * 1024;
312        let mut data2: Vec<i16> = Vec::with_capacity(data2_cap);
313        let mut parser_state: ParserState = ParserState::Start;
314        let mut progress_prescaler_counter = 0;
315
316        loop {
317            // improves performance by allowing ffmpeg to generate more data in pipe
318            // TODO: an async tokio read might also have the same effect (without being as machine dependent)
319            //  -> too low: does not do anything (+some otherhead)
320            //  -> too high: slows down computaton because ffmpeg has to wait for this process to read
321            //std::thread::sleep(Duration::from_nanos(1000));
322
323            let read_bytes = stdout.read(&mut data).with_context(|_| DecoderErrorKind::ReadError)?;
324            //println!("{}", read_bytes);
325
326            if read_bytes == 0 {
327                match ffmpeg_process
328                    .wait()
329                    .with_context(|_| DecoderErrorKind::WaitingForProcessFailed {
330                        cmd_path: ffmpeg_path.clone(),
331                    })?
332                    .code()
333                {
334                    Some(0) => {
335                        receiver
336                            .push_samples(&data2)
337                            .with_context(|_| DecoderErrorKind::AudioSegmentProcessingFailed)?;
338                        data2.clear();
339                        progress_handler.finish();
340                        return Ok(receiver
341                            .finish()
342                            .with_context(|_| DecoderErrorKind::AudioSegmentProcessingFailed)?);
343                    }
344                    code @ Some(_) | code @ None => {
345                        let error_code_err: DecoderErrorKind = DecoderErrorKind::ProcessErrorCode {
346                            cmd_path: ffmpeg_path,
347                            code: code,
348                        };
349
350                        let mut stderr_data = Vec::new();
351                        ffmpeg_process
352                            .stderr
353                            .unwrap()
354                            .read_to_end(&mut stderr_data)
355                            .with_context(|_| DecoderErrorKind::ReadError)?;
356
357                        let stderr_str: String = String::from_utf8_lossy(&stderr_data).into();
358
359                        if stderr_str.is_empty() {
360                            return Err(error_code_err.into());
361                        } else {
362                            return Err(DecoderError::from(DecoderErrorKind::ProcessErrorMessage {
363                                msg: stderr_str,
364                            }))
365                            .with_context(|_| error_code_err)
366                            .map_err(|x| DecoderError::from(x));
367                        }
368                    }
369                }
370            }
371
372            for &byte in &data[0..read_bytes] {
373                match parser_state {
374                    ParserState::Start => parser_state = ParserState::SingleByte(byte),
375                    ParserState::SingleByte(last_byte) => {
376                        let two_bytes = [last_byte, byte];
377                        let sample = byteorder::LittleEndian::read_i16(&two_bytes);
378                        receiver
379                            .push_samples(&[sample])
380                            .with_context(|_| DecoderErrorKind::AudioSegmentProcessingFailed)?;
381
382                        if progress_prescaler_counter == PROGRESS_PRESCALER {
383                            progress_handler.inc();
384                            progress_prescaler_counter = 0;
385                        }
386
387                        progress_prescaler_counter = progress_prescaler_counter + 1;
388
389                        /*data2.push(sample);
390                        if data2.len() == data2_cap {
391                            receiver.push_samples(&data2);
392                            data2.clear();
393                        }*/
394                        parser_state = ParserState::Start;
395                    }
396                }
397            }
398        }
399    }
400
401    fn get_metadata(file_path: PathBuf, ffprobe_path: PathBuf, args: &[OsString]) -> Result<Metadata, DecoderError> {
402        let ffprobe_process: Output = Command::new(ffprobe_path.clone())
403            .args(args)
404            .stdin(Stdio::null())
405            .stderr(Stdio::piped())
406            .stdout(Stdio::piped())
407            .output()
408            .with_context(|_| DecoderErrorKind::FailedSpawningSubprocess {
409                path: ffprobe_path.clone(),
410                args: args.to_vec(),
411            })?;
412
413        if !ffprobe_process.status.success() {
414            let stderr: String = String::from_utf8_lossy(&ffprobe_process.stderr)
415                .to_string()
416                .trim_end()
417                .to_string();
418
419            let err = DecoderErrorKind::ProcessErrorCode {
420                cmd_path: ffprobe_path.clone(),
421                code: ffprobe_process.status.code(),
422            };
423
424            if stderr.is_empty() {
425                return Err(DecoderError::from(err));
426            } else {
427                return Err(DecoderError::from(DecoderErrorKind::ProcessErrorMessage {
428                    msg: stderr,
429                }))
430                .with_context(|_| err)
431                .map_err(|x| DecoderError::from(x));
432            }
433        }
434
435        let stdout =
436            from_utf8(&ffprobe_process.stdout).with_context(|_| DecoderErrorKind::FailedToDecodeVideoStreamInfo)?;
437
438        let metadata: Metadata = serde_json::from_str(stdout)
439            .with_context(|_| DecoderErrorKind::DeserializingMetadataFailed { path: file_path })?;
440
441        Ok(metadata)
442    }
443}