use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc, watch};
use crate::Page;
use crate::error::FerriError;
const FPS: u32 = 25;
pub use crate::ffmpeg::{video_content_type, video_extension};
pub struct VideoRecordingHandle {
pump_task: tokio::task::JoinHandle<()>,
encoder_task: tokio::task::JoinHandle<crate::error::Result<()>>,
shutdown_tx: tokio::sync::oneshot::Sender<()>,
output_path: PathBuf,
}
pub async fn start_recording(
page: &Page,
output_path: PathBuf,
width: u32,
height: u32,
quality: u8,
) -> crate::error::Result<VideoRecordingHandle> {
let w = width & !1;
let h = height & !1;
let (frame_tx, frame_rx) = mpsc::channel::<(Vec<u8>, f64)>(64);
let (cdp_rx, shutdown_tx) = page.start_screencast(quality, w, h).await?;
let pump_task = tokio::spawn(async move {
let mut rx = cdp_rx;
while let Some((jpeg, ts)) = rx.recv().await {
if frame_tx.send((jpeg, ts)).await.is_err() {
break; }
}
});
let path = output_path.clone();
let encoder_task = tokio::task::spawn_blocking(move || crate::ffmpeg::encode_stream(frame_rx, &path, w, h, FPS));
Ok(VideoRecordingHandle {
pump_task,
encoder_task,
shutdown_tx,
output_path,
})
}
impl VideoRecordingHandle {
pub async fn stop(self, page: &Page) -> crate::error::Result<PathBuf> {
if !page.is_closed() {
let _ = page.stop_screencast().await;
}
let _ = self.shutdown_tx.send(());
let _ = self.pump_task.await;
self
.encoder_task
.await
.map_err(|e| FerriError::Backend(format!("encoder join: {e}")))??;
Ok(self.output_path)
}
}
type FrameBuffer = Arc<Mutex<Vec<(Vec<u8>, f64)>>>;
pub struct BufferedRecordingHandle {
frames: FrameBuffer,
pump_task: tokio::task::JoinHandle<()>,
shutdown_tx: tokio::sync::oneshot::Sender<()>,
width: u32,
height: u32,
}
pub async fn start_buffered_recording(
page: &Page,
width: u32,
height: u32,
quality: u8,
) -> crate::error::Result<BufferedRecordingHandle> {
let w = width & !1;
let h = height & !1;
let (cdp_rx, shutdown_tx) = page.start_screencast(quality, w, h).await?;
let frames: FrameBuffer = Arc::new(Mutex::new(Vec::with_capacity(64)));
let frames_clone = Arc::clone(&frames);
let pump_task = tokio::spawn(async move {
let mut rx = cdp_rx;
while let Some((jpeg, ts)) = rx.recv().await {
frames_clone.lock().await.push((jpeg, ts));
}
});
Ok(BufferedRecordingHandle {
frames,
pump_task,
shutdown_tx,
width: w,
height: h,
})
}
impl BufferedRecordingHandle {
pub async fn encode(self, page: &Page, output_path: PathBuf) -> crate::error::Result<PathBuf> {
let _ = page.stop_screencast().await;
let _ = self.shutdown_tx.send(());
let _ = self.pump_task.await;
let frames = self.frames.lock().await;
if frames.is_empty() {
return Err(FerriError::backend("no frames captured"));
}
let w = self.width;
let h = self.height;
let frames_owned: Vec<(Vec<u8>, f64)> = frames.clone();
drop(frames);
let path = output_path.clone();
tokio::task::spawn_blocking(move || crate::ffmpeg::encode_frames(&frames_owned, &path, w, h, FPS))
.await
.map_err(|e| FerriError::backend(format!("encode join: {e}")))??;
Ok(output_path)
}
pub async fn discard(self, page: &Page) {
let _ = page.stop_screencast().await;
let _ = self.shutdown_tx.send(());
let _ = self.pump_task.await;
}
}
type FinalPath = std::result::Result<PathBuf, FerriError>;
#[derive(Clone)]
pub struct Video {
state: Arc<VideoState>,
}
struct VideoState {
path_rx: watch::Receiver<Option<FinalPath>>,
}
impl Video {
#[must_use]
pub fn new() -> (Self, VideoSink) {
let (tx, rx) = watch::channel(None);
let video = Self {
state: Arc::new(VideoState { path_rx: rx }),
};
let sink = VideoSink { tx };
(video, sink)
}
pub async fn path(&self) -> crate::error::Result<PathBuf> {
self.await_terminal_state().await?
}
pub async fn save_as(&self, dest: impl AsRef<Path>) -> crate::error::Result<()> {
let source = self.path().await?;
let dest = dest.as_ref().to_path_buf();
if let Some(parent) = dest.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)
.map_err(|e| FerriError::Backend(format!("video.saveAs create parent {}: {e}", parent.display())))?;
}
}
tokio::task::spawn_blocking(move || std::fs::copy(&source, &dest).map(|_| ()))
.await
.map_err(|e| FerriError::Backend(format!("video.saveAs join: {e}")))?
.map_err(|e| FerriError::Backend(format!("video.saveAs copy: {e}")))
}
pub async fn delete(&self) -> crate::error::Result<()> {
let Ok(path) = self.path().await else {
return Ok(());
};
tokio::task::spawn_blocking(move || match std::fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(format!("video.delete remove {}: {e}", path.display())),
})
.await
.map_err(|e| FerriError::Backend(format!("video.delete join: {e}")))?
.map_err(FerriError::Backend)
}
async fn await_terminal_state(&self) -> crate::error::Result<FinalPath> {
let mut rx = self.state.path_rx.clone();
loop {
if let Some(val) = rx.borrow_and_update().clone() {
return Ok(val);
}
if rx.changed().await.is_err() {
return Err(FerriError::target_closed(Some(
"video recording was cancelled before finalisation".into(),
)));
}
}
}
}
pub struct VideoSink {
tx: watch::Sender<Option<FinalPath>>,
}
impl VideoSink {
pub fn finish_ok(self, path: PathBuf) {
let _ = self.tx.send_replace(Some(Ok(path)));
}
pub fn finish_err(self, error: FerriError) {
let _ = self.tx.send_replace(Some(Err(error)));
}
}