use crate::CommandClient;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
Idle,
Arming,
WaitingForCapture,
ReadingData,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CaptureData {
pub channels: Vec<Vec<f64>>,
pub channel_count: usize,
pub capture_length: usize,
pub pre_trigger_samples: usize,
pub actual_samples: usize,
pub sample_rate: f64,
pub timestamp_ns: u64,
pub sequence: u64,
}
pub struct DaqCapture {
daq_fqdn: String,
pub busy: bool,
pub active: bool,
pub error: bool,
pub error_message: String,
pub data: Option<CaptureData>,
state: State,
arm_time: Option<std::time::Instant>,
pending_tid: Option<u32>,
}
impl DaqCapture {
pub fn new(daq_fqdn: &str) -> Self {
Self {
daq_fqdn: daq_fqdn.to_string(),
busy: false,
active: false,
error: false,
error_message: String::new(),
data: None,
state: State::Idle,
arm_time: None,
pending_tid: None,
}
}
pub fn is_busy(&self) -> bool {
self.busy
}
pub fn is_error(&self) -> bool {
self.error
}
pub fn start(&mut self, client: &mut CommandClient) {
self.error = false;
self.error_message.clear();
self.data = None;
self.active = false;
let tid = client.send(
&format!("{}.arm", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
self.arm_time = Some(std::time::Instant::now());
self.busy = true;
self.state = State::Arming;
}
pub fn reset(&mut self) {
self.state = State::Idle;
self.busy = false;
self.active = false;
self.pending_tid = None;
}
pub fn set_trigger_delay(&mut self, client: &mut CommandClient, delay_ms: u32) {
if self.state != State::Idle {
return;
}
let _ = client.send(
&format!("{}.set_trigger_delay", self.daq_fqdn),
serde_json::json!({ "delay_ms": delay_ms }),
);
}
pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
match self.state {
State::Idle => {}
State::Arming => {
if self.check_timeout(timeout_ms) { return; }
if let Some(tid) = self.pending_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_tid = None;
if resp.success {
self.active = true;
self.state = State::WaitingForCapture;
let tid = client.send(
&format!("{}.capture_status", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
} else {
self.set_error(&resp.error_message);
}
}
}
}
State::WaitingForCapture => {
if self.check_timeout(timeout_ms) { return; }
if let Some(tid) = self.pending_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_tid = None;
if resp.success {
let data_ready = resp.data.get("data_ready")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if data_ready {
self.active = false;
let tid = client.send(
&format!("{}.read_capture", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
self.state = State::ReadingData;
} else {
let tid = client.send(
&format!("{}.capture_status", self.daq_fqdn),
serde_json::json!({}),
);
self.pending_tid = Some(tid);
}
} else {
self.set_error(&resp.error_message);
}
}
}
}
State::ReadingData => {
if self.check_timeout(timeout_ms) { return; }
if let Some(tid) = self.pending_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_tid = None;
if resp.success {
match Self::parse_capture_data(&resp.data) {
Ok(capture) => {
self.data = Some(capture);
self.busy = false;
self.state = State::Idle;
}
Err(e) => {
self.set_error(&e);
}
}
} else {
self.set_error(&resp.error_message);
}
}
}
}
}
}
fn check_timeout(&mut self, timeout_ms: u32) -> bool {
if let Some(t) = self.arm_time {
if t.elapsed().as_millis() as u32 > timeout_ms {
self.set_error("Capture timeout");
return true;
}
}
false
}
fn set_error(&mut self, message: &str) {
self.state = State::Idle;
self.busy = false;
self.active = false;
self.error = true;
self.error_message = message.to_string();
self.pending_tid = None;
}
fn parse_capture_data(data: &serde_json::Value) -> Result<CaptureData, String> {
let channel_count = data.get("channel_count")
.and_then(|v| v.as_u64())
.ok_or("Missing channel_count")? as usize;
let capture_length = data.get("capture_length")
.and_then(|v| v.as_u64())
.ok_or("Missing capture_length")? as usize;
let pre_trigger_samples = data.get("pre_trigger_samples")
.and_then(|v| v.as_u64())
.ok_or("Missing pre_trigger_samples")? as usize;
let actual_samples = data.get("actual_samples")
.and_then(|v| v.as_u64())
.ok_or("Missing actual_samples")? as usize;
let sample_rate = data.get("sample_rate")
.and_then(|v| v.as_f64())
.ok_or("Missing sample_rate")?;
let timestamp_ns = data.get("timestamp_ns")
.and_then(|v| v.as_u64())
.ok_or("Missing timestamp_ns")?;
let sequence = data.get("sequence")
.and_then(|v| v.as_u64())
.ok_or("Missing sequence")?;
let channels_arr = data.get("channels")
.and_then(|v| v.as_array())
.ok_or("Missing channels array")?;
if channels_arr.len() != channel_count {
return Err(format!(
"channel_count mismatch: header says {} but got {} arrays",
channel_count, channels_arr.len()
));
}
let channels: Vec<Vec<f64>> = channels_arr.iter()
.map(|ch| {
ch.as_array()
.map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
.unwrap_or_default()
})
.collect();
Ok(CaptureData {
channels,
channel_count,
capture_length,
pre_trigger_samples,
actual_samples,
sample_rate,
timestamp_ns,
sequence,
})
}
}
impl Default for DaqCapture {
fn default() -> Self {
Self::new("ni.capture")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_capture_data() {
let data = serde_json::json!({
"channel_count": 2,
"capture_length": 100,
"pre_trigger_samples": 10,
"actual_samples": 110,
"sample_rate": 1000.0,
"timestamp_ns": 1234567890u64,
"sequence": 1u64,
"channels": [
[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0],
],
});
let capture = DaqCapture::parse_capture_data(&data).unwrap();
assert_eq!(capture.channel_count, 2);
assert_eq!(capture.capture_length, 100);
assert_eq!(capture.pre_trigger_samples, 10);
assert_eq!(capture.actual_samples, 110);
assert_eq!(capture.sample_rate, 1000.0);
assert_eq!(capture.channels[0], vec![1.0, 2.0, 3.0]);
assert_eq!(capture.channels[1], vec![4.0, 5.0, 6.0]);
}
#[test]
fn test_parse_capture_data_missing_field() {
let data = serde_json::json!({"channel_count": 1});
assert!(DaqCapture::parse_capture_data(&data).is_err());
}
}