use serde::Serialize;
use serde_json::json;
use crate::camera_url;
use crate::config::Config;
use crate::error::{AppError, AppResult};
use crate::models::Camera;
use crate::state::AppState;
const SOFTWARE_CODEC_ARGS: &str =
"-c:v libx264 -preset ultrafast -tune zerolatency -profile:v baseline -pix_fmt yuv420p -g 30";
pub fn transcode_codec_args(cfg: &Config) -> String {
select_codec_args(&cfg.live_transcode_engine, &cfg.vaapi_device)
}
fn select_codec_args(engine: &str, vaapi_device: &str) -> String {
match engine {
"software" => SOFTWARE_CODEC_ARGS.to_string(),
"vaapi" => {
format!("-vaapi_device {vaapi_device} -vf format=nv12,hwupload -c:v h264_vaapi -g 30")
}
"nvenc" => "-c:v h264_nvenc -preset p1 -tune ll -profile:v baseline -pix_fmt yuv420p -g 30"
.to_string(),
other => {
tracing::warn!(
engine = %other,
"unknown HELDAR_LIVE_TRANSCODE_ENGINE; falling back to software (libx264)"
);
SOFTWARE_CODEC_ARGS.to_string()
}
}
}
#[derive(Debug, Serialize)]
pub struct LiveUrls {
pub name: String,
pub hls_url: String,
pub webrtc_url: String,
pub rtsp_url: String,
}
fn client_facing_base(base: &str, request_host: Option<&str>) -> String {
let Some(host) = request_host.and_then(host_only) else {
return base.to_string();
};
let Some((scheme, rest)) = base.split_once("://") else {
return base.to_string();
};
let (authority, tail) = match rest.find('/') {
Some(i) => (&rest[..i], &rest[i..]),
None => (rest, ""),
};
let (cur_host, port) = split_host_port(authority);
if !is_loopback_host(cur_host) {
return base.to_string();
}
let h = if host.contains(':') {
format!("[{host}]")
} else {
host
};
match port {
Some(p) => format!("{scheme}://{h}:{p}{tail}"),
None => format!("{scheme}://{h}{tail}"),
}
}
fn is_loopback_host(h: &str) -> bool {
matches!(h, "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]")
}
fn host_only(host_header: &str) -> Option<String> {
let h = host_header.trim();
if h.is_empty() {
return None;
}
if let Some(rest) = h.strip_prefix('[') {
return rest.split(']').next().map(str::to_string); }
Some(h.rsplit_once(':').map_or(h, |(host, _)| host).to_string())
}
fn split_host_port(authority: &str) -> (&str, Option<&str>) {
if let Some(rest) = authority.strip_prefix('[') {
if let Some(close) = rest.find(']') {
return (&rest[..close], rest[close + 1..].strip_prefix(':'));
}
}
authority
.rsplit_once(':')
.map_or((authority, None), |(h, p)| (h, Some(p)))
}
pub async fn ensure_live(
state: &AppState,
camera_id: &str,
request_host: Option<&str>,
) -> AppResult<LiveUrls> {
let cam: Option<Camera> = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
.bind(camera_id)
.fetch_optional(&state.pool)
.await?;
let cam = cam.ok_or_else(|| AppError::NotFound(format!("camera {camera_id} not found")))?;
let source = camera_url::stream_url(&cam, "sub")
.or_else(|| camera_url::record_url(&cam))
.ok_or_else(|| AppError::BadRequest("camera has no stream URL".into()))?;
let name = format!("cam_{camera_id}");
let api = state.cfg.mediamtx_api_url.trim_end_matches('/');
let existing = state
.http
.get(format!("{api}/v3/config/paths/get/{name}"))
.send()
.await;
let already = matches!(existing, Ok(ref r) if r.status().is_success());
if !already {
let codec_args = transcode_codec_args(&state.cfg);
let audio_args = if cam.record_audio {
"-c:a aac -b:a 96k"
} else {
"-an"
};
let run_on_demand = format!(
"ffmpeg -nostdin -rtsp_transport tcp -timeout 10000000 -i {source} {audio_args} \
{codec_args} \
-f rtsp rtsp://localhost:$RTSP_PORT/$MTX_PATH"
);
let body = json!({
"runOnDemand": run_on_demand,
"runOnDemandRestart": true,
"runOnDemandStartTimeout": "30s",
"runOnDemandCloseAfter": "10s",
});
let resp = state
.http
.post(format!("{api}/v3/config/paths/add/{name}"))
.json(&body)
.send()
.await
.map_err(|e| AppError::Other(anyhow::anyhow!("MediaMTX unreachable at {api}: {e}")))?;
let code = resp.status();
if !code.is_success() && code.as_u16() != 400 {
let txt = resp.text().await.unwrap_or_default();
return Err(AppError::Other(anyhow::anyhow!(
"MediaMTX add-path failed ({code}): {txt}"
)));
}
}
let hls_base = client_facing_base(&state.cfg.mediamtx_hls_base, request_host);
let webrtc_base = client_facing_base(&state.cfg.mediamtx_webrtc_base, request_host);
let rtsp_base = client_facing_base(&state.cfg.mediamtx_rtsp_base, request_host);
let hls = hls_base.trim_end_matches('/');
let webrtc = webrtc_base.trim_end_matches('/');
let rtsp = rtsp_base.trim_end_matches('/');
Ok(LiveUrls {
hls_url: format!("{hls}/{name}/index.m3u8"),
webrtc_url: format!("{webrtc}/{name}"),
rtsp_url: format!("{rtsp}/{name}"),
name,
})
}
pub async fn set_webrtc_ice_servers(state: &AppState, ice: &serde_json::Value) -> AppResult<()> {
let api = state.cfg.mediamtx_api_url.trim_end_matches('/');
let resp = state
.http
.patch(format!("{api}/v3/config/global/patch"))
.json(&json!({ "webrtcICEServers2": ice }))
.send()
.await
.map_err(|e| AppError::Other(anyhow::anyhow!("MediaMTX unreachable at {api}: {e}")))?;
if !resp.status().is_success() {
let code = resp.status();
let txt = resp.text().await.unwrap_or_default();
return Err(AppError::Other(anyhow::anyhow!(
"MediaMTX set-ice failed ({code}): {txt}"
)));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn loopback_base_is_rewritten_to_the_request_host() {
assert_eq!(
client_facing_base("http://127.0.0.1:8888", Some("10.200.0.1:8000")),
"http://10.200.0.1:8888"
);
assert_eq!(
client_facing_base("http://localhost:8889/", Some("192.168.1.50:8000")),
"http://192.168.1.50:8889/"
);
assert_eq!(
client_facing_base("rtsp://0.0.0.0:8554", Some("box.local")),
"rtsp://box.local:8554"
);
}
#[test]
fn non_loopback_base_and_missing_host_are_left_untouched() {
assert_eq!(
client_facing_base("https://cdn.example.com:8888", Some("10.200.0.1:8000")),
"https://cdn.example.com:8888"
);
assert_eq!(
client_facing_base("http://127.0.0.1:8888", None),
"http://127.0.0.1:8888"
);
}
#[test]
fn ipv6_request_host_is_bracketed() {
assert_eq!(
client_facing_base("http://127.0.0.1:8888", Some("[fd00::1]:8000")),
"http://[fd00::1]:8888"
);
}
#[test]
fn codec_args_select_by_engine() {
assert_eq!(
select_codec_args("software", "/dev/dri/renderD128"),
SOFTWARE_CODEC_ARGS
);
let vaapi = select_codec_args("vaapi", "/dev/dri/renderD129");
assert!(vaapi.contains("h264_vaapi"));
assert!(vaapi.contains("/dev/dri/renderD129"));
assert!(select_codec_args("nvenc", "/dev/dri/renderD128").contains("h264_nvenc"));
assert_eq!(
select_codec_args("bogus", "/dev/dri/renderD128"),
SOFTWARE_CODEC_ARGS
);
}
}