1use std::collections::{HashMap, VecDeque};
24use std::sync::Arc;
25use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
26
27use anyhow::Result;
28use axum::{
29 extract::{Query, State},
30 http::StatusCode,
31 response::IntoResponse,
32 routing::{get, post},
33 Json, Router,
34};
35use clap::Args;
36use serde::{Deserialize, Serialize};
37use tokio::net::UdpSocket;
38use tokio::sync::{mpsc, oneshot, RwLock};
39use tower_http::cors::CorsLayer;
40use wifi_densepose_calibration::extract::{AnchorFeature, Features};
41use wifi_densepose_calibration::{
42 AnchorLabel, AnchorQualityGate, AnchorRecorder, MixtureOfSpecialists, NodeGeometry,
43 SpecialistBank,
44};
45use wifi_densepose_core::types::CsiFrame;
46use wifi_densepose_signal::{BaselineCalibration, CalibrationRecorder};
47
48use crate::calibrate::{parse_csi_packet, tier_config};
49
50const LIVE_WINDOW: usize = 256;
53
54fn frame_scalar(frame: &CsiFrame) -> f32 {
56 let a = &frame.amplitude;
57 if a.is_empty() {
58 0.0
59 } else {
60 (a.sum() / a.len() as f64) as f32
61 }
62}
63
64const RECV_BUF: usize = 2048;
65
66#[derive(Args, Debug, Clone)]
72pub struct CalibrateServeArgs {
73 #[arg(long, default_value_t = 8090)]
75 pub http_port: u16,
76
77 #[arg(long, default_value = "127.0.0.1")]
80 pub http_bind: String,
81
82 #[arg(long, default_value_t = 5005)]
84 pub udp_port: u16,
85
86 #[arg(long, default_value = "0.0.0.0")]
88 pub udp_bind: String,
89
90 #[arg(long, default_value = "ht20")]
92 pub tier: String,
93
94 #[arg(long, default_value = "./baselines")]
96 pub output_dir: String,
97
98 #[arg(long, env = "CALIBRATE_TOKEN")]
101 pub token: Option<String>,
102}
103
104fn sanitize_room_id(raw: &str) -> String {
108 let cleaned: String = raw
109 .chars()
110 .filter(|c| c.is_ascii_alphanumeric() || *c == '_' || *c == '-')
111 .take(64)
112 .collect();
113 if cleaned.is_empty() {
114 "default".into()
115 } else {
116 cleaned
117 }
118}
119
120#[derive(Debug, Deserialize)]
126#[serde(default)]
127pub struct StartParams {
128 pub tier: Option<String>,
130 pub duration_s: u32,
132 pub room_id: Option<String>,
134 pub min_frames: u32,
136}
137
138impl Default for StartParams {
139 fn default() -> Self {
140 Self { tier: None, duration_s: 30, room_id: None, min_frames: 0 }
141 }
142}
143
144#[derive(Debug, Clone, Serialize)]
146pub struct SessionStatus {
147 pub state: String,
149 pub room_id: String,
150 pub tier: String,
151 pub frames_recorded: usize,
152 pub target_frames: usize,
153 pub progress: f32,
155 pub z_median: f32,
156 pub z_max: f32,
157 pub motion_flagged: bool,
158 pub elapsed_s: f32,
159 pub eta_s: f32,
160 pub note: Option<String>,
162}
163
164#[derive(Debug, Clone, Serialize)]
166pub struct ResultSummary {
167 pub calibration_id: String,
168 pub room_id: String,
169 pub tier: String,
170 pub frame_count: u64,
171 pub subcarriers: usize,
172 pub captured_at_unix_s: i64,
173 pub amp_mean_avg: f32,
174 pub amp_variance_avg: f32,
175 pub phase_dispersion_avg: f32,
176 pub output_path: String,
177 pub saved_bytes: usize,
178}
179
180#[derive(Default)]
182struct SharedStatus {
183 udp_port: u16,
184 default_tier: String,
185 output_dir: String,
186 frames_seen: u64,
187 last_frame_unix_ms: u64,
188 session: Option<SessionStatus>,
189 last_result: Option<ResultSummary>,
190}
191
192enum CalCommand {
194 Start { params: StartParams, reply: oneshot::Sender<Result<SessionStatus, String>> },
195 Stop { reply: oneshot::Sender<Result<ResultSummary, String>> },
196 EnrollAnchor {
197 room_id: String,
198 baseline_name: String,
199 label: AnchorLabel,
200 duration_s: u32,
201 reply: oneshot::Sender<Result<AnchorVerdict, String>>,
202 },
203}
204
205#[derive(Default)]
207struct RoomEnroll {
208 baseline_id: String,
209 fs_hz: f32,
210 anchors: Vec<AnchorFeature>,
211 geometry: Vec<NodeGeometry>,
214}
215
216#[derive(Debug, Clone, Serialize)]
218pub struct AnchorVerdict {
219 pub label: String,
221 pub accepted: bool,
223 pub reason: Option<String>,
225 pub presence_z: f32,
227 pub motion_rate: f32,
229 pub frames: u32,
231 pub accepted_count: usize,
233 pub next: Option<String>,
235}
236
237struct EnrollCapture {
239 recorder: AnchorRecorder,
240 baseline: BaselineCalibration,
241 label: AnchorLabel,
242 room_id: String,
243 baseline_id: String,
244 fs_hz: f32,
245 series: Vec<f32>,
246 deadline: Instant,
247 reply: Option<oneshot::Sender<Result<AnchorVerdict, String>>>,
248}
249
250#[derive(Clone)]
251struct ApiState {
252 cmd_tx: mpsc::Sender<CalCommand>,
253 status: Arc<RwLock<SharedStatus>>,
254 window: Arc<RwLock<VecDeque<f32>>>,
256 fs_hz: f32,
258 enroll: Arc<RwLock<HashMap<String, RoomEnroll>>>,
260}
261
262async fn require_bearer(
266 axum::extract::State(token): axum::extract::State<String>,
267 req: axum::extract::Request,
268 next: axum::middleware::Next,
269) -> axum::response::Response {
270 let authorized = req
271 .headers()
272 .get(axum::http::header::AUTHORIZATION)
273 .and_then(|v| v.to_str().ok())
274 .and_then(|h| h.strip_prefix("Bearer "))
275 .map(|t| t == token)
276 .unwrap_or(false);
277 if authorized {
278 next.run(req).await
279 } else {
280 (
281 StatusCode::UNAUTHORIZED,
282 Json(serde_json::json!({"error": "missing or invalid bearer token"})),
283 )
284 .into_response()
285 }
286}
287
288fn build_router(state: ApiState) -> Router {
295 Router::new()
296 .route("/", get(descriptor))
297 .route("/api/v1/calibration/health", get(health))
298 .route("/api/v1/calibration/start", post(start))
299 .route("/api/v1/calibration/status", get(status_handler))
300 .route("/api/v1/calibration/stop", post(stop))
301 .route("/api/v1/calibration/result", get(result))
302 .route("/api/v1/calibration/baselines", get(baselines))
303 .route("/api/v1/room/state", get(room_state))
304 .route("/api/v1/room/train", post(train_room))
305 .route("/api/v1/enroll/anchor", post(enroll_anchor))
306 .route("/api/v1/enroll/geometry", post(enroll_geometry))
307 .route("/api/v1/enroll/status", get(enroll_status))
308 .layer(CorsLayer::permissive())
309 .with_state(state)
310}
311
312pub async fn execute(args: CalibrateServeArgs) -> Result<()> {
314 std::fs::create_dir_all(&args.output_dir)
315 .map_err(|e| anyhow::anyhow!("cannot create output dir {}: {e}", args.output_dir))?;
316
317 let udp_addr = format!("{}:{}", args.udp_bind, args.udp_port);
318 let socket = UdpSocket::bind(&udp_addr)
319 .await
320 .map_err(|e| anyhow::anyhow!("cannot bind UDP socket on {udp_addr}: {e}"))?;
321 eprintln!("[calibrate-serve] CSI ingest on udp://{udp_addr}");
322
323 let status = Arc::new(RwLock::new(SharedStatus {
324 udp_port: args.udp_port,
325 default_tier: args.tier.clone(),
326 output_dir: args.output_dir.clone(),
327 ..Default::default()
328 }));
329
330 let (cmd_tx, cmd_rx) = mpsc::channel::<CalCommand>(8);
331 let window = Arc::new(RwLock::new(VecDeque::<f32>::with_capacity(LIVE_WINDOW)));
332 let enroll = Arc::new(RwLock::new(HashMap::<String, RoomEnroll>::new()));
333
334 {
336 let status = status.clone();
337 let default_tier = args.tier.clone();
338 let output_dir = args.output_dir.clone();
339 let window = window.clone();
340 let enroll = enroll.clone();
341 tokio::spawn(async move {
342 ingest_loop(socket, cmd_rx, status, default_tier, output_dir, window, enroll).await;
343 });
344 }
345
346 let state = ApiState { cmd_tx, status, window, fs_hz: 15.0, enroll };
347 let mut app = build_router(state);
348
349 if let Some(token) = args.token.clone() {
351 app = app.layer(axum::middleware::from_fn_with_state(token, require_bearer));
352 eprintln!("[calibrate-serve] bearer auth ENABLED");
353 } else if args.http_bind != "127.0.0.1" && args.http_bind != "localhost" {
354 eprintln!(
355 "[calibrate-serve] WARNING: bound to {} with NO --token — anyone on the network can drive calibration",
356 args.http_bind
357 );
358 }
359
360 let http_addr = format!("{}:{}", args.http_bind, args.http_port);
361 let listener = tokio::net::TcpListener::bind(&http_addr)
362 .await
363 .map_err(|e| anyhow::anyhow!("cannot bind HTTP listener on {http_addr}: {e}"))?;
364 eprintln!("[calibrate-serve] HTTP API on http://{http_addr} (GET / for the route list)");
365
366 axum::serve(listener, app)
367 .await
368 .map_err(|e| anyhow::anyhow!("HTTP server error: {e}"))?;
369 Ok(())
370}
371
372struct ActiveSession {
377 recorder: CalibrationRecorder,
378 room_id: String,
379 tier: String,
380 started: Instant,
381 deadline: Instant,
382 target_frames: usize,
383 z_median: f32,
384 z_max: f32,
385 motion_flagged: bool,
386}
387
388async fn ingest_loop(
389 socket: UdpSocket,
390 mut cmd_rx: mpsc::Receiver<CalCommand>,
391 status: Arc<RwLock<SharedStatus>>,
392 default_tier: String,
393 output_dir: String,
394 window: Arc<RwLock<VecDeque<f32>>>,
395 enroll: Arc<RwLock<HashMap<String, RoomEnroll>>>,
396) {
397 let mut buf = vec![0u8; RECV_BUF];
398 let mut active: Option<ActiveSession> = None;
399 let mut active_enroll: Option<EnrollCapture> = None;
400 let mut tick = tokio::time::interval(Duration::from_millis(200));
401 let mut frames_seen: u64 = 0;
404 let mut last_frame_ms: u64 = 0;
405 let mut win_local: VecDeque<f32> = VecDeque::with_capacity(LIVE_WINDOW);
407
408 loop {
409 tokio::select! {
410 Some(cmd) = cmd_rx.recv() => match cmd {
412 CalCommand::Start { params, reply } => {
413 if active.is_some() {
414 let _ = reply.send(Err("a calibration session is already running".into()));
415 continue;
416 }
417 let tier = params.tier.unwrap_or_else(|| default_tier.clone());
418 if !["ht20", "ht40", "he20", "he40"].contains(&tier.to_ascii_lowercase().as_str()) {
419 let _ = reply.send(Err(format!("invalid tier {tier:?}")));
420 continue;
421 }
422 let mut config = tier_config(&tier);
423 if params.min_frames > 0 {
424 config.min_frames = params.min_frames;
425 }
426 let target_frames = config.min_frames as usize;
427 let dur = params.duration_s.max(1) as u64;
428 let room_id = sanitize_room_id(¶ms.room_id.unwrap_or_else(|| "default".into()));
430 let sess = ActiveSession {
431 recorder: CalibrationRecorder::new(config),
432 room_id: room_id.clone(),
433 tier: tier.clone(),
434 started: Instant::now(),
435 deadline: Instant::now() + Duration::from_secs(dur),
436 target_frames,
437 z_median: 0.0,
438 z_max: 0.0,
439 motion_flagged: false,
440 };
441 let snap = session_snapshot(&sess, "recording", None);
442 active = Some(sess);
443 {
444 let mut s = status.write().await;
445 s.session = Some(snap.clone());
446 s.last_result = None;
447 }
448 eprintln!("[calibrate-serve] session start room={room_id} tier={tier} target={target_frames}");
449 let _ = reply.send(Ok(snap));
450 }
451 CalCommand::Stop { reply } => {
452 match active.take() {
453 Some(sess) => {
454 let res = finalize(sess, &output_dir, &status).await;
455 let _ = reply.send(res);
456 }
457 None => { let _ = reply.send(Err("no active calibration session".into())); }
458 }
459 }
460 CalCommand::EnrollAnchor { room_id, baseline_name, label, duration_s, reply } => {
461 if active.is_some() || active_enroll.is_some() {
462 let _ = reply.send(Err("a capture is already running".into()));
463 continue;
464 }
465 let bname = sanitize_room_id(&baseline_name);
467 let bpath = format!("{output_dir}/{bname}.bin");
468 let baseline = match tokio::fs::read(&bpath).await {
469 Ok(bytes) => match BaselineCalibration::from_bytes(&bytes) {
470 Ok(b) => b,
471 Err(e) => { let _ = reply.send(Err(format!("invalid baseline {bname}: {e}"))); continue; }
472 },
473 Err(e) => { let _ = reply.send(Err(format!("baseline {bname} not found: {e}"))); continue; }
474 };
475 let baseline_id = baseline.calibration_uuid().to_string();
476 eprintln!("[calibrate-serve] enroll anchor room={room_id} label={} ({}s)", label.as_str(), duration_s);
477 active_enroll = Some(EnrollCapture {
478 recorder: AnchorRecorder::new(label),
479 baseline,
480 label,
481 room_id,
482 baseline_id,
483 fs_hz: 15.0,
484 series: Vec::new(),
485 deadline: Instant::now() + Duration::from_secs(duration_s.max(1) as u64),
486 reply: Some(reply),
487 });
488 }
489 },
490
491 Ok(n) = socket.recv(&mut buf) => {
493 frames_seen += 1;
494 last_frame_ms = unix_ms();
495 let parse_tier = active.as_ref().map(|s| s.tier.clone()).unwrap_or_else(|| default_tier.clone());
496 if let Some(frame) = parse_csi_packet(&buf[..n], &parse_tier) {
497 win_local.push_back(frame_scalar(&frame));
499 while win_local.len() > LIVE_WINDOW {
500 win_local.pop_front();
501 }
502 if let Some(sess) = active.as_mut() {
503 if let Ok(score) = sess.recorder.record(&frame) {
504 sess.z_median = score.amplitude_z_median;
505 sess.z_max = score.amplitude_z_max;
506 sess.motion_flagged = score.motion_flagged;
507 }
508 if sess.recorder.frames_recorded() as usize >= sess.target_frames {
509 if let Some(done) = active.take() {
510 let _ = finalize(done, &output_dir, &status).await;
511 }
512 }
513 }
514 if let Some(ec) = active_enroll.as_mut() {
515 ec.recorder.record_frame(&ec.baseline, &frame);
516 ec.series.push(frame_scalar(&frame));
517 }
518 }
519 },
520
521 _ = tick.tick() => {
523 {
524 let mut s = status.write().await;
525 s.frames_seen = frames_seen;
526 s.last_frame_unix_ms = last_frame_ms;
527 if let Some(sess) = active.as_ref() {
528 s.session = Some(session_snapshot(sess, "recording", None));
529 }
530 }
531 {
532 let mut w = window.write().await;
533 w.clear();
534 w.extend(win_local.iter().copied());
535 }
536 if let Some(sess) = active.as_ref() {
537 if Instant::now() >= sess.deadline {
538 let frames = sess.recorder.frames_recorded() as usize;
539 if frames >= 10 {
540 if let Some(done) = active.take() {
541 let _ = finalize(done, &output_dir, &status).await;
542 }
543 } else if let Some(mut done) = active.take() {
544 done.motion_flagged = false;
546 let note = format!(
547 "aborted: only {frames} frames in the time window (need >=10) — \
548 is the ESP32 streaming to udp:{}? ",
549 status.read().await.udp_port
550 );
551 let snap = session_snapshot(&done, "aborted", Some(note.clone()));
552 status.write().await.session = Some(snap);
553 eprintln!("[calibrate-serve] {note}");
554 }
555 }
556 }
557 let enroll_done = active_enroll.as_ref().map(|ec| Instant::now() >= ec.deadline).unwrap_or(false);
559 if enroll_done {
560 if let Some(mut ec) = active_enroll.take() {
561 let gate = AnchorQualityGate::default();
562 let (anchor, reason) = ec.recorder.finalize(&gate, (unix_ms() / 1000) as i64);
563 let mut verdict = AnchorVerdict {
564 label: ec.label.as_str().into(),
565 accepted: anchor.quality.accepted,
566 reason,
567 presence_z: anchor.quality.presence_z,
568 motion_rate: anchor.quality.motion_rate,
569 frames: anchor.quality.frames,
570 accepted_count: 0,
571 next: None,
572 };
573 if anchor.quality.accepted {
574 let feat = AnchorFeature::from_series(&ec.room_id, ec.label, &ec.series, ec.fs_hz);
575 let mut map = enroll.write().await;
576 let re = map.entry(ec.room_id.clone()).or_insert_with(RoomEnroll::default);
577 if re.baseline_id.is_empty() {
578 re.baseline_id = ec.baseline_id.clone();
579 re.fs_hz = ec.fs_hz;
580 }
581 if let Some(slot) = re.anchors.iter_mut().find(|a| a.label == ec.label) {
582 *slot = feat;
583 } else {
584 re.anchors.push(feat);
585 }
586 verdict.accepted_count = re.anchors.len();
587 verdict.next = AnchorLabel::SEQUENCE.iter().copied()
588 .find(|l| !re.anchors.iter().any(|a| a.label == *l))
589 .map(|l| l.as_str().to_string());
590 } else {
591 verdict.accepted_count = enroll.read().await.get(&ec.room_id).map(|re| re.anchors.len()).unwrap_or(0);
592 }
593 eprintln!("[calibrate-serve] enroll anchor {} accepted={} ({} total)", verdict.label, verdict.accepted, verdict.accepted_count);
594 if let Some(tx) = ec.reply.take() {
595 let _ = tx.send(Ok(verdict));
596 }
597 }
598 }
599 },
600 }
601 }
602}
603
604async fn finalize(
606 sess: ActiveSession,
607 output_dir: &str,
608 status: &Arc<RwLock<SharedStatus>>,
609) -> Result<ResultSummary, String> {
610 let room_id = sess.room_id.clone();
611 let tier = sess.tier.clone();
612 {
614 let snap = session_snapshot(&sess, "finalizing", None);
615 status.write().await.session = Some(snap);
616 }
617
618 let baseline: BaselineCalibration = sess
619 .recorder
620 .finalize()
621 .map_err(|e| format!("finalize failed: {e}"))?;
622
623 let (amp_mean_avg, amp_var_avg, disp_avg) = baseline_averages(&baseline);
624 let uuid = baseline.calibration_uuid().to_string();
625 let path = format!("{output_dir}/{room_id}-{uuid}.bin");
626 let bytes = baseline.to_bytes();
627 tokio::fs::write(&path, &bytes)
629 .await
630 .map_err(|e| format!("cannot write {path}: {e}"))?;
631
632 let summary = ResultSummary {
633 calibration_id: uuid,
634 room_id: room_id.clone(),
635 tier,
636 frame_count: baseline.frame_count,
637 subcarriers: baseline.subcarriers.len(),
638 captured_at_unix_s: baseline.captured_at_unix_s,
639 amp_mean_avg,
640 amp_variance_avg: amp_var_avg,
641 phase_dispersion_avg: disp_avg,
642 output_path: path.clone(),
643 saved_bytes: bytes.len(),
644 };
645
646 {
647 let mut s = status.write().await;
648 if let Some(sess_status) = s.session.as_mut() {
650 sess_status.state = "complete".into();
651 sess_status.progress = 1.0;
652 }
653 s.last_result = Some(summary.clone());
654 }
655 eprintln!(
656 "[calibrate-serve] session complete room={room_id} frames={} -> {path} ({} bytes)",
657 summary.frame_count, summary.saved_bytes
658 );
659 Ok(summary)
660}
661
662async fn descriptor() -> impl IntoResponse {
667 Json(serde_json::json!({
668 "service": "wifi-densepose calibration API",
669 "adr": "ADR-135 (baseline) / ADR-151 (room calibration & training)",
670 "endpoints": {
671 "GET /api/v1/calibration/health": "liveness + UDP ingest stats",
672 "POST /api/v1/calibration/start": "{ tier?, duration_s?, room_id?, min_frames? }",
673 "GET /api/v1/calibration/status": "live session progress (poll for UI)",
674 "POST /api/v1/calibration/stop": "finalize current session early",
675 "GET /api/v1/calibration/result": "last finalized baseline summary",
676 "GET /api/v1/calibration/baselines": "list persisted baseline files",
677 "GET /api/v1/room/state?bank=<name>": "live mixture-of-specialists RoomState over the CSI window",
678 "POST /api/v1/room/train": "{ room_id, baseline_id, anchors[]?, geometry[]? } → train + persist a specialist bank (anchors[]/geometry[] optional if enrolled in-server)",
679 "POST /api/v1/enroll/anchor": "{ room_id, baseline, label, duration_s? } → capture one guided anchor (blocks for the capture)",
680 "POST /api/v1/enroll/geometry": "{ room_id, geometry: [NodeGeometry…] } → record transceiver geometry for the room (ADR-152 §2.1.1; latest wins)",
681 "GET /api/v1/enroll/status?room=<id>": "enrollment progress (accepted anchors, next, complete)"
682 }
683 }))
684}
685
686async fn health(State(st): State<ApiState>) -> impl IntoResponse {
687 let s = st.status.read().await;
688 let age = if s.last_frame_unix_ms == 0 { None } else { Some(unix_ms().saturating_sub(s.last_frame_unix_ms)) };
689 Json(serde_json::json!({
690 "status": "ok",
691 "udp_port": s.udp_port,
692 "frames_seen": s.frames_seen,
693 "last_frame_age_ms": age,
694 "streaming": age.map(|a| a < 2000).unwrap_or(false),
695 "default_tier": s.default_tier,
696 "output_dir": s.output_dir,
697 "session_active": s.session.as_ref().map(|x| x.state == "recording").unwrap_or(false),
698 }))
699}
700
701async fn start(State(st): State<ApiState>, Json(params): Json<StartParams>) -> impl IntoResponse {
702 let (tx, rx) = oneshot::channel();
703 if st.cmd_tx.send(CalCommand::Start { params, reply: tx }).await.is_err() {
704 return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"ingest task unavailable"}))).into_response();
705 }
706 match rx.await {
707 Ok(Ok(snap)) => (StatusCode::ACCEPTED, Json(serde_json::to_value(snap).unwrap())).into_response(),
708 Ok(Err(e)) => (StatusCode::CONFLICT, Json(serde_json::json!({"error": e}))).into_response(),
709 Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"no reply"}))).into_response(),
710 }
711}
712
713async fn status_handler(State(st): State<ApiState>) -> impl IntoResponse {
714 let s = st.status.read().await;
715 match &s.session {
716 Some(sess) => (StatusCode::OK, Json(serde_json::to_value(sess).unwrap())).into_response(),
717 None => (StatusCode::OK, Json(serde_json::json!({"state":"idle"}))).into_response(),
718 }
719}
720
721async fn stop(State(st): State<ApiState>) -> impl IntoResponse {
722 let (tx, rx) = oneshot::channel();
723 if st.cmd_tx.send(CalCommand::Stop { reply: tx }).await.is_err() {
724 return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"ingest task unavailable"}))).into_response();
725 }
726 match rx.await {
727 Ok(Ok(summary)) => (StatusCode::OK, Json(serde_json::to_value(summary).unwrap())).into_response(),
728 Ok(Err(e)) => (StatusCode::CONFLICT, Json(serde_json::json!({"error": e}))).into_response(),
729 Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"no reply"}))).into_response(),
730 }
731}
732
733async fn result(State(st): State<ApiState>) -> impl IntoResponse {
734 let s = st.status.read().await;
735 match &s.last_result {
736 Some(r) => (StatusCode::OK, Json(serde_json::to_value(r).unwrap())).into_response(),
737 None => (StatusCode::NOT_FOUND, Json(serde_json::json!({"error":"no finalized baseline yet"}))).into_response(),
738 }
739}
740
741#[derive(Deserialize)]
744struct TrainRequest {
745 room_id: String,
746 baseline_id: String,
747 #[serde(default)]
748 anchors: Vec<AnchorFeature>,
749 #[serde(default)]
753 geometry: Vec<NodeGeometry>,
754}
755
756async fn train_room(State(st): State<ApiState>, Json(req): Json<TrainRequest>) -> impl IntoResponse {
762 let (anchors, baseline_id) = if !req.anchors.is_empty() {
763 (req.anchors.clone(), req.baseline_id.clone())
764 } else {
765 match st.enroll.read().await.get(&req.room_id) {
766 Some(re) if !re.anchors.is_empty() => (re.anchors.clone(), re.baseline_id.clone()),
767 _ => {
768 return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error":"no anchors in request and none enrolled for this room"}))).into_response();
769 }
770 }
771 };
772 let geometry = if !req.geometry.is_empty() {
773 req.geometry.clone()
774 } else {
775 st.enroll.read().await.get(&req.room_id).map(|re| re.geometry.clone()).unwrap_or_default()
776 };
777 let at = (unix_ms() / 1000) as i64;
778 let bank = match SpecialistBank::train(&req.room_id, &baseline_id, &anchors, at) {
779 Ok(b) => b,
780 Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("training failed: {e}")}))).into_response(),
781 };
782 let bank = if geometry.is_empty() {
783 eprintln!(
784 "[calibrate-serve] no transceiver geometry recorded for room '{}' — bank will not support geometry conditioning (ADR-152 §2.1.2)",
785 req.room_id
786 );
787 bank
788 } else {
789 bank.with_geometry(geometry)
790 };
791 let name = sanitize_room_id(&req.room_id);
792 let dir = { st.status.read().await.output_dir.clone() };
793 let path = format!("{dir}/{name}.json");
794 let json = match bank.to_json() {
795 Ok(j) => j,
796 Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("serialize: {e}")}))).into_response(),
797 };
798 if let Err(e) = tokio::fs::write(&path, json).await {
799 return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": format!("cannot write {path}: {e}")}))).into_response();
800 }
801 let kinds: Vec<String> = bank.trained_kinds().iter().map(|k| format!("{k:?}")).collect();
802 (StatusCode::OK, Json(serde_json::json!({
803 "room_id": bank.room_id,
804 "bank": name, "anchor_count": bank.anchor_count,
806 "specialists": kinds,
807 "geometry_nodes": bank.geometry.len(),
808 "path": path,
809 }))).into_response()
810}
811
812#[derive(Deserialize)]
814struct EnrollGeometryBody {
815 room_id: String,
816 geometry: Vec<NodeGeometry>,
818}
819
820async fn enroll_geometry(State(st): State<ApiState>, Json(b): Json<EnrollGeometryBody>) -> impl IntoResponse {
825 if b.geometry.is_empty() {
826 return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error":"geometry must be a non-empty array of NodeGeometry records"}))).into_response();
827 }
828 let nodes = b.geometry.len();
829 {
830 let mut map = st.enroll.write().await;
831 let re = map.entry(b.room_id.clone()).or_insert_with(RoomEnroll::default);
832 re.geometry = b.geometry;
833 }
834 eprintln!("[calibrate-serve] enroll geometry room={} nodes={nodes}", b.room_id);
835 (StatusCode::OK, Json(serde_json::json!({"room_id": b.room_id, "geometry_nodes": nodes}))).into_response()
836}
837
838#[derive(Deserialize)]
840struct EnrollAnchorBody {
841 room_id: String,
842 baseline: String,
844 label: String,
846 duration_s: Option<u32>,
848}
849
850async fn enroll_anchor(State(st): State<ApiState>, Json(b): Json<EnrollAnchorBody>) -> impl IntoResponse {
853 let label = match AnchorLabel::from_str(&b.label) {
854 Some(l) => l,
855 None => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("unknown anchor label {:?}", b.label)}))).into_response(),
856 };
857 let duration_s = b.duration_s.unwrap_or_else(|| label.duration_s());
858 let (tx, rx) = oneshot::channel();
859 let cmd = CalCommand::EnrollAnchor {
860 room_id: b.room_id,
861 baseline_name: b.baseline,
862 label,
863 duration_s,
864 reply: tx,
865 };
866 if st.cmd_tx.send(cmd).await.is_err() {
867 return (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"ingest task unavailable"}))).into_response();
868 }
869 match rx.await {
870 Ok(Ok(v)) => (StatusCode::OK, Json(serde_json::to_value(v).unwrap())).into_response(),
871 Ok(Err(e)) => (StatusCode::CONFLICT, Json(serde_json::json!({"error": e}))).into_response(),
872 Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error":"no reply"}))).into_response(),
873 }
874}
875
876#[derive(Deserialize)]
878struct EnrollStatusQuery {
879 room: String,
880}
881
882async fn enroll_status(State(st): State<ApiState>, Query(q): Query<EnrollStatusQuery>) -> impl IntoResponse {
884 let map = st.enroll.read().await;
885 let (accepted, baseline_id): (Vec<String>, String) = match map.get(&q.room) {
886 Some(re) => (
887 re.anchors.iter().map(|a| a.label.as_str().to_string()).collect(),
888 re.baseline_id.clone(),
889 ),
890 None => (Vec::new(), String::new()),
891 };
892 let next = AnchorLabel::SEQUENCE
893 .iter()
894 .copied()
895 .find(|l| !accepted.iter().any(|a| a == l.as_str()))
896 .map(|l| l.as_str().to_string());
897 Json(serde_json::json!({
898 "room": q.room,
899 "baseline_id": baseline_id,
900 "accepted": accepted,
901 "count": accepted.len(),
902 "total": AnchorLabel::SEQUENCE.len(),
903 "next": next,
904 "complete": next.is_none() && !accepted.is_empty(),
905 }))
906}
907
908#[derive(Deserialize)]
910struct RoomStateQuery {
911 bank: String,
913 fs: Option<f32>,
915}
916
917async fn room_state(State(st): State<ApiState>, Query(q): Query<RoomStateQuery>) -> impl IntoResponse {
919 let name = sanitize_room_id(&q.bank);
921 let dir = { st.status.read().await.output_dir.clone() };
922 let path = format!("{dir}/{name}.json");
923 let raw = match tokio::fs::read_to_string(&path).await {
924 Ok(r) => r,
925 Err(e) => {
926 return (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": format!("bank '{name}' not found: {e}")}))).into_response();
927 }
928 };
929 let bank = match SpecialistBank::from_json(&raw) {
930 Ok(b) => b,
931 Err(e) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": format!("invalid bank: {e}")}))).into_response(),
932 };
933
934 let series: Vec<f32> = { st.window.read().await.iter().copied().collect() };
935 if series.len() < 32 {
936 return (StatusCode::OK, Json(serde_json::json!({"state":"warming_up","frames":series.len()}))).into_response();
937 }
938 let fs = q.fs.unwrap_or(st.fs_hz);
939 let features = Features::from_series(&series, fs);
940 let baseline_id = bank.baseline_id.clone();
941 let mix = MixtureOfSpecialists::new(bank);
942 let room = mix.infer(&features, &baseline_id);
943 (StatusCode::OK, Json(serde_json::to_value(room).unwrap())).into_response()
944}
945
946async fn baselines(State(st): State<ApiState>) -> impl IntoResponse {
947 let dir = { st.status.read().await.output_dir.clone() };
948 let mut out = Vec::new();
949 if let Ok(rd) = std::fs::read_dir(&dir) {
950 for entry in rd.flatten() {
951 let path = entry.path();
952 if path.extension().and_then(|e| e.to_str()) == Some("bin") {
953 let bytes = entry.metadata().map(|m| m.len()).unwrap_or(0);
954 out.push(serde_json::json!({
955 "file": path.file_name().and_then(|n| n.to_str()).unwrap_or(""),
956 "path": path.to_string_lossy(),
957 "bytes": bytes,
958 }));
959 }
960 }
961 }
962 Json(serde_json::json!({ "dir": dir, "baselines": out }))
963}
964
965fn session_snapshot(sess: &ActiveSession, state: &str, note: Option<String>) -> SessionStatus {
970 let frames = sess.recorder.frames_recorded() as usize;
971 let progress = if sess.target_frames == 0 {
972 0.0
973 } else {
974 (frames as f32 / sess.target_frames as f32).clamp(0.0, 1.0)
975 };
976 let elapsed = sess.started.elapsed().as_secs_f32();
977 let eta = if frames == 0 {
978 sess.deadline.saturating_duration_since(Instant::now()).as_secs_f32()
979 } else {
980 let per = elapsed / frames as f32;
981 (per * (sess.target_frames.saturating_sub(frames)) as f32).max(0.0)
982 };
983 SessionStatus {
984 state: state.into(),
985 room_id: sess.room_id.clone(),
986 tier: sess.tier.clone(),
987 frames_recorded: frames,
988 target_frames: sess.target_frames,
989 progress,
990 z_median: sess.z_median,
991 z_max: sess.z_max,
992 motion_flagged: sess.motion_flagged,
993 elapsed_s: elapsed,
994 eta_s: eta,
995 note,
996 }
997}
998
999fn baseline_averages(b: &BaselineCalibration) -> (f32, f32, f32) {
1000 let n = b.subcarriers.len().max(1) as f32;
1001 let mut amp = 0.0f32;
1002 let mut var = 0.0f32;
1003 let mut disp = 0.0f32;
1004 for s in &b.subcarriers {
1005 amp += s.amp_mean;
1006 var += s.amp_variance;
1007 disp += s.phase_dispersion;
1008 }
1009 (amp / n, var / n, disp / n)
1010}
1011
1012fn unix_ms() -> u64 {
1013 SystemTime::now()
1014 .duration_since(UNIX_EPOCH)
1015 .map(|d| d.as_millis() as u64)
1016 .unwrap_or(0)
1017}
1018
1019#[cfg(test)]
1024mod tests {
1025 use super::*;
1026
1027 #[test]
1028 fn start_params_defaults() {
1029 let p = StartParams::default();
1030 assert_eq!(p.duration_s, 30);
1031 assert_eq!(p.min_frames, 0);
1032 assert!(p.tier.is_none());
1033 }
1034
1035 #[test]
1036 fn start_params_partial_json() {
1037 let p: StartParams = serde_json::from_str(r#"{"room_id":"living-room","tier":"he20"}"#).unwrap();
1038 assert_eq!(p.room_id.as_deref(), Some("living-room"));
1039 assert_eq!(p.tier.as_deref(), Some("he20"));
1040 assert_eq!(p.duration_s, 30); }
1042
1043 #[test]
1044 fn args_defaults() {
1045 let a = CalibrateServeArgs {
1046 http_port: 8090,
1047 http_bind: "127.0.0.1".into(),
1048 udp_port: 5005,
1049 udp_bind: "0.0.0.0".into(),
1050 tier: "ht20".into(),
1051 output_dir: "./baselines".into(),
1052 token: None,
1053 };
1054 assert_eq!(a.http_port, 8090);
1055 assert_eq!(a.udp_port, 5005);
1056 }
1057
1058 #[test]
1059 fn sanitize_blocks_path_traversal() {
1060 assert_eq!(sanitize_room_id("../../etc/passwd"), "etcpasswd");
1061 assert_eq!(sanitize_room_id("/abs/path"), "abspath");
1062 assert_eq!(sanitize_room_id("living-room_1"), "living-room_1");
1063 assert_eq!(sanitize_room_id(""), "default");
1064 assert_eq!(sanitize_room_id("..\\..\\win"), "win");
1065 assert!(!sanitize_room_id("a/b/c").contains('/'));
1066 }
1067
1068 use axum::body::Body;
1071 use axum::http::{Request, StatusCode};
1072 use tower::ServiceExt; fn test_state(dir: &str) -> ApiState {
1075 let (cmd_tx, _rx) = mpsc::channel::<CalCommand>(8);
1076 let status = Arc::new(RwLock::new(SharedStatus {
1077 output_dir: dir.to_string(),
1078 ..Default::default()
1079 }));
1080 let window = Arc::new(RwLock::new(VecDeque::<f32>::new()));
1081 let enroll = Arc::new(RwLock::new(HashMap::<String, RoomEnroll>::new()));
1082 drop(_rx);
1084 ApiState { cmd_tx, status, window, fs_hz: 15.0, enroll }
1085 }
1086
1087 async fn req(app: Router, method: &str, uri: &str, body: Option<&str>) -> StatusCode {
1088 let b = body.map(|s| Body::from(s.to_string())).unwrap_or_else(Body::empty);
1089 let r = Request::builder()
1090 .method(method)
1091 .uri(uri)
1092 .header("content-type", "application/json")
1093 .body(b)
1094 .unwrap();
1095 app.oneshot(r).await.unwrap().status()
1096 }
1097
1098 #[tokio::test]
1099 async fn health_and_descriptor_ok() {
1100 let dir = tempfile::tempdir().unwrap();
1101 let app = build_router(test_state(dir.path().to_str().unwrap()));
1102 assert_eq!(req(app.clone(), "GET", "/", None).await, StatusCode::OK);
1103 assert_eq!(req(app, "GET", "/api/v1/calibration/health", None).await, StatusCode::OK);
1104 }
1105
1106 #[tokio::test]
1107 async fn train_then_state_and_traversal_defense() {
1108 let dir = tempfile::tempdir().unwrap();
1109 let state = test_state(dir.path().to_str().unwrap());
1110 {
1112 let mut w = state.window.write().await;
1113 for i in 0..200 {
1114 w.push_back((2.0 * std::f32::consts::PI * 0.3 * i as f32 / 15.0).sin());
1115 }
1116 }
1117 let app = build_router(state);
1118
1119 let body = r#"{"room_id":"t","baseline_id":"b","anchors":[
1121 {"room_id":"t","label":"empty","features":{"mean":1.0,"variance":1.0,"motion":0.1,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}},
1122 {"room_id":"t","label":"stand_still","features":{"mean":1.0,"variance":10.0,"motion":0.2,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}}
1123 ]}"#;
1124 assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(body)).await, StatusCode::OK);
1125 assert!(dir.path().join("t.json").exists(), "bank file written");
1126
1127 assert_eq!(req(app.clone(), "GET", "/api/v1/room/state?bank=t", None).await, StatusCode::OK);
1129
1130 assert_eq!(
1132 req(app.clone(), "GET", "/api/v1/room/state?bank=../../etc/passwd", None).await,
1133 StatusCode::NOT_FOUND
1134 );
1135
1136 assert_eq!(
1138 req(app, "POST", "/api/v1/room/train", Some(r#"{"room_id":"none","baseline_id":"b","anchors":[]}"#)).await,
1139 StatusCode::BAD_REQUEST
1140 );
1141 }
1142
1143 #[tokio::test]
1147 async fn train_threads_geometry_into_bank() {
1148 let dir = tempfile::tempdir().unwrap();
1149 let app = build_router(test_state(dir.path().to_str().unwrap()));
1150 let anchors = r#"[
1151 {"room_id":"g","label":"empty","features":{"mean":1.0,"variance":1.0,"motion":0.1,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}},
1152 {"room_id":"g","label":"stand_still","features":{"mean":1.0,"variance":10.0,"motion":0.2,"breathing_score":0.0,"breathing_hz":0.0,"heart_score":0.0,"heart_hz":0.0}}
1153 ]"#;
1154 let load_bank = |name: &str| {
1155 let raw = std::fs::read_to_string(dir.path().join(format!("{name}.json"))).unwrap();
1156 SpecialistBank::from_json(&raw).unwrap()
1157 };
1158
1159 let body = format!(
1161 r#"{{"room_id":"g1","baseline_id":"b","anchors":{anchors},
1162 "geometry":[{{"node_id":1,"position":{{"x_m":0.0,"y_m":0.0,"z_m":1.0}},"method":"tape-measure"}},{{"node_id":2}}]}}"#
1163 );
1164 assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(&body)).await, StatusCode::OK);
1165 let bank = load_bank("g1");
1166 assert_eq!(bank.geometry.len(), 2);
1167 assert_eq!(bank.geometry[0].method, "tape-measure");
1168 assert_eq!(bank.geometry[1].node_id, 2);
1169
1170 assert_eq!(
1172 req(app.clone(), "POST", "/api/v1/enroll/geometry",
1173 Some(r#"{"room_id":"g2","geometry":[{"node_id":7,"method":"floor-plan"}]}"#)).await,
1174 StatusCode::OK
1175 );
1176 let body2 = format!(r#"{{"room_id":"g2","baseline_id":"b","anchors":{anchors}}}"#);
1177 assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(&body2)).await, StatusCode::OK);
1178 let bank2 = load_bank("g2");
1179 assert_eq!(bank2.geometry.len(), 1);
1180 assert_eq!(bank2.geometry[0].node_id, 7);
1181
1182 let body3 = format!(r#"{{"room_id":"g3","baseline_id":"b","anchors":{anchors}}}"#);
1184 assert_eq!(req(app.clone(), "POST", "/api/v1/room/train", Some(&body3)).await, StatusCode::OK);
1185 let bank3 = load_bank("g3");
1186 assert!(bank3.geometry.is_empty());
1187 assert!(bank3.presence.is_some(), "bank still trains without geometry");
1188
1189 assert_eq!(
1191 req(app, "POST", "/api/v1/enroll/geometry", Some(r#"{"room_id":"g4","geometry":[]}"#)).await,
1192 StatusCode::BAD_REQUEST
1193 );
1194 }
1195
1196 #[tokio::test]
1197 async fn enroll_status_empty_and_bad_label() {
1198 let dir = tempfile::tempdir().unwrap();
1199 let app = build_router(test_state(dir.path().to_str().unwrap()));
1200 assert_eq!(req(app.clone(), "GET", "/api/v1/enroll/status?room=x", None).await, StatusCode::OK);
1202 assert_eq!(
1204 req(app, "POST", "/api/v1/enroll/anchor", Some(r#"{"room_id":"x","baseline":"b","label":"nope"}"#)).await,
1205 StatusCode::BAD_REQUEST
1206 );
1207 }
1208}