use crate::CommandClient;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
Idle,
Reading,
}
pub struct ReadTaskTiming {
task_fqdn: String,
actual_sample_rate: f64,
time_increment: f64,
busy: bool,
error: bool,
error_message: String,
pending_rate_tid: Option<u32>,
pending_ti_tid: Option<u32>,
state: State,
start_time: Option<std::time::Instant>,
}
impl ReadTaskTiming {
pub fn new(task_fqdn: &str) -> Self {
Self {
task_fqdn: task_fqdn.to_string(),
actual_sample_rate: 0.0,
time_increment: 0.0,
busy: false,
error: false,
error_message: String::new(),
pending_rate_tid: None,
pending_ti_tid: None,
state: State::Idle,
start_time: None,
}
}
pub fn is_busy(&self) -> bool { self.busy }
pub fn is_error(&self) -> bool { self.error }
pub fn error_message(&self) -> &str { &self.error_message }
pub fn actual_sample_rate(&self) -> f64 { self.actual_sample_rate }
pub fn time_increment(&self) -> f64 { self.time_increment }
pub fn start(&mut self, client: &mut CommandClient) {
self.error = false;
self.error_message.clear();
self.actual_sample_rate = 0.0;
self.time_increment = 0.0;
let rate_tid = client.send(
&format!("{}.actual_sample_rate", self.task_fqdn),
serde_json::json!({}),
);
let ti_tid = client.send(
&format!("{}.time_increment", self.task_fqdn),
serde_json::json!({}),
);
self.pending_rate_tid = Some(rate_tid);
self.pending_ti_tid = Some(ti_tid);
self.start_time = Some(std::time::Instant::now());
self.busy = true;
self.state = State::Reading;
}
pub fn reset(&mut self) {
self.state = State::Idle;
self.busy = false;
self.pending_rate_tid = None;
self.pending_ti_tid = None;
self.start_time = None;
}
pub fn tick(&mut self, timeout_ms: u32, client: &mut CommandClient) {
if self.state != State::Reading {
return;
}
if self.check_timeout(timeout_ms) {
return;
}
if let Some(tid) = self.pending_rate_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_rate_tid = None;
if resp.success {
self.actual_sample_rate = resp.data.as_f64().unwrap_or(0.0);
} else {
self.set_error(&format!(
"actual_sample_rate read failed: {}", resp.error_message
));
return;
}
}
}
if let Some(tid) = self.pending_ti_tid {
if let Some(resp) = client.take_response(tid) {
self.pending_ti_tid = None;
if resp.success {
self.time_increment = resp.data.as_f64().unwrap_or(0.0);
} else {
self.set_error(&format!(
"time_increment read failed: {}", resp.error_message
));
return;
}
}
}
if self.pending_rate_tid.is_none() && self.pending_ti_tid.is_none() {
self.busy = false;
self.state = State::Idle;
self.start_time = None;
}
}
fn check_timeout(&mut self, timeout_ms: u32) -> bool {
if let Some(t) = self.start_time {
if t.elapsed().as_millis() as u32 > timeout_ms {
self.set_error("ReadTaskTiming: timed out waiting for responses");
return true;
}
}
false
}
fn set_error(&mut self, msg: &str) {
self.error = true;
self.error_message = msg.to_string();
self.busy = false;
self.state = State::Idle;
self.pending_rate_tid = None;
self.pending_ti_tid = None;
self.start_time = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use mechutil::ipc::CommandMessage;
use tokio::sync::mpsc;
fn test_client() -> (
CommandClient,
mpsc::UnboundedReceiver<String>,
mpsc::UnboundedSender<CommandMessage>,
) {
let (write_tx, write_rx) = mpsc::unbounded_channel::<String>();
let (response_tx, response_rx) = mpsc::unbounded_channel::<CommandMessage>();
(CommandClient::new(write_tx, response_rx), write_rx, response_tx)
}
fn drain_sent(rx: &mut mpsc::UnboundedReceiver<String>) -> Vec<CommandMessage> {
let mut out = Vec::new();
while let Ok(s) = rx.try_recv() {
if let Ok(m) = serde_json::from_str::<CommandMessage>(&s) {
out.push(m);
}
}
out
}
#[test]
fn start_fires_two_reads_and_marks_busy() {
let mut fb = ReadTaskTiming::new("ni.AnalogTask");
let (mut client, mut write_rx, _resp_tx) = test_client();
assert!(!fb.is_busy());
fb.start(&mut client);
assert!(fb.is_busy());
let sent = drain_sent(&mut write_rx);
assert_eq!(sent.len(), 2);
assert_eq!(sent[0].topic, "ni.AnalogTask.actual_sample_rate");
assert_eq!(sent[1].topic, "ni.AnalogTask.time_increment");
}
#[test]
fn both_successes_populate_values_and_clear_busy() {
let mut fb = ReadTaskTiming::new("ni.AnalogTask");
let (mut client, mut write_rx, resp_tx) = test_client();
fb.start(&mut client);
let sent = drain_sent(&mut write_rx);
let rate_tid = sent[0].transaction_id;
let ti_tid = sent[1].transaction_id;
resp_tx.send(CommandMessage::response(rate_tid, serde_json::json!(1024.75))).unwrap();
resp_tx.send(CommandMessage::response(ti_tid, serde_json::json!(0.0009759))).unwrap();
client.poll();
fb.tick(2000, &mut client);
assert!(!fb.is_busy());
assert!(!fb.is_error());
assert!((fb.actual_sample_rate() - 1024.75).abs() < 1e-6);
assert!((fb.time_increment() - 0.0009759).abs() < 1e-9);
}
#[test]
fn one_failure_short_circuits_and_sets_error() {
let mut fb = ReadTaskTiming::new("ni.BadTask");
let (mut client, mut write_rx, resp_tx) = test_client();
fb.start(&mut client);
let sent = drain_sent(&mut write_rx);
let mut err = CommandMessage::response(sent[0].transaction_id, serde_json::json!({}));
err.success = false;
err.error_message = "Unknown subtopic: badtask.actual_sample_rate".into();
resp_tx.send(err).unwrap();
client.poll();
fb.tick(2000, &mut client);
assert!(fb.is_error());
assert!(!fb.is_busy());
assert!(fb.error_message().contains("actual_sample_rate read failed"));
}
#[test]
fn timeout_sets_error() {
let mut fb = ReadTaskTiming::new("ni.SlowTask");
let (mut client, _write_rx, _resp_tx) = test_client();
fb.start(&mut client);
fb.start_time = Some(std::time::Instant::now() - std::time::Duration::from_secs(5));
fb.tick(100, &mut client);
assert!(fb.is_error());
assert!(!fb.is_busy());
assert!(fb.error_message().contains("timed out"));
}
#[test]
fn reset_drops_pending_state() {
let mut fb = ReadTaskTiming::new("ni.AnalogTask");
let (mut client, _write_rx, _resp_tx) = test_client();
fb.start(&mut client);
assert!(fb.is_busy());
fb.reset();
assert!(!fb.is_busy());
assert!(fb.pending_rate_tid.is_none());
assert!(fb.pending_ti_tid.is_none());
}
}