use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::bytes::ByteOrder;
use crate::codec::registers_for_type;
use crate::value::ModbusValue;
pub const DEFAULT_BATCH_WINDOW_MS: u64 = 20;
pub const DEFAULT_MAX_BATCH_SIZE: usize = 100;
#[derive(Debug, Clone)]
pub struct BatchCommand {
pub point_id: u32,
pub value: ModbusValue,
pub slave_id: u8,
pub function_code: u8,
pub register_address: u16,
pub data_type: &'static str,
pub byte_order: ByteOrder,
}
#[derive(Debug)]
pub struct CommandBatcher {
pending_commands: HashMap<(u8, u8), Vec<BatchCommand>>,
last_batch_time: Instant,
total_pending: usize,
batch_window: Duration,
max_batch_size: usize,
}
impl CommandBatcher {
pub fn new() -> Self {
Self {
pending_commands: HashMap::new(),
last_batch_time: Instant::now(),
total_pending: 0,
batch_window: Duration::from_millis(DEFAULT_BATCH_WINDOW_MS),
max_batch_size: DEFAULT_MAX_BATCH_SIZE,
}
}
pub fn with_config(batch_window_ms: u64, max_batch_size: usize) -> Self {
Self {
pending_commands: HashMap::new(),
last_batch_time: Instant::now(),
total_pending: 0,
batch_window: Duration::from_millis(batch_window_ms),
max_batch_size,
}
}
#[inline]
pub fn pending_count(&self) -> usize {
self.total_pending
}
#[inline]
pub fn elapsed_since_last_batch(&self) -> Duration {
self.last_batch_time.elapsed()
}
pub fn should_execute(&self) -> bool {
self.last_batch_time.elapsed() >= self.batch_window
|| self.total_pending >= self.max_batch_size
}
pub fn take_commands(&mut self) -> HashMap<(u8, u8), Vec<BatchCommand>> {
self.last_batch_time = Instant::now();
self.total_pending = 0;
std::mem::take(&mut self.pending_commands)
}
pub fn add_command(&mut self, command: BatchCommand) {
let key = (command.slave_id, command.function_code);
self.pending_commands.entry(key).or_default().push(command);
self.total_pending += 1;
}
pub fn are_strictly_consecutive(commands: &[BatchCommand]) -> bool {
if commands.len() < 2 {
return false;
}
let mut indices: Vec<usize> = (0..commands.len()).collect();
indices.sort_by_key(|&i| commands[i].register_address);
let mut expected_addr = commands[indices[0]].register_address;
for &idx in &indices {
if commands[idx].register_address != expected_addr {
return false;
}
expected_addr += Self::get_register_count(commands[idx].data_type);
}
true
}
#[inline]
pub fn get_register_count(data_type: &str) -> u16 {
registers_for_type(data_type) as u16
}
pub fn clear(&mut self) {
self.pending_commands.clear();
self.total_pending = 0;
}
#[inline]
pub fn is_empty(&self) -> bool {
self.total_pending == 0
}
}
impl Default for CommandBatcher {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_command(
point_id: u32,
slave_id: u8,
function_code: u8,
register_address: u16,
data_type: &'static str,
) -> BatchCommand {
BatchCommand {
point_id,
value: ModbusValue::F32(100.0),
slave_id,
function_code,
register_address,
data_type,
byte_order: ByteOrder::BigEndian,
}
}
#[test]
fn test_new_creates_empty_batcher() {
let batcher = CommandBatcher::new();
assert_eq!(batcher.pending_count(), 0);
assert!(batcher.is_empty());
}
#[test]
fn test_default_is_equivalent_to_new() {
let batcher1 = CommandBatcher::new();
let batcher2 = CommandBatcher::default();
assert_eq!(batcher1.pending_count(), batcher2.pending_count());
}
#[test]
fn test_with_config() {
let batcher = CommandBatcher::with_config(50, 200);
assert_eq!(batcher.batch_window, Duration::from_millis(50));
assert_eq!(batcher.max_batch_size, 200);
}
#[test]
fn test_pending_count_after_add() {
let mut batcher = CommandBatcher::new();
batcher.add_command(create_test_command(1, 1, 6, 100, "uint16"));
assert_eq!(batcher.pending_count(), 1);
assert!(!batcher.is_empty());
batcher.add_command(create_test_command(2, 1, 6, 101, "uint16"));
assert_eq!(batcher.pending_count(), 2);
}
#[test]
fn test_pending_count_resets_after_take() {
let mut batcher = CommandBatcher::new();
batcher.add_command(create_test_command(1, 1, 6, 100, "uint16"));
batcher.add_command(create_test_command(2, 1, 6, 101, "uint16"));
assert_eq!(batcher.pending_count(), 2);
let _ = batcher.take_commands();
assert_eq!(batcher.pending_count(), 0);
assert!(batcher.is_empty());
}
#[test]
fn test_should_execute_false_when_empty_and_recent() {
let batcher = CommandBatcher::new();
assert!(!batcher.should_execute());
}
#[test]
fn test_should_execute_true_at_max_batch_size() {
let mut batcher = CommandBatcher::new();
for i in 0..DEFAULT_MAX_BATCH_SIZE {
batcher.add_command(create_test_command(i as u32, 1, 6, i as u16, "uint16"));
}
assert!(batcher.should_execute());
}
#[test]
fn test_take_commands_returns_all_pending() {
let mut batcher = CommandBatcher::new();
batcher.add_command(create_test_command(1, 1, 6, 100, "uint16"));
batcher.add_command(create_test_command(2, 1, 6, 101, "uint16"));
batcher.add_command(create_test_command(3, 2, 6, 200, "uint16"));
let commands = batcher.take_commands();
assert_eq!(commands.len(), 2); assert_eq!(commands.get(&(1, 6)).map(|v| v.len()), Some(2));
assert_eq!(commands.get(&(2, 6)).map(|v| v.len()), Some(1));
}
#[test]
fn test_add_command_groups_by_slave_and_function() {
let mut batcher = CommandBatcher::new();
batcher.add_command(create_test_command(1, 1, 6, 100, "uint16"));
batcher.add_command(create_test_command(2, 1, 6, 101, "uint16"));
batcher.add_command(create_test_command(3, 2, 6, 100, "uint16"));
batcher.add_command(create_test_command(4, 1, 16, 100, "uint16"));
let commands = batcher.take_commands();
assert_eq!(commands.len(), 3); assert_eq!(commands.get(&(1, 6)).map(|v| v.len()), Some(2));
assert_eq!(commands.get(&(2, 6)).map(|v| v.len()), Some(1));
assert_eq!(commands.get(&(1, 16)).map(|v| v.len()), Some(1));
}
#[test]
fn test_consecutive_single_command_returns_false() {
let commands = vec![create_test_command(1, 1, 6, 100, "uint16")];
assert!(!CommandBatcher::are_strictly_consecutive(&commands));
}
#[test]
fn test_consecutive_empty_returns_false() {
let commands: Vec<BatchCommand> = vec![];
assert!(!CommandBatcher::are_strictly_consecutive(&commands));
}
#[test]
fn test_consecutive_uint16_sequence() {
let commands = vec![
create_test_command(1, 1, 6, 100, "uint16"),
create_test_command(2, 1, 6, 101, "uint16"),
create_test_command(3, 1, 6, 102, "uint16"),
];
assert!(CommandBatcher::are_strictly_consecutive(&commands));
}
#[test]
fn test_consecutive_float32_sequence() {
let commands = vec![
create_test_command(1, 1, 6, 100, "float32"),
create_test_command(2, 1, 6, 102, "float32"),
create_test_command(3, 1, 6, 104, "float32"),
];
assert!(CommandBatcher::are_strictly_consecutive(&commands));
}
#[test]
fn test_consecutive_mixed_types() {
let commands = vec![
create_test_command(1, 1, 6, 100, "uint16"),
create_test_command(2, 1, 6, 101, "float32"),
create_test_command(3, 1, 6, 103, "uint16"),
];
assert!(CommandBatcher::are_strictly_consecutive(&commands));
}
#[test]
fn test_non_consecutive_gap() {
let commands = vec![
create_test_command(1, 1, 6, 100, "uint16"),
create_test_command(2, 1, 6, 105, "uint16"), ];
assert!(!CommandBatcher::are_strictly_consecutive(&commands));
}
#[test]
fn test_consecutive_out_of_order_input() {
let commands = vec![
create_test_command(3, 1, 6, 102, "uint16"),
create_test_command(1, 1, 6, 100, "uint16"),
create_test_command(2, 1, 6, 101, "uint16"),
];
assert!(CommandBatcher::are_strictly_consecutive(&commands));
}
#[test]
fn test_register_count() {
assert_eq!(CommandBatcher::get_register_count("uint16"), 1);
assert_eq!(CommandBatcher::get_register_count("int16"), 1);
assert_eq!(CommandBatcher::get_register_count("uint32"), 2);
assert_eq!(CommandBatcher::get_register_count("float32"), 2);
assert_eq!(CommandBatcher::get_register_count("uint64"), 4);
assert_eq!(CommandBatcher::get_register_count("float64"), 4);
assert_eq!(CommandBatcher::get_register_count("unknown"), 1);
}
#[test]
fn test_clear() {
let mut batcher = CommandBatcher::new();
batcher.add_command(create_test_command(1, 1, 6, 100, "uint16"));
batcher.add_command(create_test_command(2, 1, 6, 101, "uint16"));
assert_eq!(batcher.pending_count(), 2);
batcher.clear();
assert_eq!(batcher.pending_count(), 0);
assert!(batcher.is_empty());
}
#[test]
fn test_batch_workflow() {
let mut batcher = CommandBatcher::new();
assert_eq!(batcher.pending_count(), 0);
for i in 0..5 {
batcher.add_command(create_test_command(i, 1, 6, 100 + i as u16, "uint16"));
}
assert_eq!(batcher.pending_count(), 5);
let batch = batcher.take_commands();
assert_eq!(batch.get(&(1, 6)).unwrap().len(), 5);
assert_eq!(batcher.pending_count(), 0);
assert!(batcher.is_empty());
}
}