use std::net::SocketAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time;
use tokio_modbus::client::tcp;
use tokio_modbus::prelude::*;
use tracing::{debug, info, warn};
use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;
fn unix_ts() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64()
}
#[derive(Debug, Clone)]
pub struct RegisterDef {
pub address: u16,
pub count: u16,
pub name: String,
pub scale: f64,
}
impl RegisterDef {
pub fn new(address: u16, count: u16, name: impl Into<String>, scale: f64) -> Self {
Self { address, count, name: name.into(), scale }
}
}
pub struct ModbusPoller {
host: String,
slave_id: u8,
registers: Vec<RegisterDef>,
poll_interval: Duration,
topic_prefix: String,
classifier: TopicClassifier,
delta_filter: DeltaFilter,
}
impl ModbusPoller {
pub fn new(host: impl Into<String>) -> Self {
Self {
host: host.into(),
slave_id: 1,
registers: Vec::new(),
poll_interval: Duration::from_secs(5),
topic_prefix: "modbus".to_string(),
classifier: TopicClassifier::new(),
delta_filter: DeltaFilter::new(0.0),
}
}
pub fn slave(mut self, id: u8) -> Self {
self.slave_id = id;
self
}
pub fn poll_interval(mut self, d: Duration) -> Self {
self.poll_interval = d;
self
}
pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
self.topic_prefix = prefix.into();
self
}
pub fn registers(mut self, regs: Vec<RegisterDef>) -> Self {
self.registers = regs;
self
}
pub fn register(mut self, reg: RegisterDef) -> Self {
self.registers.push(reg);
self
}
pub fn delta_threshold(mut self, threshold: f64) -> Self {
self.delta_filter = DeltaFilter::new(threshold);
self
}
pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
&mut self.classifier
}
pub async fn run(mut self, client: WireBandClient) -> Result<()> {
let socket_addr: SocketAddr = self.host.parse()
.map_err(|_| WireBandError::Modbus(
format!("invalid Modbus address: '{}'", self.host)
))?;
info!(
host = %self.host, slave = self.slave_id,
registers = self.registers.len(),
"Modbus poller starting"
);
let mut ctx = tcp::connect_slave(socket_addr, Slave(self.slave_id)).await
.map_err(|e| WireBandError::Modbus(format!("Modbus connect: {e}")))?;
let mut ticker = time::interval(self.poll_interval);
loop {
ticker.tick().await;
for reg in &self.registers {
match ctx.read_holding_registers(reg.address, reg.count).await {
Ok(Ok(words)) => {
let raw = words_to_f64(&words);
let value = raw * reg.scale;
let topic = format!("{}/{}", self.topic_prefix, reg.name);
let payload = serde_json::json!({
"value": value,
"address": reg.address,
"raw": raw,
});
if self.delta_filter.should_suppress(&topic, &payload) {
debug!(topic = %topic, "Modbus: register delta-filtered");
continue;
}
let symbol = self.classifier.classify(&topic, &payload);
let encoded = frame::encode(symbol, &topic, &payload);
client.buffer_event(topic.clone(), symbol, encoded, unix_ts()).await;
debug!(
register = reg.address,
name = %reg.name,
value,
symbol = format!("{symbol:#06x}"),
"Modbus register buffered"
);
}
Ok(Err(e)) => {
warn!(register = reg.address, name = %reg.name, err = ?e, "Modbus exception");
}
Err(e) => {
warn!(register = reg.address, name = %reg.name, err = %e, "Modbus IO error");
}
}
}
}
}
}
fn words_to_f64(words: &[u16]) -> f64 {
match words.len() {
0 => 0.0,
1 => words[0] as f64,
_ => {
let val = ((words[0] as u32) << 16) | (words[1] as u32);
val as f64
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn words_to_f64_single() {
assert_eq!(words_to_f64(&[1234]), 1234.0);
}
#[test]
fn words_to_f64_double() {
assert_eq!(words_to_f64(&[0x0001, 0x86A0]), 100_000.0);
}
#[test]
fn words_to_f64_empty() {
assert_eq!(words_to_f64(&[]), 0.0);
}
#[test]
fn register_def_builder() {
let reg = RegisterDef::new(0x0100, 2, "motor_rpm", 0.1);
assert_eq!(reg.address, 0x0100);
assert_eq!(reg.count, 2);
assert_eq!(reg.name, "motor_rpm");
assert!((reg.scale - 0.1).abs() < 1e-9);
}
}