wireband-edge 0.4.1

Lightweight Wire.Band client — semantic data middleware for any domain (IoT, AI/ML, DeFi, legal, geospatial, supply chain, and more)
Documentation
//! Modbus TCP poller — reads holding/input registers from PLC and industrial sensors.
//!
//! Polls a configurable set of register ranges at a fixed interval, converts
//! raw 16-bit words to scaled engineering values, and buffers them into
//! [`WireBandClient`] for compression and upload.
//!
//! Supports Modbus TCP (primary). RTU over serial is planned.
//!
//! Feature-gated: `--features modbus`.

use std::net::SocketAddr;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use tokio::time;
use tokio_modbus::client::tcp;
use tokio_modbus::prelude::*;
use tracing::{debug, info, warn};

use crate::classifier::{DeltaFilter, TopicClassifier};
use crate::client::WireBandClient;
use crate::error::{Result, WireBandError};
use crate::frame;

fn unix_ts() -> f64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs_f64()
}

/// Describes a single register range to poll.
#[derive(Debug, Clone)]
pub struct RegisterDef {
    /// Starting register address (Modbus 0-based).
    pub address: u16,
    /// Number of consecutive registers to read (1 = u16, 2 = u32 big-endian).
    pub count: u16,
    /// Human-readable name used as the Wire.Band sub-topic.
    pub name: String,
    /// Scale factor applied to the raw integer value.
    pub scale: f64,
}

impl RegisterDef {
    /// Shorthand constructor.
    pub fn new(address: u16, count: u16, name: impl Into<String>, scale: f64) -> Self {
        Self { address, count, name: name.into(), scale }
    }
}

/// Polls Modbus TCP registers at a fixed interval and feeds the Wire.Band pipeline.
///
/// # Example
///
/// ```ignore
/// use std::time::Duration;
/// use wireband_edge::modbus::{ModbusPoller, RegisterDef};
///
/// ModbusPoller::new("192.168.1.100:502")
///     .slave(1)
///     .poll_interval(Duration::from_secs(5))
///     .topic_prefix("factory/plc")
///     .registers(vec![
///         RegisterDef::new(0x0100, 1, "motor_rpm",   1.0),
///         RegisterDef::new(0x0101, 1, "motor_temp",  0.1),  // 0.1 °C per LSB
///         RegisterDef::new(0x0102, 2, "energy_kwh",  0.001),
///     ])
///     .run(client)
///     .await?;
/// ```
pub struct ModbusPoller {
    host:          String,
    slave_id:      u8,
    registers:     Vec<RegisterDef>,
    poll_interval: Duration,
    topic_prefix:  String,
    classifier:    TopicClassifier,
    delta_filter:  DeltaFilter,
}

impl ModbusPoller {
    /// Create a poller targeting `host` (e.g. `"192.168.1.100:502"`).
    pub fn new(host: impl Into<String>) -> Self {
        Self {
            host:          host.into(),
            slave_id:      1,
            registers:     Vec::new(),
            poll_interval: Duration::from_secs(5),
            topic_prefix:  "modbus".to_string(),
            classifier:    TopicClassifier::new(),
            delta_filter:  DeltaFilter::new(0.0),
        }
    }

    /// Modbus slave / unit ID (default: 1).
    pub fn slave(mut self, id: u8) -> Self {
        self.slave_id = id;
        self
    }

    /// How often to poll each register (default: 5 s).
    pub fn poll_interval(mut self, d: Duration) -> Self {
        self.poll_interval = d;
        self
    }

    /// Topic prefix for events from this device (default: `"modbus"`).
    ///
    /// Each register is published to `{prefix}/{register.name}`.
    pub fn topic_prefix(mut self, prefix: impl Into<String>) -> Self {
        self.topic_prefix = prefix.into();
        self
    }

    /// Set the register definitions to poll.
    pub fn registers(mut self, regs: Vec<RegisterDef>) -> Self {
        self.registers = regs;
        self
    }

    /// Add a single register definition.
    pub fn register(mut self, reg: RegisterDef) -> Self {
        self.registers.push(reg);
        self
    }

    /// Suppress readings that change less than this fraction (0.0 = forward all).
    pub fn delta_threshold(mut self, threshold: f64) -> Self {
        self.delta_filter = DeltaFilter::new(threshold);
        self
    }

    /// Access the topic classifier to register custom keyword/topic mappings.
    pub fn classifier_mut(&mut self) -> &mut TopicClassifier {
        &mut self.classifier
    }

    /// Connect to the Modbus TCP server and poll registers indefinitely.
    pub async fn run(mut self, client: WireBandClient) -> Result<()> {
        let socket_addr: SocketAddr = self.host.parse()
            .map_err(|_| WireBandError::Modbus(
                format!("invalid Modbus address: '{}'", self.host)
            ))?;

        info!(
            host = %self.host, slave = self.slave_id,
            registers = self.registers.len(),
            "Modbus poller starting"
        );

        let mut ctx = tcp::connect_slave(socket_addr, Slave(self.slave_id)).await
            .map_err(|e| WireBandError::Modbus(format!("Modbus connect: {e}")))?;

        let mut ticker = time::interval(self.poll_interval);

        loop {
            ticker.tick().await;

            for reg in &self.registers {
                // tokio-modbus 0.14 returns Result<Result<Vec<u16>, ExceptionCode>>:
                // outer = IO error, inner = Modbus application exception
                match ctx.read_holding_registers(reg.address, reg.count).await {
                    Ok(Ok(words)) => {
                        let raw   = words_to_f64(&words);
                        let value = raw * reg.scale;
                        let topic = format!("{}/{}", self.topic_prefix, reg.name);
                        let payload = serde_json::json!({
                            "value":   value,
                            "address": reg.address,
                            "raw":     raw,
                        });

                        if self.delta_filter.should_suppress(&topic, &payload) {
                            debug!(topic = %topic, "Modbus: register delta-filtered");
                            continue;
                        }

                        let symbol  = self.classifier.classify(&topic, &payload);
                        let encoded = frame::encode(symbol, &topic, &payload);
                        client.buffer_event(topic.clone(), symbol, encoded, unix_ts()).await;
                        debug!(
                            register = reg.address,
                            name = %reg.name,
                            value,
                            symbol = format!("{symbol:#06x}"),
                            "Modbus register buffered"
                        );
                    }
                    Ok(Err(e)) => {
                        warn!(register = reg.address, name = %reg.name, err = ?e, "Modbus exception");
                    }
                    Err(e) => {
                        warn!(register = reg.address, name = %reg.name, err = %e, "Modbus IO error");
                    }
                }
            }
        }
    }
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

/// Convert 1–N u16 words to an f64 value (big-endian).
fn words_to_f64(words: &[u16]) -> f64 {
    match words.len() {
        0 => 0.0,
        1 => words[0] as f64,
        _ => {
            // Two 16-bit words combined as a big-endian u32
            let val = ((words[0] as u32) << 16) | (words[1] as u32);
            val as f64
        }
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn words_to_f64_single() {
        assert_eq!(words_to_f64(&[1234]), 1234.0);
    }

    #[test]
    fn words_to_f64_double() {
        // 0x0001_86A0 = 100_000
        assert_eq!(words_to_f64(&[0x0001, 0x86A0]), 100_000.0);
    }

    #[test]
    fn words_to_f64_empty() {
        assert_eq!(words_to_f64(&[]), 0.0);
    }

    #[test]
    fn register_def_builder() {
        let reg = RegisterDef::new(0x0100, 2, "motor_rpm", 0.1);
        assert_eq!(reg.address, 0x0100);
        assert_eq!(reg.count, 2);
        assert_eq!(reg.name, "motor_rpm");
        assert!((reg.scale - 0.1).abs() < 1e-9);
    }
}