Skip to main content

mabi_modbus/rtu/
server.rs

1//! Modbus RTU server implementation.
2//!
3//! This module provides a high-performance RTU server with support for:
4//!
5//! - Multiple transport types (virtual serial, TCP bridge, channel)
6//! - Multiple device simulation
7//! - Extensible handler architecture
8//! - Metrics and monitoring
9//! - Graceful shutdown
10//!
11//! # Example
12//!
13//! ```rust,no_run
14//! use mabi_modbus::rtu::{ModbusRtuServer, RtuServerConfig, VirtualSerialConfig};
15//!
16//! #[tokio::main]
17//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
18//!     let config = RtuServerConfig::default()
19//!         .with_unit_ids(vec![1, 2, 3]);
20//!
21//!     let server = ModbusRtuServer::new(config);
22//!     server.run().await?;
23//!     Ok(())
24//! }
25//! ```
26
27use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use dashmap::DashMap;
32use parking_lot::RwLock;
33use serde::{Deserialize, Serialize};
34use tokio::sync::broadcast;
35use tracing::{debug, error, info};
36
37use crate::device::ModbusDevice;
38use crate::error::{ModbusError, ModbusResult};
39use crate::fault_injection::rtu_timing::RtuTimingFaultConfig;
40use crate::fault_injection::{FaultAction, FaultPipeline, ModbusFaultContext};
41use crate::handler::{build_exception_pdu, ExceptionCode, HandlerContext, HandlerRegistry};
42use crate::register::RegisterStore;
43
44use super::codec::RtuTiming;
45use super::frame::{RtuFrame, RtuFrameError};
46use super::transport::{ChannelConfig, ChannelTransport, RtuTransport, TransportConfig, TransportMetrics};
47
48/// RTU server configuration.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct RtuServerConfig {
51    /// Transport configuration.
52    #[serde(default)]
53    pub transport: TransportConfig,
54
55    /// Supported unit IDs (empty = all).
56    #[serde(default)]
57    pub unit_ids: Vec<u8>,
58
59    /// Enable broadcast (unit ID 0).
60    #[serde(default = "default_broadcast")]
61    pub broadcast_enabled: bool,
62
63    /// Request processing timeout.
64    #[serde(default = "default_request_timeout")]
65    pub request_timeout: Duration,
66
67    /// Shutdown timeout.
68    #[serde(default = "default_shutdown_timeout")]
69    pub shutdown_timeout: Duration,
70
71    /// Enable response delay simulation.
72    #[serde(default)]
73    pub simulate_response_delay: bool,
74
75    /// Additional response delay (beyond transmission time).
76    #[serde(default)]
77    pub additional_response_delay: Duration,
78}
79
80fn default_broadcast() -> bool {
81    true
82}
83
84fn default_request_timeout() -> Duration {
85    Duration::from_secs(5)
86}
87
88fn default_shutdown_timeout() -> Duration {
89    Duration::from_secs(10)
90}
91
92impl Default for RtuServerConfig {
93    fn default() -> Self {
94        Self {
95            transport: TransportConfig::default(),
96            unit_ids: vec![1], // Default to unit ID 1
97            broadcast_enabled: true,
98            request_timeout: default_request_timeout(),
99            shutdown_timeout: default_shutdown_timeout(),
100            simulate_response_delay: true,
101            additional_response_delay: Duration::ZERO,
102        }
103    }
104}
105
106impl RtuServerConfig {
107    /// Create a new configuration with default settings.
108    pub fn new() -> Self {
109        Self::default()
110    }
111
112    /// Set transport configuration.
113    pub fn with_transport(mut self, transport: TransportConfig) -> Self {
114        self.transport = transport;
115        self
116    }
117
118    /// Set supported unit IDs.
119    pub fn with_unit_ids(mut self, ids: Vec<u8>) -> Self {
120        self.unit_ids = ids;
121        self
122    }
123
124    /// Enable or disable broadcast support.
125    pub fn with_broadcast(mut self, enabled: bool) -> Self {
126        self.broadcast_enabled = enabled;
127        self
128    }
129
130    /// Set request timeout.
131    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
132        self.request_timeout = timeout;
133        self
134    }
135
136    /// Enable response delay simulation.
137    pub fn with_response_delay_simulation(mut self, enabled: bool) -> Self {
138        self.simulate_response_delay = enabled;
139        self
140    }
141
142    /// Create a configuration for testing with channel transport.
143    pub fn for_testing() -> Self {
144        Self {
145            transport: TransportConfig::Channel(ChannelConfig::default()),
146            unit_ids: vec![1],
147            broadcast_enabled: true,
148            request_timeout: Duration::from_secs(1),
149            shutdown_timeout: Duration::from_secs(1),
150            simulate_response_delay: false,
151            additional_response_delay: Duration::ZERO,
152        }
153    }
154}
155
156/// RTU server events.
157#[derive(Debug, Clone)]
158pub enum RtuServerEvent {
159    /// Server started.
160    Started,
161
162    /// Server stopped.
163    Stopped,
164
165    /// Request received.
166    RequestReceived {
167        unit_id: u8,
168        function_code: u8,
169        timestamp: Instant,
170    },
171
172    /// Response sent.
173    ResponseSent {
174        unit_id: u8,
175        function_code: u8,
176        is_exception: bool,
177        latency_us: u64,
178    },
179
180    /// Error occurred.
181    Error { message: String },
182
183    /// Frame error (CRC, framing, etc.).
184    FrameError { error: String },
185}
186
187/// RTU server state.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum RtuServerState {
190    /// Server is stopped.
191    Stopped,
192    /// Server is starting.
193    Starting,
194    /// Server is running.
195    Running,
196    /// Server is stopping.
197    Stopping,
198}
199
200/// RTU server statistics.
201#[derive(Debug, Clone, Default)]
202pub struct RtuServerStats {
203    /// Total requests processed.
204    pub requests_processed: u64,
205
206    /// Successful requests.
207    pub requests_success: u64,
208
209    /// Exception responses.
210    pub requests_exception: u64,
211
212    /// CRC errors.
213    pub crc_errors: u64,
214
215    /// Framing errors.
216    pub framing_errors: u64,
217
218    /// Timeouts.
219    pub timeouts: u64,
220
221    /// Total bytes received.
222    pub bytes_received: u64,
223
224    /// Total bytes sent.
225    pub bytes_sent: u64,
226
227    /// Average latency (microseconds).
228    pub avg_latency_us: f64,
229}
230
231/// Modbus RTU server.
232///
233/// Provides a high-performance RTU server implementation with
234/// support for multiple devices and extensible handlers.
235pub struct ModbusRtuServer {
236    /// Server configuration.
237    config: RtuServerConfig,
238
239    /// Handler registry.
240    handlers: Arc<HandlerRegistry>,
241
242    /// Devices by unit ID.
243    devices: DashMap<u8, Arc<ModbusDevice>>,
244
245    /// Default register store (for unknown units).
246    default_registers: Arc<RegisterStore>,
247
248    /// Server state.
249    state: RwLock<RtuServerState>,
250
251    /// Shutdown flag.
252    shutdown: Arc<AtomicBool>,
253
254    /// Event broadcaster.
255    event_tx: broadcast::Sender<RtuServerEvent>,
256
257    /// Statistics.
258    stats: RwLock<RtuServerStats>,
259
260    /// Transport metrics.
261    transport_metrics: RwLock<TransportMetrics>,
262
263    /// Request counter for latency tracking.
264    request_count: AtomicU64,
265    latency_sum: AtomicU64,
266
267    /// Optional fault injection pipeline.
268    fault_pipeline: Option<Arc<FaultPipeline>>,
269
270    /// Optional RTU timing fault configuration.
271    rtu_timing_fault: Option<Arc<RtuTimingFaultConfig>>,
272}
273
274impl ModbusRtuServer {
275    /// Create a new RTU server.
276    pub fn new(config: RtuServerConfig) -> Self {
277        let (event_tx, _) = broadcast::channel(256);
278
279        Self {
280            config,
281            handlers: Arc::new(HandlerRegistry::with_defaults()),
282            devices: DashMap::new(),
283            default_registers: Arc::new(RegisterStore::with_defaults()),
284            state: RwLock::new(RtuServerState::Stopped),
285            shutdown: Arc::new(AtomicBool::new(false)),
286            event_tx,
287            stats: RwLock::new(RtuServerStats::default()),
288            transport_metrics: RwLock::new(TransportMetrics::new()),
289            request_count: AtomicU64::new(0),
290            latency_sum: AtomicU64::new(0),
291            fault_pipeline: None,
292            rtu_timing_fault: None,
293        }
294    }
295
296    /// Set fault injection pipeline.
297    pub fn with_fault_pipeline(mut self, pipeline: FaultPipeline) -> Self {
298        self.fault_pipeline = Some(Arc::new(pipeline));
299        self
300    }
301
302    /// Set RTU timing fault configuration.
303    pub fn with_rtu_timing_fault(mut self, config: RtuTimingFaultConfig) -> Self {
304        self.rtu_timing_fault = Some(Arc::new(config));
305        self
306    }
307
308    /// Create with custom handler registry.
309    pub fn with_handlers(mut self, handlers: HandlerRegistry) -> Self {
310        self.handlers = Arc::new(handlers);
311        self
312    }
313
314    /// Create with custom default registers.
315    pub fn with_default_registers(mut self, registers: RegisterStore) -> Self {
316        self.default_registers = Arc::new(registers);
317        self
318    }
319
320    /// Add a device to the server.
321    pub fn add_device(&self, device: ModbusDevice) {
322        let unit_id = device.unit_id();
323        self.devices.insert(unit_id, Arc::new(device));
324    }
325
326    /// Remove a device from the server.
327    pub fn remove_device(&self, unit_id: u8) -> Option<Arc<ModbusDevice>> {
328        self.devices.remove(&unit_id).map(|(_, d)| d)
329    }
330
331    /// Get a device by unit ID.
332    pub fn device(&self, unit_id: u8) -> Option<Arc<ModbusDevice>> {
333        self.devices.get(&unit_id).map(|d| d.clone())
334    }
335
336    /// Get default registers.
337    pub fn default_registers(&self) -> &Arc<RegisterStore> {
338        &self.default_registers
339    }
340
341    /// Subscribe to server events.
342    pub fn subscribe(&self) -> broadcast::Receiver<RtuServerEvent> {
343        self.event_tx.subscribe()
344    }
345
346    /// Get current server state.
347    pub fn state(&self) -> RtuServerState {
348        *self.state.read()
349    }
350
351    /// Check if shutdown has been requested.
352    pub fn is_shutdown(&self) -> bool {
353        self.shutdown.load(Ordering::SeqCst)
354    }
355
356    /// Request server shutdown.
357    pub fn shutdown(&self) {
358        if !self.shutdown.swap(true, Ordering::SeqCst) {
359            info!("RTU server shutdown requested");
360        }
361    }
362
363    /// Get server statistics.
364    pub fn stats(&self) -> RtuServerStats {
365        let mut stats = self.stats.read().clone();
366
367        // Calculate average latency
368        let count = self.request_count.load(Ordering::Relaxed);
369        if count > 0 {
370            let sum = self.latency_sum.load(Ordering::Relaxed);
371            stats.avg_latency_us = sum as f64 / count as f64;
372        }
373
374        stats
375    }
376
377    /// Get transport metrics.
378    pub fn transport_metrics(&self) -> TransportMetrics {
379        self.transport_metrics.read().clone()
380    }
381
382    /// Run the server with the configured transport.
383    pub async fn run(&self) -> ModbusResult<()> {
384        // For now, we'll use the channel transport for testing
385        // In production, this would create the appropriate transport
386        match &self.config.transport {
387            TransportConfig::Channel(config) => {
388                let (transport, _peer) = ChannelTransport::pair(config.clone());
389                self.run_with_transport(transport).await
390            }
391            _ => {
392                Err(ModbusError::Internal(
393                    "Only channel transport is currently supported".into(),
394                ))
395            }
396        }
397    }
398
399    /// Run with a specific transport.
400    pub async fn run_with_transport<T: RtuTransport + 'static>(
401        &self,
402        mut transport: T,
403    ) -> ModbusResult<()> {
404        // Update state
405        {
406            let mut state = self.state.write();
407            if *state != RtuServerState::Stopped {
408                return Err(ModbusError::Internal("Server already running".into()));
409            }
410            *state = RtuServerState::Starting;
411        }
412
413        self.shutdown.store(false, Ordering::SeqCst);
414        let _ = self.event_tx.send(RtuServerEvent::Started);
415
416        {
417            let mut state = self.state.write();
418            *state = RtuServerState::Running;
419        }
420
421        info!("RTU server started");
422
423        // Main processing loop
424        let mut read_buffer = vec![0u8; 256];
425        let mut frame_buffer = Vec::with_capacity(256);
426        let mut rtu_request_number: u64 = 0;
427        let serial_config = transport.serial_config().clone();
428        let timing = RtuTiming::from_baud_rate(serial_config.baud_rate);
429
430        loop {
431            // Check shutdown
432            if self.shutdown.load(Ordering::SeqCst) {
433                break;
434            }
435
436            // Read with timeout
437            let read_result = tokio::time::timeout(
438                timing.inter_frame_timeout * 2,
439                transport.read(&mut read_buffer),
440            )
441            .await;
442
443            match read_result {
444                Ok(Ok(0)) => {
445                    // No data available or connection closed
446                    tokio::task::yield_now().await;
447                    continue;
448                }
449                Ok(Ok(n)) => {
450                    // Data received
451                    frame_buffer.extend_from_slice(&read_buffer[..n]);
452                    self.transport_metrics.write().record_bytes_received(n);
453
454                    // Try to parse frame
455                    if let Some(frame) = self.try_parse_frame(&mut frame_buffer)? {
456                        // Process request
457                        let response = self.process_request(&frame).await;
458                        rtu_request_number += 1;
459
460                        // Apply fault injection pipeline (if configured)
461                        let fault_action = if let Some(ref pipeline) = self.fault_pipeline {
462                            let unit_id = frame.unit_id;
463                            let function_code = frame.function_code().unwrap_or(0);
464                            let fault_ctx = ModbusFaultContext::rtu(
465                                unit_id,
466                                function_code,
467                                &frame.pdu,
468                                &response.pdu,
469                                rtu_request_number,
470                            );
471                            pipeline.apply(&fault_ctx)
472                        } else {
473                            None
474                        };
475
476                        match fault_action {
477                            Some(FaultAction::DropResponse) => {
478                                // Silent drop - no response sent
479                                debug!("Fault: dropping RTU response");
480                            }
481                            Some(FaultAction::DelayThenSend { delay, response: fault_pdu }) => {
482                                tokio::time::sleep(delay).await;
483                                // Re-encode with faulted PDU
484                                let fault_frame = RtuFrame::response(&frame, fault_pdu);
485                                let response_bytes = fault_frame.encode();
486                                if self.config.simulate_response_delay {
487                                    let tx_delay = transport.transmission_delay(response_bytes.len());
488                                    tokio::time::sleep(tx_delay + self.config.additional_response_delay).await;
489                                }
490                                if let Err(e) = transport.write(&response_bytes).await {
491                                    error!("Failed to send delayed response: {}", e);
492                                } else {
493                                    self.transport_metrics.write().record_bytes_sent(response_bytes.len());
494                                }
495                            }
496                            Some(FaultAction::SendRawBytes(raw_bytes)) => {
497                                // Send raw wire bytes (used for CRC corruption, wrong unit_id)
498                                // Apply RTU timing faults if configured
499                                if let Some(ref timing_config) = self.rtu_timing_fault {
500                                    if timing_config.is_active() {
501                                        let plan = timing_config.build_timing_plan(&raw_bytes);
502                                        let mut total_sent = 0usize;
503                                        for segment in &plan.segments {
504                                            if !segment.delay_before.is_zero() {
505                                                tokio::time::sleep(segment.delay_before).await;
506                                            }
507                                            if let Err(e) = transport.write(&segment.data).await {
508                                                error!("Failed to send timing segment (raw): {}", e);
509                                                break;
510                                            }
511                                            total_sent += segment.data.len();
512                                        }
513                                        self.transport_metrics.write().record_bytes_sent(total_sent);
514                                    } else {
515                                        if self.config.simulate_response_delay {
516                                            let delay = transport.transmission_delay(raw_bytes.len());
517                                            tokio::time::sleep(delay + self.config.additional_response_delay).await;
518                                        }
519                                        if let Err(e) = transport.write(&raw_bytes).await {
520                                            error!("Failed to send raw bytes: {}", e);
521                                        } else {
522                                            self.transport_metrics.write().record_bytes_sent(raw_bytes.len());
523                                        }
524                                    }
525                                } else {
526                                    if self.config.simulate_response_delay {
527                                        let delay = transport.transmission_delay(raw_bytes.len());
528                                        tokio::time::sleep(delay + self.config.additional_response_delay).await;
529                                    }
530                                    if let Err(e) = transport.write(&raw_bytes).await {
531                                        error!("Failed to send raw bytes: {}", e);
532                                    } else {
533                                        self.transport_metrics.write().record_bytes_sent(raw_bytes.len());
534                                    }
535                                }
536                            }
537                            Some(FaultAction::SendPartial { bytes }) => {
538                                // Send partial frame bytes
539                                if self.config.simulate_response_delay {
540                                    let delay = transport.transmission_delay(bytes.len());
541                                    tokio::time::sleep(delay + self.config.additional_response_delay).await;
542                                }
543                                if let Err(e) = transport.write(&bytes).await {
544                                    error!("Failed to send partial frame: {}", e);
545                                } else {
546                                    self.transport_metrics.write().record_bytes_sent(bytes.len());
547                                }
548                            }
549                            Some(FaultAction::SendResponse(fault_pdu)) => {
550                                let fault_frame = RtuFrame::response(&frame, fault_pdu);
551                                let response_bytes = fault_frame.encode();
552                                if self.config.simulate_response_delay {
553                                    let delay = transport.transmission_delay(response_bytes.len());
554                                    tokio::time::sleep(delay + self.config.additional_response_delay).await;
555                                }
556                                if let Err(e) = transport.write(&response_bytes).await {
557                                    error!("Failed to send faulted response: {}", e);
558                                } else {
559                                    self.transport_metrics.write().record_bytes_sent(response_bytes.len());
560                                }
561                            }
562                            Some(FaultAction::OverrideTransactionId { .. }) => {
563                                // TID override is TCP-only, send normal response for RTU
564                                let response_bytes = response.encode();
565                                if self.config.simulate_response_delay {
566                                    let delay = transport.transmission_delay(response_bytes.len());
567                                    tokio::time::sleep(delay + self.config.additional_response_delay).await;
568                                }
569                                if let Err(e) = transport.write(&response_bytes).await {
570                                    error!("Failed to send response: {}", e);
571                                } else {
572                                    self.transport_metrics.write().record_bytes_sent(response_bytes.len());
573                                }
574                            }
575                            None => {
576                                // Normal response path (no fault)
577                                let response_bytes = response.encode();
578
579                                // Apply RTU timing faults if configured
580                                if let Some(ref timing_config) = self.rtu_timing_fault {
581                                    if timing_config.is_active() {
582                                        let plan = timing_config.build_timing_plan(&response_bytes);
583                                        let mut total_sent = 0usize;
584                                        for segment in &plan.segments {
585                                            if !segment.delay_before.is_zero() {
586                                                tokio::time::sleep(segment.delay_before).await;
587                                            }
588                                            if let Err(e) = transport.write(&segment.data).await {
589                                                error!("Failed to send timing segment: {}", e);
590                                                let _ = self.event_tx.send(RtuServerEvent::Error {
591                                                    message: e.to_string(),
592                                                });
593                                                break;
594                                            }
595                                            total_sent += segment.data.len();
596                                        }
597                                        self.transport_metrics.write().record_bytes_sent(total_sent);
598                                    } else {
599                                        // Timing config present but not active
600                                        if self.config.simulate_response_delay {
601                                            let delay = transport.transmission_delay(response_bytes.len());
602                                            tokio::time::sleep(delay + self.config.additional_response_delay).await;
603                                        }
604                                        if let Err(e) = transport.write(&response_bytes).await {
605                                            error!("Failed to send response: {}", e);
606                                            let _ = self.event_tx.send(RtuServerEvent::Error {
607                                                message: e.to_string(),
608                                            });
609                                        } else {
610                                            self.transport_metrics.write().record_bytes_sent(response_bytes.len());
611                                        }
612                                    }
613                                } else {
614                                    // No timing config
615                                    if self.config.simulate_response_delay {
616                                        let delay = transport.transmission_delay(response_bytes.len());
617                                        tokio::time::sleep(delay + self.config.additional_response_delay).await;
618                                    }
619                                    if let Err(e) = transport.write(&response_bytes).await {
620                                        error!("Failed to send response: {}", e);
621                                        let _ = self.event_tx.send(RtuServerEvent::Error {
622                                            message: e.to_string(),
623                                        });
624                                    } else {
625                                        self.transport_metrics
626                                            .write()
627                                            .record_bytes_sent(response_bytes.len());
628                                    }
629                                }
630                            }
631                        }
632                    }
633                }
634                Ok(Err(e)) => {
635                    error!("Transport read error: {}", e);
636                    let _ = self.event_tx.send(RtuServerEvent::Error {
637                        message: e.to_string(),
638                    });
639                }
640                Err(_) => {
641                    // Timeout - check for partial frame
642                    if !frame_buffer.is_empty() {
643                        // Incomplete frame, discard
644                        debug!("Discarding incomplete frame ({} bytes)", frame_buffer.len());
645                        frame_buffer.clear();
646                        self.stats.write().framing_errors += 1;
647                    }
648                }
649            }
650        }
651
652        // Shutdown
653        {
654            let mut state = self.state.write();
655            *state = RtuServerState::Stopping;
656        }
657
658        let _ = transport.close().await;
659
660        {
661            let mut state = self.state.write();
662            *state = RtuServerState::Stopped;
663        }
664
665        let _ = self.event_tx.send(RtuServerEvent::Stopped);
666        info!("RTU server stopped");
667
668        Ok(())
669    }
670
671    /// Try to parse a complete frame from the buffer.
672    fn try_parse_frame(&self, buffer: &mut Vec<u8>) -> ModbusResult<Option<RtuFrame>> {
673        if buffer.len() < 4 {
674            return Ok(None);
675        }
676
677        // Try to decode
678        match RtuFrame::try_decode(buffer) {
679            Ok(Some(frame)) => {
680                // Remove parsed bytes from buffer
681                let frame_size = frame.frame_size();
682                buffer.drain(..frame_size);
683
684                self.transport_metrics.write().record_frame_received();
685
686                Ok(Some(frame))
687            }
688            Ok(None) => {
689                // Need more data
690                Ok(None)
691            }
692            Err(RtuFrameError::CrcMismatch { .. }) => {
693                // CRC error - discard frame
694                buffer.clear();
695                self.stats.write().crc_errors += 1;
696                self.transport_metrics.write().record_crc_error();
697
698                let _ = self.event_tx.send(RtuServerEvent::FrameError {
699                    error: "CRC mismatch".into(),
700                });
701
702                Ok(None)
703            }
704            Err(e) => {
705                // Other error
706                buffer.clear();
707                self.stats.write().framing_errors += 1;
708                self.transport_metrics.write().record_framing_error();
709
710                let _ = self.event_tx.send(RtuServerEvent::FrameError {
711                    error: e.to_string(),
712                });
713
714                Ok(None)
715            }
716        }
717    }
718
719    /// Process a request and generate a response.
720    async fn process_request(&self, request: &RtuFrame) -> RtuFrame {
721        let start = Instant::now();
722        let unit_id = request.unit_id;
723        let function_code = request.function_code().unwrap_or(0);
724
725        // Emit request event
726        let _ = self.event_tx.send(RtuServerEvent::RequestReceived {
727            unit_id,
728            function_code,
729            timestamp: start,
730        });
731
732        // Check unit ID
733        if !self.should_respond_to_unit(unit_id) {
734            // Silent ignore for non-matching unit IDs
735            debug!("Ignoring request for unit {}", unit_id);
736            return RtuFrame::new(unit_id, vec![]);
737        }
738
739        // Get registers for this unit
740        let registers = if let Some(device) = self.devices.get(&unit_id) {
741            device.registers().clone()
742        } else if unit_id == 0 {
743            // Broadcast - use default
744            self.default_registers.clone()
745        } else {
746            // Unknown unit but in our filter - use default
747            self.default_registers.clone()
748        };
749
750        // Create handler context
751        let ctx = HandlerContext::new(unit_id, registers, 0);
752
753        // Process with timeout
754        let response_pdu = match tokio::time::timeout(
755            self.config.request_timeout,
756            async { self.handlers.dispatch(&request.pdu, &ctx) },
757        )
758        .await
759        {
760            Ok(Ok(pdu)) => pdu,
761            Ok(Err(exception_code)) => {
762                build_exception_pdu(function_code, exception_code)
763            }
764            Err(_) => {
765                self.stats.write().timeouts += 1;
766                build_exception_pdu(function_code, ExceptionCode::SlaveDeviceBusy)
767            }
768        };
769
770        // Create response frame
771        let response = RtuFrame::response(request, response_pdu);
772        let is_exception = response.is_exception();
773
774        // Update statistics
775        let latency_us = start.elapsed().as_micros() as u64;
776        self.request_count.fetch_add(1, Ordering::Relaxed);
777        self.latency_sum.fetch_add(latency_us, Ordering::Relaxed);
778
779        {
780            let mut stats = self.stats.write();
781            stats.requests_processed += 1;
782            if is_exception {
783                stats.requests_exception += 1;
784            } else {
785                stats.requests_success += 1;
786            }
787        }
788
789        // Emit response event
790        let _ = self.event_tx.send(RtuServerEvent::ResponseSent {
791            unit_id,
792            function_code,
793            is_exception,
794            latency_us,
795        });
796
797        response
798    }
799
800    /// Check if we should respond to a given unit ID.
801    fn should_respond_to_unit(&self, unit_id: u8) -> bool {
802        // Broadcast
803        if unit_id == 0 {
804            return self.config.broadcast_enabled;
805        }
806
807        // Empty filter = accept all
808        if self.config.unit_ids.is_empty() {
809            return true;
810        }
811
812        // Check filter
813        self.config.unit_ids.contains(&unit_id)
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use super::*;
820
821    #[test]
822    fn test_server_config_default() {
823        let config = RtuServerConfig::default();
824        assert_eq!(config.unit_ids, vec![1]);
825        assert!(config.broadcast_enabled);
826    }
827
828    #[test]
829    fn test_server_config_builder() {
830        let config = RtuServerConfig::new()
831            .with_unit_ids(vec![1, 2, 3])
832            .with_broadcast(false)
833            .with_request_timeout(Duration::from_secs(10));
834
835        assert_eq!(config.unit_ids, vec![1, 2, 3]);
836        assert!(!config.broadcast_enabled);
837        assert_eq!(config.request_timeout, Duration::from_secs(10));
838    }
839
840    #[test]
841    fn test_server_creation() {
842        let config = RtuServerConfig::for_testing();
843        let server = ModbusRtuServer::new(config);
844
845        assert_eq!(server.state(), RtuServerState::Stopped);
846        assert!(!server.is_shutdown());
847    }
848
849    #[test]
850    fn test_server_device_management() {
851        use crate::config::ModbusDeviceConfig;
852
853        let server = ModbusRtuServer::new(RtuServerConfig::for_testing());
854
855        // Add device
856        let device = ModbusDevice::new(ModbusDeviceConfig::new(5, "Test Device"));
857        server.add_device(device);
858
859        assert!(server.device(5).is_some());
860        assert!(server.device(10).is_none());
861
862        // Remove device
863        let removed = server.remove_device(5);
864        assert!(removed.is_some());
865        assert!(server.device(5).is_none());
866    }
867
868    #[test]
869    fn test_should_respond_to_unit() {
870        let config = RtuServerConfig::new()
871            .with_unit_ids(vec![1, 2, 3])
872            .with_broadcast(true);
873        let server = ModbusRtuServer::new(config);
874
875        // Matching units
876        assert!(server.should_respond_to_unit(1));
877        assert!(server.should_respond_to_unit(2));
878        assert!(server.should_respond_to_unit(3));
879
880        // Non-matching
881        assert!(!server.should_respond_to_unit(4));
882        assert!(!server.should_respond_to_unit(255));
883
884        // Broadcast
885        assert!(server.should_respond_to_unit(0));
886    }
887
888    #[test]
889    fn test_should_respond_broadcast_disabled() {
890        let config = RtuServerConfig::new()
891            .with_unit_ids(vec![1])
892            .with_broadcast(false);
893        let server = ModbusRtuServer::new(config);
894
895        assert!(server.should_respond_to_unit(1));
896        assert!(!server.should_respond_to_unit(0)); // Broadcast disabled
897    }
898
899    #[test]
900    fn test_should_respond_empty_filter() {
901        let config = RtuServerConfig::new().with_unit_ids(vec![]);
902        let server = ModbusRtuServer::new(config);
903
904        // Empty filter = accept all
905        assert!(server.should_respond_to_unit(1));
906        assert!(server.should_respond_to_unit(100));
907        assert!(server.should_respond_to_unit(255));
908    }
909
910    #[tokio::test]
911    async fn test_server_stats() {
912        let server = ModbusRtuServer::new(RtuServerConfig::for_testing());
913        let stats = server.stats();
914
915        assert_eq!(stats.requests_processed, 0);
916        assert_eq!(stats.crc_errors, 0);
917    }
918}