Skip to main content

heldar_kernel/services/
webrtc_rendezvous.rs

1//! Box-side WebRTC rendezvous client (ADR 0003, P2).
2//!
3//! For universal remote viewing the box must be reachable from any browser on any network, but it is
4//! typically behind CGNAT (no inbound). The fix — like the rest of the kernel's cloud seams — is to dial
5//! OUT: this loop maintains an outbound HTTP long-poll to a public rendezvous (the private `heldar`
6//! Cloudflare Worker + Durable Object — `apps/edge/`). When a browser asks to view a camera, the
7//! rendezvous hands the box the browser's WebRTC SDP offer; the box bridges it to its OWN local MediaMTX
8//! WHEP endpoint and returns the answer. Media then flows browser ⇄ TURN ⇄ MediaMTX (DTLS-SRTP) — never
9//! through the rendezvous, never re-encoded here. The box only shuttles two SDP blobs per session.
10//!
11//! Pure outbound HTTP, no new crates — the only seam is `HELDAR_REMOTE_RENDEZVOUS_URL`. Strictly opt-in:
12//! unset (the default) and this loop parks forever, the same posture as `fleet_register`. Reuses the
13//! `HELDAR_CP_TLS_*` mTLS identity when configured (not needed for the Cloudflare Worker — it uses the
14//! `HELDAR_CP_TOKEN` bearer).
15
16use std::collections::HashMap;
17use std::time::Duration;
18
19use anyhow::Context;
20use base64::engine::general_purpose::STANDARD as B64;
21use base64::Engine as _;
22use reqwest::header::{ACCEPT, CONTENT_TYPE};
23use reqwest::StatusCode;
24use serde::Deserialize;
25use serde_json::json;
26
27use crate::config::Config;
28use crate::services::mediamtx;
29use crate::state::AppState;
30
31/// Long-poll endpoint: the box asks for the next pending viewing session (doubles as a liveness beat).
32fn poll_url(rendezvous_url: &str) -> String {
33    format!(
34        "{}/api/v1/rendezvous/poll",
35        rendezvous_url.trim_end_matches('/')
36    )
37}
38
39/// Endpoint the box POSTs the WHEP answer (or a bridge error) back to, keyed by session id.
40fn answer_url(rendezvous_url: &str) -> String {
41    format!(
42        "{}/api/v1/rendezvous/answer",
43        rendezvous_url.trim_end_matches('/')
44    )
45}
46
47/// A pending browser viewing session the rendezvous handed us: the camera and its recvonly SDP offer.
48#[derive(Debug, Deserialize)]
49struct PendingSession {
50    session_id: String,
51    camera_id: String,
52    sdp_offer: String,
53}
54
55/// Build the outbound client, configuring mTLS (client identity + control-plane CA) when
56/// `HELDAR_CP_TLS_*` is set — same material the fleet registration uses. Errors only on bad cert files.
57fn build_client(cfg: &Config) -> anyhow::Result<reqwest::Client> {
58    // A generous timeout: the poll is a long-poll the rendezvous holds open until work arrives or it
59    // times out server-side.
60    let mut builder = reqwest::Client::builder().timeout(Duration::from_secs(40));
61    if let Some(t) = &cfg.cp_tls {
62        let cert = std::fs::read(&t.client_cert)
63            .with_context(|| format!("reading client cert {}", t.client_cert.display()))?;
64        let key = std::fs::read(&t.client_key)
65            .with_context(|| format!("reading client key {}", t.client_key.display()))?;
66        let ca = std::fs::read(&t.server_ca)
67            .with_context(|| format!("reading control-plane CA {}", t.server_ca.display()))?;
68        let mut identity_pem = key;
69        identity_pem.extend_from_slice(&cert);
70        let identity =
71            reqwest::Identity::from_pem(&identity_pem).context("building client identity")?;
72        let root = reqwest::Certificate::from_pem(&ca).context("parsing control-plane CA")?;
73        builder = builder.identity(identity).add_root_certificate(root);
74    }
75    builder.build().context("building HTTP client")
76}
77
78/// Bridge a browser SDP offer to the local MediaMTX WHEP endpoint and return the answer. Reuses
79/// `ensure_live` (which creates the `cam_<id>` path on demand) with `request_host = None`, so the
80/// returned `webrtc_url` keeps its loopback base — exactly the address the box POSTs to its own MediaMTX.
81///
82/// Authorization note: the rendezvous (the `heldar` Worker, `apps/edge/`) is the sole authority on WHO may
83/// view WHICH camera — it verifies a signed ticket before relaying a session here. The box only talks to
84/// the rendezvous it dialed OUT to, so it trusts the session it is handed; it does not re-check the ticket.
85async fn bridge_to_local_whep(
86    state: &AppState,
87    camera_id: &str,
88    sdp_offer: &str,
89) -> anyhow::Result<String> {
90    let live = mediamtx::ensure_live(state, camera_id, None)
91        .await
92        .map_err(|e| anyhow::anyhow!("ensure_live({camera_id}) failed: {e}"))?;
93    let whep = format!("{}/whep", live.webrtc_url);
94    let answer = state
95        .http
96        .post(&whep)
97        // MediaMTX answers a WHEP offer only once its on-demand HEVC→H.264 transcode has started, which
98        // can exceed `state.http`'s default 10s timeout — give the cold start room (still under the poll).
99        .timeout(Duration::from_secs(25))
100        .header(CONTENT_TYPE, "application/sdp")
101        .header(ACCEPT, "application/sdp")
102        .body(sdp_offer.to_owned())
103        .send()
104        .await
105        .context("posting offer to local WHEP")?
106        .error_for_status()
107        .context("local WHEP rejected the offer")?
108        .text()
109        .await
110        .context("reading WHEP answer")?;
111    Ok(answer)
112}
113
114/// Largest browser SDP offer we'll bridge (defensive — the rendezvous already caps it well below this).
115const MAX_SDP_BYTES: usize = 512 * 1024;
116
117/// The box's camera list (id + display name) advertised to the rendezvous on each poll, so the grid
118/// viewer can enumerate cameras without reaching the box's REST API (that is the Stage C relay). Read
119/// straight from local state (no self-HTTP, so it is unaffected by whether the REST API requires auth);
120/// names fall back to the id. Exposes only id+name — never a stream URL or credential.
121async fn camera_catalog(state: &AppState) -> Vec<serde_json::Value> {
122    sqlx::query_as::<_, (String, Option<String>)>("SELECT id, name FROM cameras ORDER BY id ASC")
123        .fetch_all(&state.pool)
124        .await
125        .unwrap_or_default()
126        .into_iter()
127        .map(|(id, name)| {
128            let name = name.filter(|n| !n.is_empty()).unwrap_or_else(|| id.clone());
129            json!({ "id": id, "name": name })
130        })
131        .collect()
132}
133
134/// One long-poll cycle: ask for the next session; if one arrives, bridge it and report the answer (or the
135/// error) back. Returns `Ok(true)` when a bridge FAILED (so the caller can rate-limit a persistent local
136/// failure), `Ok(false)` on a clean cycle (work handled or nothing pending). `Err` only on a transport
137/// failure talking to the rendezvous, which the caller backs off on.
138async fn poll_once(
139    state: &AppState,
140    client: &reqwest::Client,
141    rendezvous_url: &str,
142    site_id: &str,
143    token: &str,
144) -> anyhow::Result<bool> {
145    let resp = client
146        .post(poll_url(rendezvous_url))
147        .bearer_auth(token)
148        // Piggy-back the camera list so the grid viewer can enumerate cameras (refreshed every poll).
149        .json(&json!({ "site_id": site_id, "cameras": camera_catalog(state).await }))
150        .send()
151        .await
152        .context("rendezvous poll request")?;
153    if resp.status() == StatusCode::NO_CONTENT {
154        return Ok(false); // long-poll timed out with no work — re-poll
155    }
156    let session: PendingSession = resp
157        .error_for_status()
158        .context("rendezvous poll rejected")?
159        .json()
160        .await
161        .context("decoding pending session")?;
162
163    let result = if session.sdp_offer.len() > MAX_SDP_BYTES {
164        Err(anyhow::anyhow!(
165            "offer too large ({} bytes)",
166            session.sdp_offer.len()
167        ))
168    } else {
169        bridge_to_local_whep(state, &session.camera_id, &session.sdp_offer).await
170    };
171    // `site_id` lets the rendezvous route the answer back to this box's session (the Durable Object
172    // keyed by site id). `session_id` matches it to the waiting browser request.
173    let body = match &result {
174        Ok(sdp) => {
175            json!({ "site_id": site_id, "session_id": session.session_id, "sdp_answer": sdp })
176        }
177        Err(e) => {
178            json!({ "site_id": site_id, "session_id": session.session_id, "error": e.to_string() })
179        }
180    };
181    if let Err(e) = &result {
182        tracing::warn!(session = %session.session_id, camera = %session.camera_id, error = %e, "rendezvous: bridge to local WHEP failed");
183    }
184    client
185        .post(answer_url(rendezvous_url))
186        .bearer_auth(token)
187        .json(&body)
188        .send()
189        .await
190        .context("posting answer to rendezvous")?
191        .error_for_status()
192        .context("rendezvous rejected the answer")?;
193    Ok(result.is_err())
194}
195
196/// The dial-out loop. Parks forever unless `HELDAR_REMOTE_RENDEZVOUS_URL` + `HELDAR_SITE_ID` are set
197/// (remote access is opt-in). Otherwise long-polls the rendezvous, bridging each viewing session to the
198/// local MediaMTX, with exponential backoff on transport failure. Never returns.
199pub async fn run(state: AppState) {
200    let cfg = state.cfg.clone();
201    let (Some(rendezvous_url), Some(site_id)) =
202        (cfg.rendezvous_url.as_deref(), cfg.site_id.as_deref())
203    else {
204        std::future::pending::<()>().await;
205        return;
206    };
207
208    let client = match build_client(&cfg) {
209        Ok(c) => c,
210        Err(e) => {
211            tracing::error!(error = %e, "webrtc rendezvous disabled: bad mTLS config");
212            std::future::pending::<()>().await;
213            return;
214        }
215    };
216
217    if cfg.cp_token.is_empty() {
218        tracing::warn!(
219            "webrtc rendezvous: HELDAR_CP_TOKEN is empty; the rendezvous will reject polls if it enforces a bearer (BOX_TOKEN)"
220        );
221    }
222    tracing::info!(site = %site_id, rendezvous = %rendezvous_url, "webrtc rendezvous: dialing out for remote viewing");
223    let mut backoff = Duration::from_secs(1);
224    loop {
225        match poll_once(&state, &client, rendezvous_url, site_id, &cfg.cp_token).await {
226            Ok(false) => backoff = Duration::from_secs(1),
227            // A bridge to the local MediaMTX failed (e.g. camera/transcode down) — the answer/error was
228            // already reported to the browser; pause briefly so a persistent failure can't tight-loop.
229            Ok(true) => tokio::time::sleep(Duration::from_secs(2)).await,
230            Err(e) => {
231                tracing::warn!(site = %site_id, error = %e, "webrtc rendezvous poll failed; backing off");
232                tokio::time::sleep(backoff).await;
233                backoff = (backoff * 2).min(Duration::from_secs(30));
234            }
235        }
236    }
237}
238
239/// The box-facing TURN endpoint on the rendezvous (mints ICE for the box's own MediaMTX).
240fn box_turn_url(rendezvous_url: &str) -> String {
241    format!("{}/api/v1/box/turn", rendezvous_url.trim_end_matches('/'))
242}
243
244/// Fetch short-lived TURN credentials from the rendezvous and shape them into a MediaMTX
245/// `webrtcICEServers2` array (`[{url, username?, password?}]`).
246async fn fetch_rendezvous_ice(
247    client: &reqwest::Client,
248    rendezvous_url: &str,
249    token: &str,
250) -> anyhow::Result<serde_json::Value> {
251    let data: serde_json::Value = client
252        .get(box_turn_url(rendezvous_url))
253        .bearer_auth(token)
254        .send()
255        .await
256        .context("rendezvous box/turn request")?
257        .error_for_status()
258        .context("rendezvous box/turn rejected")?
259        .json()
260        .await
261        .context("decoding box/turn")?;
262    let ice = data
263        .get("iceServers")
264        .ok_or_else(|| anyhow::anyhow!("box/turn response missing iceServers"))?;
265    let user = ice.get("username").and_then(|v| v.as_str());
266    let cred = ice.get("credential").and_then(|v| v.as_str());
267    let urls = ice
268        .get("urls")
269        .and_then(|v| v.as_array())
270        .ok_or_else(|| anyhow::anyhow!("box/turn response missing iceServers.urls"))?;
271    let list: Vec<serde_json::Value> = urls
272        .iter()
273        .filter_map(|u| u.as_str())
274        .map(|u| {
275            if u.starts_with("stun:") {
276                json!({ "url": u })
277            } else {
278                json!({ "url": u, "username": user, "password": cred })
279            }
280        })
281        .collect();
282    Ok(serde_json::Value::Array(list))
283}
284
285/// Resolve the ICE servers to program into MediaMTX, and how long until the next refresh.
286async fn resolve_ice(cfg: &Config, client: &reqwest::Client) -> (serde_json::Value, Duration) {
287    // 1) Operator-provided (their own STUN/TURN) — static, refresh rarely.
288    if let Some(raw) = &cfg.webrtc_ice_servers {
289        match serde_json::from_str::<serde_json::Value>(raw) {
290            Ok(v) => return (v, Duration::from_secs(12 * 3600)),
291            Err(e) => {
292                tracing::error!(error = %e, "HELDAR_WEBRTC_ICE_SERVERS is not valid JSON; ignoring")
293            }
294        }
295    }
296    // 2) Heldar-hosted: short-lived TURN from the rendezvous (creds expire → refresh often).
297    if let Some(url) = cfg.rendezvous_url.as_deref() {
298        match fetch_rendezvous_ice(client, url, &cfg.cp_token).await {
299            Ok(v) => return (v, Duration::from_secs(30 * 60)),
300            Err(e) => {
301                tracing::warn!(error = %e, "webrtc ICE: rendezvous TURN fetch failed; using STUN only")
302            }
303        }
304    }
305    // 3) Fallback: STUN only (works for non-symmetric NAT).
306    (
307        json!([{ "url": "stun:stun.cloudflare.com:3478" }]),
308        Duration::from_secs(30 * 60),
309    )
310}
311
312/// Periodically program MediaMTX's WebRTC ICE servers for remote viewing — the operator's own
313/// `HELDAR_WEBRTC_ICE_SERVERS`, else short-lived TURN fetched from the rendezvous, else STUN. Parks when
314/// remote viewing is not configured (neither ICE config nor a rendezvous URL set).
315pub async fn run_ice(state: AppState) {
316    let cfg = state.cfg.clone();
317    if cfg.webrtc_ice_servers.is_none() && cfg.rendezvous_url.is_none() {
318        std::future::pending::<()>().await;
319        return;
320    }
321    let client = match build_client(&cfg) {
322        Ok(c) => c,
323        Err(e) => {
324            tracing::error!(error = %e, "webrtc ICE disabled: bad mTLS config");
325            std::future::pending::<()>().await;
326            return;
327        }
328    };
329    loop {
330        let (ice, cadence) = resolve_ice(&cfg, &client).await;
331        match mediamtx::set_webrtc_ice_servers(&state, &ice).await {
332            Ok(()) => tracing::info!("webrtc ICE: programmed MediaMTX ICE servers"),
333            Err(e) => tracing::warn!(error = %e, "webrtc ICE: failed to program MediaMTX"),
334        }
335        tokio::time::sleep(cadence).await;
336    }
337}
338
339// ---- Stage C: authenticated read-only HTTP relay ----
340//
341// A SECOND outbound channel so an authenticated remote browser can drive the kernel's REST API
342// (read-only in Stage C). The rendezvous hands the box a `RelayJob` — an HTTP request the browser made,
343// carrying the user's REAL kernel Bearer — and the box REPLAYS it against its own local kernel
344// (`127.0.0.1:api_port`). The kernel runs its NORMAL auth + RBAC, so the relay is a dumb, allowlisted
345// pipe — never an auth-bypass, never a fabricated principal. Independent of the WHEP channel (separate
346// poll → no head-of-line blocking). FAIL-SAFE: this loop refuses to run unless kernel auth is ENABLED
347// and a real user exists, so the REST API is never exposed remotely while it would answer as the
348// synthetic auth-off admin.
349
350fn relay_poll_url(u: &str) -> String {
351    format!("{}/api/v1/relay/poll", u.trim_end_matches('/'))
352}
353fn relay_respond_url(u: &str) -> String {
354    format!("{}/api/v1/relay/respond", u.trim_end_matches('/'))
355}
356
357/// An HTTP request the rendezvous asks the box to replay against its local kernel.
358#[derive(Debug, Deserialize)]
359struct RelayJob {
360    job_id: String,
361    method: String,
362    path: String,
363    #[serde(default)]
364    headers: HashMap<String, String>,
365    #[serde(default)]
366    body_b64: Option<String>,
367}
368
369/// Concurrent relay pollers, so a few dashboard reads can be in flight at once (vs fully serialized).
370const RELAY_POLLERS: usize = 4;
371/// Cap on a relayed request/response body (defensive; Stage C is small JSON + the odd snapshot).
372const MAX_RELAY_BODY: usize = 8 * 1024 * 1024;
373
374/// What the box will replay for the remote dashboard (Stage B): the full REST + media surface, all HTTP
375/// methods. The kernel's own auth + RBAC (run on the forwarded per-user Bearer) is the real authorization
376/// gate; this allowlist is defense in depth — it pins the surface to `/api/v1/*` + `/media/*`, blocks
377/// path traversal/smuggling, and never relays the Worker-internal/metrics surfaces.
378fn relay_allowed(method: &str, path: &str) -> bool {
379    if !path.starts_with('/') || path.contains("..") || path.contains("//") || path.contains('@') {
380        return false;
381    }
382    const DENY: &[&str] = &["/api/v1/relay", "/api/v1/rendezvous", "/metrics"];
383    if DENY
384        .iter()
385        .any(|d| path == *d || path.starts_with(&format!("{d}/")))
386    {
387        return false;
388    }
389    if !(path.starts_with("/api/v1/") || path.starts_with("/media/")) {
390        return false;
391    }
392    matches!(method, "GET" | "HEAD" | "POST" | "PUT" | "PATCH" | "DELETE")
393}
394
395/// Request headers the box forwards from the browser to the local kernel (everything else stripped, so
396/// a client cannot smuggle X-Forwarded-For / trust headers).
397fn forward_request_header(name: &str) -> bool {
398    matches!(
399        name.to_ascii_lowercase().as_str(),
400        "authorization"
401            | "accept"
402            | "content-type"
403            | "range"
404            | "if-none-match"
405            | "if-modified-since"
406    )
407}
408/// Response headers the box passes back through the relay (Set-Cookie deliberately NOT forwarded — the
409/// box's own cookie is meaningless cross-origin and the Worker manages the browser session).
410fn forward_response_header(name: &str) -> bool {
411    matches!(
412        name.to_ascii_lowercase().as_str(),
413        "content-type"
414            | "content-length"
415            | "content-range"
416            | "accept-ranges"
417            | "cache-control"
418            | "etag"
419            | "last-modified"
420    )
421}
422
423/// Replay one relay job against the local kernel; returns (status, response headers, base64 body).
424async fn replay_relay_job(
425    state: &AppState,
426    job: &RelayJob,
427) -> (u16, HashMap<String, String>, String) {
428    // Canonicalize the path exactly as the HTTP client will before authorizing it. `url` resolves
429    // `.`/`..`/`%2e%2e` dot-segments while parsing, so the raw `job.path` the allowlist sees can differ
430    // from what actually goes on the wire. We therefore parse first, then (a) confirm the result still
431    // points at our own loopback origin — `join()` on a `//host`/absolute-URL path could otherwise swap
432    // the host (SSRF) — and (b) run the allowlist on the CANONICAL path, forwarding the parsed URL so
433    // "authorized" is byte-for-byte "sent". Without this, `/api/v1/%2e%2e/%2e%2e/metrics` passes the
434    // raw-string check yet is sent as `/metrics`, escaping the `/api/v1/`+`/media/` pin and the DENY list.
435    let base = format!("http://127.0.0.1:{}", state.cfg.api_port);
436    let parsed = match reqwest::Url::parse(&base).and_then(|b| b.join(&job.path)) {
437        Ok(u) => u,
438        Err(_) => {
439            return (
440                400,
441                HashMap::new(),
442                B64.encode(br#"{"error":"bad relay path"}"#),
443            );
444        }
445    };
446    let same_origin = parsed.scheme() == "http"
447        && parsed.host_str() == Some("127.0.0.1")
448        && parsed.port() == Some(state.cfg.api_port);
449    if !same_origin || !relay_allowed(&job.method, parsed.path()) {
450        return (
451            403,
452            HashMap::new(),
453            B64.encode(br#"{"error":"relay path not allowed"}"#),
454        );
455    }
456    let method = reqwest::Method::from_bytes(job.method.as_bytes()).unwrap_or(reqwest::Method::GET);
457    let mut req = state
458        .http
459        .request(method, parsed)
460        .timeout(Duration::from_secs(20));
461    for (k, v) in &job.headers {
462        if forward_request_header(k) {
463            req = req.header(k, v);
464        }
465    }
466    if let Some(b) = &job.body_b64 {
467        if let Ok(bytes) = B64.decode(b) {
468            if bytes.len() <= MAX_RELAY_BODY {
469                req = req.body(bytes);
470            }
471        }
472    }
473    match req.send().await {
474        Ok(resp) => {
475            let status = resp.status().as_u16();
476            let mut headers = HashMap::new();
477            for (k, v) in resp.headers() {
478                if forward_response_header(k.as_str()) {
479                    if let Ok(vs) = v.to_str() {
480                        headers.insert(k.as_str().to_string(), vs.to_string());
481                    }
482                }
483            }
484            // Refuse to buffer an over-large upstream response: a forwarded request for a big media
485            // object (e.g. a full recording fetched without a Range header) would otherwise make each
486            // poller buffer the whole body in memory and could OOM the box. Large media is served with
487            // a Content-Length, so reject before reading; the browser fetches video via Range requests.
488            if resp
489                .content_length()
490                .is_some_and(|len| len > MAX_RELAY_BODY as u64)
491            {
492                return (
493                    413,
494                    HashMap::new(),
495                    B64.encode(br#"{"error":"relay response too large; use range requests"}"#),
496                );
497            }
498            let body = resp.bytes().await.unwrap_or_default();
499            let slice = if body.len() > MAX_RELAY_BODY {
500                &body[..MAX_RELAY_BODY]
501            } else {
502                &body[..]
503            };
504            (status, headers, B64.encode(slice))
505        }
506        Err(e) => (
507            502,
508            HashMap::new(),
509            B64.encode(format!(r#"{{"error":"relay upstream: {e}"}}"#).as_bytes()),
510        ),
511    }
512}
513
514/// One relay poller cycle: long-poll for a job, replay it, post the response. `Err` only on a transport
515/// failure with the rendezvous (the caller backs off).
516async fn relay_poll_once(
517    state: &AppState,
518    client: &reqwest::Client,
519    rendezvous_url: &str,
520    site_id: &str,
521    token: &str,
522) -> anyhow::Result<()> {
523    let resp = client
524        .post(relay_poll_url(rendezvous_url))
525        .bearer_auth(token)
526        .json(&json!({ "site_id": site_id, "auth_enforced": true }))
527        .send()
528        .await
529        .context("relay poll request")?;
530    if resp.status() == StatusCode::NO_CONTENT {
531        return Ok(());
532    }
533    let job: RelayJob = resp
534        .error_for_status()
535        .context("relay poll rejected")?
536        .json()
537        .await
538        .context("decoding relay job")?;
539    let (status, headers, body_b64) = replay_relay_job(state, &job).await;
540    client
541        .post(relay_respond_url(rendezvous_url))
542        .bearer_auth(token)
543        .json(&json!({
544            "site_id": site_id,
545            "job_id": job.job_id,
546            "status": status,
547            "headers": headers,
548            "body_b64": body_b64,
549        }))
550        .send()
551        .await
552        .context("posting relay response")?
553        .error_for_status()
554        .context("rendezvous rejected relay response")?;
555    Ok(())
556}
557
558/// The relay dial-out loop (Stage C). Parks unless remote viewing is configured AND kernel auth is
559/// enabled AND a real (active) user exists — so the REST API is never relayed while auth is off. Runs a
560/// small pool of concurrent pollers for responsiveness.
561pub async fn run_relay(state: AppState) {
562    let cfg = state.cfg.clone();
563    let (Some(rendezvous_url), Some(site_id)) = (cfg.rendezvous_url.clone(), cfg.site_id.clone())
564    else {
565        std::future::pending::<()>().await;
566        return;
567    };
568    if !cfg.auth_enabled {
569        tracing::warn!(
570            "webrtc relay disabled: kernel auth is OFF (HELDAR_AUTH_ENABLED=false). The remote REST \
571             relay refuses to run until auth is enabled, so the open API is never exposed remotely."
572        );
573        std::future::pending::<()>().await;
574        return;
575    }
576    let users: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM users WHERE active = 1")
577        .fetch_one(&state.pool)
578        .await
579        .unwrap_or(0);
580    if users == 0 {
581        tracing::warn!("webrtc relay disabled: kernel auth is on but no active users exist yet");
582        std::future::pending::<()>().await;
583        return;
584    }
585    let client = match build_client(&cfg) {
586        Ok(c) => c,
587        Err(e) => {
588            tracing::error!(error = %e, "webrtc relay disabled: bad mTLS config");
589            std::future::pending::<()>().await;
590            return;
591        }
592    };
593    tracing::info!(site = %site_id, "webrtc relay: dialing out for the authenticated remote dashboard (read-only)");
594    let mut tasks = Vec::new();
595    for _ in 0..RELAY_POLLERS {
596        let state = state.clone();
597        let client = client.clone();
598        let rendezvous_url = rendezvous_url.clone();
599        let site_id = site_id.clone();
600        let token = cfg.cp_token.clone();
601        tasks.push(tokio::spawn(async move {
602            let mut backoff = Duration::from_secs(1);
603            loop {
604                match relay_poll_once(&state, &client, &rendezvous_url, &site_id, &token).await {
605                    Ok(()) => backoff = Duration::from_secs(1),
606                    Err(e) => {
607                        tracing::warn!(error = %e, "webrtc relay poll failed; backing off");
608                        tokio::time::sleep(backoff).await;
609                        backoff = (backoff * 2).min(Duration::from_secs(30));
610                    }
611                }
612            }
613        }));
614    }
615    for t in tasks {
616        let _ = t.await;
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623
624    #[test]
625    fn relay_allowlist_pins_surface_and_blocks_internal_and_traversal() {
626        // the full REST + media surface, all methods (kernel RBAC is the real gate)
627        assert!(relay_allowed("GET", "/api/v1/cameras"));
628        assert!(relay_allowed("POST", "/api/v1/cameras"));
629        assert!(relay_allowed("PATCH", "/api/v1/cameras/cam2"));
630        assert!(relay_allowed("DELETE", "/api/v1/cameras/cam2"));
631        assert!(relay_allowed("GET", "/media/recordings/x.mp4"));
632        assert!(relay_allowed("POST", "/api/v1/auth/login"));
633        // off-surface, Worker-internal, metrics, traversal, smuggling are refused
634        assert!(!relay_allowed("GET", "/healthz"));
635        assert!(!relay_allowed("GET", "/api/v1/relay/poll"));
636        assert!(!relay_allowed("GET", "/api/v1/rendezvous/poll"));
637        assert!(!relay_allowed("GET", "/metrics"));
638        assert!(!relay_allowed("GET", "/api/v1/../secrets"));
639        assert!(!relay_allowed("GET", "/api/v1//cameras"));
640        assert!(!relay_allowed("TRACE", "/api/v1/cameras"));
641    }
642
643    /// Regression for the relay allowlist bypass: the HTTP client normalizes `%2e%2e` to `..` and
644    /// removes dot-segments, so a raw path with no literal ".." can still be SENT as an escaped path.
645    /// `replay_relay_job` defends by canonicalizing via `Url::join` and running the allowlist on the
646    /// canonical path — this pins that the canonical form of the known bypasses is refused.
647    #[test]
648    fn relay_allowlist_runs_on_canonical_path_not_raw() {
649        let canon = |p: &str| {
650            reqwest::Url::parse("http://127.0.0.1:8088")
651                .unwrap()
652                .join(p)
653                .unwrap()
654                .path()
655                .to_string()
656        };
657        // The attack path passes the naive raw-string check (no literal "..") ...
658        assert!(!"/api/v1/%2e%2e/%2e%2e/metrics".contains(".."));
659        // ... but the client canonicalizes it to an off-surface path the allowlist rejects.
660        assert_eq!(canon("/api/v1/%2e%2e/%2e%2e/metrics"), "/metrics");
661        assert!(!relay_allowed(
662            "GET",
663            &canon("/api/v1/%2e%2e/%2e%2e/metrics")
664        ));
665        assert!(!relay_allowed(
666            "GET",
667            &canon("/api/v1/cameras/%2e%2e/relay/poll")
668        ));
669        assert!(!relay_allowed(
670            "POST",
671            &canon("/api/v1/cameras/%2e%2e/%2e%2e/healthz")
672        ));
673        // Legitimate paths still pass after canonicalization.
674        assert!(relay_allowed("GET", &canon("/api/v1/cameras")));
675        assert!(relay_allowed(
676            "GET",
677            &canon("/media/recordings/cam2/seg.mp4")
678        ));
679    }
680
681    #[test]
682    fn endpoints_append_paths_and_trim_trailing_slash() {
683        assert_eq!(
684            poll_url("https://rv.example.com"),
685            "https://rv.example.com/api/v1/rendezvous/poll"
686        );
687        assert_eq!(
688            answer_url("https://rv.example.com/"),
689            "https://rv.example.com/api/v1/rendezvous/answer"
690        );
691    }
692}