use chrono::{Duration as ChronoDuration, Utc};
use std::path::Path;
use std::process::{Command, Stdio};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
use tracing::{debug, info};
use vidsage_core::video::metadata::{VideoFormat, VideoQuality};
use vidsage_core::video::processor::CompressionLevel;
use vidsage_core::video::VideoOutput;
use vidsage_core::{
CoreError, ProcessOptions, ProcessingStatus, Result, VideoMetadata, VideoProcessor,
};
use which::which;
pub struct FFmpegProcessor {
ffmpeg_path: String,
ffprobe_path: String,
}
impl FFmpegProcessor {
pub fn new() -> Result<Self> {
let ffmpeg_path = which("ffmpeg")
.map_err(|e| CoreError::VideoProcessingError(format!("FFmpeg not found: {}", e)))?;
let ffprobe_path = which("ffprobe")
.map_err(|e| CoreError::VideoProcessingError(format!("ffprobe not found: {}", e)))?;
Ok(Self {
ffmpeg_path: ffmpeg_path.to_string_lossy().to_string(),
ffprobe_path: ffprobe_path.to_string_lossy().to_string(),
})
}
async fn extract_metadata_with_ffprobe(&self, input: &Path) -> Result<VideoMetadata> {
info!("Extracting metadata from: {:?}", input);
let output = Command::new(&self.ffprobe_path)
.args([
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
input.to_str().unwrap(),
])
.output()
.map_err(|e| {
CoreError::VideoProcessingError(format!("Failed to run ffprobe: {}", e))
})?;
if !output.status.success() {
return Err(CoreError::VideoProcessingError(
String::from_utf8_lossy(&output.stderr).to_string(),
));
}
let json: serde_json::Value = serde_json::from_slice(&output.stdout).map_err(|e| {
CoreError::VideoProcessingError(format!("Failed to parse ffprobe output: {}", e))
})?;
let video_stream = json["streams"]
.as_array()
.ok_or_else(|| {
CoreError::VideoProcessingError("No streams found in video".to_string())
})?
.iter()
.find(|stream| stream["codec_type"].as_str() == Some("video"))
.ok_or_else(|| CoreError::VideoProcessingError("No video stream found".to_string()))?;
let format = json["format"].as_object().ok_or_else(|| {
CoreError::VideoProcessingError("No format information found".to_string())
})?;
let duration_str = format["duration"].as_str().unwrap_or("0");
let duration_secs = duration_str.parse::<f64>().map_err(|e| {
CoreError::VideoProcessingError(format!("Failed to parse duration: {}", e))
})?;
let duration = ChronoDuration::seconds(duration_secs as i64);
let width = video_stream["width"].as_i64().unwrap_or(0) as u32;
let height = video_stream["height"].as_i64().unwrap_or(0) as u32;
let file_size = format["size"].as_i64().unwrap_or(0) as u64;
let bitrate_str = format["bit_rate"].as_str().unwrap_or("0");
let bitrate = bitrate_str.parse::<u32>().map_err(|e| {
CoreError::VideoProcessingError(format!("Failed to parse bitrate: {}", e))
})? / 1000;
let frame_rate_str = video_stream["r_frame_rate"].as_str().unwrap_or("30/1");
let frame_rate = parse_frame_rate(frame_rate_str).map_err(|e| {
CoreError::VideoProcessingError(format!("Failed to parse frame rate: {}", e))
})?;
let video_codec = video_stream["codec_name"]
.as_str()
.unwrap_or("unknown")
.to_string();
let audio_codec = json["streams"]
.as_array()
.unwrap_or(&vec![])
.iter()
.find(|stream| stream["codec_type"].as_str() == Some("audio"))
.and_then(|stream| stream["codec_name"].as_str())
.unwrap_or("unknown")
.to_string();
let audio_tracks = json["streams"]
.as_array()
.unwrap_or(&vec![])
.iter()
.filter(|stream| stream["codec_type"].as_str() == Some("audio"))
.count() as u32;
let audio_bitrate = json["streams"]
.as_array()
.unwrap_or(&vec![])
.iter()
.find(|stream| stream["codec_type"].as_str() == Some("audio"))
.and_then(|stream| stream["bit_rate"].as_str())
.unwrap_or("0")
.parse::<u32>()
.unwrap_or(0)
/ 1000;
let format = Path::new(input)
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| {
let ext = ext.to_lowercase();
match ext.as_str() {
"mp4" => VideoFormat::MP4,
"webm" => VideoFormat::WebM,
"avi" => VideoFormat::AVI,
"mov" => VideoFormat::MOV,
"mkv" => VideoFormat::MKV,
"flv" => VideoFormat::FLV,
_ => VideoFormat::Other,
}
})
.unwrap_or(VideoFormat::Other);
let metadata = VideoMetadata {
id: vidsage_core::utils::helpers::generate_id(),
title: Path::new(input)
.file_stem()
.and_then(|stem| stem.to_str())
.unwrap_or("Unknown Video")
.to_string(),
duration,
resolution: (width, height),
format,
file_size,
bitrate,
frame_rate,
audio_tracks,
audio_bitrate,
video_codec,
audio_codec,
created_at: Utc::now(),
updated_at: Utc::now(),
};
debug!("Extracted metadata: {:?}", metadata);
Ok(metadata)
}
fn build_process_command(
&self,
input: &Path,
output: &Path,
options: &ProcessOptions,
) -> Vec<String> {
let mut args = vec![
"-i".to_string(),
input.to_str().unwrap().to_string(),
"-y".to_string(), ];
match options.format {
VideoFormat::MP4 => args.extend(vec!["-c:v".to_string(), "libx264".to_string()]),
VideoFormat::WebM => args.extend(vec!["-c:v".to_string(), "libvpx-vp9".to_string()]),
_ => args.extend(vec!["-c:v".to_string(), "copy".to_string()]),
}
match options.quality {
VideoQuality::Low => args.extend(vec!["-crf".to_string(), "30".to_string()]),
VideoQuality::Medium => args.extend(vec!["-crf".to_string(), "23".to_string()]),
VideoQuality::High => args.extend(vec!["-crf".to_string(), "17".to_string()]),
VideoQuality::Ultra => args.extend(vec!["-crf".to_string(), "12".to_string()]),
VideoQuality::Custom(bitrate) => args.extend(vec![
"-b:v".to_string(),
format!("{}k", bitrate).to_string(),
]),
}
match options.compression {
CompressionLevel::Low => {
args.extend(vec!["-preset".to_string(), "ultrafast".to_string()])
},
CompressionLevel::Medium => {
args.extend(vec!["-preset".to_string(), "medium".to_string()])
},
CompressionLevel::High => args.extend(vec!["-preset".to_string(), "slow".to_string()]),
CompressionLevel::Ultra => {
args.extend(vec!["-preset".to_string(), "veryslow".to_string()])
},
CompressionLevel::Custom(_) => {
args.extend(vec!["-preset".to_string(), "medium".to_string()])
},
}
if let Some(resolution) = options.resolution {
args.extend(vec![
"-s".to_string(),
format!("{}x{}", resolution.0, resolution.1).to_string(),
]);
}
if let Some(frame_rate) = options.frame_rate {
args.extend(vec!["-r".to_string(), frame_rate.to_string()]);
}
if let Some(threads) = options.threads {
args.extend(vec!["-threads".to_string(), threads.to_string()]);
}
args.push(output.to_str().unwrap().to_string());
args
}
}
#[async_trait::async_trait]
impl VideoProcessor for FFmpegProcessor {
async fn extract_metadata(&self, input: &Path) -> Result<VideoMetadata> {
self.extract_metadata_with_ffprobe(input).await
}
async fn process_video(&self, input: &Path, options: ProcessOptions) -> Result<VideoOutput> {
info!("Processing video: {:?} with options: {:?}", input, options);
let output_dir = input.parent().unwrap_or(Path::new("."));
std::fs::create_dir_all(output_dir).map_err(|e| {
CoreError::VideoProcessingError(format!("Failed to create output directory: {}", e))
})?;
let output_ext = match options.format {
VideoFormat::MP4 => "mp4",
VideoFormat::WebM => "webm",
VideoFormat::AVI => "avi",
VideoFormat::MOV => "mov",
VideoFormat::MKV => "mkv",
VideoFormat::FLV => "flv",
VideoFormat::Other => "mp4",
};
let output_file_name = format!(
"{}-processed.{}",
input.file_stem().unwrap_or_default().to_str().unwrap(),
output_ext
);
let output_path = output_dir.join(output_file_name);
let args = self.build_process_command(input, &output_path, &options);
debug!("FFmpeg command: {} {:?}", self.ffmpeg_path, args);
let mut cmd = TokioCommand::new(&self.ffmpeg_path)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| {
CoreError::VideoProcessingError(format!("Failed to start FFmpeg: {}", e))
})?;
let stderr = cmd.stderr.take().unwrap();
let mut reader = BufReader::new(stderr).lines();
let output = tokio::spawn(async move {
while let Some(line) = reader.next_line().await.unwrap() {
debug!("FFmpeg stderr: {}", line);
}
cmd.wait().await
})
.await
.unwrap()
.map_err(|e| CoreError::VideoProcessingError(format!("FFmpeg process failed: {}", e)))?;
if !output.success() {
return Err(CoreError::VideoProcessingError(format!(
"FFmpeg processing failed with exit code: {}",
output.code().unwrap_or(-1)
)));
}
let metadata = self.extract_metadata_with_ffprobe(&output_path).await?;
let output = VideoOutput {
metadata,
processed_path: output_path.clone(),
status: ProcessingStatus::Completed,
processing_time: 0.0, original_path: input.to_path_buf(),
audio_path: None, extracted_metadata: None, };
info!("Video processing completed successfully: {:?}", output_path);
Ok(output)
}
async fn get_status(&self, _job_id: &str) -> Result<ProcessingStatus> {
Ok(ProcessingStatus::Completed)
}
async fn cancel_job(&self, _job_id: &str) -> Result<bool> {
Ok(true)
}
}
fn parse_frame_rate(frame_rate_str: &str) -> Result<f64> {
if frame_rate_str.contains('/') {
let parts: Vec<&str> = frame_rate_str.split('/').collect();
if parts.len() == 2 {
let numerator = parts[0].parse::<f64>().map_err(|e| {
CoreError::VideoProcessingError(format!("Invalid frame rate: {}", e))
})?;
let denominator = parts[1].parse::<f64>().map_err(|e| {
CoreError::VideoProcessingError(format!("Invalid frame rate: {}", e))
})?;
if denominator == 0.0 {
return Ok(30.0); }
return Ok(numerator / denominator);
}
}
frame_rate_str
.parse::<f64>()
.map_err(|e| CoreError::VideoProcessingError(format!("Invalid frame rate: {}", e)))
.or(Ok(30.0))
}
pub fn is_ffmpeg_installed() -> bool {
which("ffmpeg").is_ok() && which("ffprobe").is_ok()
}
pub fn get_ffmpeg_version() -> Option<String> {
let output = Command::new("ffmpeg").args(["-version"]).output().ok()?;
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
stdout.lines().next().map(|line| line.to_string())
} else {
None
}
}