ffmpeg_common/
process.rs

1use std::path::{Path, PathBuf};
2use std::process::Stdio;
3use std::time::Duration;
4use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
5use tokio::process::{Child, Command};
6use tokio::time::timeout;
7use tracing::{debug, trace};
8use which::which;
9
10use crate::error::{Error, Result};
11
12/// Find the path to an FFmpeg executable
13pub fn find_executable(name: &str) -> Result<PathBuf> {
14    which(name).map_err(|_| Error::ExecutableNotFound(name.to_string()))
15}
16
17/// Process execution configuration
18#[derive(Debug, Clone)]
19pub struct ProcessConfig {
20    /// Executable path
21    pub executable: PathBuf,
22    /// Working directory
23    pub working_dir: Option<PathBuf>,
24    /// Environment variables
25    pub env: Vec<(String, String)>,
26    /// Timeout for the process
27    pub timeout: Option<Duration>,
28    /// Whether to capture stdout
29    pub capture_stdout: bool,
30    /// Whether to capture stderr
31    pub capture_stderr: bool,
32    /// Whether to pipe stdin
33    pub pipe_stdin: bool,
34}
35
36impl ProcessConfig {
37    /// Create a new process configuration
38    pub fn new(executable: impl Into<PathBuf>) -> Self {
39        Self {
40            executable: executable.into(),
41            working_dir: None,
42            env: Vec::new(),
43            timeout: None,
44            capture_stdout: true,
45            capture_stderr: true,
46            pipe_stdin: false,
47        }
48    }
49
50    /// Set working directory
51    pub fn working_dir(mut self, dir: impl Into<PathBuf>) -> Self {
52        self.working_dir = Some(dir.into());
53        self
54    }
55
56    /// Add environment variable
57    pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
58        self.env.push((key.into(), value.into()));
59        self
60    }
61
62    /// Set timeout
63    pub fn timeout(mut self, duration: Duration) -> Self {
64        self.timeout = Some(duration);
65        self
66    }
67
68    /// Set stdout capture
69    pub fn capture_stdout(mut self, capture: bool) -> Self {
70        self.capture_stdout = capture;
71        self
72    }
73
74    /// Set stderr capture
75    pub fn capture_stderr(mut self, capture: bool) -> Self {
76        self.capture_stderr = capture;
77        self
78    }
79
80    /// Set stdin piping
81    pub fn pipe_stdin(mut self, pipe: bool) -> Self {
82        self.pipe_stdin = pipe;
83        self
84    }
85}
86
87/// Process handle for running FFmpeg processes
88pub struct Process {
89    child: Child,
90    config: ProcessConfig,
91}
92
93impl Process {
94    /// Spawn a new process with arguments
95    pub async fn spawn(config: ProcessConfig, args: Vec<String>) -> Result<Self> {
96        debug!("Spawning process: {} {:?}", config.executable.display(), args);
97
98        let mut cmd = Command::new(&config.executable);
99
100        // Add arguments
101        for arg in &args {
102            cmd.arg(arg);
103        }
104
105        // Set working directory
106        if let Some(ref dir) = config.working_dir {
107            cmd.current_dir(dir);
108        }
109
110        // Set environment variables
111        for (key, value) in &config.env {
112            cmd.env(key, value);
113        }
114
115        // Configure stdio
116        cmd.stdin(if config.pipe_stdin {
117            Stdio::piped()
118        } else {
119            Stdio::null()
120        });
121
122        cmd.stdout(if config.capture_stdout {
123            Stdio::piped()
124        } else {
125            Stdio::null()
126        });
127
128        cmd.stderr(if config.capture_stderr {
129            Stdio::piped()
130        } else {
131            Stdio::null()
132        });
133
134        // Kill on drop
135        cmd.kill_on_drop(true);
136
137        let child = cmd.spawn().map_err(Error::Io)?;
138
139        Ok(Self { child, config })
140    }
141
142    /// Wait for the process to complete
143    pub async fn wait(mut self) -> Result<ProcessOutput> {
144        // This async block will capture the process output.
145        // We explicitly map `std::io::Error` to our custom `Error::Io` variant
146        // to resolve the compiler's type inference ambiguity.
147        let wait_future = async {
148            let status = self.child.wait().await.map_err(Error::Io)?;
149
150            let stdout = if self.config.capture_stdout {
151                if let Some(mut stdout) = self.child.stdout.take() {
152                    let mut buf = Vec::new();
153                    stdout.read_to_end(&mut buf).await.map_err(Error::Io)?;
154                    Some(buf)
155                } else {
156                    None
157                }
158            } else {
159                None
160            };
161
162            let stderr = if self.config.capture_stderr {
163                if let Some(mut stderr) = self.child.stderr.take() {
164                    let mut buf = Vec::new();
165                    stderr.read_to_end(&mut buf).await.map_err(Error::Io)?;
166                    Some(buf)
167                } else {
168                    None
169                }
170            } else {
171                None
172            };
173
174            Ok(ProcessOutput {
175                status,
176                stdout,
177                stderr,
178            })
179        };
180
181        if let Some(timeout_duration) = self.config.timeout {
182            match timeout(timeout_duration, wait_future).await {
183                // The future completed without timing out. `result` is the `Result` from our future.
184                Ok(result) => result,
185                // The future timed out.
186                Err(_) => {
187                    let _ = self.child.kill().await;
188                    Err(Error::Timeout(timeout_duration))
189                }
190            }
191        } else {
192            // No timeout configured, just await the future.
193            wait_future.await
194        }
195    }
196
197    /// Get a handle to stdin
198    pub fn stdin(&mut self) -> Option<tokio::process::ChildStdin> {
199        self.child.stdin.take()
200    }
201
202    /// Get a handle to stdout
203    pub fn stdout(&mut self) -> Option<tokio::process::ChildStdout> {
204        self.child.stdout.take()
205    }
206
207    /// Get a handle to stderr
208    pub fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
209        self.child.stderr.take()
210    }
211
212    /// Kill the process
213    pub async fn kill(&mut self) -> Result<()> {
214        self.child.kill().await.map_err(Error::Io)
215    }
216
217    /// Get the process ID
218    pub fn id(&self) -> Option<u32> {
219        self.child.id()
220    }
221
222    /// Try to wait for the process without blocking
223    pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
224        // Explicitly map the error to avoid ambiguity with the `?` operator.
225        self.child.try_wait().map_err(Error::Io)
226    }
227}
228
229/// Output from a completed process
230#[derive(Debug)]
231pub struct ProcessOutput {
232    /// Exit status
233    pub status: std::process::ExitStatus,
234    /// Stdout data if captured
235    pub stdout: Option<Vec<u8>>,
236    /// Stderr data if captured
237    pub stderr: Option<Vec<u8>>,
238}
239
240impl ProcessOutput {
241    /// Check if the process succeeded
242    pub fn success(&self) -> bool {
243        self.status.success()
244    }
245
246    /// Get stdout as string
247    pub fn stdout_str(&self) -> Option<String> {
248        self.stdout.as_ref().map(|b| String::from_utf8_lossy(b).into_owned())
249    }
250
251    /// Get stderr as string
252    pub fn stderr_str(&self) -> Option<String> {
253        self.stderr.as_ref().map(|b| String::from_utf8_lossy(b).into_owned())
254    }
255
256    /// Convert to a Result, treating non-zero exit as error
257    pub fn into_result(self) -> Result<Self> {
258        if self.success() {
259            Ok(self)
260        } else {
261            Err(Error::process_failed(
262                format!("Process exited with status: {}", self.status),
263                Some(self.status),
264                self.stderr_str(),
265            ))
266        }
267    }
268}
269
270/// Progress information from FFmpeg
271#[derive(Debug, Clone)]
272pub struct Progress {
273    /// Current frame number
274    pub frame: Option<u64>,
275    /// Frames per second
276    pub fps: Option<f64>,
277    /// Quality factor
278    pub q: Option<f64>,
279    /// Current size in bytes
280    pub size: Option<u64>,
281    /// Current time position
282    pub time: Option<Duration>,
283    /// Bitrate in bits/s
284    pub bitrate: Option<f64>,
285    /// Processing speed
286    pub speed: Option<f64>,
287}
288
289impl Progress {
290    /// Parse progress from FFmpeg stderr line
291    pub fn parse_line(line: &str) -> Option<Self> {
292        if !line.contains("frame=") {
293            return None;
294        }
295
296        let mut progress = Progress {
297            frame: None,
298            fps: None,
299            q: None,
300            size: None,
301            time: None,
302            bitrate: None,
303            speed: None,
304        };
305
306        // This is a more robust way to parse the key-value pairs from FFmpeg,
307        // which can have inconsistent spacing (e.g., "key=value" or "key= value").
308        let parts: Vec<&str> = line.split_whitespace().collect();
309        let mut i = 0;
310        while i < parts.len() {
311            if let Some((key, mut value)) = parts[i].split_once('=') {
312                // If `split_once` gives an empty value, it means the actual value
313                // is the next element in `parts` (e.g., "frame=", "100").
314                if value.is_empty() {
315                    if let Some(next_part) = parts.get(i + 1) {
316                        value = next_part;
317                        i += 1; // Manually advance to skip the value part in the next iteration.
318                    }
319                }
320
321                match key.trim() {
322                    "frame" => progress.frame = value.trim().parse().ok(),
323                    "fps" => progress.fps = value.trim().parse().ok(),
324                    "q" => progress.q = value.trim().parse().ok(),
325                    "size" => {
326                        // Remove "kB" suffix and convert to bytes
327                        if let Some(kb_str) = value.trim().strip_suffix("kB") {
328                            progress.size = kb_str.parse::<u64>().ok().map(|kb| kb * 1024);
329                        }
330                    }
331                    "time" => {
332                        // Parse time in HH:MM:SS.MS format
333                        if let Ok(duration) = crate::types::Duration::from_ffmpeg_format(value.trim()) {
334                            progress.time = Some(duration.into());
335                        }
336                    }
337                    "bitrate" => {
338                        // Remove "kbits/s" suffix
339                        if let Some(kbits_str) = value.trim().strip_suffix("kbits/s") {
340                            progress.bitrate = kbits_str.parse::<f64>().ok().map(|kb| kb * 1000.0);
341                        }
342                    }
343                    "speed" => {
344                        // Remove "x" suffix
345                        if let Some(speed_str) = value.trim().strip_suffix('x') {
346                            progress.speed = speed_str.parse().ok();
347                        }
348                    }
349                    _ => {}
350                }
351            }
352            i += 1;
353        }
354
355        Some(progress)
356    }
357}
358
359/// Progress callback type
360pub type ProgressCallback = Box<dyn Fn(Progress) + Send + Sync>;
361
362/// Stream progress updates from FFmpeg stderr
363pub async fn stream_progress<R: AsyncRead + Unpin + Send + 'static>(
364    stderr: R,
365    mut callback: impl FnMut(Progress) + Send + 'static,
366) {
367    let reader = BufReader::new(stderr);
368    let mut lines = reader.lines();
369
370    while let Ok(Some(line)) = lines.next_line().await {
371        trace!("FFmpeg stderr: {}", line);
372        if let Some(progress) = Progress::parse_line(&line) {
373            callback(progress);
374        }
375    }
376}
377
378/// Command builder with safe argument construction
379#[derive(Debug, Clone)]
380pub struct CommandBuilder {
381    args: Vec<String>,
382}
383
384impl CommandBuilder {
385    /// Create a new command builder
386    pub fn new() -> Self {
387        Self { args: Vec::new() }
388    }
389
390    /// Add a flag (no value)
391    pub fn flag(mut self, flag: impl AsRef<str>) -> Self {
392        self.args.push(flag.as_ref().to_string());
393        self
394    }
395
396    /// Add an option with a value
397    pub fn option(mut self, key: impl AsRef<str>, value: impl ToString) -> Self {
398        self.args.push(key.as_ref().to_string());
399        self.args.push(value.to_string());
400        self
401    }
402
403    /// Add an option only if the value is Some
404    pub fn option_if_some<T: ToString>(self, key: impl AsRef<str>, value: Option<T>) -> Self {
405        if let Some(val) = value {
406            self.option(key, val)
407        } else {
408            self
409        }
410    }
411
412    /// Add a flag only if the condition is true
413    pub fn flag_if(self, flag: impl AsRef<str>, condition: bool) -> Self {
414        if condition {
415            self.flag(flag)
416        } else {
417            self
418        }
419    }
420
421    /// Add raw arguments
422    pub fn args(mut self, args: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
423        for arg in args {
424            self.args.push(arg.as_ref().to_string());
425        }
426        self
427    }
428
429    /// Add raw argument
430    pub fn arg(mut self, arg: impl AsRef<str>) -> Self {
431        self.args.push(arg.as_ref().to_string());
432        self
433    }
434
435    /// Build into a vector of arguments
436    pub fn build(self) -> Vec<String> {
437        self.args
438    }
439}
440
441impl Default for CommandBuilder {
442    fn default() -> Self {
443        Self::new()
444    }
445}
446
447/// Helper to validate paths exist
448pub fn validate_input_path(path: &Path) -> Result<()> {
449    if !path.exists() {
450        return Err(Error::Io(std::io::Error::new(
451            std::io::ErrorKind::NotFound,
452            format!("Input file not found: {}", path.display()),
453        )));
454    }
455    Ok(())
456}
457
458/// Helper to validate output path can be written
459pub fn validate_output_path(path: &Path) -> Result<()> {
460    if let Some(parent) = path.parent() {
461        if !parent.exists() {
462            return Err(Error::Io(std::io::Error::new(
463                std::io::ErrorKind::NotFound,
464                format!("Output directory does not exist: {}", parent.display()),
465            )));
466        }
467    }
468    Ok(())
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use std::time::Duration;
475
476    #[test]
477    fn test_command_builder() {
478        let args = CommandBuilder::new()
479            .flag("-y")
480            .option("-i", "input.mp4")
481            .option_if_some("-ss", Some("00:00:10"))
482            .option_if_some("-t", None::<&str>)
483            .flag_if("-n", false)
484            .arg("output.mp4")
485            .build();
486
487        assert_eq!(args, vec!["-y", "-i", "input.mp4", "-ss", "00:00:10", "output.mp4"]);
488    }
489
490    #[test]
491    fn test_progress_parsing() {
492        let line = "frame=  100 fps=25.0 q=28.0 size=    1024kB time=00:00:04.00 bitrate=2097.2kbits/s speed=1.00x";
493        let progress = Progress::parse_line(line).unwrap();
494
495        assert_eq!(progress.frame, Some(100));
496        assert_eq!(progress.fps, Some(25.0));
497        assert_eq!(progress.q, Some(28.0));
498        assert_eq!(progress.size, Some(1024 * 1024));
499        assert_eq!(progress.time, Some(Duration::from_secs(4)));
500        assert_eq!(progress.bitrate, Some(2_097_200.0));
501        assert_eq!(progress.speed, Some(1.0));
502    }
503}