use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tracing::{debug, error, info};
use crate::context::{BroadcastPolicy, ServerContext, SharedAddressSpace};
use crate::device::ModbusDevice;
use crate::error::{ModbusError, ModbusResult};
use crate::fault_injection::rtu_timing::RtuTimingFaultConfig;
use crate::fault_injection::{FaultAction, FaultPipeline, ModbusFaultContext};
use crate::handler::HandlerRegistry;
use crate::register::RegisterStore;
use crate::service::{
execute_transport_request, ExtensionRegistry, StandardModbusService, TransportDisposition,
TransportServicePolicy, UnknownUnitBehavior,
};
use crate::transport_runtime::TransportHookBundle;
use super::codec::RtuTiming;
use super::frame::{RtuFrame, RtuFrameError};
use super::transport::{
ChannelConfig, RtuTransport, TransportConfig, TransportFactory, TransportMetrics, TransportType,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum PerformancePreset {
#[default]
Default,
HighThroughput,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum EventEmissionMode {
Always,
SubscriberAware,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct RtuRuntimePolicy {
enforce_request_timeout: bool,
event_mode: EventEmissionMode,
record_transport_metrics: bool,
}
impl RtuRuntimePolicy {
fn resolve(
preset: PerformancePreset,
transport_type: TransportType,
has_fault_pipeline: bool,
has_timing_fault: bool,
simulate_response_delay: bool,
) -> Self {
match preset {
PerformancePreset::Default => Self {
enforce_request_timeout: true,
event_mode: EventEmissionMode::Always,
record_transport_metrics: true,
},
PerformancePreset::HighThroughput => match transport_type {
TransportType::Channel => Self {
enforce_request_timeout: true,
event_mode: EventEmissionMode::Always,
record_transport_metrics: true,
},
TransportType::TcpBridge => Self {
enforce_request_timeout: false,
event_mode: EventEmissionMode::SubscriberAware,
record_transport_metrics: true,
},
TransportType::VirtualSerial => {
let keep_timeout =
has_fault_pipeline || has_timing_fault || simulate_response_delay;
Self {
enforce_request_timeout: keep_timeout,
event_mode: EventEmissionMode::SubscriberAware,
record_transport_metrics: true,
}
}
},
}
}
#[inline]
fn request_timeout(self, timeout: Duration) -> Option<Duration> {
self.enforce_request_timeout.then_some(timeout)
}
#[inline]
fn should_emit_events(self, subscriber_count: usize) -> bool {
match self.event_mode {
EventEmissionMode::Always => true,
EventEmissionMode::SubscriberAware => subscriber_count > 0,
}
}
#[inline]
fn should_record_transport_metrics(self) -> bool {
self.record_transport_metrics
}
}
#[derive(Debug, Clone, Copy)]
struct RtuHookBundle {
transport: TransportHookBundle,
simulate_response_delay: bool,
additional_response_delay: Duration,
apply_timing_faults: bool,
}
impl RtuHookBundle {
fn new(
policy: RtuRuntimePolicy,
request_timeout: Duration,
simulate_response_delay: bool,
additional_response_delay: Duration,
apply_timing_faults: bool,
) -> Self {
Self {
transport: TransportHookBundle::new()
.with_request_timeout(policy.request_timeout(request_timeout))
.with_transport_metrics(policy.should_record_transport_metrics()),
simulate_response_delay,
additional_response_delay,
apply_timing_faults,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RtuServerConfig {
#[serde(default)]
pub transport: TransportConfig,
#[serde(default)]
pub unit_ids: Vec<u8>,
#[serde(default = "default_broadcast")]
pub broadcast_enabled: bool,
#[serde(default = "default_request_timeout")]
pub request_timeout: Duration,
#[serde(default = "default_shutdown_timeout")]
pub shutdown_timeout: Duration,
#[serde(default)]
pub simulate_response_delay: bool,
#[serde(default)]
pub additional_response_delay: Duration,
#[serde(default)]
pub performance_preset: PerformancePreset,
}
fn default_broadcast() -> bool {
true
}
fn default_request_timeout() -> Duration {
Duration::from_secs(5)
}
fn default_shutdown_timeout() -> Duration {
Duration::from_secs(10)
}
impl Default for RtuServerConfig {
fn default() -> Self {
Self {
transport: TransportConfig::default(),
unit_ids: vec![1], broadcast_enabled: true,
request_timeout: default_request_timeout(),
shutdown_timeout: default_shutdown_timeout(),
simulate_response_delay: true,
additional_response_delay: Duration::ZERO,
performance_preset: PerformancePreset::Default,
}
}
}
impl RtuServerConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_transport(mut self, transport: TransportConfig) -> Self {
self.transport = transport;
self
}
pub fn with_unit_ids(mut self, ids: Vec<u8>) -> Self {
self.unit_ids = ids;
self
}
pub fn with_broadcast(mut self, enabled: bool) -> Self {
self.broadcast_enabled = enabled;
self
}
pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
pub fn with_performance_preset(mut self, preset: PerformancePreset) -> Self {
self.performance_preset = preset;
self
}
pub fn with_response_delay_simulation(mut self, enabled: bool) -> Self {
self.simulate_response_delay = enabled;
self
}
pub fn for_testing() -> Self {
Self {
transport: TransportConfig::Channel(ChannelConfig::default()),
unit_ids: vec![1],
broadcast_enabled: true,
request_timeout: Duration::from_secs(1),
shutdown_timeout: Duration::from_secs(1),
simulate_response_delay: false,
additional_response_delay: Duration::ZERO,
performance_preset: PerformancePreset::Default,
}
}
}
#[derive(Debug, Clone)]
pub enum RtuServerEvent {
Started,
Stopped,
RequestReceived {
unit_id: u8,
function_code: u8,
timestamp: Instant,
},
ResponseSent {
unit_id: u8,
function_code: u8,
is_exception: bool,
latency_us: u64,
},
Error { message: String },
FrameError { error: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RtuServerState {
Stopped,
Starting,
Running,
Stopping,
}
#[derive(Debug, Clone, Default)]
pub struct RtuServerStats {
pub requests_processed: u64,
pub requests_success: u64,
pub requests_exception: u64,
pub crc_errors: u64,
pub framing_errors: u64,
pub timeouts: u64,
pub bytes_received: u64,
pub bytes_sent: u64,
pub avg_latency_us: f64,
}
#[derive(Debug, Default)]
struct RtuStatsCounters {
requests_processed: AtomicU64,
requests_success: AtomicU64,
requests_exception: AtomicU64,
crc_errors: AtomicU64,
framing_errors: AtomicU64,
timeouts: AtomicU64,
bytes_received: AtomicU64,
bytes_sent: AtomicU64,
}
impl RtuStatsCounters {
#[inline]
fn record_request(
&self,
is_exception: bool,
latency_us: u64,
bytes_received: u64,
bytes_sent: u64,
) {
let _ = latency_us;
self.requests_processed.fetch_add(1, Ordering::Relaxed);
if is_exception {
self.requests_exception.fetch_add(1, Ordering::Relaxed);
} else {
self.requests_success.fetch_add(1, Ordering::Relaxed);
}
self.bytes_received
.fetch_add(bytes_received, Ordering::Relaxed);
self.bytes_sent.fetch_add(bytes_sent, Ordering::Relaxed);
}
#[inline]
fn record_crc_error(&self) {
self.crc_errors.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn record_framing_error(&self) {
self.framing_errors.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn record_timeout(&self) {
self.timeouts.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self, request_count: u64, latency_sum: u64) -> RtuServerStats {
let avg_latency_us = if request_count > 0 {
latency_sum as f64 / request_count as f64
} else {
0.0
};
RtuServerStats {
requests_processed: self.requests_processed.load(Ordering::Relaxed),
requests_success: self.requests_success.load(Ordering::Relaxed),
requests_exception: self.requests_exception.load(Ordering::Relaxed),
crc_errors: self.crc_errors.load(Ordering::Relaxed),
framing_errors: self.framing_errors.load(Ordering::Relaxed),
timeouts: self.timeouts.load(Ordering::Relaxed),
bytes_received: self.bytes_received.load(Ordering::Relaxed),
bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
avg_latency_us,
}
}
}
#[derive(Debug, Default)]
struct AtomicTransportMetrics {
bytes_received: AtomicU64,
bytes_sent: AtomicU64,
frames_received: AtomicU64,
frames_sent: AtomicU64,
crc_errors: AtomicU64,
framing_errors: AtomicU64,
timeouts: AtomicU64,
}
impl AtomicTransportMetrics {
#[inline]
fn record_bytes_received(&self, bytes: usize) {
self.bytes_received
.fetch_add(bytes as u64, Ordering::Relaxed);
}
#[inline]
fn record_bytes_sent(&self, bytes: usize) {
self.bytes_sent.fetch_add(bytes as u64, Ordering::Relaxed);
self.frames_sent.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn record_frame_received(&self) {
self.frames_received.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn record_crc_error(&self) {
self.crc_errors.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn record_framing_error(&self) {
self.framing_errors.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn record_timeout(&self) {
self.timeouts.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> TransportMetrics {
TransportMetrics {
bytes_received: self.bytes_received.load(Ordering::Relaxed),
bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
frames_received: self.frames_received.load(Ordering::Relaxed),
frames_sent: self.frames_sent.load(Ordering::Relaxed),
crc_errors: self.crc_errors.load(Ordering::Relaxed),
framing_errors: self.framing_errors.load(Ordering::Relaxed),
timeouts: self.timeouts.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
enum UnitFilter {
All,
Selected(Box<[bool; 256]>),
}
impl UnitFilter {
fn new(unit_ids: &[u8]) -> Self {
if unit_ids.is_empty() {
Self::All
} else {
let mut selected = Box::new([false; 256]);
for unit_id in unit_ids {
selected[*unit_id as usize] = true;
}
Self::Selected(selected)
}
}
#[inline]
fn allows(&self, unit_id: u8) -> bool {
match self {
Self::All => true,
Self::Selected(selected) => selected[unit_id as usize],
}
}
}
pub struct ModbusRtuServer {
config: RtuServerConfig,
service: Arc<StandardModbusService>,
devices: DashMap<u8, Arc<ModbusDevice>>,
server_context: Arc<ServerContext>,
unit_filter: UnitFilter,
state: RwLock<RtuServerState>,
shutdown: Arc<AtomicBool>,
event_tx: broadcast::Sender<RtuServerEvent>,
stats: RtuStatsCounters,
transport_metrics: AtomicTransportMetrics,
request_count: AtomicU64,
latency_sum: AtomicU64,
fault_pipeline: Option<Arc<FaultPipeline>>,
rtu_timing_fault: Option<Arc<RtuTimingFaultConfig>>,
}
impl ModbusRtuServer {
pub fn new(config: RtuServerConfig) -> Self {
let (event_tx, _) = broadcast::channel(256);
let server_context = Arc::new(ServerContext::new(Arc::new(RegisterStore::with_defaults())));
server_context.set_broadcast_enabled(config.broadcast_enabled);
let unit_filter = UnitFilter::new(&config.unit_ids);
Self {
config,
service: Arc::new(StandardModbusService::default()),
devices: DashMap::new(),
server_context,
unit_filter,
state: RwLock::new(RtuServerState::Stopped),
shutdown: Arc::new(AtomicBool::new(false)),
event_tx,
stats: RtuStatsCounters::default(),
transport_metrics: AtomicTransportMetrics::default(),
request_count: AtomicU64::new(0),
latency_sum: AtomicU64::new(0),
fault_pipeline: None,
rtu_timing_fault: None,
}
}
pub fn with_fault_pipeline(mut self, pipeline: FaultPipeline) -> Self {
self.fault_pipeline = Some(Arc::new(pipeline));
self
}
pub fn with_rtu_timing_fault(mut self, config: RtuTimingFaultConfig) -> Self {
self.rtu_timing_fault = Some(Arc::new(config));
self
}
pub fn with_handlers(mut self, handlers: HandlerRegistry) -> Self {
self.service = Arc::new(StandardModbusService::new(handlers));
self
}
pub fn with_extensions(mut self, extensions: ExtensionRegistry) -> Self {
self.service = Arc::new(StandardModbusService::with_extensions(extensions));
self
}
pub fn with_default_registers(self, registers: RegisterStore) -> Self {
self.server_context.set_default_space(Arc::new(registers));
self
}
pub fn add_device(&self, device: ModbusDevice) {
let unit_id = device.unit_id();
let device = Arc::new(device);
self.server_context.register(device.context().clone());
self.devices.insert(unit_id, device);
}
pub fn remove_device(&self, unit_id: u8) -> Option<Arc<ModbusDevice>> {
self.server_context.remove(unit_id);
self.devices.remove(&unit_id).map(|(_, d)| d)
}
pub fn device(&self, unit_id: u8) -> Option<Arc<ModbusDevice>> {
self.devices.get(&unit_id).map(|d| d.clone())
}
pub fn device_ids(&self) -> Vec<u8> {
self.devices.iter().map(|entry| *entry.key()).collect()
}
pub fn default_registers(&self) -> SharedAddressSpace {
self.server_context.default_space()
}
pub fn set_broadcast_enabled(&self, enabled: bool) {
self.server_context.set_broadcast_enabled(enabled);
}
pub fn set_broadcast_policy(&self, policy: BroadcastPolicy) {
self.server_context.set_broadcast_policy(policy);
}
pub fn subscribe(&self) -> broadcast::Receiver<RtuServerEvent> {
self.event_tx.subscribe()
}
fn runtime_policy(&self, transport_type: TransportType) -> RtuRuntimePolicy {
RtuRuntimePolicy::resolve(
self.config.performance_preset,
transport_type,
self.fault_pipeline.is_some(),
self.rtu_timing_fault
.as_ref()
.map(|config| config.is_active())
.unwrap_or(false),
self.config.simulate_response_delay,
)
}
#[inline]
fn should_emit_events(&self, policy: RtuRuntimePolicy) -> bool {
policy.should_emit_events(self.event_tx.receiver_count())
}
#[inline]
fn emit_event(&self, policy: RtuRuntimePolicy, event: RtuServerEvent) {
if self.should_emit_events(policy) {
let _ = self.event_tx.send(event);
}
}
#[inline]
fn record_transport_bytes_received(&self, policy: RtuRuntimePolicy, bytes: usize) {
if policy.should_record_transport_metrics() {
self.transport_metrics.record_bytes_received(bytes);
}
}
#[inline]
fn record_transport_bytes_sent(&self, policy: RtuRuntimePolicy, bytes: usize) {
if policy.should_record_transport_metrics() {
self.transport_metrics.record_bytes_sent(bytes);
}
}
#[inline]
fn record_transport_frame_received(&self, policy: RtuRuntimePolicy) {
if policy.should_record_transport_metrics() {
self.transport_metrics.record_frame_received();
}
}
#[inline]
fn record_transport_crc_error(&self, policy: RtuRuntimePolicy) {
if policy.should_record_transport_metrics() {
self.transport_metrics.record_crc_error();
}
}
#[inline]
fn record_transport_framing_error(&self, policy: RtuRuntimePolicy) {
if policy.should_record_transport_metrics() {
self.transport_metrics.record_framing_error();
}
}
#[inline]
fn record_request_observation(
&self,
is_exception: bool,
latency_us: u64,
request_bytes: u64,
response_bytes: u64,
) {
self.request_count.fetch_add(1, Ordering::Relaxed);
self.latency_sum.fetch_add(latency_us, Ordering::Relaxed);
self.stats
.record_request(is_exception, latency_us, request_bytes, response_bytes);
}
async fn send_response_bytes(
&self,
transport: &mut dyn RtuTransport,
policy: RtuRuntimePolicy,
hooks: RtuHookBundle,
bytes: &[u8],
allow_timing_faults: bool,
error_context: &str,
) -> bool {
if allow_timing_faults && hooks.apply_timing_faults {
if let Some(ref timing_config) = self.rtu_timing_fault {
let plan = timing_config.build_timing_plan(bytes);
let mut total_sent = 0usize;
for segment in &plan.segments {
if !segment.delay_before.is_zero() {
tokio::time::sleep(segment.delay_before).await;
}
if let Err(error) = transport.write(&segment.data).await {
error!("{error_context}: {error}");
self.emit_event(
policy,
RtuServerEvent::Error {
message: error.to_string(),
},
);
return false;
}
total_sent += segment.data.len();
}
self.record_transport_bytes_sent(policy, total_sent);
return true;
}
}
if hooks.simulate_response_delay {
let delay = transport.transmission_delay(bytes.len()) + hooks.additional_response_delay;
tokio::time::sleep(delay).await;
}
match transport.write(bytes).await {
Ok(_) => {
self.record_transport_bytes_sent(policy, bytes.len());
true
}
Err(error) => {
error!("{error_context}: {error}");
self.emit_event(
policy,
RtuServerEvent::Error {
message: error.to_string(),
},
);
false
}
}
}
pub fn state(&self) -> RtuServerState {
*self.state.read()
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::SeqCst)
}
pub fn shutdown(&self) {
if !self.shutdown.swap(true, Ordering::SeqCst) {
info!("RTU server shutdown requested");
}
}
pub fn stats(&self) -> RtuServerStats {
let count = self.request_count.load(Ordering::Relaxed);
let sum = self.latency_sum.load(Ordering::Relaxed);
self.stats.snapshot(count, sum)
}
pub fn transport_metrics(&self) -> TransportMetrics {
self.transport_metrics.snapshot()
}
pub async fn run(&self) -> ModbusResult<()> {
let transport = TransportFactory::create(self.config.transport.clone()).await?;
self.run_with_boxed_transport(transport).await
}
pub async fn run_with_transport<T: RtuTransport + 'static>(
&self,
transport: T,
) -> ModbusResult<()> {
self.run_with_boxed_transport(Box::new(transport)).await
}
async fn run_with_boxed_transport(
&self,
mut transport: Box<dyn RtuTransport>,
) -> ModbusResult<()> {
let policy = self.runtime_policy(transport.transport_type());
let hooks = RtuHookBundle::new(
policy,
self.config.request_timeout,
self.config.simulate_response_delay,
self.config.additional_response_delay,
self.rtu_timing_fault
.as_ref()
.map(|config| config.is_active())
.unwrap_or(false),
);
{
let mut state = self.state.write();
if *state != RtuServerState::Stopped {
return Err(ModbusError::Internal("Server already running".into()));
}
*state = RtuServerState::Starting;
}
self.shutdown.store(false, Ordering::SeqCst);
self.emit_event(policy, RtuServerEvent::Started);
{
let mut state = self.state.write();
*state = RtuServerState::Running;
}
info!("RTU server started");
let mut read_buffer = vec![0u8; 256];
let mut frame_buffer = Vec::with_capacity(256);
let mut rtu_request_number: u64 = 0;
let serial_config = transport.serial_config().clone();
let timing = RtuTiming::from_baud_rate(serial_config.baud_rate);
loop {
if self.shutdown.load(Ordering::SeqCst) {
break;
}
let read_result = tokio::time::timeout(
timing.inter_frame_timeout * 2,
transport.read(&mut read_buffer),
)
.await;
match read_result {
Ok(Ok(0)) => {
tokio::task::yield_now().await;
continue;
}
Ok(Ok(n)) => {
frame_buffer.extend_from_slice(&read_buffer[..n]);
self.record_transport_bytes_received(policy, n);
if let Some(frame) = self.try_parse_frame(&mut frame_buffer, policy)? {
let emit_events = self.should_emit_events(policy);
let response = self.process_request(&frame, hooks, emit_events).await;
rtu_request_number += 1;
if response.pdu.is_empty() {
continue;
}
let fault_action = if let Some(ref pipeline) = self.fault_pipeline {
let unit_id = frame.unit_id;
let function_code = frame.function_code().unwrap_or(0);
let fault_ctx = ModbusFaultContext::rtu(
unit_id,
function_code,
&frame.pdu,
&response.pdu,
rtu_request_number,
);
pipeline.apply(&fault_ctx)
} else {
None
};
match fault_action {
Some(FaultAction::DropResponse) => {
debug!("Fault: dropping RTU response");
}
Some(FaultAction::DelayThenSend {
delay,
response: fault_pdu,
}) => {
tokio::time::sleep(delay).await;
let response_bytes = RtuFrame::response(&frame, fault_pdu).encode();
let _ = self
.send_response_bytes(
transport.as_mut(),
policy,
hooks,
&response_bytes,
false,
"Failed to send delayed response",
)
.await;
}
Some(FaultAction::SendRawBytes(raw_bytes)) => {
let _ = self
.send_response_bytes(
transport.as_mut(),
policy,
hooks,
&raw_bytes,
true,
"Failed to send raw bytes",
)
.await;
}
Some(FaultAction::SendPartial { bytes }) => {
let _ = self
.send_response_bytes(
transport.as_mut(),
policy,
hooks,
&bytes,
false,
"Failed to send partial frame",
)
.await;
}
Some(FaultAction::SendResponse(fault_pdu)) => {
let response_bytes = RtuFrame::response(&frame, fault_pdu).encode();
let _ = self
.send_response_bytes(
transport.as_mut(),
policy,
hooks,
&response_bytes,
false,
"Failed to send faulted response",
)
.await;
}
Some(FaultAction::OverrideTransactionId { .. }) => {
let response_bytes = response.encode();
let _ = self
.send_response_bytes(
transport.as_mut(),
policy,
hooks,
&response_bytes,
false,
"Failed to send response",
)
.await;
}
None => {
let response_bytes = response.encode();
let _ = self
.send_response_bytes(
transport.as_mut(),
policy,
hooks,
&response_bytes,
true,
"Failed to send response",
)
.await;
}
}
}
}
Ok(Err(e)) => {
error!("Transport read error: {}", e);
self.emit_event(
policy,
RtuServerEvent::Error {
message: e.to_string(),
},
);
}
Err(_) => {
if !frame_buffer.is_empty() {
debug!("Discarding incomplete frame ({} bytes)", frame_buffer.len());
frame_buffer.clear();
self.stats.record_framing_error();
}
}
}
}
{
let mut state = self.state.write();
*state = RtuServerState::Stopping;
}
let _ = transport.close().await;
{
let mut state = self.state.write();
*state = RtuServerState::Stopped;
}
self.emit_event(policy, RtuServerEvent::Stopped);
info!("RTU server stopped");
Ok(())
}
fn try_parse_frame(
&self,
buffer: &mut Vec<u8>,
policy: RtuRuntimePolicy,
) -> ModbusResult<Option<RtuFrame>> {
if buffer.len() < 4 {
return Ok(None);
}
match RtuFrame::try_decode(buffer) {
Ok(Some(frame)) => {
let frame_size = frame.frame_size();
buffer.drain(..frame_size);
self.record_transport_frame_received(policy);
Ok(Some(frame))
}
Ok(None) => {
Ok(None)
}
Err(RtuFrameError::CrcMismatch { .. }) => {
buffer.clear();
self.stats.record_crc_error();
self.record_transport_crc_error(policy);
self.emit_event(
policy,
RtuServerEvent::FrameError {
error: "CRC mismatch".into(),
},
);
Ok(None)
}
Err(e) => {
buffer.clear();
self.stats.record_framing_error();
self.record_transport_framing_error(policy);
self.emit_event(
policy,
RtuServerEvent::FrameError {
error: e.to_string(),
},
);
Ok(None)
}
}
}
async fn process_request(
&self,
request: &RtuFrame,
hooks: RtuHookBundle,
emit_events: bool,
) -> RtuFrame {
let start = Instant::now();
let unit_id = request.unit_id;
let function_code = request.function_code().unwrap_or(0);
let is_broadcast = unit_id == 0;
if emit_events {
let _ = self.event_tx.send(RtuServerEvent::RequestReceived {
unit_id,
function_code,
timestamp: start,
});
}
if !self.should_respond_to_unit(unit_id) {
debug!("Ignoring request for unit {}", unit_id);
return RtuFrame::new(unit_id, vec![]);
}
let execution = execute_transport_request(
self.service.as_ref(),
self.server_context.as_ref(),
unit_id,
0,
request.pdu.as_slice(),
TransportServicePolicy::new(UnknownUnitBehavior::Ignore)
.with_request_timeout(hooks.transport.request_timeout),
)
.await;
if execution.timed_out {
self.stats.record_timeout();
if hooks.transport.record_transport_metrics {
self.transport_metrics.record_timeout();
}
}
let (is_exception, response) = match execution.disposition {
TransportDisposition::Ignore => return RtuFrame::new(unit_id, vec![]),
TransportDisposition::BroadcastSuppressed(response) => {
(response.is_exception(), RtuFrame::new(unit_id, vec![]))
}
TransportDisposition::Reply(response) => {
let is_exception = response.is_exception();
(
is_exception,
RtuFrame::response(request, response.into_bytes()),
)
}
};
let latency_us = start.elapsed().as_micros() as u64;
self.record_request_observation(
is_exception,
latency_us,
request.frame_size() as u64,
response.frame_size() as u64,
);
if emit_events && !is_broadcast {
let _ = self.event_tx.send(RtuServerEvent::ResponseSent {
unit_id,
function_code,
is_exception,
latency_us,
});
}
response
}
fn should_respond_to_unit(&self, unit_id: u8) -> bool {
if unit_id == 0 {
return self.server_context.broadcast_enabled();
}
self.unit_filter.allows(unit_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_config_default() {
let config = RtuServerConfig::default();
assert_eq!(config.unit_ids, vec![1]);
assert!(config.broadcast_enabled);
}
#[test]
fn test_server_config_builder() {
let config = RtuServerConfig::new()
.with_unit_ids(vec![1, 2, 3])
.with_broadcast(false)
.with_request_timeout(Duration::from_secs(10));
assert_eq!(config.unit_ids, vec![1, 2, 3]);
assert!(!config.broadcast_enabled);
assert_eq!(config.request_timeout, Duration::from_secs(10));
}
#[test]
fn test_server_creation() {
let config = RtuServerConfig::for_testing();
let server = ModbusRtuServer::new(config);
assert_eq!(server.state(), RtuServerState::Stopped);
assert!(!server.is_shutdown());
}
#[test]
fn test_server_device_management() {
use crate::config::ModbusDeviceConfig;
let server = ModbusRtuServer::new(RtuServerConfig::for_testing());
let device = ModbusDevice::new(ModbusDeviceConfig::new(5, "Test Device"));
server.add_device(device);
assert!(server.device(5).is_some());
assert!(server.device(10).is_none());
let removed = server.remove_device(5);
assert!(removed.is_some());
assert!(server.device(5).is_none());
}
#[test]
fn test_should_respond_to_unit() {
let config = RtuServerConfig::new()
.with_unit_ids(vec![1, 2, 3])
.with_broadcast(true);
let server = ModbusRtuServer::new(config);
assert!(server.should_respond_to_unit(1));
assert!(server.should_respond_to_unit(2));
assert!(server.should_respond_to_unit(3));
assert!(!server.should_respond_to_unit(4));
assert!(!server.should_respond_to_unit(255));
assert!(server.should_respond_to_unit(0));
}
#[test]
fn test_should_respond_broadcast_disabled() {
let config = RtuServerConfig::new()
.with_unit_ids(vec![1])
.with_broadcast(false);
let server = ModbusRtuServer::new(config);
assert!(server.should_respond_to_unit(1));
assert!(!server.should_respond_to_unit(0)); }
#[test]
fn test_should_respond_empty_filter() {
let config = RtuServerConfig::new().with_unit_ids(vec![]);
let server = ModbusRtuServer::new(config);
assert!(server.should_respond_to_unit(1));
assert!(server.should_respond_to_unit(100));
assert!(server.should_respond_to_unit(255));
}
#[test]
fn test_runtime_policy_default_is_fully_observable() {
let policy = RtuRuntimePolicy::resolve(
PerformancePreset::Default,
TransportType::Channel,
false,
false,
false,
);
assert_eq!(
policy.request_timeout(Duration::from_secs(1)),
Some(Duration::from_secs(1))
);
assert!(policy.should_emit_events(0));
assert!(policy.should_record_transport_metrics());
}
#[test]
fn test_runtime_policy_channel_high_throughput_matches_default() {
let policy = RtuRuntimePolicy::resolve(
PerformancePreset::HighThroughput,
TransportType::Channel,
false,
false,
false,
);
assert_eq!(
policy.request_timeout(Duration::from_secs(1)),
Some(Duration::from_secs(1))
);
assert!(policy.should_emit_events(0));
}
#[test]
fn test_runtime_policy_tcp_bridge_high_throughput_is_subscriber_aware() {
let policy = RtuRuntimePolicy::resolve(
PerformancePreset::HighThroughput,
TransportType::TcpBridge,
false,
false,
false,
);
assert_eq!(policy.request_timeout(Duration::from_secs(1)), None);
assert!(!policy.should_emit_events(0));
assert!(policy.should_emit_events(1));
}
#[test]
fn test_runtime_policy_virtual_serial_keeps_timeout_when_timing_semantics_are_active() {
let policy = RtuRuntimePolicy::resolve(
PerformancePreset::HighThroughput,
TransportType::VirtualSerial,
true,
true,
true,
);
assert_eq!(
policy.request_timeout(Duration::from_secs(1)),
Some(Duration::from_secs(1))
);
assert!(!policy.should_emit_events(0));
assert!(policy.should_emit_events(2));
}
#[tokio::test]
async fn test_server_stats() {
let server = ModbusRtuServer::new(RtuServerConfig::for_testing());
let stats = server.stats();
assert_eq!(stats.requests_processed, 0);
assert_eq!(stats.crc_errors, 0);
}
}