use crate::CommandClient;
const TARE_SAFETY_MARGIN_MS: u128 = 150;
const DEFAULT_TARE_DURATION_MS: u32 = 1000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
Idle,
Arming,
WaitingForCapture,
ReadingData,
}
struct PendingTare {
channel: String,
deadline: std::time::Instant,
response_tid: Option<u32>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CaptureData {
pub channels: Vec<Vec<f64>>,
pub channel_count: usize,
pub capture_length: usize,
pub pre_trigger_samples: usize,
pub actual_samples: usize,
pub sample_rate: f64,
pub timestamp_ns: u64,
pub sequence: u64,
}
pub struct DaqCapture {
daq_fqdn: String,
pub busy: bool,
pub active: bool,
pub error: bool,
pub error_message: String,
pub data: Option<CaptureData>,
state: State,
arm_time: Option<std::time::Instant>,
pending_tid: Option<u32>,
pending_tares: Vec<PendingTare>,
}
impl DaqCapture {
pub fn new(daq_fqdn: &str) -> Self {
Self {
daq_fqdn: daq_fqdn.to_string(),
busy: false,
active: false,
error: false,
error_message: String::new(),
data: None,
state: State::Idle,
arm_time: None,
pending_tid: None,
pending_tares: Vec::new(),
}
}
pub fn is_busy(&self) -> bool {
self.busy || !self.pending_tares.is_empty()
}
pub fn is_taring(&self) -> bool {
!self.pending_tares.is_empty()
}
pub fn is_error(&self) -> bool {
self.error
}
pub fn start(&mut self, client: &mut CommandClient) {
self.error = false;
self.error_message.clear();
self.data = None;
self.active = false;
let tid = client.send(
&format!("{}.arm", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
self.arm_time = Some(std::time::Instant::now());
self.busy = true;
self.state = State::Arming;
}
pub fn reset(&mut self) {
self.state = State::Idle;
self.busy = false;
self.active = false;
self.pending_tid = None;
self.pending_tares.clear();
}
pub fn tare<S: AsRef<str>>(
&mut self,
channels: &[S],
duration_ms: Option<u32>,
client: &mut CommandClient,
) {
if self.state != State::Idle {
return;
}
let duration = duration_ms.unwrap_or(DEFAULT_TARE_DURATION_MS);
let module = self.module_prefix().to_string();
let now = std::time::Instant::now();
let deadline = now
+ std::time::Duration::from_millis(duration as u64)
+ std::time::Duration::from_millis(TARE_SAFETY_MARGIN_MS as u64);
for ch in channels {
let ch = ch.as_ref();
let tid = client.send(
&format!("{}.{}.tare", module, ch),
serde_json::json!({ "duration_ms": duration }),
);
self.pending_tares.push(PendingTare {
channel: ch.to_string(),
deadline,
response_tid: Some(tid),
});
}
}
pub fn clear_tare<S: AsRef<str>>(
&mut self,
channels: &[S],
client: &mut CommandClient,
) {
let module = self.module_prefix().to_string();
for ch in channels {
let _ = client.send(
&format!("{}.{}.clear_tare", module, ch.as_ref()),
serde_json::json!({}),
);
}
}
fn module_prefix(&self) -> &str {
self.daq_fqdn.split_once('.')
.map(|(m, _)| m)
.unwrap_or(&self.daq_fqdn)
}
pub fn set_trigger_delay(&mut self, client: &mut CommandClient, delay_ms: u32) {
if self.state != State::Idle {
return;
}
let _ = client.send(
&format!("{}.set_trigger_delay", self.daq_fqdn),
serde_json::json!({ "delay_ms": delay_ms }),
);
}
pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
if !self.pending_tares.is_empty() {
for pt in self.pending_tares.iter_mut() {
if let Some(tid) = pt.response_tid {
if let Some(resp) = client.take_response(tid) {
pt.response_tid = None;
if !resp.success {
log::warn!(
"DaqCapture tare on '{}' rejected by module: {}",
pt.channel, resp.error_message,
);
pt.deadline = std::time::Instant::now();
}
}
}
}
let now = std::time::Instant::now();
self.pending_tares.retain(|pt| pt.deadline > now);
}
match self.state {
State::Idle => {}
State::Arming => {
if self.check_timeout(timeout_ms) { return; }
if let Some(tid) = self.pending_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_tid = None;
if resp.success {
self.active = true;
self.state = State::WaitingForCapture;
let tid = client.send(
&format!("{}.capture_status", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
} else {
self.set_error(&resp.error_message);
}
}
}
}
State::WaitingForCapture => {
if self.check_timeout(timeout_ms) { return; }
if let Some(tid) = self.pending_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_tid = None;
if resp.success {
let data_ready = resp.data.get("data_ready")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if data_ready {
self.active = false;
let tid = client.send(
&format!("{}.read_capture", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
self.state = State::ReadingData;
} else {
let tid = client.send(
&format!("{}.capture_status", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
}
} else {
self.set_error(&resp.error_message);
}
}
}
}
State::ReadingData => {
if self.check_timeout(timeout_ms) { return; }
if let Some(tid) = self.pending_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_tid = None;
if resp.success {
match Self::parse_capture_data(&resp.data) {
Ok(capture) => {
self.data = Some(capture);
self.busy = false;
self.state = State::Idle;
}
Err(e) => {
self.set_error(&e);
}
}
} else {
self.set_error(&resp.error_message);
}
}
}
}
}
}
fn check_timeout(&mut self, timeout_ms: u32) -> bool {
if let Some(t) = self.arm_time {
if t.elapsed().as_millis() as u32 > timeout_ms {
self.set_error("Capture timeout");
return true;
}
}
false
}
fn set_error(&mut self, message: &str) {
self.state = State::Idle;
self.busy = false;
self.active = false;
self.error = true;
self.error_message = message.to_string();
self.pending_tid = None;
}
fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
let channel_count = data.get("channel_count")
.and_then(|v| v.as_u64())
.ok_or("Missing channel_count")? as usize;
let capture_length = data.get("capture_length")
.and_then(|v| v.as_u64())
.ok_or("Missing capture_length")? as usize;
let pre_trigger_samples = data.get("pre_trigger_samples")
.and_then(|v| v.as_u64())
.ok_or("Missing pre_trigger_samples")? as usize;
let actual_samples = data.get("actual_samples")
.and_then(|v| v.as_u64())
.ok_or("Missing actual_samples")? as usize;
let sample_rate = data.get("sample_rate")
.and_then(|v| v.as_f64())
.ok_or("Missing sample_rate")?;
let timestamp_ns = data.get("timestamp_ns")
.and_then(|v| v.as_u64())
.ok_or("Missing timestamp_ns")?;
let sequence = data.get("sequence")
.and_then(|v| v.as_u64())
.ok_or("Missing sequence")?;
let channels_arr = data.get("channels")
.and_then(|v| v.as_array())
.ok_or("Missing channels array")?;
if channels_arr.len() != channel_count {
return Err(format!(
"channel_count mismatch: header says {} but got {} arrays",
channel_count, channels_arr.len()
));
}
let channels: Vec<Vec<f64>> = channels_arr.iter()
.map(|ch| {
ch.as_array()
.map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
.unwrap_or_default()
})
.collect();
Ok(CaptureData {
channels,
channel_count,
capture_length,
pre_trigger_samples,
actual_samples,
sample_rate,
timestamp_ns,
sequence,
})
}
}
impl Default for DaqCapture {
fn default() -> Self {
Self::new("ni.capture")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_capture_data() {
let data = serde_json::json!({
"channel_count": 2,
"capture_length": 100,
"pre_trigger_samples": 10,
"actual_samples": 110,
"sample_rate": 1000.0,
"timestamp_ns": 1234567890u64,
"sequence": 1u64,
"channels": [
[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0],
],
});
let capture = DaqCapture::parse_capture_data(&data).unwrap();
assert_eq!(capture.channel_count, 2);
assert_eq!(capture.capture_length, 100);
assert_eq!(capture.pre_trigger_samples, 10);
assert_eq!(capture.actual_samples, 110);
assert_eq!(capture.sample_rate, 1000.0);
assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
}
#[test]
fn test_parse_capture_data_missing_field() {
let data = serde_json::json!({"channel_count": 1});
assert!(DaqCapture::parse_capture_data(&data).is_err());
}
use tokio::sync::mpsc;
use mechutil::ipc::CommandMessage;
fn test_client() -> (
CommandClient,
mpsc::UnboundedReceiver<String>,
mpsc::UnboundedSender<CommandMessage>,
) {
let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
(CommandClient::new(write_tx, response_rx), write_rx, response_tx)
}
fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
let mut out = Vec::new();
while let Ok(s) = rx.try_recv() {
if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
out.push(m);
}
}
out
}
#[test]
fn test_tare_dispatches_per_channel() {
let mut daq = DaqCapture::new("ni.traction");
let (mut client, mut write_rx, _resp_tx) = test_client();
daq.tare(&["tsdr_fx", "tsdr_fz", "enc_x"], Some(200), &mut client);
assert!(daq.is_busy(), "FB must be busy during tare");
assert!(daq.is_taring(), "is_taring() must report true");
assert_eq!(daq.pending_tares.len(), 3);
let sent = drain_sent(&mut write_rx);
assert_eq!(sent.len(), 3);
assert_eq!(sent[0].topic, "ni.tsdr_fx.tare");
assert_eq!(sent[1].topic, "ni.tsdr_fz.tare");
assert_eq!(sent[2].topic, "ni.enc_x.tare");
for msg in &sent {
assert_eq!(msg.data.get("duration_ms").and_then(|v| v.as_u64()), Some(200));
}
}
#[test]
fn test_tare_default_duration() {
let mut daq = DaqCapture::new("ni.traction");
let (mut client, mut write_rx, _resp_tx) = test_client();
daq.tare(&["tsdr_fz"], None, &mut client);
let sent = drain_sent(&mut write_rx);
assert_eq!(sent[0].data.get("duration_ms").and_then(|v| v.as_u64()), Some(1000));
}
#[test]
fn test_tare_deadline_clears_busy() {
let mut daq = DaqCapture::new("ni.traction");
let (mut client, mut write_rx, _resp_tx) = test_client();
daq.tare(&["tsdr_fz"], Some(0), &mut client);
let _ = drain_sent(&mut write_rx);
assert!(daq.is_busy());
daq.tick(5000, &mut client);
assert!(daq.is_busy(), "too soon — deadline not reached yet");
std::thread::sleep(std::time::Duration::from_millis(200));
daq.tick(5000, &mut client);
assert!(!daq.is_busy(), "tare should have cleared after deadline");
assert!(!daq.is_taring());
}
#[test]
fn test_tare_error_response_clears_channel_early() {
let mut daq = DaqCapture::new("ni.traction");
let (mut client, mut write_rx, resp_tx) = test_client();
daq.tare(&["good_ch", "bogus_ch"], Some(5000), &mut client);
let sent = drain_sent(&mut write_rx);
let bogus_tid = sent[1].transaction_id;
let mut err = CommandMessage::response(bogus_tid, serde_json::json!({}));
err.success = false;
err.error_message = "Tare: channel 'bogus_ch' not found".into();
resp_tx.send(err).unwrap();
client.poll();
daq.tick(5000, &mut client);
assert_eq!(daq.pending_tares.len(), 1);
assert_eq!(daq.pending_tares[0].channel, "good_ch");
}
#[test]
fn test_tare_is_rejected_while_capture_running() {
let mut daq = DaqCapture::new("ni.traction");
let (mut client, mut write_rx, _resp_tx) = test_client();
daq.state = State::Arming;
daq.busy = true;
daq.tare(&["tsdr_fz"], Some(100), &mut client);
let sent = drain_sent(&mut write_rx);
assert!(sent.is_empty(), "tare must not fire while a capture is in progress");
assert!(daq.pending_tares.is_empty());
}
#[test]
fn test_clear_tare_dispatches_per_channel() {
let mut daq = DaqCapture::new("ni.traction");
let (mut client, mut write_rx, _resp_tx) = test_client();
daq.clear_tare(&["tsdr_fz", "enc_x"], &mut client);
assert!(!daq.is_busy());
assert!(daq.pending_tares.is_empty());
let sent = drain_sent(&mut write_rx);
assert_eq!(sent.len(), 2);
assert_eq!(sent[0].topic, "ni.tsdr_fz.clear_tare");
assert_eq!(sent[1].topic, "ni.enc_x.clear_tare");
}
#[test]
fn test_reset_drops_pending_tares() {
let mut daq = DaqCapture::new("ni.traction");
let (mut client, mut write_rx, _resp_tx) = test_client();
daq.tare(&["a", "b"], Some(5000), &mut client);
let _ = drain_sent(&mut write_rx);
assert!(daq.is_busy());
daq.reset();
assert!(!daq.is_busy());
assert!(daq.pending_tares.is_empty());
}
}