use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use super::super::RecordingConfig;
use crate::error::Result;
use crate::events::DownloadEvent;
use crate::events::types::RecordingMethod;
use crate::executor::Executor;
#[derive(Debug)]
pub struct FfmpegLiveRecorder {
stream_url: String,
output_path: PathBuf,
ffmpeg_path: PathBuf,
video_id: String,
max_duration: Option<Duration>,
cancellation_token: CancellationToken,
event_bus: crate::events::EventBus,
quality: String,
}
impl FfmpegLiveRecorder {
pub fn new(config: RecordingConfig, ffmpeg_path: impl Into<PathBuf>) -> Self {
Self {
stream_url: config.stream_url,
output_path: config.output_path,
ffmpeg_path: ffmpeg_path.into(),
video_id: config.video_id,
quality: config.quality,
max_duration: config.max_duration,
cancellation_token: config.cancellation_token,
event_bus: config.event_bus,
}
}
pub async fn record(&self) -> Result<super::super::RecordingResult> {
let start = Instant::now();
tracing::info!(
url = self.stream_url,
video_id = self.video_id,
output = ?self.output_path,
max_duration = ?self.max_duration,
"📥 Starting live recording (ffmpeg)"
);
self.event_bus.emit_if_subscribed(DownloadEvent::LiveRecordingStarted {
video_id: self.video_id.clone(),
url: self.stream_url.clone(),
quality: self.quality.clone(),
method: RecordingMethod::Fallback,
});
if let Some(parent) = self.output_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut args: Vec<String> = vec![
"-i".to_string(),
self.stream_url.clone(),
"-c".to_string(),
"copy".to_string(),
];
if let Some(max) = self.max_duration {
args.push("-t".to_string());
args.push(max.as_secs().to_string());
}
args.push("-y".to_string());
args.push(self.output_path.display().to_string());
let executor = Executor::new(&self.ffmpeg_path, &args, Duration::from_secs(0));
let mut process = executor.execute_streaming().await?;
let stop_reason = tokio::select! {
_ = self.cancellation_token.cancelled() => {
tracing::info!(video_id = self.video_id, "📥 Cancellation requested, stopping ffmpeg");
match process.stop().await {
Ok(_) => "cancelled".to_string(),
Err(e) => {
tracing::warn!(error = %e, "Failed to stop ffmpeg gracefully, killing");
let _ = process.kill().await;
"cancelled (killed)".to_string()
}
}
}
result = process.wait() => {
match result {
Ok(output) if output.code == 0 => "stream ended".to_string(),
Ok(output) => {
let reason = format!("ffmpeg exited with code {}: {}", output.code, output.stderr.lines().last().unwrap_or(""));
tracing::warn!(video_id = self.video_id, exit_code = output.code, "FFmpeg exited with non-zero code");
reason
}
Err(e) => {
let reason = format!("ffmpeg process error: {e}");
tracing::warn!(video_id = self.video_id, error = %e, "FFmpeg process error");
reason
}
}
}
};
let total_duration = start.elapsed();
let total_bytes = tokio::fs::metadata(&self.output_path)
.await
.map(|m| m.len())
.unwrap_or(0);
tracing::info!(
video_id = self.video_id,
total_bytes = total_bytes,
duration = ?total_duration,
reason = stop_reason,
"✅ FFmpeg live recording stopped"
);
self.event_bus.emit_if_subscribed(DownloadEvent::LiveRecordingStopped {
video_id: self.video_id.clone(),
reason: stop_reason,
output_path: self.output_path.clone(),
total_bytes,
total_duration,
});
Ok(super::super::RecordingResult {
output_path: self.output_path.clone(),
total_bytes,
total_duration,
segments_downloaded: 0, })
}
}
impl std::fmt::Display for FfmpegLiveRecorder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FfmpegLiveRecorder(video_id={}, quality={}, output={})",
self.video_id,
self.quality,
self.output_path.display()
)
}
}