Skip to main content

heldar_kernel/services/
mediamtx.rs

1//! Live-view gateway integration: registers a camera's stream as a MediaMTX path (server-side,
2//! credentials never exposed to the browser) and returns HLS / WebRTC / RTSP playback URLs.
3
4use serde::Serialize;
5use serde_json::json;
6
7use crate::camera_url;
8use crate::config::Config;
9use crate::error::{AppError, AppResult};
10use crate::models::Camera;
11use crate::state::AppState;
12
13/// Software (libx264) encoder args for the live HEVC->H.264 preview transcode (the default path).
14const SOFTWARE_CODEC_ARGS: &str =
15    "-c:v libx264 -preset ultrafast -tune zerolatency -profile:v baseline -pix_fmt yuv420p -g 30";
16
17/// FFmpeg encoder args for the live preview transcode, selected by `HELDAR_LIVE_TRANSCODE_ENGINE`.
18/// `software` uses libx264 (CPU); `vaapi` offloads to an Intel/AMD render node; `nvenc` to an NVIDIA
19/// GPU. An unknown engine warns and falls back to software so a typo never breaks live preview.
20pub fn transcode_codec_args(cfg: &Config) -> String {
21    select_codec_args(&cfg.live_transcode_engine, &cfg.vaapi_device)
22}
23
24fn select_codec_args(engine: &str, vaapi_device: &str) -> String {
25    match engine {
26        "software" => SOFTWARE_CODEC_ARGS.to_string(),
27        // VAAPI: upload the decoded frames to the render node and encode with h264_vaapi.
28        "vaapi" => {
29            format!("-vaapi_device {vaapi_device} -vf format=nv12,hwupload -c:v h264_vaapi -g 30")
30        }
31        // NVENC: low-latency NVIDIA hardware encoder.
32        "nvenc" => "-c:v h264_nvenc -preset p1 -tune ll -profile:v baseline -pix_fmt yuv420p -g 30"
33            .to_string(),
34        other => {
35            tracing::warn!(
36                engine = %other,
37                "unknown HELDAR_LIVE_TRANSCODE_ENGINE; falling back to software (libx264)"
38            );
39            SOFTWARE_CODEC_ARGS.to_string()
40        }
41    }
42}
43
44#[derive(Debug, Serialize)]
45pub struct LiveUrls {
46    pub name: String,
47    pub hls_url: String,
48    pub webrtc_url: String,
49    pub rtsp_url: String,
50}
51
52/// MediaMTX (and our default config) listen on loopback. A playback URL like `http://127.0.0.1:8888/…`
53/// is useless to a REMOTE client — over the WireGuard tunnel (or on the LAN) `127.0.0.1` is the client
54/// itself, not the box. When the configured base points at loopback/unspecified, rewrite its HOST to the
55/// one the client used to reach us (the request's `Host` header), preserving scheme + port. An explicitly
56/// external base (a real hostname/IP, e.g. a CDN) is left untouched so operator overrides still win.
57fn client_facing_base(base: &str, request_host: Option<&str>) -> String {
58    let Some(host) = request_host.and_then(host_only) else {
59        return base.to_string();
60    };
61    let Some((scheme, rest)) = base.split_once("://") else {
62        return base.to_string();
63    };
64    let (authority, tail) = match rest.find('/') {
65        Some(i) => (&rest[..i], &rest[i..]),
66        None => (rest, ""),
67    };
68    let (cur_host, port) = split_host_port(authority);
69    if !is_loopback_host(cur_host) {
70        return base.to_string();
71    }
72    let h = if host.contains(':') {
73        format!("[{host}]")
74    } else {
75        host
76    };
77    match port {
78        Some(p) => format!("{scheme}://{h}:{p}{tail}"),
79        None => format!("{scheme}://{h}{tail}"),
80    }
81}
82
83fn is_loopback_host(h: &str) -> bool {
84    matches!(h, "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]")
85}
86
87/// Hostname from a `Host` header value: `"10.0.0.1:8000"` → `"10.0.0.1"`, `"[::1]:8000"` → `"::1"`.
88fn host_only(host_header: &str) -> Option<String> {
89    let h = host_header.trim();
90    if h.is_empty() {
91        return None;
92    }
93    if let Some(rest) = h.strip_prefix('[') {
94        return rest.split(']').next().map(str::to_string); // IPv6 literal
95    }
96    Some(h.rsplit_once(':').map_or(h, |(host, _)| host).to_string())
97}
98
99/// Split a URL authority into `(host, port?)`, handling `[ipv6]:port`.
100fn split_host_port(authority: &str) -> (&str, Option<&str>) {
101    if let Some(rest) = authority.strip_prefix('[') {
102        if let Some(close) = rest.find(']') {
103            return (&rest[..close], rest[close + 1..].strip_prefix(':'));
104        }
105    }
106    authority
107        .rsplit_once(':')
108        .map_or((authority, None), |(h, p)| (h, Some(p)))
109}
110
111/// Ensure a MediaMTX path exists for this camera and return its playback URLs. `request_host` is the
112/// `Host` header of the originating request, used to make loopback stream URLs reachable by the client.
113pub async fn ensure_live(
114    state: &AppState,
115    camera_id: &str,
116    request_host: Option<&str>,
117) -> AppResult<LiveUrls> {
118    let cam: Option<Camera> = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
119        .bind(camera_id)
120        .fetch_optional(&state.pool)
121        .await?;
122    let cam = cam.ok_or_else(|| AppError::NotFound(format!("camera {camera_id} not found")))?;
123
124    let source = camera_url::stream_url(&cam, "sub")
125        .or_else(|| camera_url::record_url(&cam))
126        .ok_or_else(|| AppError::BadRequest("camera has no stream URL".into()))?;
127
128    let name = format!("cam_{camera_id}");
129    let api = state.cfg.mediamtx_api_url.trim_end_matches('/');
130
131    let existing = state
132        .http
133        .get(format!("{api}/v3/config/paths/get/{name}"))
134        .send()
135        .await;
136    let already = matches!(existing, Ok(ref r) if r.status().is_success());
137
138    if !already {
139        // Transcode to H.264 on demand: many cameras (e.g. these HikVision units) emit HEVC, which
140        // browsers can't play over HLS/WebRTC. FFmpeg decodes the camera stream and republishes
141        // H.264 to this path, but only while someone is actually watching (runOnDemand). The raw
142        // stream is still recorded untouched by the recorder; this decode is preview-only.
143        // $MTX_PATH / $RTSP_PORT are substituted by MediaMTX; credentials stay server-side. The
144        // video encoder args are selected by HELDAR_LIVE_TRANSCODE_ENGINE (software | vaapi | nvenc).
145        let codec_args = transcode_codec_args(&state.cfg);
146        // Live audio is opt-in per camera, reusing the same `record_audio` intent as the recorder:
147        // a camera you record audio for can also be listened to live (re-encoded to AAC for HLS; a
148        // no-op when the source has no audio track). Cameras without it stay video-only (`-an`).
149        let audio_args = if cam.record_audio {
150            "-c:a aac -b:a 96k"
151        } else {
152            "-an"
153        };
154        let run_on_demand = format!(
155            "ffmpeg -nostdin -rtsp_transport tcp -timeout 10000000 -i {source} {audio_args} \
156{codec_args} \
157-f rtsp rtsp://localhost:$RTSP_PORT/$MTX_PATH"
158        );
159        let body = json!({
160            "runOnDemand": run_on_demand,
161            "runOnDemandRestart": true,
162            // The HEVC→H.264 transcode cold-start (ffmpeg connect + first keyframe) routinely exceeds
163            // MediaMTX's 10s default, which would drop the WHEP/HLS reader before the source is ready.
164            "runOnDemandStartTimeout": "30s",
165            "runOnDemandCloseAfter": "10s",
166        });
167        let resp = state
168            .http
169            .post(format!("{api}/v3/config/paths/add/{name}"))
170            .json(&body)
171            .send()
172            .await
173            .map_err(|e| AppError::Other(anyhow::anyhow!("MediaMTX unreachable at {api}: {e}")))?;
174        let code = resp.status();
175        if !code.is_success() && code.as_u16() != 400 {
176            let txt = resp.text().await.unwrap_or_default();
177            return Err(AppError::Other(anyhow::anyhow!(
178                "MediaMTX add-path failed ({code}): {txt}"
179            )));
180        }
181    }
182
183    // Rewrite loopback bases to the host the client actually reached us on, so streams are reachable
184    // over the tunnel / LAN (not just from the box itself).
185    let hls_base = client_facing_base(&state.cfg.mediamtx_hls_base, request_host);
186    let webrtc_base = client_facing_base(&state.cfg.mediamtx_webrtc_base, request_host);
187    let rtsp_base = client_facing_base(&state.cfg.mediamtx_rtsp_base, request_host);
188    let hls = hls_base.trim_end_matches('/');
189    let webrtc = webrtc_base.trim_end_matches('/');
190    let rtsp = rtsp_base.trim_end_matches('/');
191    Ok(LiveUrls {
192        hls_url: format!("{hls}/{name}/index.m3u8"),
193        webrtc_url: format!("{webrtc}/{name}"),
194        rtsp_url: format!("{rtsp}/{name}"),
195        name,
196    })
197}
198
199/// Program MediaMTX's WebRTC ICE servers (STUN/TURN) so it gathers reachable candidates for remote
200/// viewing — needed for symmetric-NAT traversal. `ice` is a MediaMTX `webrtcICEServers2` array
201/// (`[{"url":..,"username"?:..,"password"?:..}]`). Patches the RUNNING MediaMTX over its API (no restart).
202pub async fn set_webrtc_ice_servers(state: &AppState, ice: &serde_json::Value) -> AppResult<()> {
203    let api = state.cfg.mediamtx_api_url.trim_end_matches('/');
204    let resp = state
205        .http
206        .patch(format!("{api}/v3/config/global/patch"))
207        .json(&json!({ "webrtcICEServers2": ice }))
208        .send()
209        .await
210        .map_err(|e| AppError::Other(anyhow::anyhow!("MediaMTX unreachable at {api}: {e}")))?;
211    if !resp.status().is_success() {
212        let code = resp.status();
213        let txt = resp.text().await.unwrap_or_default();
214        return Err(AppError::Other(anyhow::anyhow!(
215            "MediaMTX set-ice failed ({code}): {txt}"
216        )));
217    }
218    Ok(())
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn loopback_base_is_rewritten_to_the_request_host() {
227        // tunnel client: dashboard reached at 10.200.0.1:8000 -> stream at 10.200.0.1:8888
228        assert_eq!(
229            client_facing_base("http://127.0.0.1:8888", Some("10.200.0.1:8000")),
230            "http://10.200.0.1:8888"
231        );
232        // LAN client, localhost base, hostname Host, path preserved
233        assert_eq!(
234            client_facing_base("http://localhost:8889/", Some("192.168.1.50:8000")),
235            "http://192.168.1.50:8889/"
236        );
237        // rtsp scheme + 0.0.0.0 also rewritten
238        assert_eq!(
239            client_facing_base("rtsp://0.0.0.0:8554", Some("box.local")),
240            "rtsp://box.local:8554"
241        );
242    }
243
244    #[test]
245    fn non_loopback_base_and_missing_host_are_left_untouched() {
246        // operator set a real external base -> respected
247        assert_eq!(
248            client_facing_base("https://cdn.example.com:8888", Some("10.200.0.1:8000")),
249            "https://cdn.example.com:8888"
250        );
251        // no Host header -> unchanged
252        assert_eq!(
253            client_facing_base("http://127.0.0.1:8888", None),
254            "http://127.0.0.1:8888"
255        );
256    }
257
258    #[test]
259    fn ipv6_request_host_is_bracketed() {
260        assert_eq!(
261            client_facing_base("http://127.0.0.1:8888", Some("[fd00::1]:8000")),
262            "http://[fd00::1]:8888"
263        );
264    }
265
266    #[test]
267    fn codec_args_select_by_engine() {
268        assert_eq!(
269            select_codec_args("software", "/dev/dri/renderD128"),
270            SOFTWARE_CODEC_ARGS
271        );
272        let vaapi = select_codec_args("vaapi", "/dev/dri/renderD129");
273        assert!(vaapi.contains("h264_vaapi"));
274        assert!(vaapi.contains("/dev/dri/renderD129"));
275        assert!(select_codec_args("nvenc", "/dev/dri/renderD128").contains("h264_nvenc"));
276        // Unknown engine falls back to software (libx264).
277        assert_eq!(
278            select_codec_args("bogus", "/dev/dri/renderD128"),
279            SOFTWARE_CODEC_ARGS
280        );
281    }
282}