heldar_kernel/services/
mediamtx.rs1use serde::Serialize;
5use serde_json::json;
6
7use crate::camera_url;
8use crate::config::Config;
9use crate::error::{AppError, AppResult};
10use crate::models::Camera;
11use crate::state::AppState;
12
13const SOFTWARE_CODEC_ARGS: &str =
15 "-c:v libx264 -preset ultrafast -tune zerolatency -profile:v baseline -pix_fmt yuv420p -g 30";
16
17pub fn transcode_codec_args(cfg: &Config) -> String {
21 select_codec_args(&cfg.live_transcode_engine, &cfg.vaapi_device)
22}
23
24fn select_codec_args(engine: &str, vaapi_device: &str) -> String {
25 match engine {
26 "software" => SOFTWARE_CODEC_ARGS.to_string(),
27 "vaapi" => {
29 format!("-vaapi_device {vaapi_device} -vf format=nv12,hwupload -c:v h264_vaapi -g 30")
30 }
31 "nvenc" => "-c:v h264_nvenc -preset p1 -tune ll -profile:v baseline -pix_fmt yuv420p -g 30"
33 .to_string(),
34 other => {
35 tracing::warn!(
36 engine = %other,
37 "unknown HELDAR_LIVE_TRANSCODE_ENGINE; falling back to software (libx264)"
38 );
39 SOFTWARE_CODEC_ARGS.to_string()
40 }
41 }
42}
43
44#[derive(Debug, Serialize)]
45pub struct LiveUrls {
46 pub name: String,
47 pub hls_url: String,
48 pub webrtc_url: String,
49 pub rtsp_url: String,
50}
51
52fn client_facing_base(base: &str, request_host: Option<&str>) -> String {
58 let Some(host) = request_host.and_then(host_only) else {
59 return base.to_string();
60 };
61 let Some((scheme, rest)) = base.split_once("://") else {
62 return base.to_string();
63 };
64 let (authority, tail) = match rest.find('/') {
65 Some(i) => (&rest[..i], &rest[i..]),
66 None => (rest, ""),
67 };
68 let (cur_host, port) = split_host_port(authority);
69 if !is_loopback_host(cur_host) {
70 return base.to_string();
71 }
72 let h = if host.contains(':') {
73 format!("[{host}]")
74 } else {
75 host
76 };
77 match port {
78 Some(p) => format!("{scheme}://{h}:{p}{tail}"),
79 None => format!("{scheme}://{h}{tail}"),
80 }
81}
82
83fn is_loopback_host(h: &str) -> bool {
84 matches!(h, "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]")
85}
86
87fn host_only(host_header: &str) -> Option<String> {
89 let h = host_header.trim();
90 if h.is_empty() {
91 return None;
92 }
93 if let Some(rest) = h.strip_prefix('[') {
94 return rest.split(']').next().map(str::to_string); }
96 Some(h.rsplit_once(':').map_or(h, |(host, _)| host).to_string())
97}
98
99fn split_host_port(authority: &str) -> (&str, Option<&str>) {
101 if let Some(rest) = authority.strip_prefix('[') {
102 if let Some(close) = rest.find(']') {
103 return (&rest[..close], rest[close + 1..].strip_prefix(':'));
104 }
105 }
106 authority
107 .rsplit_once(':')
108 .map_or((authority, None), |(h, p)| (h, Some(p)))
109}
110
111pub async fn ensure_live(
114 state: &AppState,
115 camera_id: &str,
116 request_host: Option<&str>,
117) -> AppResult<LiveUrls> {
118 let cam: Option<Camera> = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
119 .bind(camera_id)
120 .fetch_optional(&state.pool)
121 .await?;
122 let cam = cam.ok_or_else(|| AppError::NotFound(format!("camera {camera_id} not found")))?;
123
124 let source = camera_url::stream_url(&cam, "sub")
125 .or_else(|| camera_url::record_url(&cam))
126 .ok_or_else(|| AppError::BadRequest("camera has no stream URL".into()))?;
127
128 let name = format!("cam_{camera_id}");
129 let api = state.cfg.mediamtx_api_url.trim_end_matches('/');
130
131 let existing = state
132 .http
133 .get(format!("{api}/v3/config/paths/get/{name}"))
134 .send()
135 .await;
136 let already = matches!(existing, Ok(ref r) if r.status().is_success());
137
138 if !already {
139 let codec_args = transcode_codec_args(&state.cfg);
146 let audio_args = if cam.record_audio {
150 "-c:a aac -b:a 96k"
151 } else {
152 "-an"
153 };
154 let run_on_demand = format!(
155 "ffmpeg -nostdin -rtsp_transport tcp -timeout 10000000 -i {source} {audio_args} \
156{codec_args} \
157-f rtsp rtsp://localhost:$RTSP_PORT/$MTX_PATH"
158 );
159 let body = json!({
160 "runOnDemand": run_on_demand,
161 "runOnDemandRestart": true,
162 "runOnDemandStartTimeout": "30s",
165 "runOnDemandCloseAfter": "10s",
166 });
167 let resp = state
168 .http
169 .post(format!("{api}/v3/config/paths/add/{name}"))
170 .json(&body)
171 .send()
172 .await
173 .map_err(|e| AppError::Other(anyhow::anyhow!("MediaMTX unreachable at {api}: {e}")))?;
174 let code = resp.status();
175 if !code.is_success() && code.as_u16() != 400 {
176 let txt = resp.text().await.unwrap_or_default();
177 return Err(AppError::Other(anyhow::anyhow!(
178 "MediaMTX add-path failed ({code}): {txt}"
179 )));
180 }
181 }
182
183 let hls_base = client_facing_base(&state.cfg.mediamtx_hls_base, request_host);
186 let webrtc_base = client_facing_base(&state.cfg.mediamtx_webrtc_base, request_host);
187 let rtsp_base = client_facing_base(&state.cfg.mediamtx_rtsp_base, request_host);
188 let hls = hls_base.trim_end_matches('/');
189 let webrtc = webrtc_base.trim_end_matches('/');
190 let rtsp = rtsp_base.trim_end_matches('/');
191 Ok(LiveUrls {
192 hls_url: format!("{hls}/{name}/index.m3u8"),
193 webrtc_url: format!("{webrtc}/{name}"),
194 rtsp_url: format!("{rtsp}/{name}"),
195 name,
196 })
197}
198
199pub async fn set_webrtc_ice_servers(state: &AppState, ice: &serde_json::Value) -> AppResult<()> {
203 let api = state.cfg.mediamtx_api_url.trim_end_matches('/');
204 let resp = state
205 .http
206 .patch(format!("{api}/v3/config/global/patch"))
207 .json(&json!({ "webrtcICEServers2": ice }))
208 .send()
209 .await
210 .map_err(|e| AppError::Other(anyhow::anyhow!("MediaMTX unreachable at {api}: {e}")))?;
211 if !resp.status().is_success() {
212 let code = resp.status();
213 let txt = resp.text().await.unwrap_or_default();
214 return Err(AppError::Other(anyhow::anyhow!(
215 "MediaMTX set-ice failed ({code}): {txt}"
216 )));
217 }
218 Ok(())
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn loopback_base_is_rewritten_to_the_request_host() {
227 assert_eq!(
229 client_facing_base("http://127.0.0.1:8888", Some("10.200.0.1:8000")),
230 "http://10.200.0.1:8888"
231 );
232 assert_eq!(
234 client_facing_base("http://localhost:8889/", Some("192.168.1.50:8000")),
235 "http://192.168.1.50:8889/"
236 );
237 assert_eq!(
239 client_facing_base("rtsp://0.0.0.0:8554", Some("box.local")),
240 "rtsp://box.local:8554"
241 );
242 }
243
244 #[test]
245 fn non_loopback_base_and_missing_host_are_left_untouched() {
246 assert_eq!(
248 client_facing_base("https://cdn.example.com:8888", Some("10.200.0.1:8000")),
249 "https://cdn.example.com:8888"
250 );
251 assert_eq!(
253 client_facing_base("http://127.0.0.1:8888", None),
254 "http://127.0.0.1:8888"
255 );
256 }
257
258 #[test]
259 fn ipv6_request_host_is_bracketed() {
260 assert_eq!(
261 client_facing_base("http://127.0.0.1:8888", Some("[fd00::1]:8000")),
262 "http://[fd00::1]:8888"
263 );
264 }
265
266 #[test]
267 fn codec_args_select_by_engine() {
268 assert_eq!(
269 select_codec_args("software", "/dev/dri/renderD128"),
270 SOFTWARE_CODEC_ARGS
271 );
272 let vaapi = select_codec_args("vaapi", "/dev/dri/renderD129");
273 assert!(vaapi.contains("h264_vaapi"));
274 assert!(vaapi.contains("/dev/dri/renderD129"));
275 assert!(select_codec_args("nvenc", "/dev/dri/renderD128").contains("h264_nvenc"));
276 assert_eq!(
278 select_codec_args("bogus", "/dev/dri/renderD128"),
279 SOFTWARE_CODEC_ARGS
280 );
281 }
282}