Skip to main content

cog_pose_estimation/
runtime.rs

1//! Long-running inference loop. Polls the appliance's sensing-server,
2//! runs a CSI window through the engine, emits `pose.frame` events.
3
4use crate::config::CogConfig;
5use crate::inference::{CsiWindow, InferenceEngine, INPUT_SUBCARRIERS, INPUT_TIMESTEPS};
6use crate::publisher::{emit_event, Event};
7use std::time::Duration;
8use tokio::time::sleep;
9
10pub async fn run_loop(
11    cfg: CogConfig,
12    engine: InferenceEngine,
13) -> Result<(), Box<dyn std::error::Error>> {
14    let mut buffer: Vec<f32> = Vec::with_capacity(INPUT_SUBCARRIERS * INPUT_TIMESTEPS);
15    let mut tick: u64 = 0;
16
17    loop {
18        // Poll one frame from the sensing-server. On error, sleep and retry —
19        // we expect transient blips when the server restarts.
20        match fetch_frame(&cfg.sensing_url).await {
21            Ok(amplitudes) => {
22                tick += 1;
23                buffer.extend(amplitudes);
24                // Slide-window: keep only the most recent N*T values
25                let cap = INPUT_SUBCARRIERS * INPUT_TIMESTEPS;
26                if buffer.len() >= cap {
27                    let window = CsiWindow {
28                        data: buffer.split_off(buffer.len() - cap),
29                    };
30                    if let Ok(out) = engine.infer(&window) {
31                        if out.confidence >= cfg.min_confidence {
32                            // Flatten persons array (single-person v0.0.1)
33                            let persons = serde_json::json!([{
34                                "keypoints": chunk_pairs(&out.keypoints),
35                                "confidence": out.confidence,
36                            }]);
37                            emit_event(&Event::pose_frame(tick, 1, persons));
38                        }
39                    }
40                }
41            }
42            Err(e) => {
43                tracing::warn!(error = %e, "sensing-server fetch failed");
44            }
45        }
46        sleep(Duration::from_millis(cfg.poll_ms)).await;
47    }
48}
49
50async fn fetch_frame(url: &str) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
51    // Synchronous ureq inside an async fn — we accept the blocking call
52    // here because the per-frame cost (~1 ms loopback) is dwarfed by the
53    // inference cost. Replace with a proper async client if we ever poll
54    // remote sensing-servers over the wire.
55    let url = url.to_string();
56    let body = tokio::task::spawn_blocking(move || -> Result<String, ureq::Error> {
57        Ok(ureq::get(&url).call()?.into_string()?)
58    })
59    .await??;
60    let json: serde_json::Value = serde_json::from_str(&body)?;
61    let snapshot = json.get("snapshot").unwrap_or(&json);
62    let nodes = snapshot
63        .get("nodes")
64        .and_then(|v| v.as_array())
65        .ok_or("missing nodes[]")?;
66    // Take node 0's amplitude vector — we'll add multi-node fusion later.
67    let amplitude = nodes
68        .first()
69        .and_then(|n| n.get("amplitude"))
70        .and_then(|v| v.as_array())
71        .ok_or("missing nodes[0].amplitude[]")?;
72    Ok(amplitude
73        .iter()
74        .filter_map(|v| v.as_f64().map(|f| f as f32))
75        .collect())
76}
77
78fn chunk_pairs(flat: &[f32]) -> Vec<[f32; 2]> {
79    flat.chunks_exact(2).map(|c| [c[0], c[1]]).collect()
80}