1use async_trait::async_trait;
6use std::net::SocketAddr;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::net::{TcpListener, TcpStream};
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::sync::{broadcast, Mutex};
12use tokio::time::timeout;
13use log::{info, error, debug, warn};
14use tokio_serial;
15
16use crate::error::{ModbusError, ModbusResult};
17use crate::protocol::{ModbusRequest, ModbusResponse, ModbusFunction};
18use crate::register_bank::{ModbusRegisterBank, RegisterBankStats};
19
20const MAX_TCP_FRAME_SIZE: usize = 260;
22
23const MBAP_HEADER_SIZE: usize = 6;
25
26#[async_trait]
28pub trait ModbusServer: Send + Sync {
29 async fn start(&mut self) -> ModbusResult<()>;
31
32 async fn stop(&mut self) -> ModbusResult<()>;
34
35 fn is_running(&self) -> bool;
37
38 fn get_stats(&self) -> ServerStats;
40
41 fn get_register_bank(&self) -> Option<Arc<ModbusRegisterBank>>;
43}
44
45#[derive(Debug, Clone, Default)]
47pub struct ServerStats {
48 pub connections_count: u64,
49 pub total_requests: u64,
50 pub successful_requests: u64,
51 pub failed_requests: u64,
52 pub bytes_received: u64,
53 pub bytes_sent: u64,
54 pub uptime_seconds: u64,
55 pub register_bank_stats: Option<RegisterBankStats>,
56}
57
58
59#[derive(Debug, Clone)]
61pub struct ModbusTcpServerConfig {
62 pub bind_address: SocketAddr,
63 pub max_connections: usize,
64 pub request_timeout: Duration,
65 pub register_bank: Option<Arc<ModbusRegisterBank>>,
66}
67
68impl Default for ModbusTcpServerConfig {
69 fn default() -> Self {
70 Self {
71 bind_address: "127.0.0.1:502".parse().unwrap(),
72 max_connections: 100,
73 request_timeout: Duration::from_secs(30),
74 register_bank: None,
75 }
76 }
77}
78
79pub struct ModbusTcpServer {
81 config: ModbusTcpServerConfig,
82 register_bank: Arc<ModbusRegisterBank>,
83 stats: Arc<Mutex<ServerStats>>,
84 shutdown_tx: Option<broadcast::Sender<()>>,
85 is_running: Arc<Mutex<bool>>,
86 start_time: Option<std::time::Instant>,
87}
88
89impl ModbusTcpServer {
90 pub fn new(bind_address: &str) -> ModbusResult<Self> {
92 let addr = bind_address.parse()
93 .map_err(|e| ModbusError::invalid_data(format!("Invalid bind address: {}", e)))?;
94
95 let config = ModbusTcpServerConfig {
96 bind_address: addr,
97 ..Default::default()
98 };
99
100 Self::with_config(config)
101 }
102
103 pub fn with_config(config: ModbusTcpServerConfig) -> ModbusResult<Self> {
105 let register_bank = config.register_bank.clone()
106 .unwrap_or_else(|| Arc::new(ModbusRegisterBank::new()));
107
108 Ok(Self {
109 config,
110 register_bank,
111 stats: Arc::new(Mutex::new(ServerStats::default())),
112 shutdown_tx: None,
113 is_running: Arc::new(Mutex::new(false)),
114 start_time: None,
115 })
116 }
117
118 pub fn set_register_bank(&mut self, register_bank: Arc<ModbusRegisterBank>) {
120 self.register_bank = register_bank;
121 }
122
123 async fn handle_client(
125 stream: TcpStream,
126 register_bank: Arc<ModbusRegisterBank>,
127 stats: Arc<Mutex<ServerStats>>,
128 mut shutdown_rx: broadcast::Receiver<()>,
129 request_timeout: Duration,
130 ) {
131 let peer_addr = stream.peer_addr().unwrap_or_else(|_| "unknown".parse().unwrap());
132 info!("đĄ New client connected: {}", peer_addr);
133
134 {
136 let mut stats = stats.lock().await;
137 stats.connections_count += 1;
138 }
139
140 let mut stream = stream;
141 let mut buffer = vec![0u8; MAX_TCP_FRAME_SIZE];
142
143 loop {
144 tokio::select! {
145 _ = shutdown_rx.recv() => {
147 debug!("Shutdown signal received for client {}", peer_addr);
148 break;
149 }
150
151 result = timeout(request_timeout, stream.read(&mut buffer)) => {
153 match result {
154 Ok(Ok(0)) => {
155 debug!("Client {} disconnected", peer_addr);
156 break;
157 }
158 Ok(Ok(bytes_read)) => {
159 {
161 let mut stats = stats.lock().await;
162 stats.total_requests += 1;
163 stats.bytes_received += bytes_read as u64;
164 }
165
166 match Self::process_request(&buffer[..bytes_read], ®ister_bank).await {
168 Ok(response_data) => {
169 if let Err(e) = stream.write_all(&response_data).await {
170 error!("Failed to send response to {}: {}", peer_addr, e);
171 break;
172 } else {
173 let mut stats = stats.lock().await;
175 stats.successful_requests += 1;
176 stats.bytes_sent += response_data.len() as u64;
177 }
178 }
179 Err(e) => {
180 error!("Error processing request from {}: {}", peer_addr, e);
181
182 if let Ok(error_response) = Self::create_error_response(&buffer[..bytes_read], 0x01) {
184 let _ = stream.write_all(&error_response).await;
185 }
186
187 let mut stats = stats.lock().await;
189 stats.failed_requests += 1;
190 }
191 }
192 }
193 Ok(Err(e)) => {
194 error!("Read error from {}: {}", peer_addr, e);
195 break;
196 }
197 Err(_) => {
198 warn!("Read timeout from {}", peer_addr);
199 break;
200 }
201 }
202 }
203 }
204 }
205
206 info!("đ Client {} disconnected", peer_addr);
207 }
208
209 async fn process_request(
211 frame: &[u8],
212 register_bank: &Arc<ModbusRegisterBank>,
213 ) -> ModbusResult<Vec<u8>> {
214 if frame.len() < MBAP_HEADER_SIZE + 2 {
215 return Err(ModbusError::frame("Frame too short"));
216 }
217
218 let transaction_id = u16::from_be_bytes([frame[0], frame[1]]);
220 let protocol_id = u16::from_be_bytes([frame[2], frame[3]]);
221 let length = u16::from_be_bytes([frame[4], frame[5]]);
222 let unit_id = frame[6];
223 let function_code = frame[7];
224
225 if protocol_id != 0 {
226 return Err(ModbusError::frame("Invalid protocol ID"));
227 }
228
229 if frame.len() < MBAP_HEADER_SIZE + length as usize {
230 return Err(ModbusError::frame("Incomplete frame"));
231 }
232
233 debug!("Processing request: TID={}, Function=0x{:02x}, Unit={}",
234 transaction_id, function_code, unit_id);
235
236 let data = &frame[MBAP_HEADER_SIZE + 2..MBAP_HEADER_SIZE + length as usize];
238
239 let response_data = match function_code {
241 0x01 => Self::handle_read_coils(data, register_bank).await?,
242 0x02 => Self::handle_read_discrete_inputs(data, register_bank).await?,
243 0x03 => Self::handle_read_holding_registers(data, register_bank).await?,
244 0x04 => Self::handle_read_input_registers(data, register_bank).await?,
245 0x05 => Self::handle_write_single_coil(data, register_bank).await?,
246 0x06 => Self::handle_write_single_register(data, register_bank).await?,
247 0x0F => Self::handle_write_multiple_coils(data, register_bank).await?,
248 0x10 => Self::handle_write_multiple_registers(data, register_bank).await?,
249 _ => {
250 return Err(ModbusError::protocol(format!("Unsupported function code: 0x{:02x}", function_code)));
251 }
252 };
253
254 let response_length = response_data.len() + 2; let mut response = Vec::with_capacity(MBAP_HEADER_SIZE + response_length);
257
258 response.extend_from_slice(&transaction_id.to_be_bytes());
260 response.extend_from_slice(&protocol_id.to_be_bytes());
261 response.extend_from_slice(&(response_length as u16).to_be_bytes());
262
263 response.push(unit_id);
265 response.push(function_code);
266 response.extend_from_slice(&response_data);
267
268 Ok(response)
269 }
270
271 async fn handle_read_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
273 if data.len() < 4 {
274 return Err(ModbusError::frame("Invalid read coils request"));
275 }
276
277 let address = u16::from_be_bytes([data[0], data[1]]);
278 let quantity = u16::from_be_bytes([data[2], data[3]]);
279
280 if quantity == 0 || quantity > 2000 {
281 return Err(ModbusError::invalid_data("Invalid quantity"));
282 }
283
284 let coils = register_bank.read_coils(address, quantity)?;
285
286 let byte_count = (quantity + 7) / 8;
288 let mut response = vec![byte_count as u8];
289
290 for i in 0..byte_count {
291 let mut byte_value = 0u8;
292 for bit in 0..8 {
293 let coil_index = (i * 8 + bit) as usize;
294 if coil_index < coils.len() && coils[coil_index] {
295 byte_value |= 1 << bit;
296 }
297 }
298 response.push(byte_value);
299 }
300
301 Ok(response)
302 }
303
304 async fn handle_read_discrete_inputs(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
306 if data.len() < 4 {
307 return Err(ModbusError::frame("Invalid read discrete inputs request"));
308 }
309
310 let address = u16::from_be_bytes([data[0], data[1]]);
311 let quantity = u16::from_be_bytes([data[2], data[3]]);
312
313 if quantity == 0 || quantity > 2000 {
314 return Err(ModbusError::invalid_data("Invalid quantity"));
315 }
316
317 let inputs = register_bank.read_discrete_inputs(address, quantity)?;
318
319 let byte_count = (quantity + 7) / 8;
321 let mut response = vec![byte_count as u8];
322
323 for i in 0..byte_count {
324 let mut byte_value = 0u8;
325 for bit in 0..8 {
326 let input_index = (i * 8 + bit) as usize;
327 if input_index < inputs.len() && inputs[input_index] {
328 byte_value |= 1 << bit;
329 }
330 }
331 response.push(byte_value);
332 }
333
334 Ok(response)
335 }
336
337 async fn handle_read_holding_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
339 if data.len() < 4 {
340 return Err(ModbusError::frame("Invalid read holding registers request"));
341 }
342
343 let address = u16::from_be_bytes([data[0], data[1]]);
344 let quantity = u16::from_be_bytes([data[2], data[3]]);
345
346 if quantity == 0 || quantity > 125 {
347 return Err(ModbusError::invalid_data("Invalid quantity"));
348 }
349
350 let registers = register_bank.read_holding_registers(address, quantity)?;
351
352 let byte_count = quantity * 2;
353 let mut response = vec![byte_count as u8];
354
355 for register in registers {
356 response.extend_from_slice(®ister.to_be_bytes());
357 }
358
359 Ok(response)
360 }
361
362 async fn handle_read_input_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
364 if data.len() < 4 {
365 return Err(ModbusError::frame("Invalid read input registers request"));
366 }
367
368 let address = u16::from_be_bytes([data[0], data[1]]);
369 let quantity = u16::from_be_bytes([data[2], data[3]]);
370
371 if quantity == 0 || quantity > 125 {
372 return Err(ModbusError::invalid_data("Invalid quantity"));
373 }
374
375 let registers = register_bank.read_input_registers(address, quantity)?;
376
377 let byte_count = quantity * 2;
378 let mut response = vec![byte_count as u8];
379
380 for register in registers {
381 response.extend_from_slice(®ister.to_be_bytes());
382 }
383
384 Ok(response)
385 }
386
387 async fn handle_write_single_coil(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
389 if data.len() < 4 {
390 return Err(ModbusError::frame("Invalid write single coil request"));
391 }
392
393 let address = u16::from_be_bytes([data[0], data[1]]);
394 let value = u16::from_be_bytes([data[2], data[3]]);
395
396 let coil_value = match value {
397 0x0000 => false,
398 0xFF00 => true,
399 _ => return Err(ModbusError::invalid_data("Invalid coil value")),
400 };
401
402 register_bank.write_single_coil(address, coil_value)?;
403
404 Ok(data.to_vec())
406 }
407
408 async fn handle_write_single_register(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
410 if data.len() < 4 {
411 return Err(ModbusError::frame("Invalid write single register request"));
412 }
413
414 let address = u16::from_be_bytes([data[0], data[1]]);
415 let value = u16::from_be_bytes([data[2], data[3]]);
416
417 register_bank.write_single_register(address, value)?;
418
419 Ok(data.to_vec())
421 }
422
423 async fn handle_write_multiple_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
425 if data.len() < 5 {
426 return Err(ModbusError::frame("Invalid write multiple coils request"));
427 }
428
429 let address = u16::from_be_bytes([data[0], data[1]]);
430 let quantity = u16::from_be_bytes([data[2], data[3]]);
431 let byte_count = data[4] as usize;
432
433 if data.len() < 5 + byte_count {
434 return Err(ModbusError::frame("Incomplete write multiple coils request"));
435 }
436
437 let mut coils = Vec::with_capacity(quantity as usize);
439 for i in 0..quantity {
440 let byte_index = (i / 8) as usize;
441 let bit_index = i % 8;
442 let byte_value = data[5 + byte_index];
443 let coil_value = (byte_value & (1 << bit_index)) != 0;
444 coils.push(coil_value);
445 }
446
447 register_bank.write_multiple_coils(address, &coils)?;
448
449 Ok(data[0..4].to_vec())
451 }
452
453 async fn handle_write_multiple_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
455 if data.len() < 5 {
456 return Err(ModbusError::frame("Invalid write multiple registers request"));
457 }
458
459 let address = u16::from_be_bytes([data[0], data[1]]);
460 let quantity = u16::from_be_bytes([data[2], data[3]]);
461 let byte_count = data[4] as usize;
462
463 if data.len() < 5 + byte_count || byte_count != (quantity as usize * 2) {
464 return Err(ModbusError::frame("Incomplete write multiple registers request"));
465 }
466
467 let mut registers = Vec::with_capacity(quantity as usize);
469 for i in 0..quantity {
470 let offset = 5 + (i as usize * 2);
471 let value = u16::from_be_bytes([data[offset], data[offset + 1]]);
472 registers.push(value);
473 }
474
475 register_bank.write_multiple_registers(address, ®isters)?;
476
477 Ok(data[0..4].to_vec())
479 }
480
481 fn create_error_response(request: &[u8], exception_code: u8) -> ModbusResult<Vec<u8>> {
483 if request.len() < MBAP_HEADER_SIZE + 2 {
484 return Err(ModbusError::frame("Request too short for error response"));
485 }
486
487 let transaction_id = u16::from_be_bytes([request[0], request[1]]);
488 let protocol_id = 0u16;
489 let length = 3u16; let unit_id = request[6];
491 let function_code = request[7] | 0x80; let mut response = Vec::with_capacity(MBAP_HEADER_SIZE + 3);
494
495 response.extend_from_slice(&transaction_id.to_be_bytes());
497 response.extend_from_slice(&protocol_id.to_be_bytes());
498 response.extend_from_slice(&length.to_be_bytes());
499
500 response.push(unit_id);
502 response.push(function_code);
503 response.push(exception_code);
504
505 Ok(response)
506 }
507}
508
509#[async_trait]
510impl ModbusServer for ModbusTcpServer {
511 async fn start(&mut self) -> ModbusResult<()> {
512 let mut is_running = self.is_running.lock().await;
513 if *is_running {
514 return Err(ModbusError::protocol("Server is already running"));
515 }
516
517 info!("đ Starting Modbus TCP server on {}", self.config.bind_address);
518
519 let listener = TcpListener::bind(self.config.bind_address).await
520 .map_err(|e| ModbusError::connection(format!("Failed to bind to {}: {}", self.config.bind_address, e)))?;
521
522 let (shutdown_tx, _) = broadcast::channel(1);
523 self.shutdown_tx = Some(shutdown_tx.clone());
524 self.start_time = Some(std::time::Instant::now());
525
526 *is_running = true;
527 drop(is_running);
528
529 info!("â
Modbus TCP server started successfully");
530 info!("đ Server configuration:");
531 info!(" - Bind address: {}", self.config.bind_address);
532 info!(" - Max connections: {}", self.config.max_connections);
533 info!(" - Request timeout: {:?}", self.config.request_timeout);
534
535 let register_bank = self.register_bank.clone();
536 let stats = self.stats.clone();
537 let request_timeout = self.config.request_timeout;
538 let is_running_flag = self.is_running.clone();
539 let mut shutdown_rx = shutdown_tx.subscribe();
540
541 tokio::spawn(async move {
542 loop {
543 tokio::select! {
544 result = listener.accept() => {
545 match result {
546 Ok((stream, addr)) => {
547 debug!("Accepted connection from {}", addr);
548
549 let register_bank = register_bank.clone();
550 let stats = stats.clone();
551 let shutdown_rx = shutdown_tx.subscribe();
552
553 tokio::spawn(async move {
554 Self::handle_client(stream, register_bank, stats, shutdown_rx, request_timeout).await;
555 });
556 }
557 Err(e) => {
558 error!("Failed to accept connection: {}", e);
559 }
560 }
561 }
562 _ = shutdown_rx.recv() => {
563 info!("Shutdown signal received, stopping server");
564 break;
565 }
566 }
567 }
568
569 let mut is_running = is_running_flag.lock().await;
570 *is_running = false;
571 });
572
573 Ok(())
574 }
575
576 async fn stop(&mut self) -> ModbusResult<()> {
577 if let Some(shutdown_tx) = &self.shutdown_tx {
578 let _ = shutdown_tx.send(());
579 }
580
581 let mut is_running = self.is_running.lock().await;
582 *is_running = false;
583
584 info!("âšī¸ Modbus TCP server stopped");
585 Ok(())
586 }
587
588 fn is_running(&self) -> bool {
589 false }
593
594 fn get_stats(&self) -> ServerStats {
595 let mut stats = ServerStats::default();
598
599 if let Some(start_time) = self.start_time {
600 stats.uptime_seconds = start_time.elapsed().as_secs();
601 }
602
603 stats.register_bank_stats = Some(self.register_bank.get_stats());
604 stats
605 }
606
607 fn get_register_bank(&self) -> Option<Arc<ModbusRegisterBank>> {
608 Some(self.register_bank.clone())
609 }
610}
611
612#[derive(Debug, Clone)]
614pub struct ModbusRtuServerConfig {
615 pub port: String,
616 pub baud_rate: u32,
617 pub data_bits: tokio_serial::DataBits,
618 pub stop_bits: tokio_serial::StopBits,
619 pub parity: tokio_serial::Parity,
620 pub timeout: Duration,
621 pub frame_gap: Duration,
622 pub register_bank: Option<Arc<ModbusRegisterBank>>,
623}
624
625impl Default for ModbusRtuServerConfig {
626 fn default() -> Self {
627 Self {
628 port: "/dev/ttyUSB0".to_string(),
629 baud_rate: 9600,
630 data_bits: tokio_serial::DataBits::Eight,
631 stop_bits: tokio_serial::StopBits::One,
632 parity: tokio_serial::Parity::None,
633 timeout: Duration::from_secs(1),
634 frame_gap: Duration::from_millis(4), register_bank: None,
636 }
637 }
638}
639
640pub struct ModbusRtuServer {
642 config: ModbusRtuServerConfig,
643 register_bank: Arc<ModbusRegisterBank>,
644 stats: Arc<Mutex<ServerStats>>,
645 shutdown_tx: Option<broadcast::Sender<()>>,
646 is_running: Arc<Mutex<bool>>,
647 start_time: Option<std::time::Instant>,
648}
649
650impl ModbusRtuServer {
651 pub fn new(port: &str, baud_rate: u32) -> ModbusResult<Self> {
653 let config = ModbusRtuServerConfig {
654 port: port.to_string(),
655 baud_rate,
656 ..Default::default()
657 };
658
659 Self::with_config(config)
660 }
661
662 pub fn with_config(config: ModbusRtuServerConfig) -> ModbusResult<Self> {
664 let register_bank = config.register_bank.clone()
665 .unwrap_or_else(|| Arc::new(ModbusRegisterBank::new()));
666
667 Ok(Self {
668 config,
669 register_bank,
670 stats: Arc::new(Mutex::new(ServerStats::default())),
671 shutdown_tx: None,
672 is_running: Arc::new(Mutex::new(false)),
673 start_time: None,
674 })
675 }
676
677 pub fn set_register_bank(&mut self, register_bank: Arc<ModbusRegisterBank>) {
679 self.register_bank = register_bank;
680 }
681
682 fn calculate_crc(data: &[u8]) -> u16 {
684 use crc::{Crc, CRC_16_MODBUS};
685 const CRC_MODBUS: Crc<u16> = Crc::<u16>::new(&CRC_16_MODBUS);
686 CRC_MODBUS.checksum(data)
687 }
688
689 async fn process_rtu_frame(
691 frame: &[u8],
692 register_bank: &Arc<ModbusRegisterBank>,
693 ) -> ModbusResult<Vec<u8>> {
694 if frame.len() < 4 {
695 return Err(ModbusError::frame("RTU frame too short"));
696 }
697
698 let data_len = frame.len() - 2;
700 let data = &frame[..data_len];
701 let received_crc = u16::from_le_bytes([frame[data_len], frame[data_len + 1]]);
702
703 let calculated_crc = Self::calculate_crc(data);
705 if received_crc != calculated_crc {
706 return Err(ModbusError::frame("Invalid CRC"));
707 }
708
709 if data.len() < 2 {
710 return Err(ModbusError::frame("Invalid RTU frame"));
711 }
712
713 let slave_id = data[0];
714 let function_code = data[1];
715 let pdu_data = &data[2..];
716
717 debug!("Processing RTU request: Slave={}, Function=0x{:02x}", slave_id, function_code);
718
719 let response_data = match function_code {
721 0x01 => Self::handle_read_coils(pdu_data, register_bank).await?,
722 0x02 => Self::handle_read_discrete_inputs(pdu_data, register_bank).await?,
723 0x03 => Self::handle_read_holding_registers(pdu_data, register_bank).await?,
724 0x04 => Self::handle_read_input_registers(pdu_data, register_bank).await?,
725 0x05 => Self::handle_write_single_coil(pdu_data, register_bank).await?,
726 0x06 => Self::handle_write_single_register(pdu_data, register_bank).await?,
727 0x0F => Self::handle_write_multiple_coils(pdu_data, register_bank).await?,
728 0x10 => Self::handle_write_multiple_registers(pdu_data, register_bank).await?,
729 _ => {
730 return Self::create_rtu_error_response(slave_id, function_code, 0x01);
731 }
732 };
733
734 let mut response = Vec::new();
736 response.push(slave_id);
737 response.push(function_code);
738 response.extend_from_slice(&response_data);
739
740 let crc = Self::calculate_crc(&response);
742 response.extend_from_slice(&crc.to_le_bytes());
743
744 Ok(response)
745 }
746
747 fn create_rtu_error_response(slave_id: u8, function_code: u8, exception_code: u8) -> ModbusResult<Vec<u8>> {
749 let mut response = Vec::new();
750 response.push(slave_id);
751 response.push(function_code | 0x80); response.push(exception_code);
753
754 let crc = Self::calculate_crc(&response);
755 response.extend_from_slice(&crc.to_le_bytes());
756
757 Ok(response)
758 }
759
760 async fn handle_rtu_communication(
762 mut port: tokio_serial::SerialStream,
763 register_bank: Arc<ModbusRegisterBank>,
764 stats: Arc<Mutex<ServerStats>>,
765 mut shutdown_rx: broadcast::Receiver<()>,
766 frame_gap: Duration,
767 ) {
768 info!("đ RTU server communication started");
769
770 let mut buffer = vec![0u8; 256];
771 let mut frame_buffer = Vec::new();
772 let mut last_activity = std::time::Instant::now();
773
774 loop {
775 tokio::select! {
776 _ = shutdown_rx.recv() => {
777 debug!("Shutdown signal received for RTU server");
778 break;
779 }
780
781 result = tokio::time::timeout(Duration::from_millis(100), port.read(&mut buffer)) => {
782 match result {
783 Ok(Ok(bytes_read)) if bytes_read > 0 => {
784 let now = std::time::Instant::now();
785
786 if now.duration_since(last_activity) > frame_gap && !frame_buffer.is_empty() {
788 Self::process_accumulated_frame(
790 &frame_buffer,
791 &mut port,
792 ®ister_bank,
793 &stats
794 ).await;
795 frame_buffer.clear();
796 }
797
798 frame_buffer.extend_from_slice(&buffer[..bytes_read]);
800 last_activity = now;
801
802 {
804 let mut stats = stats.lock().await;
805 stats.bytes_received += bytes_read as u64;
806 }
807 }
808 Ok(Ok(_)) => {
809 }
811 Ok(Err(e)) => {
812 error!("RTU read error: {}", e);
813 break;
814 }
815 Err(_) => {
816 let now = std::time::Instant::now();
818 if !frame_buffer.is_empty() && now.duration_since(last_activity) > frame_gap {
819 Self::process_accumulated_frame(
820 &frame_buffer,
821 &mut port,
822 ®ister_bank,
823 &stats
824 ).await;
825 frame_buffer.clear();
826 }
827 }
828 }
829 }
830 }
831 }
832
833 info!("đ RTU server communication stopped");
834 }
835
836 async fn process_accumulated_frame(
838 frame: &[u8],
839 port: &mut tokio_serial::SerialStream,
840 register_bank: &Arc<ModbusRegisterBank>,
841 stats: &Arc<Mutex<ServerStats>>,
842 ) {
843 {
845 let mut stats = stats.lock().await;
846 stats.total_requests += 1;
847 }
848
849 match Self::process_rtu_frame(frame, register_bank).await {
850 Ok(response) => {
851 if let Err(e) = port.write_all(&response).await {
852 error!("Failed to send RTU response: {}", e);
853 let mut stats = stats.lock().await;
854 stats.failed_requests += 1;
855 } else {
856 let mut stats = stats.lock().await;
857 stats.successful_requests += 1;
858 stats.bytes_sent += response.len() as u64;
859 }
860 }
861 Err(e) => {
862 error!("Error processing RTU request: {}", e);
863
864 if frame.len() >= 2 {
866 if let Ok(error_response) = Self::create_rtu_error_response(frame[0], frame[1], 0x01) {
867 let _ = port.write_all(&error_response).await;
868 }
869 }
870
871 let mut stats = stats.lock().await;
872 stats.failed_requests += 1;
873 }
874 }
875 }
876
877 async fn handle_read_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
879 ModbusTcpServer::handle_read_coils(data, register_bank).await
880 }
881
882 async fn handle_read_discrete_inputs(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
883 ModbusTcpServer::handle_read_discrete_inputs(data, register_bank).await
884 }
885
886 async fn handle_read_holding_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
887 ModbusTcpServer::handle_read_holding_registers(data, register_bank).await
888 }
889
890 async fn handle_read_input_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
891 ModbusTcpServer::handle_read_input_registers(data, register_bank).await
892 }
893
894 async fn handle_write_single_coil(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
895 ModbusTcpServer::handle_write_single_coil(data, register_bank).await
896 }
897
898 async fn handle_write_single_register(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
899 ModbusTcpServer::handle_write_single_register(data, register_bank).await
900 }
901
902 async fn handle_write_multiple_coils(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
903 ModbusTcpServer::handle_write_multiple_coils(data, register_bank).await
904 }
905
906 async fn handle_write_multiple_registers(data: &[u8], register_bank: &Arc<ModbusRegisterBank>) -> ModbusResult<Vec<u8>> {
907 ModbusTcpServer::handle_write_multiple_registers(data, register_bank).await
908 }
909}
910
911#[async_trait]
915impl ModbusServer for ModbusRtuServer {
916 async fn start(&mut self) -> ModbusResult<()> {
917 let mut is_running = self.is_running.lock().await;
918 if *is_running {
919 return Err(ModbusError::protocol("RTU Server is already running"));
920 }
921
922 info!("đ Starting Modbus RTU server on {}", self.config.port);
923
924 let port = tokio_serial::SerialStream::open(&tokio_serial::new(&self.config.port, self.config.baud_rate)
926 .data_bits(self.config.data_bits)
927 .stop_bits(self.config.stop_bits)
928 .parity(self.config.parity)
929 .timeout(self.config.timeout))
930 .map_err(|e| ModbusError::connection(format!("Failed to open serial port {}: {}", self.config.port, e)))?;
931
932 let (shutdown_tx, _) = broadcast::channel(1);
933 self.shutdown_tx = Some(shutdown_tx.clone());
934 self.start_time = Some(std::time::Instant::now());
935
936 *is_running = true;
937 drop(is_running);
938
939 info!("â
Modbus RTU server started successfully");
940 info!("đ Server configuration:");
941 info!(" - Port: {}", self.config.port);
942 info!(" - Baud rate: {}", self.config.baud_rate);
943 info!(" - Data bits: {:?}", self.config.data_bits);
944 info!(" - Stop bits: {:?}", self.config.stop_bits);
945 info!(" - Parity: {:?}", self.config.parity);
946 info!(" - Timeout: {:?}", self.config.timeout);
947
948 let register_bank = self.register_bank.clone();
949 let stats = self.stats.clone();
950 let frame_gap = self.config.frame_gap;
951 let is_running_flag = self.is_running.clone();
952 let shutdown_rx = shutdown_tx.subscribe();
953
954 tokio::spawn(async move {
955 Self::handle_rtu_communication(port, register_bank, stats, shutdown_rx, frame_gap).await;
956
957 let mut is_running = is_running_flag.lock().await;
958 *is_running = false;
959 });
960
961 Ok(())
962 }
963
964 async fn stop(&mut self) -> ModbusResult<()> {
965 if let Some(shutdown_tx) = &self.shutdown_tx {
966 let _ = shutdown_tx.send(());
967 }
968
969 let mut is_running = self.is_running.lock().await;
970 *is_running = false;
971
972 info!("âšī¸ Modbus RTU server stopped");
973 Ok(())
974 }
975
976 fn is_running(&self) -> bool {
977 false }
981
982 fn get_stats(&self) -> ServerStats {
983 let mut stats = ServerStats::default();
986
987 if let Some(start_time) = self.start_time {
988 stats.uptime_seconds = start_time.elapsed().as_secs();
989 }
990
991 stats.register_bank_stats = Some(self.register_bank.get_stats());
992 stats
993 }
994
995 fn get_register_bank(&self) -> Option<Arc<ModbusRegisterBank>> {
996 Some(self.register_bank.clone())
997 }
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use super::*;
1003 use std::time::Duration;
1004
1005 #[test]
1006 fn test_tcp_server_creation() {
1007 let result = ModbusTcpServer::new("127.0.0.1:5020");
1009 assert!(result.is_ok());
1010
1011 let server = result.unwrap();
1012 assert!(!server.is_running());
1013 assert!(server.get_register_bank().is_some());
1014 }
1015
1016 #[test]
1017 fn test_rtu_server_creation() {
1018 let result = ModbusRtuServer::new("/dev/ttyUSB0", 9600);
1020 assert!(result.is_ok());
1021
1022 let server = result.unwrap();
1023 assert!(!server.is_running());
1024 assert!(server.get_register_bank().is_some());
1025 }
1026
1027 #[test]
1028 fn test_rtu_server_configuration() {
1029 let config = ModbusRtuServerConfig {
1031 port: "/dev/ttyUSB0".to_string(),
1032 baud_rate: 19200,
1033 data_bits: tokio_serial::DataBits::Eight,
1034 stop_bits: tokio_serial::StopBits::Two,
1035 parity: tokio_serial::Parity::Even,
1036 timeout: Duration::from_secs(2),
1037 frame_gap: Duration::from_millis(5),
1038 register_bank: None,
1039 };
1040
1041 let result = ModbusRtuServer::with_config(config);
1042 assert!(result.is_ok());
1043
1044 let server = result.unwrap();
1045 assert!(!server.is_running());
1046 }
1047
1048 #[tokio::test]
1049 async fn test_rtu_server_lifecycle() {
1050 let mut server = ModbusRtuServer::new("/dev/ttyUSB0", 9600).unwrap();
1052
1053 assert!(!server.is_running());
1055
1056 let start_result = server.start().await;
1058
1059 if start_result.is_ok() {
1060 tokio::time::sleep(Duration::from_millis(10)).await;
1062 let stop_result = server.stop().await;
1063 assert!(stop_result.is_ok());
1064 } else {
1065 println!("RTU server start failed (expected without serial port): {:?}", start_result.err());
1067 }
1068 }
1069
1070 #[test]
1071 fn test_crc_calculation() {
1072 let test_data = vec![0x01, 0x03, 0x00, 0x00, 0x00, 0x02];
1074 let crc = ModbusRtuServer::calculate_crc(&test_data);
1075
1076 assert_eq!(crc, ModbusRtuServer::calculate_crc(&test_data));
1078
1079 let test_data2 = vec![0x01, 0x04, 0x00, 0x00, 0x00, 0x01];
1081 let crc2 = ModbusRtuServer::calculate_crc(&test_data2);
1082 assert_ne!(crc, crc2);
1083 }
1084
1085 #[test]
1086 fn test_rtu_error_response() {
1087 let result = ModbusRtuServer::create_rtu_error_response(0x01, 0x03, 0x01);
1089 assert!(result.is_ok());
1090
1091 let response = result.unwrap();
1092 assert_eq!(response[0], 0x01); assert_eq!(response[1], 0x83); assert_eq!(response[2], 0x01); assert_eq!(response.len(), 5); }
1097
1098 #[tokio::test]
1099 async fn test_server_stats() {
1100 let server = ModbusTcpServer::new("127.0.0.1:5021").unwrap();
1102 let stats = server.get_stats();
1103
1104 assert_eq!(stats.connections_count, 0);
1106 assert_eq!(stats.total_requests, 0);
1107 assert_eq!(stats.successful_requests, 0);
1108 assert_eq!(stats.failed_requests, 0);
1109 }
1110
1111 #[test]
1112 fn test_register_bank_integration() {
1113 let register_bank = Arc::new(ModbusRegisterBank::new());
1115
1116 register_bank.write_single_coil(0, true).unwrap();
1118 register_bank.write_single_register(0, 0x1234).unwrap();
1119
1120 let mut server = ModbusTcpServer::new("127.0.0.1:5022").unwrap();
1121 server.set_register_bank(register_bank.clone());
1122
1123 let server_bank = server.get_register_bank().unwrap();
1125 let coils = server_bank.read_coils(0, 1).unwrap();
1126 let registers = server_bank.read_holding_registers(0, 1).unwrap();
1127
1128 assert_eq!(coils[0], true);
1129 assert_eq!(registers[0], 0x1234);
1130 }
1131}