1use std::path::{Path, PathBuf};
2use std::process::Stdio;
3use std::time::Duration;
4use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
5use tokio::process::{Child, Command};
6use tokio::time::timeout;
7use tracing::{debug, trace};
8use which::which;
9
10use crate::error::{Error, Result};
11
12pub fn find_executable(name: &str) -> Result<PathBuf> {
14 which(name).map_err(|_| Error::ExecutableNotFound(name.to_string()))
15}
16
17#[derive(Debug, Clone)]
19pub struct ProcessConfig {
20 pub executable: PathBuf,
22 pub working_dir: Option<PathBuf>,
24 pub env: Vec<(String, String)>,
26 pub timeout: Option<Duration>,
28 pub capture_stdout: bool,
30 pub capture_stderr: bool,
32 pub pipe_stdin: bool,
34}
35
36impl ProcessConfig {
37 pub fn new(executable: impl Into<PathBuf>) -> Self {
39 Self {
40 executable: executable.into(),
41 working_dir: None,
42 env: Vec::new(),
43 timeout: None,
44 capture_stdout: true,
45 capture_stderr: true,
46 pipe_stdin: false,
47 }
48 }
49
50 pub fn working_dir(mut self, dir: impl Into<PathBuf>) -> Self {
52 self.working_dir = Some(dir.into());
53 self
54 }
55
56 pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
58 self.env.push((key.into(), value.into()));
59 self
60 }
61
62 pub fn timeout(mut self, duration: Duration) -> Self {
64 self.timeout = Some(duration);
65 self
66 }
67
68 pub fn capture_stdout(mut self, capture: bool) -> Self {
70 self.capture_stdout = capture;
71 self
72 }
73
74 pub fn capture_stderr(mut self, capture: bool) -> Self {
76 self.capture_stderr = capture;
77 self
78 }
79
80 pub fn pipe_stdin(mut self, pipe: bool) -> Self {
82 self.pipe_stdin = pipe;
83 self
84 }
85}
86
87pub struct Process {
89 child: Child,
90 config: ProcessConfig,
91}
92
93impl Process {
94 pub async fn spawn(config: ProcessConfig, args: Vec<String>) -> Result<Self> {
96 debug!("Spawning process: {} {:?}", config.executable.display(), args);
97
98 let mut cmd = Command::new(&config.executable);
99
100 for arg in &args {
102 cmd.arg(arg);
103 }
104
105 if let Some(ref dir) = config.working_dir {
107 cmd.current_dir(dir);
108 }
109
110 for (key, value) in &config.env {
112 cmd.env(key, value);
113 }
114
115 cmd.stdin(if config.pipe_stdin {
117 Stdio::piped()
118 } else {
119 Stdio::null()
120 });
121
122 cmd.stdout(if config.capture_stdout {
123 Stdio::piped()
124 } else {
125 Stdio::null()
126 });
127
128 cmd.stderr(if config.capture_stderr {
129 Stdio::piped()
130 } else {
131 Stdio::null()
132 });
133
134 cmd.kill_on_drop(true);
136
137 let child = cmd.spawn().map_err(Error::Io)?;
138
139 Ok(Self { child, config })
140 }
141
142 pub async fn wait(mut self) -> Result<ProcessOutput> {
144 let wait_future = async {
148 let status = self.child.wait().await.map_err(Error::Io)?;
149
150 let stdout = if self.config.capture_stdout {
151 if let Some(mut stdout) = self.child.stdout.take() {
152 let mut buf = Vec::new();
153 stdout.read_to_end(&mut buf).await.map_err(Error::Io)?;
154 Some(buf)
155 } else {
156 None
157 }
158 } else {
159 None
160 };
161
162 let stderr = if self.config.capture_stderr {
163 if let Some(mut stderr) = self.child.stderr.take() {
164 let mut buf = Vec::new();
165 stderr.read_to_end(&mut buf).await.map_err(Error::Io)?;
166 Some(buf)
167 } else {
168 None
169 }
170 } else {
171 None
172 };
173
174 Ok(ProcessOutput {
175 status,
176 stdout,
177 stderr,
178 })
179 };
180
181 if let Some(timeout_duration) = self.config.timeout {
182 match timeout(timeout_duration, wait_future).await {
183 Ok(result) => result,
185 Err(_) => {
187 let _ = self.child.kill().await;
188 Err(Error::Timeout(timeout_duration))
189 }
190 }
191 } else {
192 wait_future.await
194 }
195 }
196
197 pub fn stdin(&mut self) -> Option<tokio::process::ChildStdin> {
199 self.child.stdin.take()
200 }
201
202 pub fn stdout(&mut self) -> Option<tokio::process::ChildStdout> {
204 self.child.stdout.take()
205 }
206
207 pub fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
209 self.child.stderr.take()
210 }
211
212 pub async fn kill(&mut self) -> Result<()> {
214 self.child.kill().await.map_err(Error::Io)
215 }
216
217 pub fn id(&self) -> Option<u32> {
219 self.child.id()
220 }
221
222 pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
224 self.child.try_wait().map_err(Error::Io)
226 }
227}
228
229#[derive(Debug)]
231pub struct ProcessOutput {
232 pub status: std::process::ExitStatus,
234 pub stdout: Option<Vec<u8>>,
236 pub stderr: Option<Vec<u8>>,
238}
239
240impl ProcessOutput {
241 pub fn success(&self) -> bool {
243 self.status.success()
244 }
245
246 pub fn stdout_str(&self) -> Option<String> {
248 self.stdout.as_ref().map(|b| String::from_utf8_lossy(b).into_owned())
249 }
250
251 pub fn stderr_str(&self) -> Option<String> {
253 self.stderr.as_ref().map(|b| String::from_utf8_lossy(b).into_owned())
254 }
255
256 pub fn into_result(self) -> Result<Self> {
258 if self.success() {
259 Ok(self)
260 } else {
261 Err(Error::process_failed(
262 format!("Process exited with status: {}", self.status),
263 Some(self.status),
264 self.stderr_str(),
265 ))
266 }
267 }
268}
269
270#[derive(Debug, Clone)]
272pub struct Progress {
273 pub frame: Option<u64>,
275 pub fps: Option<f64>,
277 pub q: Option<f64>,
279 pub size: Option<u64>,
281 pub time: Option<Duration>,
283 pub bitrate: Option<f64>,
285 pub speed: Option<f64>,
287}
288
289impl Progress {
290 pub fn parse_line(line: &str) -> Option<Self> {
292 if !line.contains("frame=") {
293 return None;
294 }
295
296 let mut progress = Progress {
297 frame: None,
298 fps: None,
299 q: None,
300 size: None,
301 time: None,
302 bitrate: None,
303 speed: None,
304 };
305
306 let parts: Vec<&str> = line.split_whitespace().collect();
309 let mut i = 0;
310 while i < parts.len() {
311 if let Some((key, mut value)) = parts[i].split_once('=') {
312 if value.is_empty() {
315 if let Some(next_part) = parts.get(i + 1) {
316 value = next_part;
317 i += 1; }
319 }
320
321 match key.trim() {
322 "frame" => progress.frame = value.trim().parse().ok(),
323 "fps" => progress.fps = value.trim().parse().ok(),
324 "q" => progress.q = value.trim().parse().ok(),
325 "size" => {
326 if let Some(kb_str) = value.trim().strip_suffix("kB") {
328 progress.size = kb_str.parse::<u64>().ok().map(|kb| kb * 1024);
329 }
330 }
331 "time" => {
332 if let Ok(duration) = crate::types::Duration::from_ffmpeg_format(value.trim()) {
334 progress.time = Some(duration.into());
335 }
336 }
337 "bitrate" => {
338 if let Some(kbits_str) = value.trim().strip_suffix("kbits/s") {
340 progress.bitrate = kbits_str.parse::<f64>().ok().map(|kb| kb * 1000.0);
341 }
342 }
343 "speed" => {
344 if let Some(speed_str) = value.trim().strip_suffix('x') {
346 progress.speed = speed_str.parse().ok();
347 }
348 }
349 _ => {}
350 }
351 }
352 i += 1;
353 }
354
355 Some(progress)
356 }
357}
358
359pub type ProgressCallback = Box<dyn Fn(Progress) + Send + Sync>;
361
362pub async fn stream_progress<R: AsyncRead + Unpin + Send + 'static>(
364 stderr: R,
365 mut callback: impl FnMut(Progress) + Send + 'static,
366) {
367 let reader = BufReader::new(stderr);
368 let mut lines = reader.lines();
369
370 while let Ok(Some(line)) = lines.next_line().await {
371 trace!("FFmpeg stderr: {}", line);
372 if let Some(progress) = Progress::parse_line(&line) {
373 callback(progress);
374 }
375 }
376}
377
378#[derive(Debug, Clone)]
380pub struct CommandBuilder {
381 args: Vec<String>,
382}
383
384impl CommandBuilder {
385 pub fn new() -> Self {
387 Self { args: Vec::new() }
388 }
389
390 pub fn flag(mut self, flag: impl AsRef<str>) -> Self {
392 self.args.push(flag.as_ref().to_string());
393 self
394 }
395
396 pub fn option(mut self, key: impl AsRef<str>, value: impl ToString) -> Self {
398 self.args.push(key.as_ref().to_string());
399 self.args.push(value.to_string());
400 self
401 }
402
403 pub fn option_if_some<T: ToString>(self, key: impl AsRef<str>, value: Option<T>) -> Self {
405 if let Some(val) = value {
406 self.option(key, val)
407 } else {
408 self
409 }
410 }
411
412 pub fn flag_if(self, flag: impl AsRef<str>, condition: bool) -> Self {
414 if condition {
415 self.flag(flag)
416 } else {
417 self
418 }
419 }
420
421 pub fn args(mut self, args: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
423 for arg in args {
424 self.args.push(arg.as_ref().to_string());
425 }
426 self
427 }
428
429 pub fn arg(mut self, arg: impl AsRef<str>) -> Self {
431 self.args.push(arg.as_ref().to_string());
432 self
433 }
434
435 pub fn build(self) -> Vec<String> {
437 self.args
438 }
439}
440
441impl Default for CommandBuilder {
442 fn default() -> Self {
443 Self::new()
444 }
445}
446
447pub fn validate_input_path(path: &Path) -> Result<()> {
449 if !path.exists() {
450 return Err(Error::Io(std::io::Error::new(
451 std::io::ErrorKind::NotFound,
452 format!("Input file not found: {}", path.display()),
453 )));
454 }
455 Ok(())
456}
457
458pub fn validate_output_path(path: &Path) -> Result<()> {
460 if let Some(parent) = path.parent() {
461 if !parent.exists() {
462 return Err(Error::Io(std::io::Error::new(
463 std::io::ErrorKind::NotFound,
464 format!("Output directory does not exist: {}", parent.display()),
465 )));
466 }
467 }
468 Ok(())
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474 use std::time::Duration;
475
476 #[test]
477 fn test_command_builder() {
478 let args = CommandBuilder::new()
479 .flag("-y")
480 .option("-i", "input.mp4")
481 .option_if_some("-ss", Some("00:00:10"))
482 .option_if_some("-t", None::<&str>)
483 .flag_if("-n", false)
484 .arg("output.mp4")
485 .build();
486
487 assert_eq!(args, vec!["-y", "-i", "input.mp4", "-ss", "00:00:10", "output.mp4"]);
488 }
489
490 #[test]
491 fn test_progress_parsing() {
492 let line = "frame= 100 fps=25.0 q=28.0 size= 1024kB time=00:00:04.00 bitrate=2097.2kbits/s speed=1.00x";
493 let progress = Progress::parse_line(line).unwrap();
494
495 assert_eq!(progress.frame, Some(100));
496 assert_eq!(progress.fps, Some(25.0));
497 assert_eq!(progress.q, Some(28.0));
498 assert_eq!(progress.size, Some(1024 * 1024));
499 assert_eq!(progress.time, Some(Duration::from_secs(4)));
500 assert_eq!(progress.bitrate, Some(2_097_200.0));
501 assert_eq!(progress.speed, Some(1.0));
502 }
503}