use std::collections::HashMap;
use std::time::Duration;
use anyhow::Context;
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine as _;
use reqwest::header::{ACCEPT, CONTENT_TYPE};
use reqwest::StatusCode;
use serde::Deserialize;
use serde_json::json;
use crate::config::Config;
use crate::services::mediamtx;
use crate::state::AppState;
fn poll_url(rendezvous_url: &str) -> String {
format!(
"{}/api/v1/rendezvous/poll",
rendezvous_url.trim_end_matches('/')
)
}
fn answer_url(rendezvous_url: &str) -> String {
format!(
"{}/api/v1/rendezvous/answer",
rendezvous_url.trim_end_matches('/')
)
}
#[derive(Debug, Deserialize)]
struct PendingSession {
session_id: String,
camera_id: String,
sdp_offer: String,
}
fn build_client(cfg: &Config) -> anyhow::Result<reqwest::Client> {
let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(40));
if let Some(t) = &cfg.cp_tls {
let cert = std::fs::read(&t.client_cert)
.with_context(|| format!("reading client cert {}", t.client_cert.display()))?;
let key = std::fs::read(&t.client_key)
.with_context(|| format!("reading client key {}", t.client_key.display()))?;
let ca = std::fs::read(&t.server_ca)
.with_context(|| format!("reading control-plane CA {}", t.server_ca.display()))?;
let mut identity_pem = key;
identity_pem.extend_from_slice(&cert);
let identity =
reqwest::Identity::from_pem(&identity_pem).context("building client identity")?;
let root = reqwest::Certificate::from_pem(&ca).context("parsing control-plane CA")?;
builder = builder.identity(identity).add_root_certificate(root);
}
builder.build().context("building HTTP client")
}
async fn bridge_to_local_whep(
state: &AppState,
camera_id: &str,
sdp_offer: &str,
) -> anyhow::Result<String> {
let live = mediamtx::ensure_live(state, camera_id, None)
.await
.map_err(|e| anyhow::anyhow!("ensure_live({camera_id}) failed: {e}"))?;
let whep = format!("{}/whep", live.webrtc_url);
let answer = state
.http
.post(&whep)
.timeout(Duration::from_secs(25))
.header(CONTENT_TYPE, "application/sdp")
.header(ACCEPT, "application/sdp")
.body(sdp_offer.to_owned())
.send()
.await
.context("posting offer to local WHEP")?
.error_for_status()
.context("local WHEP rejected the offer")?
.text()
.await
.context("reading WHEP answer")?;
Ok(answer)
}
const MAX_SDP_BYTES: usize = 512 * 1024;
async fn camera_catalog(state: &AppState) -> Vec<serde_json::Value> {
sqlx::query_as::<_, (String, Option<String>)>("SELECT id, name FROM cameras ORDER BY id ASC")
.fetch_all(&state.pool)
.await
.unwrap_or_default()
.into_iter()
.map(|(id, name)| {
let name = name.filter(|n| !n.is_empty()).unwrap_or_else(|| id.clone());
json!({ "id": id, "name": name })
})
.collect()
}
async fn poll_once(
state: &AppState,
client: &reqwest::Client,
rendezvous_url: &str,
site_id: &str,
token: &str,
) -> anyhow::Result<bool> {
let resp = client
.post(poll_url(rendezvous_url))
.bearer_auth(token)
.json(&json!({ "site_id": site_id, "cameras": camera_catalog(state).await }))
.send()
.await
.context("rendezvous poll request")?;
if resp.status() == StatusCode::NO_CONTENT {
return Ok(false); }
let session: PendingSession = resp
.error_for_status()
.context("rendezvous poll rejected")?
.json()
.await
.context("decoding pending session")?;
let result = if session.sdp_offer.len() > MAX_SDP_BYTES {
Err(anyhow::anyhow!(
"offer too large ({} bytes)",
session.sdp_offer.len()
))
} else {
bridge_to_local_whep(state, &session.camera_id, &session.sdp_offer).await
};
let body = match &result {
Ok(sdp) => {
json!({ "site_id": site_id, "session_id": session.session_id, "sdp_answer": sdp })
}
Err(e) => {
json!({ "site_id": site_id, "session_id": session.session_id, "error": e.to_string() })
}
};
if let Err(e) = &result {
tracing::warn!(session = %session.session_id, camera = %session.camera_id, error = %e, "rendezvous: bridge to local WHEP failed");
}
client
.post(answer_url(rendezvous_url))
.bearer_auth(token)
.json(&body)
.send()
.await
.context("posting answer to rendezvous")?
.error_for_status()
.context("rendezvous rejected the answer")?;
Ok(result.is_err())
}
pub async fn run(state: AppState) {
let cfg = state.cfg.clone();
let (Some(rendezvous_url), Some(site_id)) =
(cfg.rendezvous_url.as_deref(), cfg.site_id.as_deref())
else {
std::future::pending::<()>().await;
return;
};
let client = match build_client(&cfg) {
Ok(c) => c,
Err(e) => {
tracing::error!(error = %e, "webrtc rendezvous disabled: bad mTLS config");
std::future::pending::<()>().await;
return;
}
};
if cfg.cp_token.is_empty() {
tracing::warn!(
"webrtc rendezvous: HELDAR_CP_TOKEN is empty; the rendezvous will reject polls if it enforces a bearer (BOX_TOKEN)"
);
}
tracing::info!(site = %site_id, rendezvous = %rendezvous_url, "webrtc rendezvous: dialing out for remote viewing");
let mut backoff = Duration::from_secs(1);
loop {
match poll_once(&state, &client, rendezvous_url, site_id, &cfg.cp_token).await {
Ok(false) => backoff = Duration::from_secs(1),
Ok(true) => tokio::time::sleep(Duration::from_secs(2)).await,
Err(e) => {
tracing::warn!(site = %site_id, error = %e, "webrtc rendezvous poll failed; backing off");
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(30));
}
}
}
}
fn box_turn_url(rendezvous_url: &str) -> String {
format!("{}/api/v1/box/turn", rendezvous_url.trim_end_matches('/'))
}
async fn fetch_rendezvous_ice(
client: &reqwest::Client,
rendezvous_url: &str,
token: &str,
) -> anyhow::Result<serde_json::Value> {
let data: serde_json::Value = client
.get(box_turn_url(rendezvous_url))
.bearer_auth(token)
.send()
.await
.context("rendezvous box/turn request")?
.error_for_status()
.context("rendezvous box/turn rejected")?
.json()
.await
.context("decoding box/turn")?;
let ice = data
.get("iceServers")
.ok_or_else(|| anyhow::anyhow!("box/turn response missing iceServers"))?;
let user = ice.get("username").and_then(|v| v.as_str());
let cred = ice.get("credential").and_then(|v| v.as_str());
let urls = ice
.get("urls")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow::anyhow!("box/turn response missing iceServers.urls"))?;
let list: Vec<serde_json::Value> = urls
.iter()
.filter_map(|u| u.as_str())
.map(|u| {
if u.starts_with("stun:") {
json!({ "url": u })
} else {
json!({ "url": u, "username": user, "password": cred })
}
})
.collect();
Ok(serde_json::Value::Array(list))
}
async fn resolve_ice(cfg: &Config, client: &reqwest::Client) -> (serde_json::Value, Duration) {
if let Some(raw) = &cfg.webrtc_ice_servers {
match serde_json::from_str::<serde_json::Value>(raw) {
Ok(v) => return (v, Duration::from_secs(12 * 3600)),
Err(e) => {
tracing::error!(error = %e, "HELDAR_WEBRTC_ICE_SERVERS is not valid JSON; ignoring")
}
}
}
if let Some(url) = cfg.rendezvous_url.as_deref() {
match fetch_rendezvous_ice(client, url, &cfg.cp_token).await {
Ok(v) => return (v, Duration::from_secs(30 * 60)),
Err(e) => {
tracing::warn!(error = %e, "webrtc ICE: rendezvous TURN fetch failed; using STUN only")
}
}
}
(
json!([{ "url": "stun:stun.cloudflare.com:3478" }]),
Duration::from_secs(30 * 60),
)
}
pub async fn run_ice(state: AppState) {
let cfg = state.cfg.clone();
if cfg.webrtc_ice_servers.is_none() && cfg.rendezvous_url.is_none() {
std::future::pending::<()>().await;
return;
}
let client = match build_client(&cfg) {
Ok(c) => c,
Err(e) => {
tracing::error!(error = %e, "webrtc ICE disabled: bad mTLS config");
std::future::pending::<()>().await;
return;
}
};
loop {
let (ice, cadence) = resolve_ice(&cfg, &client).await;
match mediamtx::set_webrtc_ice_servers(&state, &ice).await {
Ok(()) => tracing::info!("webrtc ICE: programmed MediaMTX ICE servers"),
Err(e) => tracing::warn!(error = %e, "webrtc ICE: failed to program MediaMTX"),
}
tokio::time::sleep(cadence).await;
}
}
fn relay_poll_url(u: &str) -> String {
format!("{}/api/v1/relay/poll", u.trim_end_matches('/'))
}
fn relay_respond_url(u: &str) -> String {
format!("{}/api/v1/relay/respond", u.trim_end_matches('/'))
}
#[derive(Debug, Deserialize)]
struct RelayJob {
job_id: String,
method: String,
path: String,
#[serde(default)]
headers: HashMap<String, String>,
#[serde(default)]
body_b64: Option<String>,
}
const RELAY_POLLERS: usize = 4;
const MAX_RELAY_BODY: usize = 8 * 1024 * 1024;
fn relay_allowed(method: &str, path: &str) -> bool {
if !path.starts_with('/') || path.contains("..") || path.contains("//") || path.contains('@') {
return false;
}
const DENY: &[&str] = &["/api/v1/relay", "/api/v1/rendezvous", "/metrics"];
if DENY
.iter()
.any(|d| path == *d || path.starts_with(&format!("{d}/")))
{
return false;
}
if !(path.starts_with("/api/v1/") || path.starts_with("/media/")) {
return false;
}
matches!(method, "GET" | "HEAD" | "POST" | "PUT" | "PATCH" | "DELETE")
}
fn forward_request_header(name: &str) -> bool {
matches!(
name.to_ascii_lowercase().as_str(),
"authorization"
| "accept"
| "content-type"
| "range"
| "if-none-match"
| "if-modified-since"
)
}
fn forward_response_header(name: &str) -> bool {
matches!(
name.to_ascii_lowercase().as_str(),
"content-type"
| "content-length"
| "content-range"
| "accept-ranges"
| "cache-control"
| "etag"
| "last-modified"
)
}
async fn replay_relay_job(
state: &AppState,
job: &RelayJob,
) -> (u16, HashMap<String, String>, String) {
let base = format!("http://127.0.0.1:{}", state.cfg.api_port);
let parsed = match reqwest::Url::parse(&base).and_then(|b| b.join(&job.path)) {
Ok(u) => u,
Err(_) => {
return (
400,
HashMap::new(),
B64.encode(br#"{"error":"bad relay path"}"#),
);
}
};
let same_origin = parsed.scheme() == "http"
&& parsed.host_str() == Some("127.0.0.1")
&& parsed.port() == Some(state.cfg.api_port);
if !same_origin || !relay_allowed(&job.method, parsed.path()) {
return (
403,
HashMap::new(),
B64.encode(br#"{"error":"relay path not allowed"}"#),
);
}
let method = reqwest::Method::from_bytes(job.method.as_bytes()).unwrap_or(reqwest::Method::GET);
let mut req = state
.http
.request(method, parsed)
.timeout(Duration::from_secs(20));
for (k, v) in &job.headers {
if forward_request_header(k) {
req = req.header(k, v);
}
}
if let Some(b) = &job.body_b64 {
if let Ok(bytes) = B64.decode(b) {
if bytes.len() <= MAX_RELAY_BODY {
req = req.body(bytes);
}
}
}
match req.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let mut headers = HashMap::new();
for (k, v) in resp.headers() {
if forward_response_header(k.as_str()) {
if let Ok(vs) = v.to_str() {
headers.insert(k.as_str().to_string(), vs.to_string());
}
}
}
if resp
.content_length()
.is_some_and(|len| len > MAX_RELAY_BODY as u64)
{
return (
413,
HashMap::new(),
B64.encode(br#"{"error":"relay response too large; use range requests"}"#),
);
}
let body = resp.bytes().await.unwrap_or_default();
let slice = if body.len() > MAX_RELAY_BODY {
&body[..MAX_RELAY_BODY]
} else {
&body[..]
};
(status, headers, B64.encode(slice))
}
Err(e) => (
502,
HashMap::new(),
B64.encode(format!(r#"{{"error":"relay upstream: {e}"}}"#).as_bytes()),
),
}
}
async fn relay_poll_once(
state: &AppState,
client: &reqwest::Client,
rendezvous_url: &str,
site_id: &str,
token: &str,
) -> anyhow::Result<()> {
let resp = client
.post(relay_poll_url(rendezvous_url))
.bearer_auth(token)
.json(&json!({ "site_id": site_id, "auth_enforced": true }))
.send()
.await
.context("relay poll request")?;
if resp.status() == StatusCode::NO_CONTENT {
return Ok(());
}
let job: RelayJob = resp
.error_for_status()
.context("relay poll rejected")?
.json()
.await
.context("decoding relay job")?;
let (status, headers, body_b64) = replay_relay_job(state, &job).await;
client
.post(relay_respond_url(rendezvous_url))
.bearer_auth(token)
.json(&json!({
"site_id": site_id,
"job_id": job.job_id,
"status": status,
"headers": headers,
"body_b64": body_b64,
}))
.send()
.await
.context("posting relay response")?
.error_for_status()
.context("rendezvous rejected relay response")?;
Ok(())
}
pub async fn run_relay(state: AppState) {
let cfg = state.cfg.clone();
let (Some(rendezvous_url), Some(site_id)) = (cfg.rendezvous_url.clone(), cfg.site_id.clone())
else {
std::future::pending::<()>().await;
return;
};
if !cfg.auth_enabled {
tracing::warn!(
"webrtc relay disabled: kernel auth is OFF (HELDAR_AUTH_ENABLED=false). The remote REST \
relay refuses to run until auth is enabled, so the open API is never exposed remotely."
);
std::future::pending::<()>().await;
return;
}
let users: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE active = 1")
.fetch_one(&state.pool)
.await
.unwrap_or(0);
if users == 0 {
tracing::warn!("webrtc relay disabled: kernel auth is on but no active users exist yet");
std::future::pending::<()>().await;
return;
}
let client = match build_client(&cfg) {
Ok(c) => c,
Err(e) => {
tracing::error!(error = %e, "webrtc relay disabled: bad mTLS config");
std::future::pending::<()>().await;
return;
}
};
tracing::info!(site = %site_id, "webrtc relay: dialing out for the authenticated remote dashboard (read-only)");
let mut tasks = Vec::new();
for _ in 0..RELAY_POLLERS {
let state = state.clone();
let client = client.clone();
let rendezvous_url = rendezvous_url.clone();
let site_id = site_id.clone();
let token = cfg.cp_token.clone();
tasks.push(tokio::spawn(async move {
let mut backoff = Duration::from_secs(1);
loop {
match relay_poll_once(&state, &client, &rendezvous_url, &site_id, &token).await {
Ok(()) => backoff = Duration::from_secs(1),
Err(e) => {
tracing::warn!(error = %e, "webrtc relay poll failed; backing off");
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(30));
}
}
}
}));
}
for t in tasks {
let _ = t.await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn relay_allowlist_pins_surface_and_blocks_internal_and_traversal() {
assert!(relay_allowed("GET", "/api/v1/cameras"));
assert!(relay_allowed("POST", "/api/v1/cameras"));
assert!(relay_allowed("PATCH", "/api/v1/cameras/cam2"));
assert!(relay_allowed("DELETE", "/api/v1/cameras/cam2"));
assert!(relay_allowed("GET", "/media/recordings/x.mp4"));
assert!(relay_allowed("POST", "/api/v1/auth/login"));
assert!(!relay_allowed("GET", "/healthz"));
assert!(!relay_allowed("GET", "/api/v1/relay/poll"));
assert!(!relay_allowed("GET", "/api/v1/rendezvous/poll"));
assert!(!relay_allowed("GET", "/metrics"));
assert!(!relay_allowed("GET", "/api/v1/../secrets"));
assert!(!relay_allowed("GET", "/api/v1//cameras"));
assert!(!relay_allowed("TRACE", "/api/v1/cameras"));
}
#[test]
fn relay_allowlist_runs_on_canonical_path_not_raw() {
let canon = |p: &str| {
reqwest::Url::parse("http://127.0.0.1:8088")
.unwrap()
.join(p)
.unwrap()
.path()
.to_string()
};
assert!(!"/api/v1/%2e%2e/%2e%2e/metrics".contains(".."));
assert_eq!(canon("/api/v1/%2e%2e/%2e%2e/metrics"), "/metrics");
assert!(!relay_allowed(
"GET",
&canon("/api/v1/%2e%2e/%2e%2e/metrics")
));
assert!(!relay_allowed(
"GET",
&canon("/api/v1/cameras/%2e%2e/relay/poll")
));
assert!(!relay_allowed(
"POST",
&canon("/api/v1/cameras/%2e%2e/%2e%2e/healthz")
));
assert!(relay_allowed("GET", &canon("/api/v1/cameras")));
assert!(relay_allowed(
"GET",
&canon("/media/recordings/cam2/seg.mp4")
));
}
#[test]
fn endpoints_append_paths_and_trim_trailing_slash() {
assert_eq!(
poll_url("https://rv.example.com"),
"https://rv.example.com/api/v1/rendezvous/poll"
);
assert_eq!(
answer_url("https://rv.example.com/"),
"https://rv.example.com/api/v1/rendezvous/answer"
);
}
}