1use std::collections::HashMap;
17use std::time::Duration;
18
19use anyhow::Context;
20use base64::engine::general_purpose::STANDARD as B64;
21use base64::Engine as _;
22use reqwest::header::{ACCEPT, CONTENT_TYPE};
23use reqwest::StatusCode;
24use serde::Deserialize;
25use serde_json::json;
26
27use crate::config::Config;
28use crate::services::mediamtx;
29use crate::state::AppState;
30
31fn poll_url(rendezvous_url: &str) -> String {
33 format!(
34 "{}/api/v1/rendezvous/poll",
35 rendezvous_url.trim_end_matches('/')
36 )
37}
38
39fn answer_url(rendezvous_url: &str) -> String {
41 format!(
42 "{}/api/v1/rendezvous/answer",
43 rendezvous_url.trim_end_matches('/')
44 )
45}
46
47#[derive(Debug, Deserialize)]
49struct PendingSession {
50 session_id: String,
51 camera_id: String,
52 sdp_offer: String,
53}
54
55fn build_client(cfg: &Config) -> anyhow::Result<reqwest::Client> {
58 let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(40));
61 if let Some(t) = &cfg.cp_tls {
62 let cert = std::fs::read(&t.client_cert)
63 .with_context(|| format!("reading client cert {}", t.client_cert.display()))?;
64 let key = std::fs::read(&t.client_key)
65 .with_context(|| format!("reading client key {}", t.client_key.display()))?;
66 let ca = std::fs::read(&t.server_ca)
67 .with_context(|| format!("reading control-plane CA {}", t.server_ca.display()))?;
68 let mut identity_pem = key;
69 identity_pem.extend_from_slice(&cert);
70 let identity =
71 reqwest::Identity::from_pem(&identity_pem).context("building client identity")?;
72 let root = reqwest::Certificate::from_pem(&ca).context("parsing control-plane CA")?;
73 builder = builder.identity(identity).add_root_certificate(root);
74 }
75 builder.build().context("building HTTP client")
76}
77
78async fn bridge_to_local_whep(
86 state: &AppState,
87 camera_id: &str,
88 sdp_offer: &str,
89) -> anyhow::Result<String> {
90 let live = mediamtx::ensure_live(state, camera_id, None)
91 .await
92 .map_err(|e| anyhow::anyhow!("ensure_live({camera_id}) failed: {e}"))?;
93 let whep = format!("{}/whep", live.webrtc_url);
94 let answer = state
95 .http
96 .post(&whep)
97 .timeout(Duration::from_secs(25))
100 .header(CONTENT_TYPE, "application/sdp")
101 .header(ACCEPT, "application/sdp")
102 .body(sdp_offer.to_owned())
103 .send()
104 .await
105 .context("posting offer to local WHEP")?
106 .error_for_status()
107 .context("local WHEP rejected the offer")?
108 .text()
109 .await
110 .context("reading WHEP answer")?;
111 Ok(answer)
112}
113
114const MAX_SDP_BYTES: usize = 512 * 1024;
116
117async fn camera_catalog(state: &AppState) -> Vec<serde_json::Value> {
122 sqlx::query_as::<_, (String, Option<String>)>("SELECT id, name FROM cameras ORDER BY id ASC")
123 .fetch_all(&state.pool)
124 .await
125 .unwrap_or_default()
126 .into_iter()
127 .map(|(id, name)| {
128 let name = name.filter(|n| !n.is_empty()).unwrap_or_else(|| id.clone());
129 json!({ "id": id, "name": name })
130 })
131 .collect()
132}
133
134async fn poll_once(
139 state: &AppState,
140 client: &reqwest::Client,
141 rendezvous_url: &str,
142 site_id: &str,
143 token: &str,
144) -> anyhow::Result<bool> {
145 let resp = client
146 .post(poll_url(rendezvous_url))
147 .bearer_auth(token)
148 .json(&json!({ "site_id": site_id, "cameras": camera_catalog(state).await }))
150 .send()
151 .await
152 .context("rendezvous poll request")?;
153 if resp.status() == StatusCode::NO_CONTENT {
154 return Ok(false); }
156 let session: PendingSession = resp
157 .error_for_status()
158 .context("rendezvous poll rejected")?
159 .json()
160 .await
161 .context("decoding pending session")?;
162
163 let result = if session.sdp_offer.len() > MAX_SDP_BYTES {
164 Err(anyhow::anyhow!(
165 "offer too large ({} bytes)",
166 session.sdp_offer.len()
167 ))
168 } else {
169 bridge_to_local_whep(state, &session.camera_id, &session.sdp_offer).await
170 };
171 let body = match &result {
174 Ok(sdp) => {
175 json!({ "site_id": site_id, "session_id": session.session_id, "sdp_answer": sdp })
176 }
177 Err(e) => {
178 json!({ "site_id": site_id, "session_id": session.session_id, "error": e.to_string() })
179 }
180 };
181 if let Err(e) = &result {
182 tracing::warn!(session = %session.session_id, camera = %session.camera_id, error = %e, "rendezvous: bridge to local WHEP failed");
183 }
184 client
185 .post(answer_url(rendezvous_url))
186 .bearer_auth(token)
187 .json(&body)
188 .send()
189 .await
190 .context("posting answer to rendezvous")?
191 .error_for_status()
192 .context("rendezvous rejected the answer")?;
193 Ok(result.is_err())
194}
195
196pub async fn run(state: AppState) {
200 let cfg = state.cfg.clone();
201 let (Some(rendezvous_url), Some(site_id)) =
202 (cfg.rendezvous_url.as_deref(), cfg.site_id.as_deref())
203 else {
204 std::future::pending::<()>().await;
205 return;
206 };
207
208 let client = match build_client(&cfg) {
209 Ok(c) => c,
210 Err(e) => {
211 tracing::error!(error = %e, "webrtc rendezvous disabled: bad mTLS config");
212 std::future::pending::<()>().await;
213 return;
214 }
215 };
216
217 if cfg.cp_token.is_empty() {
218 tracing::warn!(
219 "webrtc rendezvous: HELDAR_CP_TOKEN is empty; the rendezvous will reject polls if it enforces a bearer (BOX_TOKEN)"
220 );
221 }
222 tracing::info!(site = %site_id, rendezvous = %rendezvous_url, "webrtc rendezvous: dialing out for remote viewing");
223 let mut backoff = Duration::from_secs(1);
224 loop {
225 match poll_once(&state, &client, rendezvous_url, site_id, &cfg.cp_token).await {
226 Ok(false) => backoff = Duration::from_secs(1),
227 Ok(true) => tokio::time::sleep(Duration::from_secs(2)).await,
230 Err(e) => {
231 tracing::warn!(site = %site_id, error = %e, "webrtc rendezvous poll failed; backing off");
232 tokio::time::sleep(backoff).await;
233 backoff = (backoff * 2).min(Duration::from_secs(30));
234 }
235 }
236 }
237}
238
239fn box_turn_url(rendezvous_url: &str) -> String {
241 format!("{}/api/v1/box/turn", rendezvous_url.trim_end_matches('/'))
242}
243
244async fn fetch_rendezvous_ice(
247 client: &reqwest::Client,
248 rendezvous_url: &str,
249 token: &str,
250) -> anyhow::Result<serde_json::Value> {
251 let data: serde_json::Value = client
252 .get(box_turn_url(rendezvous_url))
253 .bearer_auth(token)
254 .send()
255 .await
256 .context("rendezvous box/turn request")?
257 .error_for_status()
258 .context("rendezvous box/turn rejected")?
259 .json()
260 .await
261 .context("decoding box/turn")?;
262 let ice = data
263 .get("iceServers")
264 .ok_or_else(|| anyhow::anyhow!("box/turn response missing iceServers"))?;
265 let user = ice.get("username").and_then(|v| v.as_str());
266 let cred = ice.get("credential").and_then(|v| v.as_str());
267 let urls = ice
268 .get("urls")
269 .and_then(|v| v.as_array())
270 .ok_or_else(|| anyhow::anyhow!("box/turn response missing iceServers.urls"))?;
271 let list: Vec<serde_json::Value> = urls
272 .iter()
273 .filter_map(|u| u.as_str())
274 .map(|u| {
275 if u.starts_with("stun:") {
276 json!({ "url": u })
277 } else {
278 json!({ "url": u, "username": user, "password": cred })
279 }
280 })
281 .collect();
282 Ok(serde_json::Value::Array(list))
283}
284
285async fn resolve_ice(cfg: &Config, client: &reqwest::Client) -> (serde_json::Value, Duration) {
287 if let Some(raw) = &cfg.webrtc_ice_servers {
289 match serde_json::from_str::<serde_json::Value>(raw) {
290 Ok(v) => return (v, Duration::from_secs(12 * 3600)),
291 Err(e) => {
292 tracing::error!(error = %e, "HELDAR_WEBRTC_ICE_SERVERS is not valid JSON; ignoring")
293 }
294 }
295 }
296 if let Some(url) = cfg.rendezvous_url.as_deref() {
298 match fetch_rendezvous_ice(client, url, &cfg.cp_token).await {
299 Ok(v) => return (v, Duration::from_secs(30 * 60)),
300 Err(e) => {
301 tracing::warn!(error = %e, "webrtc ICE: rendezvous TURN fetch failed; using STUN only")
302 }
303 }
304 }
305 (
307 json!([{ "url": "stun:stun.cloudflare.com:3478" }]),
308 Duration::from_secs(30 * 60),
309 )
310}
311
312pub async fn run_ice(state: AppState) {
316 let cfg = state.cfg.clone();
317 if cfg.webrtc_ice_servers.is_none() && cfg.rendezvous_url.is_none() {
318 std::future::pending::<()>().await;
319 return;
320 }
321 let client = match build_client(&cfg) {
322 Ok(c) => c,
323 Err(e) => {
324 tracing::error!(error = %e, "webrtc ICE disabled: bad mTLS config");
325 std::future::pending::<()>().await;
326 return;
327 }
328 };
329 loop {
330 let (ice, cadence) = resolve_ice(&cfg, &client).await;
331 match mediamtx::set_webrtc_ice_servers(&state, &ice).await {
332 Ok(()) => tracing::info!("webrtc ICE: programmed MediaMTX ICE servers"),
333 Err(e) => tracing::warn!(error = %e, "webrtc ICE: failed to program MediaMTX"),
334 }
335 tokio::time::sleep(cadence).await;
336 }
337}
338
339fn relay_poll_url(u: &str) -> String {
351 format!("{}/api/v1/relay/poll", u.trim_end_matches('/'))
352}
353fn relay_respond_url(u: &str) -> String {
354 format!("{}/api/v1/relay/respond", u.trim_end_matches('/'))
355}
356
357#[derive(Debug, Deserialize)]
359struct RelayJob {
360 job_id: String,
361 method: String,
362 path: String,
363 #[serde(default)]
364 headers: HashMap<String, String>,
365 #[serde(default)]
366 body_b64: Option<String>,
367}
368
369const RELAY_POLLERS: usize = 4;
371const MAX_RELAY_BODY: usize = 8 * 1024 * 1024;
373
374fn relay_allowed(method: &str, path: &str) -> bool {
379 if !path.starts_with('/') || path.contains("..") || path.contains("//") || path.contains('@') {
380 return false;
381 }
382 const DENY: &[&str] = &["/api/v1/relay", "/api/v1/rendezvous", "/metrics"];
383 if DENY
384 .iter()
385 .any(|d| path == *d || path.starts_with(&format!("{d}/")))
386 {
387 return false;
388 }
389 if !(path.starts_with("/api/v1/") || path.starts_with("/media/")) {
390 return false;
391 }
392 matches!(method, "GET" | "HEAD" | "POST" | "PUT" | "PATCH" | "DELETE")
393}
394
395fn forward_request_header(name: &str) -> bool {
398 matches!(
399 name.to_ascii_lowercase().as_str(),
400 "authorization"
401 | "accept"
402 | "content-type"
403 | "range"
404 | "if-none-match"
405 | "if-modified-since"
406 )
407}
408fn forward_response_header(name: &str) -> bool {
411 matches!(
412 name.to_ascii_lowercase().as_str(),
413 "content-type"
414 | "content-length"
415 | "content-range"
416 | "accept-ranges"
417 | "cache-control"
418 | "etag"
419 | "last-modified"
420 )
421}
422
423async fn replay_relay_job(
425 state: &AppState,
426 job: &RelayJob,
427) -> (u16, HashMap<String, String>, String) {
428 let base = format!("http://127.0.0.1:{}", state.cfg.api_port);
436 let parsed = match reqwest::Url::parse(&base).and_then(|b| b.join(&job.path)) {
437 Ok(u) => u,
438 Err(_) => {
439 return (
440 400,
441 HashMap::new(),
442 B64.encode(br#"{"error":"bad relay path"}"#),
443 );
444 }
445 };
446 let same_origin = parsed.scheme() == "http"
447 && parsed.host_str() == Some("127.0.0.1")
448 && parsed.port() == Some(state.cfg.api_port);
449 if !same_origin || !relay_allowed(&job.method, parsed.path()) {
450 return (
451 403,
452 HashMap::new(),
453 B64.encode(br#"{"error":"relay path not allowed"}"#),
454 );
455 }
456 let method = reqwest::Method::from_bytes(job.method.as_bytes()).unwrap_or(reqwest::Method::GET);
457 let mut req = state
458 .http
459 .request(method, parsed)
460 .timeout(Duration::from_secs(20));
461 for (k, v) in &job.headers {
462 if forward_request_header(k) {
463 req = req.header(k, v);
464 }
465 }
466 if let Some(b) = &job.body_b64 {
467 if let Ok(bytes) = B64.decode(b) {
468 if bytes.len() <= MAX_RELAY_BODY {
469 req = req.body(bytes);
470 }
471 }
472 }
473 match req.send().await {
474 Ok(resp) => {
475 let status = resp.status().as_u16();
476 let mut headers = HashMap::new();
477 for (k, v) in resp.headers() {
478 if forward_response_header(k.as_str()) {
479 if let Ok(vs) = v.to_str() {
480 headers.insert(k.as_str().to_string(), vs.to_string());
481 }
482 }
483 }
484 if resp
489 .content_length()
490 .is_some_and(|len| len > MAX_RELAY_BODY as u64)
491 {
492 return (
493 413,
494 HashMap::new(),
495 B64.encode(br#"{"error":"relay response too large; use range requests"}"#),
496 );
497 }
498 let body = resp.bytes().await.unwrap_or_default();
499 let slice = if body.len() > MAX_RELAY_BODY {
500 &body[..MAX_RELAY_BODY]
501 } else {
502 &body[..]
503 };
504 (status, headers, B64.encode(slice))
505 }
506 Err(e) => (
507 502,
508 HashMap::new(),
509 B64.encode(format!(r#"{{"error":"relay upstream: {e}"}}"#).as_bytes()),
510 ),
511 }
512}
513
514async fn relay_poll_once(
517 state: &AppState,
518 client: &reqwest::Client,
519 rendezvous_url: &str,
520 site_id: &str,
521 token: &str,
522) -> anyhow::Result<()> {
523 let resp = client
524 .post(relay_poll_url(rendezvous_url))
525 .bearer_auth(token)
526 .json(&json!({ "site_id": site_id, "auth_enforced": true }))
527 .send()
528 .await
529 .context("relay poll request")?;
530 if resp.status() == StatusCode::NO_CONTENT {
531 return Ok(());
532 }
533 let job: RelayJob = resp
534 .error_for_status()
535 .context("relay poll rejected")?
536 .json()
537 .await
538 .context("decoding relay job")?;
539 let (status, headers, body_b64) = replay_relay_job(state, &job).await;
540 client
541 .post(relay_respond_url(rendezvous_url))
542 .bearer_auth(token)
543 .json(&json!({
544 "site_id": site_id,
545 "job_id": job.job_id,
546 "status": status,
547 "headers": headers,
548 "body_b64": body_b64,
549 }))
550 .send()
551 .await
552 .context("posting relay response")?
553 .error_for_status()
554 .context("rendezvous rejected relay response")?;
555 Ok(())
556}
557
558pub async fn run_relay(state: AppState) {
562 let cfg = state.cfg.clone();
563 let (Some(rendezvous_url), Some(site_id)) = (cfg.rendezvous_url.clone(), cfg.site_id.clone())
564 else {
565 std::future::pending::<()>().await;
566 return;
567 };
568 if !cfg.auth_enabled {
569 tracing::warn!(
570 "webrtc relay disabled: kernel auth is OFF (HELDAR_AUTH_ENABLED=false). The remote REST \
571 relay refuses to run until auth is enabled, so the open API is never exposed remotely."
572 );
573 std::future::pending::<()>().await;
574 return;
575 }
576 let users: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE active = 1")
577 .fetch_one(&state.pool)
578 .await
579 .unwrap_or(0);
580 if users == 0 {
581 tracing::warn!("webrtc relay disabled: kernel auth is on but no active users exist yet");
582 std::future::pending::<()>().await;
583 return;
584 }
585 let client = match build_client(&cfg) {
586 Ok(c) => c,
587 Err(e) => {
588 tracing::error!(error = %e, "webrtc relay disabled: bad mTLS config");
589 std::future::pending::<()>().await;
590 return;
591 }
592 };
593 tracing::info!(site = %site_id, "webrtc relay: dialing out for the authenticated remote dashboard (read-only)");
594 let mut tasks = Vec::new();
595 for _ in 0..RELAY_POLLERS {
596 let state = state.clone();
597 let client = client.clone();
598 let rendezvous_url = rendezvous_url.clone();
599 let site_id = site_id.clone();
600 let token = cfg.cp_token.clone();
601 tasks.push(tokio::spawn(async move {
602 let mut backoff = Duration::from_secs(1);
603 loop {
604 match relay_poll_once(&state, &client, &rendezvous_url, &site_id, &token).await {
605 Ok(()) => backoff = Duration::from_secs(1),
606 Err(e) => {
607 tracing::warn!(error = %e, "webrtc relay poll failed; backing off");
608 tokio::time::sleep(backoff).await;
609 backoff = (backoff * 2).min(Duration::from_secs(30));
610 }
611 }
612 }
613 }));
614 }
615 for t in tasks {
616 let _ = t.await;
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use super::*;
623
624 #[test]
625 fn relay_allowlist_pins_surface_and_blocks_internal_and_traversal() {
626 assert!(relay_allowed("GET", "/api/v1/cameras"));
628 assert!(relay_allowed("POST", "/api/v1/cameras"));
629 assert!(relay_allowed("PATCH", "/api/v1/cameras/cam2"));
630 assert!(relay_allowed("DELETE", "/api/v1/cameras/cam2"));
631 assert!(relay_allowed("GET", "/media/recordings/x.mp4"));
632 assert!(relay_allowed("POST", "/api/v1/auth/login"));
633 assert!(!relay_allowed("GET", "/healthz"));
635 assert!(!relay_allowed("GET", "/api/v1/relay/poll"));
636 assert!(!relay_allowed("GET", "/api/v1/rendezvous/poll"));
637 assert!(!relay_allowed("GET", "/metrics"));
638 assert!(!relay_allowed("GET", "/api/v1/../secrets"));
639 assert!(!relay_allowed("GET", "/api/v1//cameras"));
640 assert!(!relay_allowed("TRACE", "/api/v1/cameras"));
641 }
642
643 #[test]
648 fn relay_allowlist_runs_on_canonical_path_not_raw() {
649 let canon = |p: &str| {
650 reqwest::Url::parse("http://127.0.0.1:8088")
651 .unwrap()
652 .join(p)
653 .unwrap()
654 .path()
655 .to_string()
656 };
657 assert!(!"/api/v1/%2e%2e/%2e%2e/metrics".contains(".."));
659 assert_eq!(canon("/api/v1/%2e%2e/%2e%2e/metrics"), "/metrics");
661 assert!(!relay_allowed(
662 "GET",
663 &canon("/api/v1/%2e%2e/%2e%2e/metrics")
664 ));
665 assert!(!relay_allowed(
666 "GET",
667 &canon("/api/v1/cameras/%2e%2e/relay/poll")
668 ));
669 assert!(!relay_allowed(
670 "POST",
671 &canon("/api/v1/cameras/%2e%2e/%2e%2e/healthz")
672 ));
673 assert!(relay_allowed("GET", &canon("/api/v1/cameras")));
675 assert!(relay_allowed(
676 "GET",
677 &canon("/media/recordings/cam2/seg.mp4")
678 ));
679 }
680
681 #[test]
682 fn endpoints_append_paths_and_trim_trailing_slash() {
683 assert_eq!(
684 poll_url("https://rv.example.com"),
685 "https://rv.example.com/api/v1/rendezvous/poll"
686 );
687 assert_eq!(
688 answer_url("https://rv.example.com/"),
689 "https://rv.example.com/api/v1/rendezvous/answer"
690 );
691 }
692}