Skip to main content

wifi_densepose_cli/
calibrate_api.rs

1//! `wifi-densepose calibrate-serve` — HTTP API around ADR-135 baseline calibration.
2//!
3//! Wraps the same [`wifi_densepose_signal::CalibrationRecorder`] used by the
4//! `calibrate` subcommand in a small Axum server so a UI (or any client) can
5//! drive an empty-room baseline capture remotely:
6//!
7//! | Method | Path                              | Purpose                                   |
8//! |--------|-----------------------------------|-------------------------------------------|
9//! | GET    | `/`                               | API descriptor (discovery)                |
10//! | GET    | `/api/v1/calibration/health`      | liveness + UDP ingest stats               |
11//! | POST   | `/api/v1/calibration/start`       | begin a baseline capture session          |
12//! | GET    | `/api/v1/calibration/status`      | live session progress (poll this for UI)  |
13//! | POST   | `/api/v1/calibration/stop`        | finalize the current session early        |
14//! | GET    | `/api/v1/calibration/result`      | summary of the last finalized baseline    |
15//! | GET    | `/api/v1/calibration/baselines`   | list persisted baseline files             |
16//!
17//! A single background task owns the UDP socket (ESP32 `0xC511_0001` frames) and
18//! the optional active recorder; the HTTP handlers communicate with it over an
19//! mpsc command channel and read a shared status snapshot. This keeps the
20//! `&mut` recorder lock-free and the API non-blocking. CORS is permissive so a
21//! browser UI served from any origin can call it during development.
22
23use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
26
27use anyhow::Result;
28use axum::{
29    extract::{Query, State},
30    http::StatusCode,
31    response::IntoResponse,
32    routing::{get, post},
33    Json, Router,
34};
35use clap::Args;
36use serde::{Deserialize, Serialize};
37use tokio::net::UdpSocket;
38use tokio::sync::{mpsc, oneshot, RwLock};
39use tower_http::cors::CorsLayer;
40use wifi_densepose_calibration::extract::{AnchorFeature, Features};
41use wifi_densepose_calibration::{
42    AnchorLabel, AnchorQualityGate, AnchorRecorder, MixtureOfSpecialists, NodeGeometry,
43    SpecialistBank,
44};
45use wifi_densepose_core::types::CsiFrame;
46use wifi_densepose_signal::{BaselineCalibration, CalibrationRecorder};
47
48use crate::calibrate::{parse_csi_packet, tier_config};
49
50/// Rolling window of per-frame scalars (mean amplitude) for live `room-state`
51/// inference. Maintained by the ingest task regardless of any baseline session.
52const LIVE_WINDOW: usize = 256;
53
54/// One scalar per frame: mean amplitude across subcarriers/streams.
55fn frame_scalar(frame: &CsiFrame) -> f32 {
56    let a = &frame.amplitude;
57    if a.is_empty() {
58        0.0
59    } else {
60        (a.sum() / a.len() as f64) as f32
61    }
62}
63
64const RECV_BUF: usize = 2048;
65
66// ---------------------------------------------------------------------------
67// CLI arguments
68// ---------------------------------------------------------------------------
69
70/// Arguments for the `calibrate-serve` subcommand.
71#[derive(Args, Debug, Clone)]
72pub struct CalibrateServeArgs {
73    /// TCP port for the HTTP API.
74    #[arg(long, default_value_t = 8090)]
75    pub http_port: u16,
76
77    /// Bind address for the HTTP API. Default 127.0.0.1 (localhost only);
78    /// use 0.0.0.0 to expose the API to the LAN for a remote UI.
79    #[arg(long, default_value = "127.0.0.1")]
80    pub http_bind: String,
81
82    /// UDP port to receive CSI frames from the ESP32 (must match provisioned target-port).
83    #[arg(long, default_value_t = 5005)]
84    pub udp_port: u16,
85
86    /// Bind address for the UDP CSI socket.
87    #[arg(long, default_value = "0.0.0.0")]
88    pub udp_bind: String,
89
90    /// Default PHY tier when a start request omits one (ht20 / ht40 / he20 / he40).
91    #[arg(long, default_value = "ht20")]
92    pub tier: String,
93
94    /// Directory where finalized baseline `.bin` files are written.
95    #[arg(long, default_value = "./baselines")]
96    pub output_dir: String,
97
98    /// Require `Authorization: Bearer <token>` on every API request. Strongly
99    /// recommended before binding to anything other than 127.0.0.1.
100    #[arg(long, env = "CALIBRATE_TOKEN")]
101    pub token: Option<String>,
102}
103
104/// Sanitize a client-supplied `room_id` for use in a filename (defends the
105/// baseline write path against `../` / absolute-path traversal). Keeps only
106/// `[A-Za-z0-9_-]`; empty result falls back to `default`.
107fn sanitize_room_id(raw: &str) -> String {
108    let cleaned: String = raw
109        .chars()
110        .filter(|c| c.is_ascii_alphanumeric() || *c == '_' || *c == '-')
111        .take(64)
112        .collect();
113    if cleaned.is_empty() {
114        "default".into()
115    } else {
116        cleaned
117    }
118}
119
120// ---------------------------------------------------------------------------
121// Wire types (request / response bodies)
122// ---------------------------------------------------------------------------
123
124/// Body for `POST /start`. All fields optional — sensible defaults applied.
125#[derive(Debug, Deserialize)]
126#[serde(default)]
127pub struct StartParams {
128    /// PHY tier override (falls back to the server default).
129    pub tier: Option<String>,
130    /// Capture duration in seconds (also bounded by the tier's min-frame target).
131    pub duration_s: u32,
132    /// Optional room label, used in the persisted filename and status.
133    pub room_id: Option<String>,
134    /// Override the tier's minimum frame count (0 = use tier default).
135    pub min_frames: u32,
136}
137
138impl Default for StartParams {
139    fn default() -> Self {
140        Self { tier: None, duration_s: 30, room_id: None, min_frames: 0 }
141    }
142}
143
144/// Live per-session status snapshot returned by `GET /status`.
145#[derive(Debug, Clone, Serialize)]
146pub struct SessionStatus {
147    /// `recording` | `finalizing` | `complete` | `aborted`.
148    pub state: String,
149    pub room_id: String,
150    pub tier: String,
151    pub frames_recorded: usize,
152    pub target_frames: usize,
153    /// 0.0..=1.0 capture progress.
154    pub progress: f32,
155    pub z_median: f32,
156    pub z_max: f32,
157    pub motion_flagged: bool,
158    pub elapsed_s: f32,
159    pub eta_s: f32,
160    /// Optional human-readable note (e.g. abort reason).
161    pub note: Option<String>,
162}
163
164/// Summary of a finalized baseline, returned by `GET /result` and `POST /stop`.
165#[derive(Debug, Clone, Serialize)]
166pub struct ResultSummary {
167    pub calibration_id: String,
168    pub room_id: String,
169    pub tier: String,
170    pub frame_count: u64,
171    pub subcarriers: usize,
172    pub captured_at_unix_s: i64,
173    pub amp_mean_avg: f32,
174    pub amp_variance_avg: f32,
175    pub phase_dispersion_avg: f32,
176    pub output_path: String,
177    pub saved_bytes: usize,
178}
179
180/// Shared status the HTTP handlers read.
181#[derive(Default)]
182struct SharedStatus {
183    udp_port: u16,
184    default_tier: String,
185    output_dir: String,
186    frames_seen: u64,
187    last_frame_unix_ms: u64,
188    session: Option<SessionStatus>,
189    last_result: Option<ResultSummary>,
190}
191
192/// Commands sent from HTTP handlers to the ingest task.
193enum CalCommand {
194    Start { params: StartParams, reply: oneshot::Sender<Result<SessionStatus, String>> },
195    Stop { reply: oneshot::Sender<Result<ResultSummary, String>> },
196    EnrollAnchor {
197        room_id: String,
198        baseline_name: String,
199        label: AnchorLabel,
200        duration_s: u32,
201        reply: oneshot::Sender<Result<AnchorVerdict, String>>,
202    },
203}
204
205/// Accumulated in-server enrollment for one room (not persisted until train).
206#[derive(Default)]
207struct RoomEnroll {
208    baseline_id: String,
209    fs_hz: f32,
210    anchors: Vec<AnchorFeature>,
211    /// Transceiver geometry recorded via `POST /enroll/geometry` (ADR-152
212    /// §2.1.1); latest recording wins. Snapshotted into the bank at train time.
213    geometry: Vec<NodeGeometry>,
214}
215
216/// Result of capturing one anchor (`POST /enroll/anchor`).
217#[derive(Debug, Clone, Serialize)]
218pub struct AnchorVerdict {
219    /// Anchor label (snake_case).
220    pub label: String,
221    /// Passed the quality gate.
222    pub accepted: bool,
223    /// Rejection reason, if any.
224    pub reason: Option<String>,
225    /// Mean amplitude z-score vs baseline.
226    pub presence_z: f32,
227    /// Fraction of frames flagged as motion.
228    pub motion_rate: f32,
229    /// Frames captured.
230    pub frames: u32,
231    /// Accepted anchors so far for this room.
232    pub accepted_count: usize,
233    /// Next anchor in the sequence, if any.
234    pub next: Option<String>,
235}
236
237/// In-flight anchor capture owned by the ingest task.
238struct EnrollCapture {
239    recorder: AnchorRecorder,
240    baseline: BaselineCalibration,
241    label: AnchorLabel,
242    room_id: String,
243    baseline_id: String,
244    fs_hz: f32,
245    series: Vec<f32>,
246    deadline: Instant,
247    reply: Option<oneshot::Sender<Result<AnchorVerdict, String>>>,
248}
249
250#[derive(Clone)]
251struct ApiState {
252    cmd_tx: mpsc::Sender<CalCommand>,
253    status: Arc<RwLock<SharedStatus>>,
254    /// Rolling per-frame scalars for live `room-state` inference.
255    window: Arc<RwLock<VecDeque<f32>>>,
256    /// Default sample rate for periodicity extraction.
257    fs_hz: f32,
258    /// In-server enrollment accumulator, keyed by `room_id`.
259    enroll: Arc<RwLock<HashMap<String, RoomEnroll>>>,
260}
261
262/// Bearer-token gate (applied only when `--token` is set). Constant-time-ish
263/// compare is unnecessary here (local appliance), but reject anything that
264/// isn't an exact `Bearer <token>` match.
265async fn require_bearer(
266    axum::extract::State(token): axum::extract::State<String>,
267    req: axum::extract::Request,
268    next: axum::middleware::Next,
269) -> axum::response::Response {
270    let authorized = req
271        .headers()
272        .get(axum::http::header::AUTHORIZATION)
273        .and_then(|v| v.to_str().ok())
274        .and_then(|h| h.strip_prefix("Bearer "))
275        .map(|t| t == token)
276        .unwrap_or(false);
277    if authorized {
278        next.run(req).await
279    } else {
280        (
281            StatusCode::UNAUTHORIZED,
282            Json(serde_json::json!({"error": "missing or invalid bearer token"})),
283        )
284            .into_response()
285    }
286}
287
288// ---------------------------------------------------------------------------
289// Public entry point
290// ---------------------------------------------------------------------------
291
292/// Build the API router (without the optional auth layer). Shared by `execute`
293/// and the integration tests.
294fn build_router(state: ApiState) -> Router {
295    Router::new()
296        .route("/", get(descriptor))
297        .route("/api/v1/calibration/health", get(health))
298        .route("/api/v1/calibration/start", post(start))
299        .route("/api/v1/calibration/status", get(status_handler))
300        .route("/api/v1/calibration/stop", post(stop))
301        .route("/api/v1/calibration/result", get(result))
302        .route("/api/v1/calibration/baselines", get(baselines))
303        .route("/api/v1/room/state", get(room_state))
304        .route("/api/v1/room/train", post(train_room))
305        .route("/api/v1/enroll/anchor", post(enroll_anchor))
306        .route("/api/v1/enroll/geometry", post(enroll_geometry))
307        .route("/api/v1/enroll/status", get(enroll_status))
308        .layer(CorsLayer::permissive())
309        .with_state(state)
310}
311
312/// Run the calibration HTTP API server (blocks until Ctrl-C).
313pub async fn execute(args: CalibrateServeArgs) -> Result<()> {
314    std::fs::create_dir_all(&args.output_dir)
315        .map_err(|e| anyhow::anyhow!("cannot create output dir {}: {e}", args.output_dir))?;
316
317    let udp_addr = format!("{}:{}", args.udp_bind, args.udp_port);
318    let socket = UdpSocket::bind(&udp_addr)
319        .await
320        .map_err(|e| anyhow::anyhow!("cannot bind UDP socket on {udp_addr}: {e}"))?;
321    eprintln!("[calibrate-serve] CSI ingest on udp://{udp_addr}");
322
323    let status = Arc::new(RwLock::new(SharedStatus {
324        udp_port: args.udp_port,
325        default_tier: args.tier.clone(),
326        output_dir: args.output_dir.clone(),
327        ..Default::default()
328    }));
329
330    let (cmd_tx, cmd_rx) = mpsc::channel::<CalCommand>(8);
331    let window = Arc::new(RwLock::new(VecDeque::<f32>::with_capacity(LIVE_WINDOW)));
332    let enroll = Arc::new(RwLock::new(HashMap::<String, RoomEnroll>::new()));
333
334    // Background ingest task owns the socket + recorder.
335    {
336        let status = status.clone();
337        let default_tier = args.tier.clone();
338        let output_dir = args.output_dir.clone();
339        let window = window.clone();
340        let enroll = enroll.clone();
341        tokio::spawn(async move {
342            ingest_loop(socket, cmd_rx, status, default_tier, output_dir, window, enroll).await;
343        });
344    }
345
346    let state = ApiState { cmd_tx, status, window, fs_hz: 15.0, enroll };
347    let mut app = build_router(state);
348
349    // Optional bearer auth — required before any non-loopback exposure.
350    if let Some(token) = args.token.clone() {
351        app = app.layer(axum::middleware::from_fn_with_state(token, require_bearer));
352        eprintln!("[calibrate-serve] bearer auth ENABLED");
353    } else if args.http_bind != "127.0.0.1" && args.http_bind != "localhost" {
354        eprintln!(
355            "[calibrate-serve] WARNING: bound to {} with NO --token — anyone on the network can drive calibration",
356            args.http_bind
357        );
358    }
359
360    let http_addr = format!("{}:{}", args.http_bind, args.http_port);
361    let listener = tokio::net::TcpListener::bind(&http_addr)
362        .await
363        .map_err(|e| anyhow::anyhow!("cannot bind HTTP listener on {http_addr}: {e}"))?;
364    eprintln!("[calibrate-serve] HTTP API on http://{http_addr}  (GET / for the route list)");
365
366    axum::serve(listener, app)
367        .await
368        .map_err(|e| anyhow::anyhow!("HTTP server error: {e}"))?;
369    Ok(())
370}
371
372// ---------------------------------------------------------------------------
373// Ingest task — owns the UDP socket and the optional active recorder
374// ---------------------------------------------------------------------------
375
376struct ActiveSession {
377    recorder: CalibrationRecorder,
378    room_id: String,
379    tier: String,
380    started: Instant,
381    deadline: Instant,
382    target_frames: usize,
383    z_median: f32,
384    z_max: f32,
385    motion_flagged: bool,
386}
387
388async fn ingest_loop(
389    socket: UdpSocket,
390    mut cmd_rx: mpsc::Receiver<CalCommand>,
391    status: Arc<RwLock<SharedStatus>>,
392    default_tier: String,
393    output_dir: String,
394    window: Arc<RwLock<VecDeque<f32>>>,
395    enroll: Arc<RwLock<HashMap<String, RoomEnroll>>>,
396) {
397    let mut buf = vec![0u8; RECV_BUF];
398    let mut active: Option<ActiveSession> = None;
399    let mut active_enroll: Option<EnrollCapture> = None;
400    let mut tick = tokio::time::interval(Duration::from_millis(200));
401    // Counters mirrored to shared status only on the 200 ms tick — avoids a lock
402    // + SessionStatus clone on every UDP frame (CPU starvation under flood).
403    let mut frames_seen: u64 = 0;
404    let mut last_frame_ms: u64 = 0;
405    // Live rolling window, flushed to the shared `window` on the tick.
406    let mut win_local: VecDeque<f32> = VecDeque::with_capacity(LIVE_WINDOW);
407
408    loop {
409        tokio::select! {
410            // --- incoming command ---
411            Some(cmd) = cmd_rx.recv() => match cmd {
412                CalCommand::Start { params, reply } => {
413                    if active.is_some() {
414                        let _ = reply.send(Err("a calibration session is already running".into()));
415                        continue;
416                    }
417                    let tier = params.tier.unwrap_or_else(|| default_tier.clone());
418                    if !["ht20", "ht40", "he20", "he40"].contains(&tier.to_ascii_lowercase().as_str()) {
419                        let _ = reply.send(Err(format!("invalid tier {tier:?}")));
420                        continue;
421                    }
422                    let mut config = tier_config(&tier);
423                    if params.min_frames > 0 {
424                        config.min_frames = params.min_frames;
425                    }
426                    let target_frames = config.min_frames as usize;
427                    let dur = params.duration_s.max(1) as u64;
428                    // Sanitize: room_id is interpolated into the baseline write path.
429                    let room_id = sanitize_room_id(&params.room_id.unwrap_or_else(|| "default".into()));
430                    let sess = ActiveSession {
431                        recorder: CalibrationRecorder::new(config),
432                        room_id: room_id.clone(),
433                        tier: tier.clone(),
434                        started: Instant::now(),
435                        deadline: Instant::now() + Duration::from_secs(dur),
436                        target_frames,
437                        z_median: 0.0,
438                        z_max: 0.0,
439                        motion_flagged: false,
440                    };
441                    let snap = session_snapshot(&sess, "recording", None);
442                    active = Some(sess);
443                    {
444                        let mut s = status.write().await;
445                        s.session = Some(snap.clone());
446                        s.last_result = None;
447                    }
448                    eprintln!("[calibrate-serve] session start room={room_id} tier={tier} target={target_frames}");
449                    let _ = reply.send(Ok(snap));
450                }
451                CalCommand::Stop { reply } => {
452                    match active.take() {
453                        Some(sess) => {
454                            let res = finalize(sess, &output_dir, &status).await;
455                            let _ = reply.send(res);
456                        }
457                        None => { let _ = reply.send(Err("no active calibration session".into())); }
458                    }
459                }
460                CalCommand::EnrollAnchor { room_id, baseline_name, label, duration_s, reply } => {
461                    if active.is_some() || active_enroll.is_some() {
462                        let _ = reply.send(Err("a capture is already running".into()));
463                        continue;
464                    }
465                    // Resolve the baseline as a sanitized name under output_dir.
466                    let bname = sanitize_room_id(&baseline_name);
467                    let bpath = format!("{output_dir}/{bname}.bin");
468                    let baseline = match tokio::fs::read(&bpath).await {
469                        Ok(bytes) => match BaselineCalibration::from_bytes(&bytes) {
470                            Ok(b) => b,
471                            Err(e) => { let _ = reply.send(Err(format!("invalid baseline {bname}: {e}"))); continue; }
472                        },
473                        Err(e) => { let _ = reply.send(Err(format!("baseline {bname} not found: {e}"))); continue; }
474                    };
475                    let baseline_id = baseline.calibration_uuid().to_string();
476                    eprintln!("[calibrate-serve] enroll anchor room={room_id} label={} ({}s)", label.as_str(), duration_s);
477                    active_enroll = Some(EnrollCapture {
478                        recorder: AnchorRecorder::new(label),
479                        baseline,
480                        label,
481                        room_id,
482                        baseline_id,
483                        fs_hz: 15.0,
484                        series: Vec::new(),
485                        deadline: Instant::now() + Duration::from_secs(duration_s.max(1) as u64),
486                        reply: Some(reply),
487                    });
488                }
489            },
490
491            // --- incoming CSI frame (no shared-status lock here; flushed on tick) ---
492            Ok(n) = socket.recv(&mut buf) => {
493                frames_seen += 1;
494                last_frame_ms = unix_ms();
495                let parse_tier = active.as_ref().map(|s| s.tier.clone()).unwrap_or_else(|| default_tier.clone());
496                if let Some(frame) = parse_csi_packet(&buf[..n], &parse_tier) {
497                    // Always maintain the live window (drives /room/state).
498                    win_local.push_back(frame_scalar(&frame));
499                    while win_local.len() > LIVE_WINDOW {
500                        win_local.pop_front();
501                    }
502                    if let Some(sess) = active.as_mut() {
503                        if let Ok(score) = sess.recorder.record(&frame) {
504                            sess.z_median = score.amplitude_z_median;
505                            sess.z_max = score.amplitude_z_max;
506                            sess.motion_flagged = score.motion_flagged;
507                        }
508                        if sess.recorder.frames_recorded() as usize >= sess.target_frames {
509                            if let Some(done) = active.take() {
510                                let _ = finalize(done, &output_dir, &status).await;
511                            }
512                        }
513                    }
514                    if let Some(ec) = active_enroll.as_mut() {
515                        ec.recorder.record_frame(&ec.baseline, &frame);
516                        ec.series.push(frame_scalar(&frame));
517                    }
518                }
519            },
520
521            // --- 200 ms tick: flush counters + window + session snapshot, deadline check ---
522            _ = tick.tick() => {
523                {
524                    let mut s = status.write().await;
525                    s.frames_seen = frames_seen;
526                    s.last_frame_unix_ms = last_frame_ms;
527                    if let Some(sess) = active.as_ref() {
528                        s.session = Some(session_snapshot(sess, "recording", None));
529                    }
530                }
531                {
532                    let mut w = window.write().await;
533                    w.clear();
534                    w.extend(win_local.iter().copied());
535                }
536                if let Some(sess) = active.as_ref() {
537                    if Instant::now() >= sess.deadline {
538                        let frames = sess.recorder.frames_recorded() as usize;
539                        if frames >= 10 {
540                            if let Some(done) = active.take() {
541                                let _ = finalize(done, &output_dir, &status).await;
542                            }
543                        } else if let Some(mut done) = active.take() {
544                            // not enough frames — abort honestly rather than emit a bad baseline
545                            done.motion_flagged = false;
546                            let note = format!(
547                                "aborted: only {frames} frames in the time window (need >=10) — \
548                                 is the ESP32 streaming to udp:{}? ",
549                                status.read().await.udp_port
550                            );
551                            let snap = session_snapshot(&done, "aborted", Some(note.clone()));
552                            status.write().await.session = Some(snap);
553                            eprintln!("[calibrate-serve] {note}");
554                        }
555                    }
556                }
557                // Enroll-anchor capture finished?
558                let enroll_done = active_enroll.as_ref().map(|ec| Instant::now() >= ec.deadline).unwrap_or(false);
559                if enroll_done {
560                    if let Some(mut ec) = active_enroll.take() {
561                        let gate = AnchorQualityGate::default();
562                        let (anchor, reason) = ec.recorder.finalize(&gate, (unix_ms() / 1000) as i64);
563                        let mut verdict = AnchorVerdict {
564                            label: ec.label.as_str().into(),
565                            accepted: anchor.quality.accepted,
566                            reason,
567                            presence_z: anchor.quality.presence_z,
568                            motion_rate: anchor.quality.motion_rate,
569                            frames: anchor.quality.frames,
570                            accepted_count: 0,
571                            next: None,
572                        };
573                        if anchor.quality.accepted {
574                            let feat = AnchorFeature::from_series(&ec.room_id, ec.label, &ec.series, ec.fs_hz);
575                            let mut map = enroll.write().await;
576                            let re = map.entry(ec.room_id.clone()).or_insert_with(RoomEnroll::default);
577                            if re.baseline_id.is_empty() {
578                                re.baseline_id = ec.baseline_id.clone();
579                                re.fs_hz = ec.fs_hz;
580                            }
581                            if let Some(slot) = re.anchors.iter_mut().find(|a| a.label == ec.label) {
582                                *slot = feat;
583                            } else {
584                                re.anchors.push(feat);
585                            }
586                            verdict.accepted_count = re.anchors.len();
587                            verdict.next = AnchorLabel::SEQUENCE.iter().copied()
588                                .find(|l| !re.anchors.iter().any(|a| a.label == *l))
589                                .map(|l| l.as_str().to_string());
590                        } else {
591                            verdict.accepted_count = enroll.read().await.get(&ec.room_id).map(|re| re.anchors.len()).unwrap_or(0);
592                        }
593                        eprintln!("[calibrate-serve] enroll anchor {} accepted={} ({} total)", verdict.label, verdict.accepted, verdict.accepted_count);
594                        if let Some(tx) = ec.reply.take() {
595                            let _ = tx.send(Ok(verdict));
596                        }
597                    }
598                }
599            },
600        }
601    }
602}
603
604/// Finalize a session: persist the baseline and publish the result summary.
605async fn finalize(
606    sess: ActiveSession,
607    output_dir: &str,
608    status: &Arc<RwLock<SharedStatus>>,
609) -> Result<ResultSummary, String> {
610    let room_id = sess.room_id.clone();
611    let tier = sess.tier.clone();
612    // mark finalizing
613    {
614        let snap = session_snapshot(&sess, "finalizing", None);
615        status.write().await.session = Some(snap);
616    }
617
618    let baseline: BaselineCalibration = sess
619        .recorder
620        .finalize()
621        .map_err(|e| format!("finalize failed: {e}"))?;
622
623    let (amp_mean_avg, amp_var_avg, disp_avg) = baseline_averages(&baseline);
624    let uuid = baseline.calibration_uuid().to_string();
625    let path = format!("{output_dir}/{room_id}-{uuid}.bin");
626    let bytes = baseline.to_bytes();
627    // Async write — never block the ingest task's UDP/command path.
628    tokio::fs::write(&path, &bytes)
629        .await
630        .map_err(|e| format!("cannot write {path}: {e}"))?;
631
632    let summary = ResultSummary {
633        calibration_id: uuid,
634        room_id: room_id.clone(),
635        tier,
636        frame_count: baseline.frame_count,
637        subcarriers: baseline.subcarriers.len(),
638        captured_at_unix_s: baseline.captured_at_unix_s,
639        amp_mean_avg,
640        amp_variance_avg: amp_var_avg,
641        phase_dispersion_avg: disp_avg,
642        output_path: path.clone(),
643        saved_bytes: bytes.len(),
644    };
645
646    {
647        let mut s = status.write().await;
648        // reflect completion in the session snapshot, then store the result
649        if let Some(sess_status) = s.session.as_mut() {
650            sess_status.state = "complete".into();
651            sess_status.progress = 1.0;
652        }
653        s.last_result = Some(summary.clone());
654    }
655    eprintln!(
656        "[calibrate-serve] session complete room={room_id} frames={} -> {path} ({} bytes)",
657        summary.frame_count, summary.saved_bytes
658    );
659    Ok(summary)
660}
661
662// ---------------------------------------------------------------------------
663// HTTP handlers
664// ---------------------------------------------------------------------------
665
666async fn descriptor() -> impl IntoResponse {
667    Json(serde_json::json!({
668        "service": "wifi-densepose calibration API",
669        "adr": "ADR-135 (baseline) / ADR-151 (room calibration & training)",
670        "endpoints": {
671            "GET  /api/v1/calibration/health": "liveness + UDP ingest stats",
672            "POST /api/v1/calibration/start": "{ tier?, duration_s?, room_id?, min_frames? }",
673            "GET  /api/v1/calibration/status": "live session progress (poll for UI)",
674            "POST /api/v1/calibration/stop": "finalize current session early",
675            "GET  /api/v1/calibration/result": "last finalized baseline summary",
676            "GET  /api/v1/calibration/baselines": "list persisted baseline files",
677            "GET  /api/v1/room/state?bank=<name>": "live mixture-of-specialists RoomState over the CSI window",
678            "POST /api/v1/room/train": "{ room_id, baseline_id, anchors[]?, geometry[]? } → train + persist a specialist bank (anchors[]/geometry[] optional if enrolled in-server)",
679            "POST /api/v1/enroll/anchor": "{ room_id, baseline, label, duration_s? } → capture one guided anchor (blocks for the capture)",
680            "POST /api/v1/enroll/geometry": "{ room_id, geometry: [NodeGeometry…] } → record transceiver geometry for the room (ADR-152 §2.1.1; latest wins)",
681            "GET  /api/v1/enroll/status?room=<id>": "enrollment progress (accepted anchors, next, complete)"
682        }
683    }))
684}
685
686async fn health(State(st): State<ApiState>) -> impl IntoResponse {
687    let s = st.status.read().await;
688    let age = if s.last_frame_unix_ms == 0 { None } else { Some(unix_ms().saturating_sub(s.last_frame_unix_ms)) };
689    Json(serde_json::json!({
690        "status": "ok",
691        "udp_port": s.udp_port,
692        "frames_seen": s.frames_seen,
693        "last_frame_age_ms": age,
694        "streaming": age.map(|a| a < 2000).unwrap_or(false),
695        "default_tier": s.default_tier,
696        "output_dir": s.output_dir,
697        "session_active": s.session.as_ref().map(|x| x.state == "recording").unwrap_or(false),
698    }))
699}
700
701async fn start(State(st): State<ApiState>, Json(params): Json<StartParams>) -> impl IntoResponse {
702    let (tx, rx) = oneshot::channel();
703    if st.cmd_tx.send(CalCommand::Start { params, reply: tx }).await.is_err() {
704        return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"ingest task unavailable"}))).into_response();
705    }
706    match rx.await {
707        Ok(Ok(snap)) => (StatusCode::ACCEPTED, Json(serde_json::to_value(snap).unwrap())).into_response(),
708        Ok(Err(e)) => (StatusCode::CONFLICT, Json(serde_json::json!({"error": e}))).into_response(),
709        Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"no reply"}))).into_response(),
710    }
711}
712
713async fn status_handler(State(st): State<ApiState>) -> impl IntoResponse {
714    let s = st.status.read().await;
715    match &s.session {
716        Some(sess) => (StatusCode::OK, Json(serde_json::to_value(sess).unwrap())).into_response(),
717        None => (StatusCode::OK, Json(serde_json::json!({"state":"idle"}))).into_response(),
718    }
719}
720
721async fn stop(State(st): State<ApiState>) -> impl IntoResponse {
722    let (tx, rx) = oneshot::channel();
723    if st.cmd_tx.send(CalCommand::Stop { reply: tx }).await.is_err() {
724        return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"ingest task unavailable"}))).into_response();
725    }
726    match rx.await {
727        Ok(Ok(summary)) => (StatusCode::OK, Json(serde_json::to_value(summary).unwrap())).into_response(),
728        Ok(Err(e)) => (StatusCode::CONFLICT, Json(serde_json::json!({"error": e}))).into_response(),
729        Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"no reply"}))).into_response(),
730    }
731}
732
733async fn result(State(st): State<ApiState>) -> impl IntoResponse {
734    let s = st.status.read().await;
735    match &s.last_result {
736        Some(r) => (StatusCode::OK, Json(serde_json::to_value(r).unwrap())).into_response(),
737        None => (StatusCode::NOT_FOUND, Json(serde_json::json!({"error":"no finalized baseline yet"}))).into_response(),
738    }
739}
740
741/// Body for `POST /api/v1/room/train` — an enrollment (CLI `enroll` output or
742/// any client that gathered labelled anchor features).
743#[derive(Deserialize)]
744struct TrainRequest {
745    room_id: String,
746    baseline_id: String,
747    #[serde(default)]
748    anchors: Vec<AnchorFeature>,
749    /// Optional transceiver geometry (ADR-152 §2.1.1). Falls back to the
750    /// geometry recorded in-server via `POST /enroll/geometry`; absent both,
751    /// the bank trains geometry-free (valid, but no geometry conditioning).
752    #[serde(default)]
753    geometry: Vec<NodeGeometry>,
754}
755
756/// Train a per-room specialist bank and persist it as `<output_dir>/<room_id>.json`
757/// (the name `room-state` reads back). Uses the posted `anchors` if present, else
758/// falls back to the in-server enrollment accumulated via `POST /enroll/anchor`.
759/// The enrollment's transceiver-geometry snapshot (posted `geometry` or the
760/// `POST /enroll/geometry` record) is threaded into the bank (ADR-152 §2.1.1).
761async fn train_room(State(st): State<ApiState>, Json(req): Json<TrainRequest>) -> impl IntoResponse {
762    let (anchors, baseline_id) = if !req.anchors.is_empty() {
763        (req.anchors.clone(), req.baseline_id.clone())
764    } else {
765        match st.enroll.read().await.get(&req.room_id) {
766            Some(re) if !re.anchors.is_empty() => (re.anchors.clone(), re.baseline_id.clone()),
767            _ => {
768                return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error":"no anchors in request and none enrolled for this room"}))).into_response();
769            }
770        }
771    };
772    let geometry = if !req.geometry.is_empty() {
773        req.geometry.clone()
774    } else {
775        st.enroll.read().await.get(&req.room_id).map(|re| re.geometry.clone()).unwrap_or_default()
776    };
777    let at = (unix_ms() / 1000) as i64;
778    let bank = match SpecialistBank::train(&req.room_id, &baseline_id, &anchors, at) {
779        Ok(b) => b,
780        Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("training failed: {e}")}))).into_response(),
781    };
782    let bank = if geometry.is_empty() {
783        eprintln!(
784            "[calibrate-serve] no transceiver geometry recorded for room '{}' — bank will not support geometry conditioning (ADR-152 §2.1.2)",
785            req.room_id
786        );
787        bank
788    } else {
789        bank.with_geometry(geometry)
790    };
791    let name = sanitize_room_id(&req.room_id);
792    let dir = { st.status.read().await.output_dir.clone() };
793    let path = format!("{dir}/{name}.json");
794    let json = match bank.to_json() {
795        Ok(j) => j,
796        Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("serialize: {e}")}))).into_response(),
797    };
798    if let Err(e) = tokio::fs::write(&path, json).await {
799        return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("cannot write {path}: {e}")}))).into_response();
800    }
801    let kinds: Vec<String> = bank.trained_kinds().iter().map(|k| format!("{k:?}")).collect();
802    (StatusCode::OK, Json(serde_json::json!({
803        "room_id": bank.room_id,
804        "bank": name,                  // pass as ?bank=<name> to /room/state
805        "anchor_count": bank.anchor_count,
806        "specialists": kinds,
807        "geometry_nodes": bank.geometry.len(),
808        "path": path,
809    }))).into_response()
810}
811
812/// Body for `POST /api/v1/enroll/geometry`.
813#[derive(Deserialize)]
814struct EnrollGeometryBody {
815    room_id: String,
816    /// Per-node transceiver geometry records (ADR-152 §2.1.1).
817    geometry: Vec<NodeGeometry>,
818}
819
820/// Record the room's transceiver geometry (ADR-152 §2.1.1) into the in-server
821/// enrollment; the next `POST /room/train` snapshots it into the bank. A later
822/// POST supersedes an earlier one (latest wins), mirroring
823/// `EnrollmentSession::record_geometry`.
824async fn enroll_geometry(State(st): State<ApiState>, Json(b): Json<EnrollGeometryBody>) -> impl IntoResponse {
825    if b.geometry.is_empty() {
826        return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error":"geometry must be a non-empty array of NodeGeometry records"}))).into_response();
827    }
828    let nodes = b.geometry.len();
829    {
830        let mut map = st.enroll.write().await;
831        let re = map.entry(b.room_id.clone()).or_insert_with(RoomEnroll::default);
832        re.geometry = b.geometry;
833    }
834    eprintln!("[calibrate-serve] enroll geometry room={} nodes={nodes}", b.room_id);
835    (StatusCode::OK, Json(serde_json::json!({"room_id": b.room_id, "geometry_nodes": nodes}))).into_response()
836}
837
838/// Body for `POST /api/v1/enroll/anchor`.
839#[derive(Deserialize)]
840struct EnrollAnchorBody {
841    room_id: String,
842    /// Baseline name (without `.bin`), resolved under `output_dir`.
843    baseline: String,
844    /// Anchor label (snake_case, e.g. `stand_still`).
845    label: String,
846    /// Capture duration (s); defaults to the anchor's recommended length.
847    duration_s: Option<u32>,
848}
849
850/// Capture one guided anchor against a baseline. Blocks for the capture
851/// duration, then returns the gate verdict (accept/reject + progress).
852async fn enroll_anchor(State(st): State<ApiState>, Json(b): Json<EnrollAnchorBody>) -> impl IntoResponse {
853    let label = match AnchorLabel::from_str(&b.label) {
854        Some(l) => l,
855        None => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("unknown anchor label {:?}", b.label)}))).into_response(),
856    };
857    let duration_s = b.duration_s.unwrap_or_else(|| label.duration_s());
858    let (tx, rx) = oneshot::channel();
859    let cmd = CalCommand::EnrollAnchor {
860        room_id: b.room_id,
861        baseline_name: b.baseline,
862        label,
863        duration_s,
864        reply: tx,
865    };
866    if st.cmd_tx.send(cmd).await.is_err() {
867        return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"ingest task unavailable"}))).into_response();
868    }
869    match rx.await {
870        Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::to_value(v).unwrap())).into_response(),
871        Ok(Err(e)) => (StatusCode::CONFLICT, Json(serde_json::json!({"error": e}))).into_response(),
872        Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"no reply"}))).into_response(),
873    }
874}
875
876/// Query for `GET /api/v1/enroll/status`.
877#[derive(Deserialize)]
878struct EnrollStatusQuery {
879    room: String,
880}
881
882/// Enrollment progress for a room.
883async fn enroll_status(State(st): State<ApiState>, Query(q): Query<EnrollStatusQuery>) -> impl IntoResponse {
884    let map = st.enroll.read().await;
885    let (accepted, baseline_id): (Vec<String>, String) = match map.get(&q.room) {
886        Some(re) => (
887            re.anchors.iter().map(|a| a.label.as_str().to_string()).collect(),
888            re.baseline_id.clone(),
889        ),
890        None => (Vec::new(), String::new()),
891    };
892    let next = AnchorLabel::SEQUENCE
893        .iter()
894        .copied()
895        .find(|l| !accepted.iter().any(|a| a == l.as_str()))
896        .map(|l| l.as_str().to_string());
897    Json(serde_json::json!({
898        "room": q.room,
899        "baseline_id": baseline_id,
900        "accepted": accepted,
901        "count": accepted.len(),
902        "total": AnchorLabel::SEQUENCE.len(),
903        "next": next,
904        "complete": next.is_none() && !accepted.is_empty(),
905    }))
906}
907
908/// Query for `GET /api/v1/room/state`.
909#[derive(Deserialize)]
910struct RoomStateQuery {
911    /// Bank name (sanitized; resolved as `<output_dir>/<bank>.json`).
912    bank: String,
913    /// Sample rate override (Hz).
914    fs: Option<f32>,
915}
916
917/// Live mixture-of-specialists readout over the current CSI window.
918async fn room_state(State(st): State<ApiState>, Query(q): Query<RoomStateQuery>) -> impl IntoResponse {
919    // Resolve the bank as a sanitized name under output_dir — no arbitrary file read.
920    let name = sanitize_room_id(&q.bank);
921    let dir = { st.status.read().await.output_dir.clone() };
922    let path = format!("{dir}/{name}.json");
923    let raw = match tokio::fs::read_to_string(&path).await {
924        Ok(r) => r,
925        Err(e) => {
926            return (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("bank '{name}' not found: {e}")}))).into_response();
927        }
928    };
929    let bank = match SpecialistBank::from_json(&raw) {
930        Ok(b) => b,
931        Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("invalid bank: {e}")}))).into_response(),
932    };
933
934    let series: Vec<f32> = { st.window.read().await.iter().copied().collect() };
935    if series.len() < 32 {
936        return (StatusCode::OK, Json(serde_json::json!({"state":"warming_up","frames":series.len()}))).into_response();
937    }
938    let fs = q.fs.unwrap_or(st.fs_hz);
939    let features = Features::from_series(&series, fs);
940    let baseline_id = bank.baseline_id.clone();
941    let mix = MixtureOfSpecialists::new(bank);
942    let room = mix.infer(&features, &baseline_id);
943    (StatusCode::OK, Json(serde_json::to_value(room).unwrap())).into_response()
944}
945
946async fn baselines(State(st): State<ApiState>) -> impl IntoResponse {
947    let dir = { st.status.read().await.output_dir.clone() };
948    let mut out = Vec::new();
949    if let Ok(rd) = std::fs::read_dir(&dir) {
950        for entry in rd.flatten() {
951            let path = entry.path();
952            if path.extension().and_then(|e| e.to_str()) == Some("bin") {
953                let bytes = entry.metadata().map(|m| m.len()).unwrap_or(0);
954                out.push(serde_json::json!({
955                    "file": path.file_name().and_then(|n| n.to_str()).unwrap_or(""),
956                    "path": path.to_string_lossy(),
957                    "bytes": bytes,
958                }));
959            }
960        }
961    }
962    Json(serde_json::json!({ "dir": dir, "baselines": out }))
963}
964
965// ---------------------------------------------------------------------------
966// Helpers
967// ---------------------------------------------------------------------------
968
969fn session_snapshot(sess: &ActiveSession, state: &str, note: Option<String>) -> SessionStatus {
970    let frames = sess.recorder.frames_recorded() as usize;
971    let progress = if sess.target_frames == 0 {
972        0.0
973    } else {
974        (frames as f32 / sess.target_frames as f32).clamp(0.0, 1.0)
975    };
976    let elapsed = sess.started.elapsed().as_secs_f32();
977    let eta = if frames == 0 {
978        sess.deadline.saturating_duration_since(Instant::now()).as_secs_f32()
979    } else {
980        let per = elapsed / frames as f32;
981        (per * (sess.target_frames.saturating_sub(frames)) as f32).max(0.0)
982    };
983    SessionStatus {
984        state: state.into(),
985        room_id: sess.room_id.clone(),
986        tier: sess.tier.clone(),
987        frames_recorded: frames,
988        target_frames: sess.target_frames,
989        progress,
990        z_median: sess.z_median,
991        z_max: sess.z_max,
992        motion_flagged: sess.motion_flagged,
993        elapsed_s: elapsed,
994        eta_s: eta,
995        note,
996    }
997}
998
999fn baseline_averages(b: &BaselineCalibration) -> (f32, f32, f32) {
1000    let n = b.subcarriers.len().max(1) as f32;
1001    let mut amp = 0.0f32;
1002    let mut var = 0.0f32;
1003    let mut disp = 0.0f32;
1004    for s in &b.subcarriers {
1005        amp += s.amp_mean;
1006        var += s.amp_variance;
1007        disp += s.phase_dispersion;
1008    }
1009    (amp / n, var / n, disp / n)
1010}
1011
1012fn unix_ms() -> u64 {
1013    SystemTime::now()
1014        .duration_since(UNIX_EPOCH)
1015        .map(|d| d.as_millis() as u64)
1016        .unwrap_or(0)
1017}
1018
1019// ---------------------------------------------------------------------------
1020// Tests
1021// ---------------------------------------------------------------------------
1022
1023#[cfg(test)]
1024mod tests {
1025    use super::*;
1026
1027    #[test]
1028    fn start_params_defaults() {
1029        let p = StartParams::default();
1030        assert_eq!(p.duration_s, 30);
1031        assert_eq!(p.min_frames, 0);
1032        assert!(p.tier.is_none());
1033    }
1034
1035    #[test]
1036    fn start_params_partial_json() {
1037        let p: StartParams = serde_json::from_str(r#"{"room_id":"living-room","tier":"he20"}"#).unwrap();
1038        assert_eq!(p.room_id.as_deref(), Some("living-room"));
1039        assert_eq!(p.tier.as_deref(), Some("he20"));
1040        assert_eq!(p.duration_s, 30); // default applied
1041    }
1042
1043    #[test]
1044    fn args_defaults() {
1045        let a = CalibrateServeArgs {
1046            http_port: 8090,
1047            http_bind: "127.0.0.1".into(),
1048            udp_port: 5005,
1049            udp_bind: "0.0.0.0".into(),
1050            tier: "ht20".into(),
1051            output_dir: "./baselines".into(),
1052            token: None,
1053        };
1054        assert_eq!(a.http_port, 8090);
1055        assert_eq!(a.udp_port, 5005);
1056    }
1057
1058    #[test]
1059    fn sanitize_blocks_path_traversal() {
1060        assert_eq!(sanitize_room_id("../../etc/passwd"), "etcpasswd");
1061        assert_eq!(sanitize_room_id("/abs/path"), "abspath");
1062        assert_eq!(sanitize_room_id("living-room_1"), "living-room_1");
1063        assert_eq!(sanitize_room_id(""), "default");
1064        assert_eq!(sanitize_room_id("..\\..\\win"), "win");
1065        assert!(!sanitize_room_id("a/b/c").contains('/'));
1066    }
1067
1068    // ---- HTTP integration tests (router via tower oneshot, no network/ingest) ----
1069
1070    use axum::body::Body;
1071    use axum::http::{Request, StatusCode};
1072    use tower::ServiceExt; // for `oneshot`
1073
1074    fn test_state(dir: &str) -> ApiState {
1075        let (cmd_tx, _rx) = mpsc::channel::<CalCommand>(8);
1076        let status = Arc::new(RwLock::new(SharedStatus {
1077            output_dir: dir.to_string(),
1078            ..Default::default()
1079        }));
1080        let window = Arc::new(RwLock::new(VecDeque::<f32>::new()));
1081        let enroll = Arc::new(RwLock::new(HashMap::<String, RoomEnroll>::new()));
1082        // Tested handlers never use cmd_tx; drop the receiver.
1083        drop(_rx);
1084        ApiState { cmd_tx, status, window, fs_hz: 15.0, enroll }
1085    }
1086
1087    async fn req(app: Router, method: &str, uri: &str, body: Option<&str>) -> StatusCode {
1088        let b = body.map(|s| Body::from(s.to_string())).unwrap_or_else(Body::empty);
1089        let r = Request::builder()
1090            .method(method)
1091            .uri(uri)
1092            .header("content-type", "application/json")
1093            .body(b)
1094            .unwrap();
1095        app.oneshot(r).await.unwrap().status()
1096    }
1097
1098    #[tokio::test]
1099    async fn health_and_descriptor_ok() {
1100        let dir = tempfile::tempdir().unwrap();
1101        let app = build_router(test_state(dir.path().to_str().unwrap()));
1102        assert_eq!(req(app.clone(), "GET", "/", None).await, StatusCode::OK);
1103        assert_eq!(req(app, "GET", "/api/v1/calibration/health", None).await, StatusCode::OK);
1104    }
1105
1106    #[tokio::test]
1107    async fn train_then_state_and_traversal_defense() {
1108        let dir = tempfile::tempdir().unwrap();
1109        let state = test_state(dir.path().to_str().unwrap());
1110        // Fill the live window with a 0.3 Hz breathing sine.
1111        {
1112            let mut w = state.window.write().await;
1113            for i in 0..200 {
1114                w.push_back((2.0 * std::f32::consts::PI * 0.3 * i as f32 / 15.0).sin());
1115            }
1116        }
1117        let app = build_router(state);
1118
1119        // POST /room/train with two anchors → bank persisted as t.json.
1120        let body = r#"{"room_id":"t","baseline_id":"b","anchors":[
1121            {"room_id":"t","label":"empty","features":{"mean":1.0,"variance":1.0,"motion":0.1,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}},
1122            {"room_id":"t","label":"stand_still","features":{"mean":1.0,"variance":10.0,"motion":0.2,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}}
1123        ]}"#;
1124        assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(body)).await, StatusCode::OK);
1125        assert!(dir.path().join("t.json").exists(), "bank file written");
1126
1127        // GET /room/state?bank=t → 200 (trained bank loaded, window present).
1128        assert_eq!(req(app.clone(), "GET", "/api/v1/room/state?bank=t", None).await, StatusCode::OK);
1129
1130        // Path-traversal: ?bank=../../etc/passwd is sanitized → NOT_FOUND, never reads outside dir.
1131        assert_eq!(
1132            req(app.clone(), "GET", "/api/v1/room/state?bank=../../etc/passwd", None).await,
1133            StatusCode::NOT_FOUND
1134        );
1135
1136        // Train with no anchors and nothing enrolled → 400.
1137        assert_eq!(
1138            req(app, "POST", "/api/v1/room/train", Some(r#"{"room_id":"none","baseline_id":"b","anchors":[]}"#)).await,
1139            StatusCode::BAD_REQUEST
1140        );
1141    }
1142
1143    /// ADR-152 §2.1.1: geometry threads into the trained bank through both API
1144    /// paths — inline in the train request, or recorded via /enroll/geometry —
1145    /// and a geometry-free train still produces a valid (unconditioned) bank.
1146    #[tokio::test]
1147    async fn train_threads_geometry_into_bank() {
1148        let dir = tempfile::tempdir().unwrap();
1149        let app = build_router(test_state(dir.path().to_str().unwrap()));
1150        let anchors = r#"[
1151            {"room_id":"g","label":"empty","features":{"mean":1.0,"variance":1.0,"motion":0.1,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}},
1152            {"room_id":"g","label":"stand_still","features":{"mean":1.0,"variance":10.0,"motion":0.2,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}}
1153        ]"#;
1154        let load_bank = |name: &str| {
1155            let raw = std::fs::read_to_string(dir.path().join(format!("{name}.json"))).unwrap();
1156            SpecialistBank::from_json(&raw).unwrap()
1157        };
1158
1159        // (1) geometry inline in the train request.
1160        let body = format!(
1161            r#"{{"room_id":"g1","baseline_id":"b","anchors":{anchors},
1162                "geometry":[{{"node_id":1,"position":{{"x_m":0.0,"y_m":0.0,"z_m":1.0}},"method":"tape-measure"}},{{"node_id":2}}]}}"#
1163        );
1164        assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(&body)).await, StatusCode::OK);
1165        let bank = load_bank("g1");
1166        assert_eq!(bank.geometry.len(), 2);
1167        assert_eq!(bank.geometry[0].method, "tape-measure");
1168        assert_eq!(bank.geometry[1].node_id, 2);
1169
1170        // (2) geometry recorded via /enroll/geometry; train body omits it.
1171        assert_eq!(
1172            req(app.clone(), "POST", "/api/v1/enroll/geometry",
1173                Some(r#"{"room_id":"g2","geometry":[{"node_id":7,"method":"floor-plan"}]}"#)).await,
1174            StatusCode::OK
1175        );
1176        let body2 = format!(r#"{{"room_id":"g2","baseline_id":"b","anchors":{anchors}}}"#);
1177        assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(&body2)).await, StatusCode::OK);
1178        let bank2 = load_bank("g2");
1179        assert_eq!(bank2.geometry.len(), 1);
1180        assert_eq!(bank2.geometry[0].node_id, 7);
1181
1182        // (3) no geometry anywhere → valid geometry-free bank (note logged).
1183        let body3 = format!(r#"{{"room_id":"g3","baseline_id":"b","anchors":{anchors}}}"#);
1184        assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(&body3)).await, StatusCode::OK);
1185        let bank3 = load_bank("g3");
1186        assert!(bank3.geometry.is_empty());
1187        assert!(bank3.presence.is_some(), "bank still trains without geometry");
1188
1189        // (4) empty geometry array is rejected.
1190        assert_eq!(
1191            req(app, "POST", "/api/v1/enroll/geometry", Some(r#"{"room_id":"g4","geometry":[]}"#)).await,
1192            StatusCode::BAD_REQUEST
1193        );
1194    }
1195
1196    #[tokio::test]
1197    async fn enroll_status_empty_and_bad_label() {
1198        let dir = tempfile::tempdir().unwrap();
1199        let app = build_router(test_state(dir.path().to_str().unwrap()));
1200        // No enrollment yet → 200 with next=empty.
1201        assert_eq!(req(app.clone(), "GET", "/api/v1/enroll/status?room=x", None).await, StatusCode::OK);
1202        // Unknown anchor label → 400.
1203        assert_eq!(
1204            req(app, "POST", "/api/v1/enroll/anchor", Some(r#"{"room_id":"x","baseline":"b","label":"nope"}"#)).await,
1205            StatusCode::BAD_REQUEST
1206        );
1207    }
1208}