use crate::pixmap::SharedPixmap;
use crate::DaemonResult;
use anyhow::anyhow;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::{Child, Command};
use tokio::task::{AbortHandle, JoinSet};
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct FfmpegOptions {
pub log_level: String,
pub framerate: usize,
pub synthesize_audio: bool,
pub output_spec: Vec<String>,
}
impl FfmpegOptions {
pub fn make_rtsp_out_spec(server_addr: &str, framerate: usize) -> Vec<String> {
[
"-vcodec",
"libx264",
"-acodec",
"aac",
"-preset",
"veryfast",
"-bf",
"0",
"-pix_fmt",
"yuv420p",
"-framerate",
&framerate.to_string(),
"-f",
"rtsp",
server_addr,
]
.into_iter()
.map(String::from)
.collect()
}
pub fn make_rtmp_out_spec(server_addr: &str, framerate: usize) -> Vec<String> {
[
"-vcodec",
"libx264",
"-acodec",
"aac",
"-preset",
"veryfast",
"-bf",
"0",
"-pix_fmt",
"yuv420p",
"-framerate",
&framerate.to_string(),
"-f",
"flv",
server_addr,
]
.into_iter()
.map(String::from)
.collect()
}
}
#[derive(Debug)]
pub struct FfmpegSink {
options: FfmpegOptions,
pixmap: SharedPixmap,
ffmpeg_proc: Option<Child>,
}
impl FfmpegSink {
pub fn new(options: FfmpegOptions, pixmap: SharedPixmap) -> Self {
Self {
options,
pixmap,
ffmpeg_proc: None,
}
}
pub async fn start(mut self, join_set: &mut JoinSet<DaemonResult>) -> anyhow::Result<AbortHandle> {
self.start_ffmpeg()?;
let handle = join_set
.build_task()
.name("ffmpeg")
.spawn(async move { self.run().await })?;
Ok(handle)
}
fn start_ffmpeg(&mut self) -> anyhow::Result<()> {
if self.ffmpeg_proc.is_some() {
return Err(anyhow!("ffmpeg is already running"));
}
let (width, height) = self.pixmap.get_size();
let mut cmd = Command::new("ffmpeg");
cmd.stdin(Stdio::piped()).kill_on_drop(true).env_clear();
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg(&self.options.log_level);
cmd
.arg("-re")
.arg("-f")
.arg("rawvideo")
.arg("-pix_fmt")
.arg("rgb24")
.arg("-video_size")
.arg(&format!("{}x{}", width, height))
.arg("-framerate")
.arg(&self.options.framerate.to_string())
.arg("-i")
.arg("/dev/stdin");
if self.options.synthesize_audio {
cmd.arg("-f")
.arg("lavfi")
.arg("-i")
.arg("anullsrc=channel_layout=stereo:sample_rate=44100");
}
cmd.args(&self.options.output_spec);
tracing::info!("starting ffmpeg sink with args {:?}", cmd.as_std().get_args());
self.ffmpeg_proc = Some(cmd.spawn()?);
Ok(())
}
async fn run(self) -> anyhow::Result<!> {
let mut ffmpeg = self.ffmpeg_proc.ok_or(anyhow!("ffmpeg is not running"))?;
let Some(channel) = &mut ffmpeg.stdin else {
return Err(anyhow!("ffmpegs stdin is not attached"));
};
let mut interval =
tokio::time::interval(Duration::from_secs_f64(1.0 / self.options.framerate as f64));
loop {
let data = unsafe {
self.pixmap
.get_color_data()
.iter()
.flat_map(|c| Into::<[u8; 3]>::into(*c))
.collect::<Vec<_>>()
};
channel.write_all(&data).await.expect("Could not write to ffmpeg");
interval.tick().await;
}
}
}