use std::path::{Path, PathBuf};
use std::sync::Mutex;
use gstreamer as gst;
use gstreamer::prelude::*;
use gstreamer_app::AppSink;
use crate::error::{Error, Result};
static GRAB_PNG_LOCK: Mutex<()> = Mutex::new(());
fn validate_pipewire_socket(path: &Path) -> Result<&Path> {
path.parent().ok_or_else(|| {
Error::screenshot(format!(
"pipewire socket path has no parent: {}",
path.display()
))
})
}
fn build_pipeline_str(node_id: u32) -> String {
format!(
"pipewiresrc path={node_id} always-copy=true do-timestamp=true num-buffers=5 \
! videoconvert \
! pngenc snapshot=true \
! appsink name=sink"
)
}
pub async fn grab_png(node_id: u32, pipewire_socket: &Path) -> Result<Vec<u8>> {
let runtime_dir = validate_pipewire_socket(pipewire_socket)?;
let socket = pipewire_socket.to_path_buf();
let runtime = runtime_dir.to_path_buf();
tokio::task::spawn_blocking(move || grab_png_sync(node_id, &socket, &runtime))
.await
.map_err(|e| Error::screenshot_with("spawn_blocking failed", e))?
}
fn grab_png_sync(node_id: u32, pipewire_socket: &Path, runtime_dir: &Path) -> Result<Vec<u8>> {
let _guard = GRAB_PNG_LOCK
.lock()
.map_err(|e| Error::screenshot(format!("grab_png lock poisoned: {e}")))?;
gst::init().map_err(|e| Error::screenshot_with("gstreamer init failed", e))?;
unsafe {
std::env::set_var("PIPEWIRE_REMOTE", pipewire_socket);
std::env::set_var("XDG_RUNTIME_DIR", runtime_dir);
}
let pipeline_str = build_pipeline_str(node_id);
let pipeline = gst::parse::launch(&pipeline_str)
.map_err(|e| Error::screenshot_with("pipeline parse failed", e))?;
let pipeline = pipeline
.dynamic_cast::<gst::Pipeline>()
.map_err(|_| Error::screenshot("parsed element is not a Pipeline"))?;
let sink = pipeline
.by_name("sink")
.ok_or_else(|| Error::screenshot("appsink not found in pipeline"))?;
let appsink = sink
.dynamic_cast::<AppSink>()
.map_err(|_| Error::screenshot("element 'sink' is not an AppSink"))?;
pipeline
.set_state(gst::State::Playing)
.map_err(|e| Error::screenshot_with("failed to start pipeline", e))?;
let sample = appsink
.try_pull_sample(gst::ClockTime::from_seconds(10))
.ok_or_else(|| Error::screenshot("timed out waiting for PNG frame"))?;
let buffer = sample
.buffer()
.ok_or_else(|| Error::screenshot("sample has no buffer"))?;
let map = buffer
.map_readable()
.map_err(|e| Error::screenshot_with("failed to map buffer", e))?;
let png_bytes = map.as_slice().to_vec();
pipeline
.set_state(gst::State::Null)
.map_err(|e| Error::screenshot_with("failed to stop pipeline", e))?;
tracing::info!(bytes = png_bytes.len(), "screenshot captured");
Ok(png_bytes)
}
pub const DEFAULT_VIDEO_BITRATE: u32 = 2_000_000;
pub const DEFAULT_VIDEO_FPS: u32 = 15;
fn build_recording_pipeline_str(
node_id: u32,
output_path: &Path,
bitrate: u32,
fps: u32,
) -> String {
debug_assert!(
!output_path.to_string_lossy().contains(char::is_whitespace),
"recording output path must not contain whitespace: {}",
output_path.display()
);
let keyframe_max_dist = fps * 2;
format!(
"pipewiresrc path={node_id} always-copy=true do-timestamp=true \
! videoconvert \
! videorate \
! video/x-raw,framerate={fps}/1 \
! vp8enc deadline=1 cpu-used=4 \
target-bitrate={bitrate} \
min-quantizer=4 max-quantizer=30 \
keyframe-max-dist={keyframe_max_dist} \
! webmmux \
! filesink location={path}",
path = output_path.display()
)
}
pub struct VideoRecorder {
pipeline: Option<gst::Pipeline>,
output_path: PathBuf,
}
impl VideoRecorder {
pub async fn start(
node_id: u32,
pipewire_socket: &Path,
output_path: &Path,
bitrate: u32,
fps: u32,
) -> Result<VideoRecorder> {
let socket = pipewire_socket.to_path_buf();
let runtime = validate_pipewire_socket(pipewire_socket)?.to_path_buf();
let output = output_path.to_path_buf();
tokio::task::spawn_blocking(move || {
start_recording_sync(node_id, &socket, &runtime, output, bitrate, fps)
})
.await
.map_err(|e| Error::screenshot_with("spawn_blocking failed", e))?
}
pub async fn stop(mut self) -> Result<()> {
let pipeline = self
.pipeline
.take()
.ok_or_else(|| Error::screenshot("recording already stopped"))?;
tokio::task::spawn_blocking(move || stop_recording_sync(&pipeline))
.await
.map_err(|e| Error::screenshot_with("spawn_blocking failed", e))?
}
pub fn output_path(&self) -> &Path {
&self.output_path
}
}
impl Drop for VideoRecorder {
fn drop(&mut self) {
let Some(pipeline) = self.pipeline.take() else {
return;
};
tracing::warn!(
path = %self.output_path.display(),
"VideoRecorder dropped without stop(); WebM will be truncated (no seekhead/cues)"
);
let _ = pipeline.set_state(gst::State::Null);
}
}
fn start_recording_sync(
node_id: u32,
pipewire_socket: &Path,
runtime_dir: &Path,
output_path: PathBuf,
bitrate: u32,
fps: u32,
) -> Result<VideoRecorder> {
let _guard = GRAB_PNG_LOCK
.lock()
.map_err(|e| Error::screenshot(format!("grab_png lock poisoned: {e}")))?;
gst::init().map_err(|e| Error::screenshot_with("gstreamer init failed", e))?;
unsafe {
std::env::set_var("PIPEWIRE_REMOTE", pipewire_socket);
std::env::set_var("XDG_RUNTIME_DIR", runtime_dir);
}
let pipeline_str = build_recording_pipeline_str(node_id, &output_path, bitrate, fps);
let pipeline = gst::parse::launch(&pipeline_str)
.map_err(|e| Error::screenshot_with("recording pipeline parse failed", e))?;
let pipeline = pipeline
.dynamic_cast::<gst::Pipeline>()
.map_err(|_| Error::screenshot("parsed element is not a Pipeline"))?;
pipeline
.set_state(gst::State::Playing)
.map_err(|e| Error::screenshot_with("failed to start recording pipeline", e))?;
tracing::info!(path = %output_path.display(), node_id, "video recording started");
Ok(VideoRecorder {
pipeline: Some(pipeline),
output_path,
})
}
fn stop_recording_sync(pipeline: &gst::Pipeline) -> Result<()> {
pipeline.send_event(gst::event::Eos::new());
let bus = pipeline
.bus()
.ok_or_else(|| Error::screenshot("recording pipeline has no bus"))?;
let timeout = gst::ClockTime::from_seconds(10);
if let Some(msg) =
bus.timed_pop_filtered(timeout, &[gst::MessageType::Eos, gst::MessageType::Error])
{
if let gst::MessageView::Error(err) = msg.view() {
let _ = pipeline.set_state(gst::State::Null);
return Err(Error::screenshot(format!(
"recording pipeline error before EOS: {} ({:?})",
err.error(),
err.debug()
)));
}
} else {
tracing::warn!("recording EOS did not arrive within 10s; file may be truncated");
}
pipeline
.set_state(gst::State::Null)
.map_err(|e| Error::screenshot_with("failed to stop recording pipeline", e))?;
tracing::info!("video recording stopped");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_pipeline_str_contains_node_id() {
let s = build_pipeline_str(42);
assert!(s.contains("path=42"), "expected path=42, got: {s}");
}
#[test]
fn test_build_pipeline_str_contains_appsink() {
let s = build_pipeline_str(0);
assert!(s.contains("appsink name=sink"));
}
#[test]
fn test_build_pipeline_str_contains_pngenc() {
let s = build_pipeline_str(1);
assert!(s.contains("pngenc snapshot=true"));
}
#[test]
fn test_build_pipeline_str_max_node_id() {
let s = build_pipeline_str(u32::MAX);
assert!(s.contains("path=4294967295"));
}
#[test]
fn test_build_recording_pipeline_str_contains_node_id() {
let s = build_recording_pipeline_str(
42,
Path::new("/tmp/out.webm"),
DEFAULT_VIDEO_BITRATE,
DEFAULT_VIDEO_FPS,
);
assert!(s.contains("path=42"));
}
#[test]
fn test_build_recording_pipeline_str_contains_output_path() {
let s = build_recording_pipeline_str(
1,
Path::new("/tmp/abc/abc.webm"),
DEFAULT_VIDEO_BITRATE,
DEFAULT_VIDEO_FPS,
);
assert!(
s.contains("location=/tmp/abc/abc.webm"),
"expected filesink location=..., got: {s}"
);
}
#[test]
fn test_build_recording_pipeline_str_uses_vp8_webm() {
let s = build_recording_pipeline_str(
0,
Path::new("/tmp/x.webm"),
DEFAULT_VIDEO_BITRATE,
DEFAULT_VIDEO_FPS,
);
assert!(s.contains("vp8enc"), "expected vp8enc: {s}");
assert!(s.contains("webmmux"), "expected webmmux: {s}");
}
#[test]
fn test_build_recording_pipeline_str_uses_default_fps() {
let s = build_recording_pipeline_str(
0,
Path::new("/tmp/x.webm"),
DEFAULT_VIDEO_BITRATE,
DEFAULT_VIDEO_FPS,
);
assert!(
s.contains(&format!("framerate={DEFAULT_VIDEO_FPS}/1")),
"expected framerate={DEFAULT_VIDEO_FPS}/1: {s}"
);
}
#[test]
fn test_build_recording_pipeline_str_honors_custom_fps() {
let s =
build_recording_pipeline_str(0, Path::new("/tmp/x.webm"), DEFAULT_VIDEO_BITRATE, 30);
assert!(s.contains("framerate=30/1"), "expected framerate=30/1: {s}");
}
#[test]
fn test_build_recording_pipeline_str_keyframe_max_dist_scales_with_fps() {
let s =
build_recording_pipeline_str(0, Path::new("/tmp/x.webm"), DEFAULT_VIDEO_BITRATE, 30);
assert!(
s.contains("keyframe-max-dist=60"),
"expected keyframe-max-dist=60 at 30 fps: {s}"
);
}
#[test]
fn test_build_recording_pipeline_str_embeds_bitrate() {
let s =
build_recording_pipeline_str(0, Path::new("/tmp/x.webm"), 1_500_000, DEFAULT_VIDEO_FPS);
assert!(
s.contains("target-bitrate=1500000"),
"expected target-bitrate=1500000, got: {s}"
);
}
#[test]
fn test_build_recording_pipeline_str_caps_quantizer() {
let s = build_recording_pipeline_str(
0,
Path::new("/tmp/x.webm"),
DEFAULT_VIDEO_BITRATE,
DEFAULT_VIDEO_FPS,
);
assert!(s.contains("max-quantizer=30"));
assert!(s.contains("min-quantizer=4"));
}
#[test]
fn default_video_bitrate_is_two_mbps() {
assert_eq!(DEFAULT_VIDEO_BITRATE, 2_000_000);
}
#[test]
fn default_video_fps_is_fifteen() {
assert_eq!(DEFAULT_VIDEO_FPS, 15);
}
#[test]
fn test_validate_pipewire_socket_valid() {
let parent = validate_pipewire_socket(Path::new("/run/user/1000/pipewire-0")).unwrap();
assert_eq!(parent, Path::new("/run/user/1000"));
}
#[test]
fn test_validate_pipewire_socket_root() {
let parent = validate_pipewire_socket(Path::new("/pipewire-0")).unwrap();
assert_eq!(parent, Path::new("/"));
}
#[test]
fn test_validate_pipewire_socket_no_parent() {
assert!(validate_pipewire_socket(Path::new("")).is_err());
}
}