1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use dashmap::DashMap;
32use parking_lot::RwLock;
33use serde::{Deserialize, Serialize};
34use tokio::sync::broadcast;
35use tracing::{debug, error, info};
36
37use crate::device::ModbusDevice;
38use crate::error::{ModbusError, ModbusResult};
39use crate::fault_injection::rtu_timing::RtuTimingFaultConfig;
40use crate::fault_injection::{FaultAction, FaultPipeline, ModbusFaultContext};
41use crate::handler::{build_exception_pdu, ExceptionCode, HandlerContext, HandlerRegistry};
42use crate::register::RegisterStore;
43
44use super::codec::RtuTiming;
45use super::frame::{RtuFrame, RtuFrameError};
46use super::transport::{ChannelConfig, ChannelTransport, RtuTransport, TransportConfig, TransportMetrics};
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct RtuServerConfig {
51 #[serde(default)]
53 pub transport: TransportConfig,
54
55 #[serde(default)]
57 pub unit_ids: Vec<u8>,
58
59 #[serde(default = "default_broadcast")]
61 pub broadcast_enabled: bool,
62
63 #[serde(default = "default_request_timeout")]
65 pub request_timeout: Duration,
66
67 #[serde(default = "default_shutdown_timeout")]
69 pub shutdown_timeout: Duration,
70
71 #[serde(default)]
73 pub simulate_response_delay: bool,
74
75 #[serde(default)]
77 pub additional_response_delay: Duration,
78}
79
80fn default_broadcast() -> bool {
81 true
82}
83
84fn default_request_timeout() -> Duration {
85 Duration::from_secs(5)
86}
87
88fn default_shutdown_timeout() -> Duration {
89 Duration::from_secs(10)
90}
91
92impl Default for RtuServerConfig {
93 fn default() -> Self {
94 Self {
95 transport: TransportConfig::default(),
96 unit_ids: vec![1], broadcast_enabled: true,
98 request_timeout: default_request_timeout(),
99 shutdown_timeout: default_shutdown_timeout(),
100 simulate_response_delay: true,
101 additional_response_delay: Duration::ZERO,
102 }
103 }
104}
105
106impl RtuServerConfig {
107 pub fn new() -> Self {
109 Self::default()
110 }
111
112 pub fn with_transport(mut self, transport: TransportConfig) -> Self {
114 self.transport = transport;
115 self
116 }
117
118 pub fn with_unit_ids(mut self, ids: Vec<u8>) -> Self {
120 self.unit_ids = ids;
121 self
122 }
123
124 pub fn with_broadcast(mut self, enabled: bool) -> Self {
126 self.broadcast_enabled = enabled;
127 self
128 }
129
130 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
132 self.request_timeout = timeout;
133 self
134 }
135
136 pub fn with_response_delay_simulation(mut self, enabled: bool) -> Self {
138 self.simulate_response_delay = enabled;
139 self
140 }
141
142 pub fn for_testing() -> Self {
144 Self {
145 transport: TransportConfig::Channel(ChannelConfig::default()),
146 unit_ids: vec![1],
147 broadcast_enabled: true,
148 request_timeout: Duration::from_secs(1),
149 shutdown_timeout: Duration::from_secs(1),
150 simulate_response_delay: false,
151 additional_response_delay: Duration::ZERO,
152 }
153 }
154}
155
156#[derive(Debug, Clone)]
158pub enum RtuServerEvent {
159 Started,
161
162 Stopped,
164
165 RequestReceived {
167 unit_id: u8,
168 function_code: u8,
169 timestamp: Instant,
170 },
171
172 ResponseSent {
174 unit_id: u8,
175 function_code: u8,
176 is_exception: bool,
177 latency_us: u64,
178 },
179
180 Error { message: String },
182
183 FrameError { error: String },
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum RtuServerState {
190 Stopped,
192 Starting,
194 Running,
196 Stopping,
198}
199
200#[derive(Debug, Clone, Default)]
202pub struct RtuServerStats {
203 pub requests_processed: u64,
205
206 pub requests_success: u64,
208
209 pub requests_exception: u64,
211
212 pub crc_errors: u64,
214
215 pub framing_errors: u64,
217
218 pub timeouts: u64,
220
221 pub bytes_received: u64,
223
224 pub bytes_sent: u64,
226
227 pub avg_latency_us: f64,
229}
230
231pub struct ModbusRtuServer {
236 config: RtuServerConfig,
238
239 handlers: Arc<HandlerRegistry>,
241
242 devices: DashMap<u8, Arc<ModbusDevice>>,
244
245 default_registers: Arc<RegisterStore>,
247
248 state: RwLock<RtuServerState>,
250
251 shutdown: Arc<AtomicBool>,
253
254 event_tx: broadcast::Sender<RtuServerEvent>,
256
257 stats: RwLock<RtuServerStats>,
259
260 transport_metrics: RwLock<TransportMetrics>,
262
263 request_count: AtomicU64,
265 latency_sum: AtomicU64,
266
267 fault_pipeline: Option<Arc<FaultPipeline>>,
269
270 rtu_timing_fault: Option<Arc<RtuTimingFaultConfig>>,
272}
273
274impl ModbusRtuServer {
275 pub fn new(config: RtuServerConfig) -> Self {
277 let (event_tx, _) = broadcast::channel(256);
278
279 Self {
280 config,
281 handlers: Arc::new(HandlerRegistry::with_defaults()),
282 devices: DashMap::new(),
283 default_registers: Arc::new(RegisterStore::with_defaults()),
284 state: RwLock::new(RtuServerState::Stopped),
285 shutdown: Arc::new(AtomicBool::new(false)),
286 event_tx,
287 stats: RwLock::new(RtuServerStats::default()),
288 transport_metrics: RwLock::new(TransportMetrics::new()),
289 request_count: AtomicU64::new(0),
290 latency_sum: AtomicU64::new(0),
291 fault_pipeline: None,
292 rtu_timing_fault: None,
293 }
294 }
295
296 pub fn with_fault_pipeline(mut self, pipeline: FaultPipeline) -> Self {
298 self.fault_pipeline = Some(Arc::new(pipeline));
299 self
300 }
301
302 pub fn with_rtu_timing_fault(mut self, config: RtuTimingFaultConfig) -> Self {
304 self.rtu_timing_fault = Some(Arc::new(config));
305 self
306 }
307
308 pub fn with_handlers(mut self, handlers: HandlerRegistry) -> Self {
310 self.handlers = Arc::new(handlers);
311 self
312 }
313
314 pub fn with_default_registers(mut self, registers: RegisterStore) -> Self {
316 self.default_registers = Arc::new(registers);
317 self
318 }
319
320 pub fn add_device(&self, device: ModbusDevice) {
322 let unit_id = device.unit_id();
323 self.devices.insert(unit_id, Arc::new(device));
324 }
325
326 pub fn remove_device(&self, unit_id: u8) -> Option<Arc<ModbusDevice>> {
328 self.devices.remove(&unit_id).map(|(_, d)| d)
329 }
330
331 pub fn device(&self, unit_id: u8) -> Option<Arc<ModbusDevice>> {
333 self.devices.get(&unit_id).map(|d| d.clone())
334 }
335
336 pub fn default_registers(&self) -> &Arc<RegisterStore> {
338 &self.default_registers
339 }
340
341 pub fn subscribe(&self) -> broadcast::Receiver<RtuServerEvent> {
343 self.event_tx.subscribe()
344 }
345
346 pub fn state(&self) -> RtuServerState {
348 *self.state.read()
349 }
350
351 pub fn is_shutdown(&self) -> bool {
353 self.shutdown.load(Ordering::SeqCst)
354 }
355
356 pub fn shutdown(&self) {
358 if !self.shutdown.swap(true, Ordering::SeqCst) {
359 info!("RTU server shutdown requested");
360 }
361 }
362
363 pub fn stats(&self) -> RtuServerStats {
365 let mut stats = self.stats.read().clone();
366
367 let count = self.request_count.load(Ordering::Relaxed);
369 if count > 0 {
370 let sum = self.latency_sum.load(Ordering::Relaxed);
371 stats.avg_latency_us = sum as f64 / count as f64;
372 }
373
374 stats
375 }
376
377 pub fn transport_metrics(&self) -> TransportMetrics {
379 self.transport_metrics.read().clone()
380 }
381
382 pub async fn run(&self) -> ModbusResult<()> {
384 match &self.config.transport {
387 TransportConfig::Channel(config) => {
388 let (transport, _peer) = ChannelTransport::pair(config.clone());
389 self.run_with_transport(transport).await
390 }
391 _ => {
392 Err(ModbusError::Internal(
393 "Only channel transport is currently supported".into(),
394 ))
395 }
396 }
397 }
398
399 pub async fn run_with_transport<T: RtuTransport + 'static>(
401 &self,
402 mut transport: T,
403 ) -> ModbusResult<()> {
404 {
406 let mut state = self.state.write();
407 if *state != RtuServerState::Stopped {
408 return Err(ModbusError::Internal("Server already running".into()));
409 }
410 *state = RtuServerState::Starting;
411 }
412
413 self.shutdown.store(false, Ordering::SeqCst);
414 let _ = self.event_tx.send(RtuServerEvent::Started);
415
416 {
417 let mut state = self.state.write();
418 *state = RtuServerState::Running;
419 }
420
421 info!("RTU server started");
422
423 let mut read_buffer = vec![0u8; 256];
425 let mut frame_buffer = Vec::with_capacity(256);
426 let mut rtu_request_number: u64 = 0;
427 let serial_config = transport.serial_config().clone();
428 let timing = RtuTiming::from_baud_rate(serial_config.baud_rate);
429
430 loop {
431 if self.shutdown.load(Ordering::SeqCst) {
433 break;
434 }
435
436 let read_result = tokio::time::timeout(
438 timing.inter_frame_timeout * 2,
439 transport.read(&mut read_buffer),
440 )
441 .await;
442
443 match read_result {
444 Ok(Ok(0)) => {
445 tokio::task::yield_now().await;
447 continue;
448 }
449 Ok(Ok(n)) => {
450 frame_buffer.extend_from_slice(&read_buffer[..n]);
452 self.transport_metrics.write().record_bytes_received(n);
453
454 if let Some(frame) = self.try_parse_frame(&mut frame_buffer)? {
456 let response = self.process_request(&frame).await;
458 rtu_request_number += 1;
459
460 let fault_action = if let Some(ref pipeline) = self.fault_pipeline {
462 let unit_id = frame.unit_id;
463 let function_code = frame.function_code().unwrap_or(0);
464 let fault_ctx = ModbusFaultContext::rtu(
465 unit_id,
466 function_code,
467 &frame.pdu,
468 &response.pdu,
469 rtu_request_number,
470 );
471 pipeline.apply(&fault_ctx)
472 } else {
473 None
474 };
475
476 match fault_action {
477 Some(FaultAction::DropResponse) => {
478 debug!("Fault: dropping RTU response");
480 }
481 Some(FaultAction::DelayThenSend { delay, response: fault_pdu }) => {
482 tokio::time::sleep(delay).await;
483 let fault_frame = RtuFrame::response(&frame, fault_pdu);
485 let response_bytes = fault_frame.encode();
486 if self.config.simulate_response_delay {
487 let tx_delay = transport.transmission_delay(response_bytes.len());
488 tokio::time::sleep(tx_delay + self.config.additional_response_delay).await;
489 }
490 if let Err(e) = transport.write(&response_bytes).await {
491 error!("Failed to send delayed response: {}", e);
492 } else {
493 self.transport_metrics.write().record_bytes_sent(response_bytes.len());
494 }
495 }
496 Some(FaultAction::SendRawBytes(raw_bytes)) => {
497 if let Some(ref timing_config) = self.rtu_timing_fault {
500 if timing_config.is_active() {
501 let plan = timing_config.build_timing_plan(&raw_bytes);
502 let mut total_sent = 0usize;
503 for segment in &plan.segments {
504 if !segment.delay_before.is_zero() {
505 tokio::time::sleep(segment.delay_before).await;
506 }
507 if let Err(e) = transport.write(&segment.data).await {
508 error!("Failed to send timing segment (raw): {}", e);
509 break;
510 }
511 total_sent += segment.data.len();
512 }
513 self.transport_metrics.write().record_bytes_sent(total_sent);
514 } else {
515 if self.config.simulate_response_delay {
516 let delay = transport.transmission_delay(raw_bytes.len());
517 tokio::time::sleep(delay + self.config.additional_response_delay).await;
518 }
519 if let Err(e) = transport.write(&raw_bytes).await {
520 error!("Failed to send raw bytes: {}", e);
521 } else {
522 self.transport_metrics.write().record_bytes_sent(raw_bytes.len());
523 }
524 }
525 } else {
526 if self.config.simulate_response_delay {
527 let delay = transport.transmission_delay(raw_bytes.len());
528 tokio::time::sleep(delay + self.config.additional_response_delay).await;
529 }
530 if let Err(e) = transport.write(&raw_bytes).await {
531 error!("Failed to send raw bytes: {}", e);
532 } else {
533 self.transport_metrics.write().record_bytes_sent(raw_bytes.len());
534 }
535 }
536 }
537 Some(FaultAction::SendPartial { bytes }) => {
538 if self.config.simulate_response_delay {
540 let delay = transport.transmission_delay(bytes.len());
541 tokio::time::sleep(delay + self.config.additional_response_delay).await;
542 }
543 if let Err(e) = transport.write(&bytes).await {
544 error!("Failed to send partial frame: {}", e);
545 } else {
546 self.transport_metrics.write().record_bytes_sent(bytes.len());
547 }
548 }
549 Some(FaultAction::SendResponse(fault_pdu)) => {
550 let fault_frame = RtuFrame::response(&frame, fault_pdu);
551 let response_bytes = fault_frame.encode();
552 if self.config.simulate_response_delay {
553 let delay = transport.transmission_delay(response_bytes.len());
554 tokio::time::sleep(delay + self.config.additional_response_delay).await;
555 }
556 if let Err(e) = transport.write(&response_bytes).await {
557 error!("Failed to send faulted response: {}", e);
558 } else {
559 self.transport_metrics.write().record_bytes_sent(response_bytes.len());
560 }
561 }
562 Some(FaultAction::OverrideTransactionId { .. }) => {
563 let response_bytes = response.encode();
565 if self.config.simulate_response_delay {
566 let delay = transport.transmission_delay(response_bytes.len());
567 tokio::time::sleep(delay + self.config.additional_response_delay).await;
568 }
569 if let Err(e) = transport.write(&response_bytes).await {
570 error!("Failed to send response: {}", e);
571 } else {
572 self.transport_metrics.write().record_bytes_sent(response_bytes.len());
573 }
574 }
575 None => {
576 let response_bytes = response.encode();
578
579 if let Some(ref timing_config) = self.rtu_timing_fault {
581 if timing_config.is_active() {
582 let plan = timing_config.build_timing_plan(&response_bytes);
583 let mut total_sent = 0usize;
584 for segment in &plan.segments {
585 if !segment.delay_before.is_zero() {
586 tokio::time::sleep(segment.delay_before).await;
587 }
588 if let Err(e) = transport.write(&segment.data).await {
589 error!("Failed to send timing segment: {}", e);
590 let _ = self.event_tx.send(RtuServerEvent::Error {
591 message: e.to_string(),
592 });
593 break;
594 }
595 total_sent += segment.data.len();
596 }
597 self.transport_metrics.write().record_bytes_sent(total_sent);
598 } else {
599 if self.config.simulate_response_delay {
601 let delay = transport.transmission_delay(response_bytes.len());
602 tokio::time::sleep(delay + self.config.additional_response_delay).await;
603 }
604 if let Err(e) = transport.write(&response_bytes).await {
605 error!("Failed to send response: {}", e);
606 let _ = self.event_tx.send(RtuServerEvent::Error {
607 message: e.to_string(),
608 });
609 } else {
610 self.transport_metrics.write().record_bytes_sent(response_bytes.len());
611 }
612 }
613 } else {
614 if self.config.simulate_response_delay {
616 let delay = transport.transmission_delay(response_bytes.len());
617 tokio::time::sleep(delay + self.config.additional_response_delay).await;
618 }
619 if let Err(e) = transport.write(&response_bytes).await {
620 error!("Failed to send response: {}", e);
621 let _ = self.event_tx.send(RtuServerEvent::Error {
622 message: e.to_string(),
623 });
624 } else {
625 self.transport_metrics
626 .write()
627 .record_bytes_sent(response_bytes.len());
628 }
629 }
630 }
631 }
632 }
633 }
634 Ok(Err(e)) => {
635 error!("Transport read error: {}", e);
636 let _ = self.event_tx.send(RtuServerEvent::Error {
637 message: e.to_string(),
638 });
639 }
640 Err(_) => {
641 if !frame_buffer.is_empty() {
643 debug!("Discarding incomplete frame ({} bytes)", frame_buffer.len());
645 frame_buffer.clear();
646 self.stats.write().framing_errors += 1;
647 }
648 }
649 }
650 }
651
652 {
654 let mut state = self.state.write();
655 *state = RtuServerState::Stopping;
656 }
657
658 let _ = transport.close().await;
659
660 {
661 let mut state = self.state.write();
662 *state = RtuServerState::Stopped;
663 }
664
665 let _ = self.event_tx.send(RtuServerEvent::Stopped);
666 info!("RTU server stopped");
667
668 Ok(())
669 }
670
671 fn try_parse_frame(&self, buffer: &mut Vec<u8>) -> ModbusResult<Option<RtuFrame>> {
673 if buffer.len() < 4 {
674 return Ok(None);
675 }
676
677 match RtuFrame::try_decode(buffer) {
679 Ok(Some(frame)) => {
680 let frame_size = frame.frame_size();
682 buffer.drain(..frame_size);
683
684 self.transport_metrics.write().record_frame_received();
685
686 Ok(Some(frame))
687 }
688 Ok(None) => {
689 Ok(None)
691 }
692 Err(RtuFrameError::CrcMismatch { .. }) => {
693 buffer.clear();
695 self.stats.write().crc_errors += 1;
696 self.transport_metrics.write().record_crc_error();
697
698 let _ = self.event_tx.send(RtuServerEvent::FrameError {
699 error: "CRC mismatch".into(),
700 });
701
702 Ok(None)
703 }
704 Err(e) => {
705 buffer.clear();
707 self.stats.write().framing_errors += 1;
708 self.transport_metrics.write().record_framing_error();
709
710 let _ = self.event_tx.send(RtuServerEvent::FrameError {
711 error: e.to_string(),
712 });
713
714 Ok(None)
715 }
716 }
717 }
718
719 async fn process_request(&self, request: &RtuFrame) -> RtuFrame {
721 let start = Instant::now();
722 let unit_id = request.unit_id;
723 let function_code = request.function_code().unwrap_or(0);
724
725 let _ = self.event_tx.send(RtuServerEvent::RequestReceived {
727 unit_id,
728 function_code,
729 timestamp: start,
730 });
731
732 if !self.should_respond_to_unit(unit_id) {
734 debug!("Ignoring request for unit {}", unit_id);
736 return RtuFrame::new(unit_id, vec![]);
737 }
738
739 let registers = if let Some(device) = self.devices.get(&unit_id) {
741 device.registers().clone()
742 } else if unit_id == 0 {
743 self.default_registers.clone()
745 } else {
746 self.default_registers.clone()
748 };
749
750 let ctx = HandlerContext::new(unit_id, registers, 0);
752
753 let response_pdu = match tokio::time::timeout(
755 self.config.request_timeout,
756 async { self.handlers.dispatch(&request.pdu, &ctx) },
757 )
758 .await
759 {
760 Ok(Ok(pdu)) => pdu,
761 Ok(Err(exception_code)) => {
762 build_exception_pdu(function_code, exception_code)
763 }
764 Err(_) => {
765 self.stats.write().timeouts += 1;
766 build_exception_pdu(function_code, ExceptionCode::SlaveDeviceBusy)
767 }
768 };
769
770 let response = RtuFrame::response(request, response_pdu);
772 let is_exception = response.is_exception();
773
774 let latency_us = start.elapsed().as_micros() as u64;
776 self.request_count.fetch_add(1, Ordering::Relaxed);
777 self.latency_sum.fetch_add(latency_us, Ordering::Relaxed);
778
779 {
780 let mut stats = self.stats.write();
781 stats.requests_processed += 1;
782 if is_exception {
783 stats.requests_exception += 1;
784 } else {
785 stats.requests_success += 1;
786 }
787 }
788
789 let _ = self.event_tx.send(RtuServerEvent::ResponseSent {
791 unit_id,
792 function_code,
793 is_exception,
794 latency_us,
795 });
796
797 response
798 }
799
800 fn should_respond_to_unit(&self, unit_id: u8) -> bool {
802 if unit_id == 0 {
804 return self.config.broadcast_enabled;
805 }
806
807 if self.config.unit_ids.is_empty() {
809 return true;
810 }
811
812 self.config.unit_ids.contains(&unit_id)
814 }
815}
816
817#[cfg(test)]
818mod tests {
819 use super::*;
820
821 #[test]
822 fn test_server_config_default() {
823 let config = RtuServerConfig::default();
824 assert_eq!(config.unit_ids, vec![1]);
825 assert!(config.broadcast_enabled);
826 }
827
828 #[test]
829 fn test_server_config_builder() {
830 let config = RtuServerConfig::new()
831 .with_unit_ids(vec![1, 2, 3])
832 .with_broadcast(false)
833 .with_request_timeout(Duration::from_secs(10));
834
835 assert_eq!(config.unit_ids, vec![1, 2, 3]);
836 assert!(!config.broadcast_enabled);
837 assert_eq!(config.request_timeout, Duration::from_secs(10));
838 }
839
840 #[test]
841 fn test_server_creation() {
842 let config = RtuServerConfig::for_testing();
843 let server = ModbusRtuServer::new(config);
844
845 assert_eq!(server.state(), RtuServerState::Stopped);
846 assert!(!server.is_shutdown());
847 }
848
849 #[test]
850 fn test_server_device_management() {
851 use crate::config::ModbusDeviceConfig;
852
853 let server = ModbusRtuServer::new(RtuServerConfig::for_testing());
854
855 let device = ModbusDevice::new(ModbusDeviceConfig::new(5, "Test Device"));
857 server.add_device(device);
858
859 assert!(server.device(5).is_some());
860 assert!(server.device(10).is_none());
861
862 let removed = server.remove_device(5);
864 assert!(removed.is_some());
865 assert!(server.device(5).is_none());
866 }
867
868 #[test]
869 fn test_should_respond_to_unit() {
870 let config = RtuServerConfig::new()
871 .with_unit_ids(vec![1, 2, 3])
872 .with_broadcast(true);
873 let server = ModbusRtuServer::new(config);
874
875 assert!(server.should_respond_to_unit(1));
877 assert!(server.should_respond_to_unit(2));
878 assert!(server.should_respond_to_unit(3));
879
880 assert!(!server.should_respond_to_unit(4));
882 assert!(!server.should_respond_to_unit(255));
883
884 assert!(server.should_respond_to_unit(0));
886 }
887
888 #[test]
889 fn test_should_respond_broadcast_disabled() {
890 let config = RtuServerConfig::new()
891 .with_unit_ids(vec![1])
892 .with_broadcast(false);
893 let server = ModbusRtuServer::new(config);
894
895 assert!(server.should_respond_to_unit(1));
896 assert!(!server.should_respond_to_unit(0)); }
898
899 #[test]
900 fn test_should_respond_empty_filter() {
901 let config = RtuServerConfig::new().with_unit_ids(vec![]);
902 let server = ModbusRtuServer::new(config);
903
904 assert!(server.should_respond_to_unit(1));
906 assert!(server.should_respond_to_unit(100));
907 assert!(server.should_respond_to_unit(255));
908 }
909
910 #[tokio::test]
911 async fn test_server_stats() {
912 let server = ModbusRtuServer::new(RtuServerConfig::for_testing());
913 let stats = server.stats();
914
915 assert_eq!(stats.requests_processed, 0);
916 assert_eq!(stats.crc_errors, 0);
917 }
918}