use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::time::timeout;
use tracing::{debug, trace};
use which::which;
use crate::error::{Error, Result};
pub fn find_executable(name: &str) -> Result<PathBuf> {
which(name).map_err(|_| Error::ExecutableNotFound(name.to_string()))
}
#[derive(Debug, Clone)]
pub struct ProcessConfig {
pub executable: PathBuf,
pub working_dir: Option<PathBuf>,
pub env: Vec<(String, String)>,
pub timeout: Option<Duration>,
pub capture_stdout: bool,
pub capture_stderr: bool,
pub pipe_stdin: bool,
}
impl ProcessConfig {
pub fn new(executable: impl Into<PathBuf>) -> Self {
Self {
executable: executable.into(),
working_dir: None,
env: Vec::new(),
timeout: None,
capture_stdout: true,
capture_stderr: true,
pipe_stdin: false,
}
}
pub fn working_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.working_dir = Some(dir.into());
self
}
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.env.push((key.into(), value.into()));
self
}
pub fn timeout(mut self, duration: Duration) -> Self {
self.timeout = Some(duration);
self
}
pub fn capture_stdout(mut self, capture: bool) -> Self {
self.capture_stdout = capture;
self
}
pub fn capture_stderr(mut self, capture: bool) -> Self {
self.capture_stderr = capture;
self
}
pub fn pipe_stdin(mut self, pipe: bool) -> Self {
self.pipe_stdin = pipe;
self
}
}
pub struct Process {
child: Child,
config: ProcessConfig,
}
impl Process {
pub async fn spawn(config: ProcessConfig, args: Vec<String>) -> Result<Self> {
debug!("Spawning process: {} {:?}", config.executable.display(), args);
let mut cmd = Command::new(&config.executable);
for arg in &args {
cmd.arg(arg);
}
if let Some(ref dir) = config.working_dir {
cmd.current_dir(dir);
}
for (key, value) in &config.env {
cmd.env(key, value);
}
cmd.stdin(if config.pipe_stdin {
Stdio::piped()
} else {
Stdio::null()
});
cmd.stdout(if config.capture_stdout {
Stdio::piped()
} else {
Stdio::null()
});
cmd.stderr(if config.capture_stderr {
Stdio::piped()
} else {
Stdio::null()
});
cmd.kill_on_drop(true);
let child = cmd.spawn().map_err(Error::Io)?;
Ok(Self { child, config })
}
pub async fn wait(mut self) -> Result<ProcessOutput> {
let wait_future = async {
let status = self.child.wait().await.map_err(Error::Io)?;
let stdout = if self.config.capture_stdout {
if let Some(mut stdout) = self.child.stdout.take() {
let mut buf = Vec::new();
stdout.read_to_end(&mut buf).await.map_err(Error::Io)?;
Some(buf)
} else {
None
}
} else {
None
};
let stderr = if self.config.capture_stderr {
if let Some(mut stderr) = self.child.stderr.take() {
let mut buf = Vec::new();
stderr.read_to_end(&mut buf).await.map_err(Error::Io)?;
Some(buf)
} else {
None
}
} else {
None
};
Ok(ProcessOutput {
status,
stdout,
stderr,
})
};
if let Some(timeout_duration) = self.config.timeout {
match timeout(timeout_duration, wait_future).await {
Ok(result) => result,
Err(_) => {
let _ = self.child.kill().await;
Err(Error::Timeout(timeout_duration))
}
}
} else {
wait_future.await
}
}
pub fn stdin(&mut self) -> Option<tokio::process::ChildStdin> {
self.child.stdin.take()
}
pub fn stdout(&mut self) -> Option<tokio::process::ChildStdout> {
self.child.stdout.take()
}
pub fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
self.child.stderr.take()
}
pub async fn kill(&mut self) -> Result<()> {
self.child.kill().await.map_err(Error::Io)
}
pub fn id(&self) -> Option<u32> {
self.child.id()
}
pub fn try_wait(&mut self) -> Result<Option<std::process::ExitStatus>> {
self.child.try_wait().map_err(Error::Io)
}
}
#[derive(Debug)]
pub struct ProcessOutput {
pub status: std::process::ExitStatus,
pub stdout: Option<Vec<u8>>,
pub stderr: Option<Vec<u8>>,
}
impl ProcessOutput {
pub fn success(&self) -> bool {
self.status.success()
}
pub fn stdout_str(&self) -> Option<String> {
self.stdout.as_ref().map(|b| String::from_utf8_lossy(b).into_owned())
}
pub fn stderr_str(&self) -> Option<String> {
self.stderr.as_ref().map(|b| String::from_utf8_lossy(b).into_owned())
}
pub fn into_result(self) -> Result<Self> {
if self.success() {
Ok(self)
} else {
Err(Error::process_failed(
format!("Process exited with status: {}", self.status),
Some(self.status),
self.stderr_str(),
))
}
}
}
#[derive(Debug, Clone)]
pub struct Progress {
pub frame: Option<u64>,
pub fps: Option<f64>,
pub q: Option<f64>,
pub size: Option<u64>,
pub time: Option<Duration>,
pub bitrate: Option<f64>,
pub speed: Option<f64>,
}
impl Progress {
pub fn parse_line(line: &str) -> Option<Self> {
if !line.contains("frame=") {
return None;
}
let mut progress = Progress {
frame: None,
fps: None,
q: None,
size: None,
time: None,
bitrate: None,
speed: None,
};
let parts: Vec<&str> = line.split_whitespace().collect();
let mut i = 0;
while i < parts.len() {
if let Some((key, mut value)) = parts[i].split_once('=') {
if value.is_empty() {
if let Some(next_part) = parts.get(i + 1) {
value = next_part;
i += 1; }
}
match key.trim() {
"frame" => progress.frame = value.trim().parse().ok(),
"fps" => progress.fps = value.trim().parse().ok(),
"q" => progress.q = value.trim().parse().ok(),
"size" => {
if let Some(kb_str) = value.trim().strip_suffix("kB") {
progress.size = kb_str.parse::<u64>().ok().map(|kb| kb * 1024);
}
}
"time" => {
if let Ok(duration) = crate::types::Duration::from_ffmpeg_format(value.trim()) {
progress.time = Some(duration.into());
}
}
"bitrate" => {
if let Some(kbits_str) = value.trim().strip_suffix("kbits/s") {
progress.bitrate = kbits_str.parse::<f64>().ok().map(|kb| kb * 1000.0);
}
}
"speed" => {
if let Some(speed_str) = value.trim().strip_suffix('x') {
progress.speed = speed_str.parse().ok();
}
}
_ => {}
}
}
i += 1;
}
Some(progress)
}
}
pub type ProgressCallback = Box<dyn Fn(Progress) + Send + Sync>;
pub async fn stream_progress<R: AsyncRead + Unpin + Send + 'static>(
stderr: R,
mut callback: impl FnMut(Progress) + Send + 'static,
) {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
trace!("FFmpeg stderr: {}", line);
if let Some(progress) = Progress::parse_line(&line) {
callback(progress);
}
}
}
#[derive(Debug, Clone)]
pub struct CommandBuilder {
args: Vec<String>,
}
impl CommandBuilder {
pub fn new() -> Self {
Self { args: Vec::new() }
}
pub fn flag(mut self, flag: impl AsRef<str>) -> Self {
self.args.push(flag.as_ref().to_string());
self
}
pub fn option(mut self, key: impl AsRef<str>, value: impl ToString) -> Self {
self.args.push(key.as_ref().to_string());
self.args.push(value.to_string());
self
}
pub fn option_if_some<T: ToString>(self, key: impl AsRef<str>, value: Option<T>) -> Self {
if let Some(val) = value {
self.option(key, val)
} else {
self
}
}
pub fn flag_if(self, flag: impl AsRef<str>, condition: bool) -> Self {
if condition {
self.flag(flag)
} else {
self
}
}
pub fn args(mut self, args: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
for arg in args {
self.args.push(arg.as_ref().to_string());
}
self
}
pub fn arg(mut self, arg: impl AsRef<str>) -> Self {
self.args.push(arg.as_ref().to_string());
self
}
pub fn build(self) -> Vec<String> {
self.args
}
}
impl Default for CommandBuilder {
fn default() -> Self {
Self::new()
}
}
pub fn validate_input_path(path: &Path) -> Result<()> {
if !path.exists() {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Input file not found: {}", path.display()),
)));
}
Ok(())
}
pub fn validate_output_path(path: &Path) -> Result<()> {
if let Some(parent) = path.parent() {
if !parent.exists() {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Output directory does not exist: {}", parent.display()),
)));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_command_builder() {
let args = CommandBuilder::new()
.flag("-y")
.option("-i", "input.mp4")
.option_if_some("-ss", Some("00:00:10"))
.option_if_some("-t", None::<&str>)
.flag_if("-n", false)
.arg("output.mp4")
.build();
assert_eq!(args, vec!["-y", "-i", "input.mp4", "-ss", "00:00:10", "output.mp4"]);
}
#[test]
fn test_progress_parsing() {
let line = "frame= 100 fps=25.0 q=28.0 size= 1024kB time=00:00:04.00 bitrate=2097.2kbits/s speed=1.00x";
let progress = Progress::parse_line(line).unwrap();
assert_eq!(progress.frame, Some(100));
assert_eq!(progress.fps, Some(25.0));
assert_eq!(progress.q, Some(28.0));
assert_eq!(progress.size, Some(1024 * 1024));
assert_eq!(progress.time, Some(Duration::from_secs(4)));
assert_eq!(progress.bitrate, Some(2_097_200.0));
assert_eq!(progress.speed, Some(1.0));
}
}